diff options
Diffstat (limited to 'cpp/src/Ice/Reference.cpp')
-rw-r--r-- | cpp/src/Ice/Reference.cpp | 626 |
1 files changed, 486 insertions, 140 deletions
diff --git a/cpp/src/Ice/Reference.cpp b/cpp/src/Ice/Reference.cpp index b9e1d534836..3fe7eee422d 100644 --- a/cpp/src/Ice/Reference.cpp +++ b/cpp/src/Ice/Reference.cpp @@ -625,6 +625,20 @@ IceInternal::FixedReference::getConnection(bool& compress) const return connection; } +void +IceInternal::FixedReference::getConnection(const GetConnectionCallbackPtr& callback) const +{ + try + { + bool compress; + callback->setConnection(getConnection(compress), compress); + } + catch(const Ice::LocalException& ex) + { + callback->setException(ex); + } +} + bool IceInternal::FixedReference::operator==(const Reference& r) const { @@ -743,20 +757,6 @@ IceInternal::FixedReference::filterConnections(const vector<ConnectionIPtr>& all IceUtil::Shared* IceInternal::upCast(IceInternal::RoutableReference* p) { return p; } -vector<EndpointIPtr> -IceInternal::RoutableReference::getRoutedEndpoints() const -{ - if(_routerInfo) - { - // - // If we route, we send everything to the router's client - // proxy endpoints. - // - return _routerInfo->getClientEndpoints(); - } - return vector<EndpointIPtr>(); -} - bool IceInternal::RoutableReference::getSecure() const { @@ -1093,6 +1093,232 @@ IceInternal::RoutableReference::operator<(const Reference& r) const return false; } +ConnectionIPtr +IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& allEndpoints, bool& comp) const +{ + vector<EndpointIPtr> endpoints = filterEndpoints(allEndpoints); + if(endpoints.empty()) + { + throw Ice::NoEndpointException(__FILE__, __LINE__, toString()); + } + + OutgoingConnectionFactoryPtr factory = getInstance()->outgoingConnectionFactory(); + Ice::ConnectionIPtr connection; + if(getCacheConnection() || endpoints.size() == 1) + { + // + // Get an existing connection or create one if there's no + // existing connection to one of the given endpoints. + // + connection = factory->create(endpoints, false, _threadPerConnection, getEndpointSelection(), comp); + } + else + { + // + // Go through the list of endpoints and try to create the + // connection until it succeeds. This is different from just + // calling create() with the given endpoints since this might + // create a new connection even if there's an existing + // connection for one of the endpoints. + // + + auto_ptr<LocalException> exception; + vector<EndpointIPtr> endpoint; + endpoint.push_back(0); + + for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) + { + try + { + endpoint.back() = *p; + connection = factory->create(endpoint, p + 1 == endpoints.end(), _threadPerConnection, + getEndpointSelection(), comp); + break; + } + catch(const LocalException& ex) + { + exception.reset(dynamic_cast<LocalException*>(ex.ice_clone())); + } + } + + if(!connection) + { + assert(exception.get()); + exception->ice_throw(); + } + } + + assert(connection); + + // + // If we have a router, set the object adapter for this router + // (if any) to the new connection, so that callbacks from the + // router can be received over this new connection. + // + if(_routerInfo) + { + connection->setAdapter(_routerInfo->getAdapter()); + } + + return connection; +} + +void +IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& allEndpoints, + const GetConnectionCallbackPtr& callback) const +{ + vector<EndpointIPtr> endpoints = filterEndpoints(allEndpoints); + if(endpoints.empty()) + { + callback->setException(Ice::NoEndpointException(__FILE__, __LINE__, toString())); + return; + } + + // + // Finally, create the connection. + // + OutgoingConnectionFactoryPtr factory = getInstance()->outgoingConnectionFactory(); + if(getCacheConnection() || endpoints.size() == 1) + { + class CB1 : public OutgoingConnectionFactory::CreateConnectionCallback + { + public: + + virtual void + setConnection(const Ice::ConnectionIPtr& connection, bool compress) + { + // + // If we have a router, set the object adapter for this router + // (if any) to the new connection, so that callbacks from the + // router can be received over this new connection. + // + if(_routerInfo) + { + connection->setAdapter(_routerInfo->getAdapter()); + } + _callback->setConnection(connection, compress); + } + + virtual void + setException(const Ice::LocalException& ex) + { + _callback->setException(ex); + } + + CB1(const RouterInfoPtr& routerInfo, const GetConnectionCallbackPtr& callback) : + _routerInfo(routerInfo), _callback(callback) + { + } + + private: + + const RouterInfoPtr _routerInfo; + const GetConnectionCallbackPtr _callback; + }; + + // + // Get an existing connection or create one if there's no + // existing connection to one of the given endpoints. + // + factory->create(endpoints, false, _threadPerConnection, getEndpointSelection(), new CB1(_routerInfo, callback)); + return; + } + else + { + class CB2 : public OutgoingConnectionFactory::CreateConnectionCallback + { + public: + + virtual void + setConnection(const Ice::ConnectionIPtr& connection, bool compress) + { + // + // If we have a router, set the object adapter for this router + // (if any) to the new connection, so that callbacks from the + // router can be received over this new connection. + // + if(_reference->getRouterInfo()) + { + connection->setAdapter(_reference->getRouterInfo()->getAdapter()); + } + _callback->setConnection(connection, compress); + } + + virtual void + setException(const Ice::LocalException& ex) + { + if(!_exception.get()) + { + _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); + } + + if(++_i == _endpoints.size()) + { + _callback->setException(*_exception.get()); + return; + } + + bool more = _i != _endpoints.size() - 1; + vector<EndpointIPtr> endpoint; + endpoint.push_back(_endpoints[_i]); + + OutgoingConnectionFactoryPtr factory = _reference->getInstance()->outgoingConnectionFactory(); + bool threadPerConnection = _reference->getThreadPerConnection(); + EndpointSelectionType sel = _reference->getEndpointSelection(); + factory->create(endpoint, more, threadPerConnection, sel, this); + } + + CB2(const RoutableReferencePtr& reference, const vector<EndpointIPtr>& endpoints, + const GetConnectionCallbackPtr& callback) : + _reference(reference), + _endpoints(endpoints), + _callback(callback), + _i(0) + { + } + + private: + + const RoutableReferencePtr _reference; + const vector<EndpointIPtr> _endpoints; + const GetConnectionCallbackPtr _callback; + size_t _i; + std::auto_ptr<Ice::LocalException> _exception; + }; + + // + // Go through the list of endpoints and try to create the + // connection until it succeeds. This is different from just + // calling create() with the given endpoints since this might + // create a new connection even if there's an existing + // connection for one of the endpoints. + // + + vector<EndpointIPtr> endpt; + endpt.push_back(endpoints[0]); + RoutableReference* self = const_cast<RoutableReference*>(this); + factory->create(endpt, true, _threadPerConnection, getEndpointSelection(), new CB2(self, endpoints, callback)); + return; + } +} + +void +IceInternal::RoutableReference::applyOverrides(vector<EndpointIPtr>& endpoints) const +{ + for(vector<EndpointIPtr>::iterator p = endpoints.begin(); p != endpoints.end(); ++p) + { + *p = (*p)->connectionId(_connectionId); + if(_overrideCompress) + { + *p = (*p)->compress(_compress); + } + if(_overrideTimeout) + { + *p = (*p)->timeout(_timeout); + } + } +} + IceInternal::RoutableReference::RoutableReference(const InstancePtr& inst, const CommunicatorPtr& com, const Identity& ident, const SharedContextPtr& ctx, const string& fs, Mode md, bool sec, bool prefSec, const RouterInfoPtr& rtrInfo, @@ -1130,8 +1356,8 @@ IceInternal::RoutableReference::RoutableReference(const RoutableReference& r) : { } -ConnectionIPtr -IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& allEndpoints, bool& comp) const +vector<EndpointIPtr> +IceInternal::RoutableReference::filterEndpoints(const vector<EndpointIPtr>& allEndpoints) const { vector<EndpointIPtr> endpoints = allEndpoints; @@ -1224,75 +1450,8 @@ IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& all // stable_partition(endpoints.begin(), endpoints.end(), not1(Ice::constMemFun(&EndpointI::secure))); } - - if(endpoints.empty()) - { - NoEndpointException ex(__FILE__, __LINE__); - ex.proxy = toString(); - throw ex; - } - - // - // Finally, create the connection. - // - OutgoingConnectionFactoryPtr factory = getInstance()->outgoingConnectionFactory(); - if(getCacheConnection() || endpoints.size() == 1) - { - // - // Get an existing connection or create one if there's no - // existing connection to one of the given endpoints. - // - return factory->create(endpoints, false, _threadPerConnection, getEndpointSelection(), comp); - } - else - { - // - // Go through the list of endpoints and try to create the - // connection until it succeeds. This is different from just - // calling create() with the given endpoints since this might - // create a new connection even if there's an existing - // connection for one of the endpoints. - // - - auto_ptr<LocalException> exception; - vector<EndpointIPtr> endpoint; - endpoint.push_back(0); - - for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) - { - try - { - endpoint.back() = *p; - return factory->create(endpoint, p + 1 == endpoints.end(), _threadPerConnection, - getEndpointSelection(), comp); - } - catch(const LocalException& ex) - { - exception.reset(dynamic_cast<LocalException*>(ex.ice_clone())); - } - } - - assert(exception.get()); - exception->ice_throw(); - return 0; // Keeps the compiler happy. - } -} - -void -IceInternal::RoutableReference::applyOverrides(vector<EndpointIPtr>& endpoints) const -{ - for(vector<EndpointIPtr>::iterator p = endpoints.begin(); p != endpoints.end(); ++p) - { - *p = (*p)->connectionId(_connectionId); - if(_overrideCompress) - { - *p = (*p)->compress(_compress); - } - if(_overrideTimeout) - { - *p = (*p)->timeout(_timeout); - } - } + + return endpoints; } IceUtil::Shared* IceInternal::upCast(IceInternal::DirectReference* p) { return p; } @@ -1403,7 +1562,8 @@ IceInternal::DirectReference::changeAdapterId(const string& newAdapterId) const getSecure(), getPreferSecure(), newAdapterId, getRouterInfo(), locatorInfo, getCollocationOptimization(), getCacheConnection(), getEndpointSelection(), - getThreadPerConnection(), getLocatorCacheTimeout()); + getThreadPerConnection(), + getLocatorCacheTimeout()); } else { @@ -1472,28 +1632,68 @@ IceInternal::DirectReference::toString() const ConnectionIPtr IceInternal::DirectReference::getConnection(bool& comp) const { - vector<EndpointIPtr> endpts = RoutableReference::getRoutedEndpoints(); - applyOverrides(endpts); - - if(endpts.empty()) + if(getRouterInfo()) { - endpts = _endpoints; // Endpoint overrides are already applied on these endpoints. + vector<EndpointIPtr> endpts = getRouterInfo()->getClientEndpoints(); + if(!endpts.empty()) + { + applyOverrides(endpts); + return createConnection(endpts, comp); + } } - ConnectionIPtr connection = createConnection(endpts, comp); + return createConnection(_endpoints, comp); +} + +void +IceInternal::DirectReference::getConnection(const GetConnectionCallbackPtr& callback) const +{ + class Callback : public RouterInfo::GetClientEndpointsCallback + { + public: + + virtual void + setEndpoints(const vector<EndpointIPtr>& endpoints) + { + vector<EndpointIPtr> endpts = endpoints; + if(!endpts.empty()) + { + _reference->applyOverrides(endpts); + _reference->createConnection(endpts, _callback); + return; + } + + _reference->createConnection(_reference->getEndpoints(), _callback); + } + + virtual void + setException(const Ice::LocalException& ex) + { + _callback->setException(ex); + } + + Callback(const DirectReferencePtr& reference, const GetConnectionCallbackPtr& callback) : + _reference(reference), _callback(callback) + { + } + + private: + + const DirectReferencePtr _reference; + const GetConnectionCallbackPtr _callback; + }; - // - // If we have a router, set the object adapter for this router - // (if any) to the new connection, so that callbacks from the - // router can be received over this new connection. - // if(getRouterInfo()) { - connection->setAdapter(getRouterInfo()->getAdapter()); + // + // If we route, we send everything to the router's client + // proxy endpoints. + // + getRouterInfo()->getClientEndpoints(new Callback(const_cast<DirectReference*>(this), callback)); + return; } - assert(connection); - return connection; + createConnection(_endpoints, callback); } bool @@ -1699,77 +1899,223 @@ IceInternal::IndirectReference::toString() const ConnectionIPtr IceInternal::IndirectReference::getConnection(bool& comp) const { - ConnectionIPtr connection; + if(getRouterInfo()) + { + // + // If we route, we send everything to the router's client + // proxy endpoints. + // + vector<EndpointIPtr> endpts = getRouterInfo()->getClientEndpoints(); + if(!endpts.empty()) + { + applyOverrides(endpts); + return createConnection(endpts, comp); + } + } while(true) { - vector<EndpointIPtr> endpts = RoutableReference::getRoutedEndpoints(); bool cached = false; - if(endpts.empty() && _locatorInfo) + vector<EndpointIPtr> endpts; + if(_locatorInfo) { - const IndirectReferencePtr self = const_cast<IndirectReference*>(this); - endpts = _locatorInfo->getEndpoints(self, _locatorCacheTimeout, cached); + endpts = _locatorInfo->getEndpoints(const_cast<IndirectReference*>(this), _locatorCacheTimeout, cached); + applyOverrides(endpts); } - applyOverrides(endpts); + if(endpts.empty()) + { + throw Ice::NoEndpointException(__FILE__, __LINE__, toString()); + } try { - connection = createConnection(endpts, comp); - assert(connection); + return createConnection(endpts, comp); } - catch(const NoEndpointException& ex) + catch(const NoEndpointException&) { - throw ex; // No need to retry if there's no endpoints. + throw; // No need to retry if there's no endpoints. } catch(const LocalException& ex) { - if(!getRouterInfo()) + assert(_locatorInfo); + _locatorInfo->clearCache(const_cast<IndirectReference*>(this)); + + if(cached) { - assert(_locatorInfo); - - // COMPILERFIX: Braces needed to prevent BCB from causing Reference refCount from + // COMPILERFIX: Braces needed to prevent BCB from causing TraceLevels refCount from // being decremented twice when loop continues. { - const IndirectReferencePtr self = const_cast<IndirectReference*>(this); - _locatorInfo->clearCache(self); + TraceLevelsPtr traceLevels = getInstance()->traceLevels(); + if(traceLevels->retry >= 2) + { + Trace out(getInstance()->initializationData().logger, traceLevels->retryCat); + out << "connection to cached endpoints failed\n" + << "removing endpoints from cache and trying one more time\n" << ex; + } } + continue; + } + throw; + } + } + + assert(false); + return 0; +} + +void +IceInternal::IndirectReference::getConnection(const GetConnectionCallbackPtr& callback) const +{ + class Callback : public RouterInfo::GetClientEndpointsCallback + { + public: + + virtual void + setEndpoints(const vector<EndpointIPtr>& endpoints) + { + vector<EndpointIPtr> endpts = endpoints; + if(!endpts.empty()) + { + _reference->applyOverrides(endpts); + _reference->createConnection(endpts, _callback); + return; + } + + _reference->getConnectionNoRouterInfo(_callback); + } + + virtual void + setException(const Ice::LocalException& ex) + { + _callback->setException(ex); + } + + Callback(const IndirectReferencePtr& reference, const GetConnectionCallbackPtr& callback) : + _reference(reference), _callback(callback) + { + } + + private: + + const IndirectReferencePtr _reference; + const GetConnectionCallbackPtr _callback; + }; - if(cached) + if(getRouterInfo()) + { + // + // If we route, we send everything to the router's client + // proxy endpoints. + // + getRouterInfo()->getClientEndpoints(new Callback(const_cast<IndirectReference*>(this), callback)); + return; + } + + getConnectionNoRouterInfo(callback); +} + +void +IceInternal::IndirectReference::getConnectionNoRouterInfo(const GetConnectionCallbackPtr& callback) const +{ + class Callback2 : public Reference::GetConnectionCallback + { + public: + + virtual void + setConnection(const Ice::ConnectionIPtr& connection, bool compress) + { + _callback->setConnection(connection, compress); + } + + virtual void + setException(const Ice::LocalException& exc) + { + try + { + exc.ice_throw(); + } + catch(const Ice::NoEndpointException& ex) + { + _callback->setException(ex); // No need to retry if there's no endpoints. + } + catch(const Ice::LocalException& ex) + { + LocatorInfoPtr locatorInfo = _reference->getLocatorInfo(); + assert(locatorInfo); + locatorInfo->clearCache(_reference); + if(_cached) { - // COMPILERFIX: Braces needed to prevent BCB from causing TraceLevels refCount from - // being decremented twice when loop continues. + TraceLevelsPtr traceLvls = _reference->getInstance()->traceLevels(); + if(traceLvls->retry >= 2) { - TraceLevelsPtr traceLevels = getInstance()->traceLevels(); - if(traceLevels->retry >= 2) - { - Trace out(getInstance()->initializationData().logger, traceLevels->retryCat); - out << "connection to cached endpoints failed\n" - << "removing endpoints from cache and trying one more time\n" << ex; - } + Trace out(_reference->getInstance()->initializationData().logger, traceLvls->retryCat); + out << "connection to cached endpoints failed\n" + << "removing endpoints from cache and trying one more time\n" << ex; } - continue; + _reference->getConnectionNoRouterInfo(_callback); // Retry. + return; } + _callback->setException(ex); } + } - throw; + Callback2(const IndirectReferencePtr& reference, const GetConnectionCallbackPtr& callback, bool cached): + _reference(reference), _callback(callback), _cached(cached) + { } + + private: + + const IndirectReferencePtr _reference; + const GetConnectionCallbackPtr _callback; + const bool _cached; + }; + + class Callback : public LocatorInfo::GetEndpointsCallback + { + public: - break; - } + virtual void + setEndpoints(const vector<EndpointIPtr>& endpoints, bool cached) + { + if(endpoints.empty()) + { + _callback->setException(Ice::NoEndpointException(__FILE__, __LINE__, _reference->toString())); + return; + } + + vector<EndpointIPtr> endpts = endpoints; + _reference->applyOverrides(endpts); + _reference->createConnection(endpts, new Callback2(_reference, _callback, cached)); + } + + virtual void + setException(const Ice::LocalException& ex) + { + _callback->setException(ex); + } - // - // If we have a router, set the object adapter for this router - // (if any) to the new connection, so that callbacks from the - // router can be received over this new connection. - // - if(getRouterInfo()) + Callback(const IndirectReferencePtr& reference, const GetConnectionCallbackPtr& callback) : + _reference(reference), _callback(callback) + { + } + + private: + + const IndirectReferencePtr _reference; + const GetConnectionCallbackPtr _callback; + }; + + if(_locatorInfo) { - connection->setAdapter(getRouterInfo()->getAdapter()); + IndirectReference* self = const_cast<IndirectReference*>(this); + _locatorInfo->getEndpoints(self, _locatorCacheTimeout, new Callback(self, callback)); + } + else + { + callback->setException(Ice::NoEndpointException(__FILE__, __LINE__, toString())); } - - assert(connection); - return connection; } int |