summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Reference.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Reference.cpp')
-rw-r--r--cpp/src/Ice/Reference.cpp626
1 files changed, 486 insertions, 140 deletions
diff --git a/cpp/src/Ice/Reference.cpp b/cpp/src/Ice/Reference.cpp
index b9e1d534836..3fe7eee422d 100644
--- a/cpp/src/Ice/Reference.cpp
+++ b/cpp/src/Ice/Reference.cpp
@@ -625,6 +625,20 @@ IceInternal::FixedReference::getConnection(bool& compress) const
return connection;
}
+void
+IceInternal::FixedReference::getConnection(const GetConnectionCallbackPtr& callback) const
+{
+ try
+ {
+ bool compress;
+ callback->setConnection(getConnection(compress), compress);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ callback->setException(ex);
+ }
+}
+
bool
IceInternal::FixedReference::operator==(const Reference& r) const
{
@@ -743,20 +757,6 @@ IceInternal::FixedReference::filterConnections(const vector<ConnectionIPtr>& all
IceUtil::Shared* IceInternal::upCast(IceInternal::RoutableReference* p) { return p; }
-vector<EndpointIPtr>
-IceInternal::RoutableReference::getRoutedEndpoints() const
-{
- if(_routerInfo)
- {
- //
- // If we route, we send everything to the router's client
- // proxy endpoints.
- //
- return _routerInfo->getClientEndpoints();
- }
- return vector<EndpointIPtr>();
-}
-
bool
IceInternal::RoutableReference::getSecure() const
{
@@ -1093,6 +1093,232 @@ IceInternal::RoutableReference::operator<(const Reference& r) const
return false;
}
+ConnectionIPtr
+IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& allEndpoints, bool& comp) const
+{
+ vector<EndpointIPtr> endpoints = filterEndpoints(allEndpoints);
+ if(endpoints.empty())
+ {
+ throw Ice::NoEndpointException(__FILE__, __LINE__, toString());
+ }
+
+ OutgoingConnectionFactoryPtr factory = getInstance()->outgoingConnectionFactory();
+ Ice::ConnectionIPtr connection;
+ if(getCacheConnection() || endpoints.size() == 1)
+ {
+ //
+ // Get an existing connection or create one if there's no
+ // existing connection to one of the given endpoints.
+ //
+ connection = factory->create(endpoints, false, _threadPerConnection, getEndpointSelection(), comp);
+ }
+ else
+ {
+ //
+ // Go through the list of endpoints and try to create the
+ // connection until it succeeds. This is different from just
+ // calling create() with the given endpoints since this might
+ // create a new connection even if there's an existing
+ // connection for one of the endpoints.
+ //
+
+ auto_ptr<LocalException> exception;
+ vector<EndpointIPtr> endpoint;
+ endpoint.push_back(0);
+
+ for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
+ {
+ try
+ {
+ endpoint.back() = *p;
+ connection = factory->create(endpoint, p + 1 == endpoints.end(), _threadPerConnection,
+ getEndpointSelection(), comp);
+ break;
+ }
+ catch(const LocalException& ex)
+ {
+ exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
+ }
+ }
+
+ if(!connection)
+ {
+ assert(exception.get());
+ exception->ice_throw();
+ }
+ }
+
+ assert(connection);
+
+ //
+ // If we have a router, set the object adapter for this router
+ // (if any) to the new connection, so that callbacks from the
+ // router can be received over this new connection.
+ //
+ if(_routerInfo)
+ {
+ connection->setAdapter(_routerInfo->getAdapter());
+ }
+
+ return connection;
+}
+
+void
+IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& allEndpoints,
+ const GetConnectionCallbackPtr& callback) const
+{
+ vector<EndpointIPtr> endpoints = filterEndpoints(allEndpoints);
+ if(endpoints.empty())
+ {
+ callback->setException(Ice::NoEndpointException(__FILE__, __LINE__, toString()));
+ return;
+ }
+
+ //
+ // Finally, create the connection.
+ //
+ OutgoingConnectionFactoryPtr factory = getInstance()->outgoingConnectionFactory();
+ if(getCacheConnection() || endpoints.size() == 1)
+ {
+ class CB1 : public OutgoingConnectionFactory::CreateConnectionCallback
+ {
+ public:
+
+ virtual void
+ setConnection(const Ice::ConnectionIPtr& connection, bool compress)
+ {
+ //
+ // If we have a router, set the object adapter for this router
+ // (if any) to the new connection, so that callbacks from the
+ // router can be received over this new connection.
+ //
+ if(_routerInfo)
+ {
+ connection->setAdapter(_routerInfo->getAdapter());
+ }
+ _callback->setConnection(connection, compress);
+ }
+
+ virtual void
+ setException(const Ice::LocalException& ex)
+ {
+ _callback->setException(ex);
+ }
+
+ CB1(const RouterInfoPtr& routerInfo, const GetConnectionCallbackPtr& callback) :
+ _routerInfo(routerInfo), _callback(callback)
+ {
+ }
+
+ private:
+
+ const RouterInfoPtr _routerInfo;
+ const GetConnectionCallbackPtr _callback;
+ };
+
+ //
+ // Get an existing connection or create one if there's no
+ // existing connection to one of the given endpoints.
+ //
+ factory->create(endpoints, false, _threadPerConnection, getEndpointSelection(), new CB1(_routerInfo, callback));
+ return;
+ }
+ else
+ {
+ class CB2 : public OutgoingConnectionFactory::CreateConnectionCallback
+ {
+ public:
+
+ virtual void
+ setConnection(const Ice::ConnectionIPtr& connection, bool compress)
+ {
+ //
+ // If we have a router, set the object adapter for this router
+ // (if any) to the new connection, so that callbacks from the
+ // router can be received over this new connection.
+ //
+ if(_reference->getRouterInfo())
+ {
+ connection->setAdapter(_reference->getRouterInfo()->getAdapter());
+ }
+ _callback->setConnection(connection, compress);
+ }
+
+ virtual void
+ setException(const Ice::LocalException& ex)
+ {
+ if(!_exception.get())
+ {
+ _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
+ }
+
+ if(++_i == _endpoints.size())
+ {
+ _callback->setException(*_exception.get());
+ return;
+ }
+
+ bool more = _i != _endpoints.size() - 1;
+ vector<EndpointIPtr> endpoint;
+ endpoint.push_back(_endpoints[_i]);
+
+ OutgoingConnectionFactoryPtr factory = _reference->getInstance()->outgoingConnectionFactory();
+ bool threadPerConnection = _reference->getThreadPerConnection();
+ EndpointSelectionType sel = _reference->getEndpointSelection();
+ factory->create(endpoint, more, threadPerConnection, sel, this);
+ }
+
+ CB2(const RoutableReferencePtr& reference, const vector<EndpointIPtr>& endpoints,
+ const GetConnectionCallbackPtr& callback) :
+ _reference(reference),
+ _endpoints(endpoints),
+ _callback(callback),
+ _i(0)
+ {
+ }
+
+ private:
+
+ const RoutableReferencePtr _reference;
+ const vector<EndpointIPtr> _endpoints;
+ const GetConnectionCallbackPtr _callback;
+ size_t _i;
+ std::auto_ptr<Ice::LocalException> _exception;
+ };
+
+ //
+ // Go through the list of endpoints and try to create the
+ // connection until it succeeds. This is different from just
+ // calling create() with the given endpoints since this might
+ // create a new connection even if there's an existing
+ // connection for one of the endpoints.
+ //
+
+ vector<EndpointIPtr> endpt;
+ endpt.push_back(endpoints[0]);
+ RoutableReference* self = const_cast<RoutableReference*>(this);
+ factory->create(endpt, true, _threadPerConnection, getEndpointSelection(), new CB2(self, endpoints, callback));
+ return;
+ }
+}
+
+void
+IceInternal::RoutableReference::applyOverrides(vector<EndpointIPtr>& endpoints) const
+{
+ for(vector<EndpointIPtr>::iterator p = endpoints.begin(); p != endpoints.end(); ++p)
+ {
+ *p = (*p)->connectionId(_connectionId);
+ if(_overrideCompress)
+ {
+ *p = (*p)->compress(_compress);
+ }
+ if(_overrideTimeout)
+ {
+ *p = (*p)->timeout(_timeout);
+ }
+ }
+}
+
IceInternal::RoutableReference::RoutableReference(const InstancePtr& inst, const CommunicatorPtr& com,
const Identity& ident, const SharedContextPtr& ctx, const string& fs,
Mode md, bool sec, bool prefSec, const RouterInfoPtr& rtrInfo,
@@ -1130,8 +1356,8 @@ IceInternal::RoutableReference::RoutableReference(const RoutableReference& r) :
{
}
-ConnectionIPtr
-IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& allEndpoints, bool& comp) const
+vector<EndpointIPtr>
+IceInternal::RoutableReference::filterEndpoints(const vector<EndpointIPtr>& allEndpoints) const
{
vector<EndpointIPtr> endpoints = allEndpoints;
@@ -1224,75 +1450,8 @@ IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& all
//
stable_partition(endpoints.begin(), endpoints.end(), not1(Ice::constMemFun(&EndpointI::secure)));
}
-
- if(endpoints.empty())
- {
- NoEndpointException ex(__FILE__, __LINE__);
- ex.proxy = toString();
- throw ex;
- }
-
- //
- // Finally, create the connection.
- //
- OutgoingConnectionFactoryPtr factory = getInstance()->outgoingConnectionFactory();
- if(getCacheConnection() || endpoints.size() == 1)
- {
- //
- // Get an existing connection or create one if there's no
- // existing connection to one of the given endpoints.
- //
- return factory->create(endpoints, false, _threadPerConnection, getEndpointSelection(), comp);
- }
- else
- {
- //
- // Go through the list of endpoints and try to create the
- // connection until it succeeds. This is different from just
- // calling create() with the given endpoints since this might
- // create a new connection even if there's an existing
- // connection for one of the endpoints.
- //
-
- auto_ptr<LocalException> exception;
- vector<EndpointIPtr> endpoint;
- endpoint.push_back(0);
-
- for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
- {
- try
- {
- endpoint.back() = *p;
- return factory->create(endpoint, p + 1 == endpoints.end(), _threadPerConnection,
- getEndpointSelection(), comp);
- }
- catch(const LocalException& ex)
- {
- exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
- }
- }
-
- assert(exception.get());
- exception->ice_throw();
- return 0; // Keeps the compiler happy.
- }
-}
-
-void
-IceInternal::RoutableReference::applyOverrides(vector<EndpointIPtr>& endpoints) const
-{
- for(vector<EndpointIPtr>::iterator p = endpoints.begin(); p != endpoints.end(); ++p)
- {
- *p = (*p)->connectionId(_connectionId);
- if(_overrideCompress)
- {
- *p = (*p)->compress(_compress);
- }
- if(_overrideTimeout)
- {
- *p = (*p)->timeout(_timeout);
- }
- }
+
+ return endpoints;
}
IceUtil::Shared* IceInternal::upCast(IceInternal::DirectReference* p) { return p; }
@@ -1403,7 +1562,8 @@ IceInternal::DirectReference::changeAdapterId(const string& newAdapterId) const
getSecure(), getPreferSecure(), newAdapterId, getRouterInfo(),
locatorInfo, getCollocationOptimization(),
getCacheConnection(), getEndpointSelection(),
- getThreadPerConnection(), getLocatorCacheTimeout());
+ getThreadPerConnection(),
+ getLocatorCacheTimeout());
}
else
{
@@ -1472,28 +1632,68 @@ IceInternal::DirectReference::toString() const
ConnectionIPtr
IceInternal::DirectReference::getConnection(bool& comp) const
{
- vector<EndpointIPtr> endpts = RoutableReference::getRoutedEndpoints();
- applyOverrides(endpts);
-
- if(endpts.empty())
+ if(getRouterInfo())
{
- endpts = _endpoints; // Endpoint overrides are already applied on these endpoints.
+ vector<EndpointIPtr> endpts = getRouterInfo()->getClientEndpoints();
+ if(!endpts.empty())
+ {
+ applyOverrides(endpts);
+ return createConnection(endpts, comp);
+ }
}
- ConnectionIPtr connection = createConnection(endpts, comp);
+ return createConnection(_endpoints, comp);
+}
+
+void
+IceInternal::DirectReference::getConnection(const GetConnectionCallbackPtr& callback) const
+{
+ class Callback : public RouterInfo::GetClientEndpointsCallback
+ {
+ public:
+
+ virtual void
+ setEndpoints(const vector<EndpointIPtr>& endpoints)
+ {
+ vector<EndpointIPtr> endpts = endpoints;
+ if(!endpts.empty())
+ {
+ _reference->applyOverrides(endpts);
+ _reference->createConnection(endpts, _callback);
+ return;
+ }
+
+ _reference->createConnection(_reference->getEndpoints(), _callback);
+ }
+
+ virtual void
+ setException(const Ice::LocalException& ex)
+ {
+ _callback->setException(ex);
+ }
+
+ Callback(const DirectReferencePtr& reference, const GetConnectionCallbackPtr& callback) :
+ _reference(reference), _callback(callback)
+ {
+ }
+
+ private:
+
+ const DirectReferencePtr _reference;
+ const GetConnectionCallbackPtr _callback;
+ };
- //
- // If we have a router, set the object adapter for this router
- // (if any) to the new connection, so that callbacks from the
- // router can be received over this new connection.
- //
if(getRouterInfo())
{
- connection->setAdapter(getRouterInfo()->getAdapter());
+ //
+ // If we route, we send everything to the router's client
+ // proxy endpoints.
+ //
+ getRouterInfo()->getClientEndpoints(new Callback(const_cast<DirectReference*>(this), callback));
+ return;
}
- assert(connection);
- return connection;
+ createConnection(_endpoints, callback);
}
bool
@@ -1699,77 +1899,223 @@ IceInternal::IndirectReference::toString() const
ConnectionIPtr
IceInternal::IndirectReference::getConnection(bool& comp) const
{
- ConnectionIPtr connection;
+ if(getRouterInfo())
+ {
+ //
+ // If we route, we send everything to the router's client
+ // proxy endpoints.
+ //
+ vector<EndpointIPtr> endpts = getRouterInfo()->getClientEndpoints();
+ if(!endpts.empty())
+ {
+ applyOverrides(endpts);
+ return createConnection(endpts, comp);
+ }
+ }
while(true)
{
- vector<EndpointIPtr> endpts = RoutableReference::getRoutedEndpoints();
bool cached = false;
- if(endpts.empty() && _locatorInfo)
+ vector<EndpointIPtr> endpts;
+ if(_locatorInfo)
{
- const IndirectReferencePtr self = const_cast<IndirectReference*>(this);
- endpts = _locatorInfo->getEndpoints(self, _locatorCacheTimeout, cached);
+ endpts = _locatorInfo->getEndpoints(const_cast<IndirectReference*>(this), _locatorCacheTimeout, cached);
+ applyOverrides(endpts);
}
- applyOverrides(endpts);
+ if(endpts.empty())
+ {
+ throw Ice::NoEndpointException(__FILE__, __LINE__, toString());
+ }
try
{
- connection = createConnection(endpts, comp);
- assert(connection);
+ return createConnection(endpts, comp);
}
- catch(const NoEndpointException& ex)
+ catch(const NoEndpointException&)
{
- throw ex; // No need to retry if there's no endpoints.
+ throw; // No need to retry if there's no endpoints.
}
catch(const LocalException& ex)
{
- if(!getRouterInfo())
+ assert(_locatorInfo);
+ _locatorInfo->clearCache(const_cast<IndirectReference*>(this));
+
+ if(cached)
{
- assert(_locatorInfo);
-
- // COMPILERFIX: Braces needed to prevent BCB from causing Reference refCount from
+ // COMPILERFIX: Braces needed to prevent BCB from causing TraceLevels refCount from
// being decremented twice when loop continues.
{
- const IndirectReferencePtr self = const_cast<IndirectReference*>(this);
- _locatorInfo->clearCache(self);
+ TraceLevelsPtr traceLevels = getInstance()->traceLevels();
+ if(traceLevels->retry >= 2)
+ {
+ Trace out(getInstance()->initializationData().logger, traceLevels->retryCat);
+ out << "connection to cached endpoints failed\n"
+ << "removing endpoints from cache and trying one more time\n" << ex;
+ }
}
+ continue;
+ }
+ throw;
+ }
+ }
+
+ assert(false);
+ return 0;
+}
+
+void
+IceInternal::IndirectReference::getConnection(const GetConnectionCallbackPtr& callback) const
+{
+ class Callback : public RouterInfo::GetClientEndpointsCallback
+ {
+ public:
+
+ virtual void
+ setEndpoints(const vector<EndpointIPtr>& endpoints)
+ {
+ vector<EndpointIPtr> endpts = endpoints;
+ if(!endpts.empty())
+ {
+ _reference->applyOverrides(endpts);
+ _reference->createConnection(endpts, _callback);
+ return;
+ }
+
+ _reference->getConnectionNoRouterInfo(_callback);
+ }
+
+ virtual void
+ setException(const Ice::LocalException& ex)
+ {
+ _callback->setException(ex);
+ }
+
+ Callback(const IndirectReferencePtr& reference, const GetConnectionCallbackPtr& callback) :
+ _reference(reference), _callback(callback)
+ {
+ }
+
+ private:
+
+ const IndirectReferencePtr _reference;
+ const GetConnectionCallbackPtr _callback;
+ };
- if(cached)
+ if(getRouterInfo())
+ {
+ //
+ // If we route, we send everything to the router's client
+ // proxy endpoints.
+ //
+ getRouterInfo()->getClientEndpoints(new Callback(const_cast<IndirectReference*>(this), callback));
+ return;
+ }
+
+ getConnectionNoRouterInfo(callback);
+}
+
+void
+IceInternal::IndirectReference::getConnectionNoRouterInfo(const GetConnectionCallbackPtr& callback) const
+{
+ class Callback2 : public Reference::GetConnectionCallback
+ {
+ public:
+
+ virtual void
+ setConnection(const Ice::ConnectionIPtr& connection, bool compress)
+ {
+ _callback->setConnection(connection, compress);
+ }
+
+ virtual void
+ setException(const Ice::LocalException& exc)
+ {
+ try
+ {
+ exc.ice_throw();
+ }
+ catch(const Ice::NoEndpointException& ex)
+ {
+ _callback->setException(ex); // No need to retry if there's no endpoints.
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ LocatorInfoPtr locatorInfo = _reference->getLocatorInfo();
+ assert(locatorInfo);
+ locatorInfo->clearCache(_reference);
+ if(_cached)
{
- // COMPILERFIX: Braces needed to prevent BCB from causing TraceLevels refCount from
- // being decremented twice when loop continues.
+ TraceLevelsPtr traceLvls = _reference->getInstance()->traceLevels();
+ if(traceLvls->retry >= 2)
{
- TraceLevelsPtr traceLevels = getInstance()->traceLevels();
- if(traceLevels->retry >= 2)
- {
- Trace out(getInstance()->initializationData().logger, traceLevels->retryCat);
- out << "connection to cached endpoints failed\n"
- << "removing endpoints from cache and trying one more time\n" << ex;
- }
+ Trace out(_reference->getInstance()->initializationData().logger, traceLvls->retryCat);
+ out << "connection to cached endpoints failed\n"
+ << "removing endpoints from cache and trying one more time\n" << ex;
}
- continue;
+ _reference->getConnectionNoRouterInfo(_callback); // Retry.
+ return;
}
+ _callback->setException(ex);
}
+ }
- throw;
+ Callback2(const IndirectReferencePtr& reference, const GetConnectionCallbackPtr& callback, bool cached):
+ _reference(reference), _callback(callback), _cached(cached)
+ {
}
+
+ private:
+
+ const IndirectReferencePtr _reference;
+ const GetConnectionCallbackPtr _callback;
+ const bool _cached;
+ };
+
+ class Callback : public LocatorInfo::GetEndpointsCallback
+ {
+ public:
- break;
- }
+ virtual void
+ setEndpoints(const vector<EndpointIPtr>& endpoints, bool cached)
+ {
+ if(endpoints.empty())
+ {
+ _callback->setException(Ice::NoEndpointException(__FILE__, __LINE__, _reference->toString()));
+ return;
+ }
+
+ vector<EndpointIPtr> endpts = endpoints;
+ _reference->applyOverrides(endpts);
+ _reference->createConnection(endpts, new Callback2(_reference, _callback, cached));
+ }
+
+ virtual void
+ setException(const Ice::LocalException& ex)
+ {
+ _callback->setException(ex);
+ }
- //
- // If we have a router, set the object adapter for this router
- // (if any) to the new connection, so that callbacks from the
- // router can be received over this new connection.
- //
- if(getRouterInfo())
+ Callback(const IndirectReferencePtr& reference, const GetConnectionCallbackPtr& callback) :
+ _reference(reference), _callback(callback)
+ {
+ }
+
+ private:
+
+ const IndirectReferencePtr _reference;
+ const GetConnectionCallbackPtr _callback;
+ };
+
+ if(_locatorInfo)
{
- connection->setAdapter(getRouterInfo()->getAdapter());
+ IndirectReference* self = const_cast<IndirectReference*>(this);
+ _locatorInfo->getEndpoints(self, _locatorCacheTimeout, new Callback(self, callback));
+ }
+ else
+ {
+ callback->setException(Ice::NoEndpointException(__FILE__, __LINE__, toString()));
}
-
- assert(connection);
- return connection;
}
int