summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionFactory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp1212
1 files changed, 910 insertions, 302 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index af2b506193d..6d69a36cb84 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -38,6 +38,7 @@ IceUtil::Shared* IceInternal::upCast(IncomingConnectionFactory* p) { return p; }
namespace
{
+
struct RandomNumberGenerator : public std::unary_function<ptrdiff_t, ptrdiff_t>
{
ptrdiff_t operator()(ptrdiff_t d)
@@ -45,6 +46,22 @@ struct RandomNumberGenerator : public std::unary_function<ptrdiff_t, ptrdiff_t>
return IceUtil::random(static_cast<int>(d));
}
};
+
+}
+
+bool
+IceInternal::OutgoingConnectionFactory::ConnectorInfo::operator<(const ConnectorInfo& other) const
+{
+ if(!threadPerConnection && other.threadPerConnection)
+ {
+ return true;
+ }
+ else if(other.threadPerConnection < threadPerConnection)
+ {
+ return false;
+ }
+
+ return connector < other.connector;
}
void
@@ -60,11 +77,11 @@ IceInternal::OutgoingConnectionFactory::destroy()
#ifdef _STLP_BEGIN_NAMESPACE
// voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h
for_each(_connections.begin(), _connections.end(),
- voidbind2nd(Ice::secondVoidMemFun1<ConnectorPtr, ConnectionI, ConnectionI::DestructionReason>
+ voidbind2nd(Ice::secondVoidMemFun1<ConnectorInfo, ConnectionI, ConnectionI::DestructionReason>
(&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed));
#else
for_each(_connections.begin(), _connections.end(),
- bind2nd(Ice::secondVoidMemFun1<const ConnectorPtr, ConnectionI, ConnectionI::DestructionReason>
+ bind2nd(Ice::secondVoidMemFun1<const ConnectorInfo, ConnectionI, ConnectionI::DestructionReason>
(&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed));
#endif
@@ -75,7 +92,7 @@ IceInternal::OutgoingConnectionFactory::destroy()
void
IceInternal::OutgoingConnectionFactory::waitUntilFinished()
{
- multimap<ConnectorPtr, ConnectionIPtr> connections;
+ multimap<ConnectorInfo, ConnectionIPtr> connections;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -85,7 +102,7 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished()
// until there are no pending connections anymore. Only then
// we can be sure the _connections contains all connections.
//
- while(!_destroyed || !_pending.empty())
+ while(!_destroyed || !_pending.empty() || !_pendingEndpoints.empty())
{
wait();
}
@@ -94,311 +111,187 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished()
// We want to wait until all connections are finished outside the
// thread synchronization.
//
- connections.swap(_connections);
+ connections = _connections;
}
for_each(connections.begin(), connections.end(),
- Ice::secondVoidMemFun<const ConnectorPtr, ConnectionI>(&ConnectionI::waitUntilFinished));
+ Ice::secondVoidMemFun<const ConnectorInfo, ConnectionI>(&ConnectionI::waitUntilFinished));
+
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ _connections.clear();
+ }
}
ConnectionIPtr
-IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool moreEndpts,
+IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool hasMore,
bool threadPerConnection, Ice::EndpointSelectionType selType,
bool& compress)
{
assert(!endpts.empty());
- vector<pair<ConnectorPtr, EndpointIPtr> > connectors;
+
+ //
+ // Apply the overrides.
+ //
+ vector<EndpointIPtr> endpoints = applyOverrides(endpts);
+ //
+ // Try to find a connection to one of the given endpoints.
+ //
+ Ice::ConnectionIPtr connection = findConnection(endpoints, threadPerConnection, compress);
+ if(connection)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ return connection;
+ }
- if(_destroyed)
- {
- throw CommunicatorDestroyedException(__FILE__, __LINE__);
- }
+ auto_ptr<Ice::LocalException> exception;
+ //
+ // If we didn't find a connection with the endpoints, we create the connectors
+ // for the endpoints.
+ //
+ vector<ConnectorInfo> connectors;
+ for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
+ {
//
- // Reap connections for which destruction has completed.
+ // Create connectors for the endpoint.
//
- std::multimap<ConnectorPtr, ConnectionIPtr>::iterator p = _connections.begin();
- while(p != _connections.end())
- {
- if(p->second->isFinished())
- {
- _connections.erase(p++);
- }
- else
- {
- ++p;
- }
- }
-
- vector<EndpointIPtr> endpoints = endpts;
- vector<EndpointIPtr>::iterator q;
- for(q = endpoints.begin(); q != endpoints.end(); ++q)
+ try
{
- //
- // Modify endpoints with overrides.
- //
- if(_instance->defaultsAndOverrides()->overrideTimeout)
- {
- *q = (*q)->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue);
- }
-
- //
- // Create connectors for the endpoints.
- //
- vector<ConnectorPtr> cons = (*q)->connectors();
- assert(cons.size() > 0);
-
- //
- // Shuffle connectors is endpoint selection type is Random.
- //
+ vector<ConnectorPtr> cons = (*p)->connectors();
+ assert(!cons.empty());
+
if(selType == Random)
{
RandomNumberGenerator rng;
random_shuffle(cons.begin(), cons.end(), rng);
}
-
- vector<ConnectorPtr>::const_iterator r;
- for(r = cons.begin(); r != cons.end(); ++r)
- {
- connectors.push_back(make_pair(*r, *q));
- }
- }
-
- //
- // Search for existing connections.
- //
- vector<pair<ConnectorPtr, EndpointIPtr> >::const_iterator r;
- for(r = connectors.begin(); r != connectors.end(); ++r)
- {
- pair<multimap<ConnectorPtr, ConnectionIPtr>::iterator,
- multimap<ConnectorPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range((*r).first);
- while(pr.first != pr.second)
+ for(vector<ConnectorPtr>::const_iterator r = cons.begin(); r != cons.end(); ++r)
{
- //
- // Don't return connections for which destruction has
- // been initiated. The connection must also match the
- // requested thread-per-connection setting.
- //
- if(!pr.first->second->isDestroyed() &&
- pr.first->second->threadPerConnection() == threadPerConnection)
- {
- if(_instance->defaultsAndOverrides()->overrideCompress)
- {
- compress = _instance->defaultsAndOverrides()->overrideCompressValue;
- }
- else
- {
- compress = (*r).second->compress();
- }
-
- return pr.first->second;
- }
-
- ++pr.first;
+ assert(*r);
+ connectors.push_back(ConnectorInfo(*r, *p, threadPerConnection));
}
}
-
- //
- // If some other thread is currently trying to establish a
- // connection to any of our endpoints, we wait until this
- // thread is finished.
- //
- bool searchAgain = false;
- while(!_destroyed)
+ catch(const Ice::LocalException& ex)
{
- for(r = connectors.begin(); r != connectors.end(); ++r)
- {
- if(_pending.find((*r).first) != _pending.end())
- {
- break;
- }
- }
-
- if(r == connectors.end())
- {
- break;
- }
-
- searchAgain = true;
-
- wait();
- }
-
- if(_destroyed)
- {
- throw CommunicatorDestroyedException(__FILE__, __LINE__);
- }
-
- //
- // Search for existing connections again if we waited above,
- // as new connections might have been added in the meantime.
- //
- if(searchAgain)
- {
- for(r = connectors.begin(); r != connectors.end(); ++r)
- {
- pair<multimap<ConnectorPtr, ConnectionIPtr>::iterator,
- multimap<ConnectorPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range((*r).first);
-
- while(pr.first != pr.second)
- {
- //
- // Don't return connections for which destruction has
- // been initiated. The connection must also match the
- // requested thread-per-connection setting.
- //
- if(!pr.first->second->isDestroyed() &&
- pr.first->second->threadPerConnection() == threadPerConnection)
- {
- if(_instance->defaultsAndOverrides()->overrideCompress)
- {
- compress = _instance->defaultsAndOverrides()->overrideCompressValue;
- }
- else
- {
- compress = (*r).second->compress();
- }
-
- return pr.first->second;
- }
-
- ++pr.first;
- }
- }
- }
-
- //
- // No connection to any of our endpoints exists yet, so we
- // will try to create one. To avoid that other threads try to
- // create connections to the same endpoints, we add our
- // endpoints to _pending.
- //
- for(r = connectors.begin(); r != connectors.end(); ++r)
- {
- _pending.insert((*r).first);
+ exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
+ handleException(ex, hasMore || p != endpoints.end() - 1);
}
}
- ConnectorPtr connector;
- ConnectionIPtr connection;
- auto_ptr<LocalException> exception;
+ if(connectors.empty())
+ {
+ assert(exception.get());
+ exception->ice_throw();
+ }
- vector<pair<ConnectorPtr, EndpointIPtr> >::const_iterator q;
- for(q = connectors.begin(); q != connectors.end(); ++q)
+ //
+ // Try to get a connection to one of the connectors. A null result indicates that no
+ // connection was found and that we should try to establish the connection (and that
+ // the connectors were added to _pending to prevent other threads from establishing
+ // the connection).
+ //
+ connection = getConnection(connectors, 0, compress);
+ if(connection)
+ {
+ return connection;
+ }
+
+ //
+ // Try to establish the connection to the connectors.
+ //
+ DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
+ for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
{
- connector = (*q).first;
- EndpointIPtr endpoint = (*q).second;
-
try
{
-
int timeout;
- if(_instance->defaultsAndOverrides()->overrideConnectTimeout)
+ if(defaultsAndOverrides->overrideConnectTimeout)
{
- timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue;
+ timeout = defaultsAndOverrides->overrideConnectTimeoutValue;
}
- // It is not necessary to check for overrideTimeout,
- // the endpoint has already been modified with this
- // override, if set.
else
{
- timeout = endpoint->timeout();
+ //
+ // It is not necessary to check for overrideTimeout, the endpoint has already
+ // been modified with this override, if set.
+ //
+ timeout = p->endpoint->timeout();
}
- TransceiverPtr transceiver = connector->connect(timeout);
- assert(transceiver);
-
- connection = new ConnectionI(_instance, transceiver, endpoint->compress(false), 0, threadPerConnection,
- _instance->threadPerConnectionStackSize());
- connection->start();
- connection->validate();
+ connection = createConnection(p->connector->connect(timeout), *p);
+ connection->start(0);
- if(_instance->defaultsAndOverrides()->overrideCompress)
+ if(defaultsAndOverrides->overrideCompress)
{
- compress = _instance->defaultsAndOverrides()->overrideCompressValue;
+ compress = defaultsAndOverrides->overrideCompressValue;
}
else
{
- compress = endpoint->compress();
+ compress = p->endpoint->compress();
}
+
break;
}
- catch(const LocalException& ex)
+ catch(const Ice::CommunicatorDestroyedException& ex)
{
- exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
-
- //
- // If a connection object was constructed, then validate()
- // must have raised the exception.
- //
- if(connection)
- {
- connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup.
- connection = 0;
- }
+ exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
+ handleException(*exception.get(), *p, connection, hasMore || p != connectors.end() - 1);
+ connection = 0;
+ break; // No need to continue
}
-
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->retry >= 2)
+ catch(const Ice::LocalException& ex)
{
- Trace out(_instance->initializationData().logger, traceLevels->retryCat);
-
- out << "connection to endpoint failed";
- if(moreEndpts || q + 1 != connectors.end())
- {
- out << ", trying next endpoint\n";
- }
- else
- {
- out << " and no more endpoints to try\n";
- }
- out << *exception.get();
+ exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
+ handleException(*exception.get(), *p, connection, hasMore || p != connectors.end() - 1);
+ connection = 0;
}
}
-
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // Signal other threads that we are done with trying to
- // establish connections to our endpoints.
- //
- for(q = connectors.begin(); q != connectors.end(); ++q)
- {
- _pending.erase((*q).first);
- }
- notifyAll();
- if(!connection)
- {
- assert(exception.get());
- exception->ice_throw();
- }
- else
- {
- _connections.insert(_connections.end(), pair<const ConnectorPtr, ConnectionIPtr>(connector, connection));
+ //
+ // Finish creating the connection (this removes the connectors from the _pending
+ // list and notifies any waiting threads).
+ //
+ finishGetConnection(connectors, 0, connection);
- if(_destroyed)
- {
- connection->destroy(ConnectionI::CommunicatorDestroyed);
- throw CommunicatorDestroyedException(__FILE__, __LINE__);
- }
- else
- {
- connection->activate();
- }
- }
+ if(!connection)
+ {
+ assert(exception.get());
+ exception->ice_throw();
}
- assert(connection);
return connection;
}
void
+IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool hasMore,
+ bool tpc, Ice::EndpointSelectionType selType,
+ const CreateConnectionCallbackPtr& callback)
+{
+ assert(!endpts.empty());
+
+ //
+ // Apply the overrides.
+ //
+ vector<EndpointIPtr> endpoints = applyOverrides(endpts);
+
+ //
+ // Try to find a connection to one of the given endpoints.
+ //
+ bool compress;
+ Ice::ConnectionIPtr connection = findConnection(endpoints, tpc, compress);
+ if(connection)
+ {
+ callback->setConnection(connection, compress);
+ return;
+ }
+
+ ConnectCallbackPtr cb = new ConnectCallback(this, endpoints, hasMore, callback, selType, tpc);
+ cb->getConnection();
+}
+
+void
IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& routerInfo)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -442,14 +335,14 @@ IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& route
//
endpoint = endpoint->compress(false);
- multimap<ConnectorPtr, ConnectionIPtr>::const_iterator q;
+ multimap<ConnectorInfo, ConnectionIPtr>::const_iterator q;
for(q = _connections.begin(); q != _connections.end(); ++q)
{
- if((*q).second->endpoint() == endpoint)
+ if(q->second->endpoint() == endpoint)
{
try
{
- (*q).second->setAdapter(adapter);
+ q->second->setAdapter(adapter);
}
catch(const Ice::LocalException&)
{
@@ -472,7 +365,7 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad
return;
}
- for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p)
+ for(multimap<ConnectorInfo, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p)
{
if(p->second->getAdapter() == adapter)
{
@@ -497,9 +390,7 @@ IceInternal::OutgoingConnectionFactory::flushBatchRequests()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- for(std::multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin();
- p != _connections.end();
+ for(multimap<ConnectorInfo, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end();
++p)
{
c.push_back(p->second);
@@ -531,6 +422,668 @@ IceInternal::OutgoingConnectionFactory::~OutgoingConnectionFactory()
assert(_connections.empty());
}
+vector<EndpointIPtr>
+IceInternal::OutgoingConnectionFactory::applyOverrides(const vector<EndpointIPtr>& endpts)
+{
+ DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
+ vector<EndpointIPtr> endpoints = endpts;
+ for(vector<EndpointIPtr>::iterator p = endpoints.begin(); p != endpoints.end(); ++p)
+ {
+ //
+ // Modify endpoints with overrides.
+ //
+ if(defaultsAndOverrides->overrideTimeout)
+ {
+ *p = (*p)->timeout(defaultsAndOverrides->overrideTimeoutValue);
+ }
+ }
+ return endpoints;
+}
+
+ConnectionIPtr
+IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr>& endpoints, bool tpc, bool& compress)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
+ assert(!endpoints.empty());
+ for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
+ {
+ pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
+ multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connectionsByEndpoint.equal_range(*p);
+
+ for(multimap<EndpointIPtr, ConnectionIPtr>::iterator q = pr.first; q != pr.second; ++q)
+ {
+ if(q->second->isActiveOrHolding() &&
+ q->second->threadPerConnection() == tpc) // Don't return destroyed or un-validated connections
+ {
+ if(defaultsAndOverrides->overrideCompress)
+ {
+ compress = defaultsAndOverrides->overrideCompressValue;
+ }
+ else
+ {
+ compress = (*p)->compress();
+ }
+ return q->second;
+ }
+ }
+ }
+ return 0;
+}
+
+ConnectionIPtr
+IceInternal::OutgoingConnectionFactory::findConnection(const vector<ConnectorInfo>& connectors, bool& compress)
+{
+ DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
+ for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
+ {
+ pair<multimap<ConnectorInfo, ConnectionIPtr>::iterator,
+ multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(*p);
+
+ if(pr.first == pr.second)
+ {
+ continue;
+ }
+
+ for(multimap<ConnectorInfo, ConnectionIPtr>::iterator q = pr.first; q != pr.second; ++q)
+ {
+ if(q->second->isActiveOrHolding()) // Don't return destroyed or un-validated connections
+ {
+ if(q->second->endpoint() != p->endpoint)
+ {
+ _connectionsByEndpoint.insert(make_pair(p->endpoint, q->second));
+ }
+
+ if(defaultsAndOverrides->overrideCompress)
+ {
+ compress = defaultsAndOverrides->overrideCompressValue;
+ }
+ else
+ {
+ compress = p->endpoint->compress();
+ }
+ return q->second;
+ }
+ }
+ }
+
+ return 0;
+}
+
+void
+IceInternal::OutgoingConnectionFactory::addPendingEndpoints(const vector<EndpointIPtr>& endpoints)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_destroyed)
+ {
+ throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__);
+ }
+ _pendingEndpoints.insert(endpoints.begin(), endpoints.end());
+}
+
+void
+IceInternal::OutgoingConnectionFactory::removePendingEndpoints(const vector<EndpointIPtr>& endpoints)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
+ {
+ assert(_pendingEndpoints.find(*p) != _pendingEndpoints.end());
+ _pendingEndpoints.erase(_pendingEndpoints.find(*p));
+ }
+
+ if(_destroyed)
+ {
+ notifyAll();
+ }
+}
+
+ConnectionIPtr
+IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo>& connectors,
+ const ConnectCallbackPtr& cb, bool& compress)
+{
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_destroyed)
+ {
+ throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__);
+ }
+
+ //
+ // Reap connections for which destruction has completed.
+ //
+ multimap<ConnectorInfo, ConnectionIPtr>::iterator p = _connections.begin();
+ while(p != _connections.end())
+ {
+ if(p->second->isFinished())
+ {
+ _connections.erase(p++);
+ }
+ else
+ {
+ ++p;
+ }
+ }
+
+ multimap<EndpointIPtr, ConnectionIPtr>::iterator q = _connectionsByEndpoint.begin();
+ while(q != _connectionsByEndpoint.end())
+ {
+ if(q->second->isFinished())
+ {
+ _connectionsByEndpoint.erase(q++);
+ }
+ else
+ {
+ ++q;
+ }
+ }
+
+ //
+ // Try to get the connection. We may need to wait for other threads to
+ // finish if one of them is currently establishing a connection to one
+ // of our connectors.
+ //
+ while(!_destroyed)
+ {
+ //
+ // Search for a matching connection. If we find one, we're done.
+ //
+ Ice::ConnectionIPtr connection = findConnection(connectors, compress);
+ if(connection)
+ {
+ if(cb)
+ {
+ //
+ // This might not be the first getConnection call for the callback. We need
+ // to ensure that the callback isn't registered with any other pending
+ // connectors since we just found a connection and therefore don't need to
+ // wait anymore for other pending connectors.
+ //
+ for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
+ {
+ map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p);
+ if(q != _pending.end())
+ {
+ q->second.erase(cb);
+ }
+ }
+ }
+ return connection;
+ }
+
+ //
+ // Determine whether another thread is currently attempting to connect to one of our endpoints;
+ // if so we wait until it's done.
+ //
+ bool found = false;
+ for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
+ {
+ map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p);
+ if(q != _pending.end())
+ {
+ found = true;
+ if(cb)
+ {
+ q->second.insert(cb); // Add the callback to each pending connector.
+ }
+ }
+ }
+
+ if(!found)
+ {
+ //
+ // If no thread is currently establishing a connection to one of our connectors,
+ // we get out of this loop and start the connection establishment to one of the
+ // given connectors.
+ //
+ break;
+ }
+ else
+ {
+ //
+ // If a callback is not specified we wait until another thread notifies us about a
+ // change to the pending list. Otherwise, if a callback is provided we're done:
+ // when the pending list changes the callback will be notified and will try to
+ // get the connection again.
+ //
+ if(!cb)
+ {
+ wait();
+ }
+ else
+ {
+ return 0;
+ }
+ }
+ }
+
+ if(_destroyed)
+ {
+ throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__);
+ }
+
+ //
+ // No connection to any of our endpoints exists yet; we add the given connectors to
+ // the _pending set to indicate that we're attempting connection establishment to
+ // these connectors.
+ //
+ for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
+ {
+ assert(_pending.find(*p) == _pending.end());
+ _pending.insert(pair<ConnectorInfo, set<ConnectCallbackPtr> >(*p, set<ConnectCallbackPtr>()));
+ }
+ }
+
+ //
+ // At this point, we're responsible for establishing the connection to one of
+ // the given connectors. If it's a non-blocking connect, calling nextConnector
+ // will start the connection establishment. Otherwise, we return null to get
+ // the caller to establish the connection.
+ //
+ if(cb)
+ {
+ cb->nextConnector();
+ }
+
+ return 0;
+}
+
+ConnectionIPtr
+IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& transceiver, const ConnectorInfo& ci)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(_pending.find(ci) != _pending.end() && transceiver);
+
+ //
+ // Create and add the connection to the connection map. Adding the connection to the map
+ // is necessary to support the interruption of the connection initialization and validation
+ // in case the communicator is destroyed.
+ //
+ try
+ {
+ if(_destroyed)
+ {
+ throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__);
+ }
+
+ Ice::ConnectionIPtr connection = new ConnectionI(_instance, transceiver, ci.endpoint->compress(false),
+ 0, ci.threadPerConnection,
+ _instance->threadPerConnectionStackSize());
+ _connections.insert(make_pair(ci, connection));
+ _connectionsByEndpoint.insert(make_pair(ci.endpoint, connection));
+ return connection;
+ }
+ catch(const Ice::LocalException&)
+ {
+ try
+ {
+ transceiver->close();
+ }
+ catch(const Ice::LocalException&)
+ {
+ // Ignore
+ }
+ throw;
+ }
+}
+
+void
+IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<ConnectorInfo>& connectors,
+ const ConnectCallbackPtr& cb,
+ const ConnectionIPtr& connection)
+{
+ vector<ConnectCallbackPtr> callbacks;
+
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // We're done trying to connect to the given connectors so we remove the
+ // connectors from the pending list and notify waiting threads. We also
+ // notify the pending connect callbacks (outside the synchronization).
+ //
+
+ for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
+ {
+ map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p);
+ assert(q != _pending.end());
+ callbacks.insert(callbacks.end(), q->second.begin(), q->second.end());
+ _pending.erase(q);
+ }
+ notifyAll();
+
+ //
+ // If the connect attempt succeeded and the communicator is not destroyed,
+ // activate the connection!
+ //
+ if(connection && !_destroyed)
+ {
+ connection->activate();
+ }
+ }
+
+ //
+ // Notify any waiting callbacks.
+ //
+ for(vector<ConnectCallbackPtr>::const_iterator p = callbacks.begin(); p != callbacks.end(); ++p)
+ {
+ (*p)->getConnection();
+ }
+}
+
+void
+IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex, const ConnectorInfo& ci,
+ const ConnectionIPtr& connection, bool hasMore)
+{
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->retry >= 2)
+ {
+ Trace out(_instance->initializationData().logger, traceLevels->retryCat);
+
+ out << "connection to endpoint failed";
+ if(dynamic_cast<const CommunicatorDestroyedException*>(&ex))
+ {
+ out << "\n";
+ }
+ else
+ {
+ if(hasMore)
+ {
+ out << ", trying next endpoint\n";
+ }
+ else
+ {
+ out << " and no more endpoints to try\n";
+ }
+ }
+ out << ex;
+ }
+
+ if(connection && connection->isFinished())
+ {
+ //
+ // If the connection is finished, we remove it right away instead of
+ // waiting for the reaping.
+ //
+ // NOTE: it's possible for the connection to not be finished yet. That's
+ // for instance the case when using thread per connection and if it's the
+ // thread which is calling back the outgoing connection factory to notify
+ // it of the failure.
+ //
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ pair<multimap<ConnectorInfo, ConnectionIPtr>::iterator,
+ multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(ci);
+
+ for(multimap<ConnectorInfo, ConnectionIPtr>::iterator p = pr.first; p != pr.second; ++p)
+ {
+ if(p->second == connection)
+ {
+ _connections.erase(p);
+ break;
+ }
+ }
+
+ pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
+ multimap<EndpointIPtr, ConnectionIPtr>::iterator> qr = _connectionsByEndpoint.equal_range(ci.endpoint);
+
+ for(multimap<EndpointIPtr, ConnectionIPtr>::iterator q = qr.first; q != qr.second; ++q)
+ {
+ if(q->second == connection)
+ {
+ _connectionsByEndpoint.erase(q);
+ break;
+ }
+ }
+ }
+ }
+}
+
+void
+IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex, bool hasMore)
+{
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->retry >= 2)
+ {
+ Trace out(_instance->initializationData().logger, traceLevels->retryCat);
+
+ out << "couldn't resolve endpoint host";
+ if(dynamic_cast<const CommunicatorDestroyedException*>(&ex))
+ {
+ out << "\n";
+ }
+ else
+ {
+ if(hasMore)
+ {
+ out << ", trying next endpoint\n";
+ }
+ else
+ {
+ out << " and no more endpoints to try\n";
+ }
+ }
+ out << ex;
+ }
+}
+
+IceInternal::OutgoingConnectionFactory::ConnectCallback::ConnectCallback(const OutgoingConnectionFactoryPtr& factory,
+ const vector<EndpointIPtr>& endpoints,
+ bool hasMore,
+ const CreateConnectionCallbackPtr& cb,
+ Ice::EndpointSelectionType selType,
+ bool threadPerConnection) :
+ _factory(factory),
+ _selectorThread(_factory->_instance->selectorThread()),
+ _endpoints(endpoints),
+ _hasMore(hasMore),
+ _callback(cb),
+ _selType(selType),
+ _threadPerConnection(threadPerConnection)
+{
+ _endpointsIter = _endpoints.begin();
+}
+
+//
+// Methods from ConnectionI.StartCallback
+//
+void
+IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartCompleted(const ConnectionIPtr& connection)
+{
+ assert(!_exception.get() && connection == _connection);
+
+ bool compress;
+ DefaultsAndOverridesPtr defaultsAndOverrides = _factory->_instance->defaultsAndOverrides();
+ if(defaultsAndOverrides->overrideCompress)
+ {
+ compress = defaultsAndOverrides->overrideCompressValue;
+ }
+ else
+ {
+ compress = _iter->endpoint->compress();
+ }
+
+ _factory->finishGetConnection(_connectors, this, connection);
+ _factory->removePendingEndpoints(_endpoints);
+ _callback->setConnection(connection, compress);
+}
+
+void
+IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(const ConnectionIPtr& connection,
+ const LocalException& ex)
+{
+ assert(!_exception.get() && connection == _connection);
+
+ _exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
+ handleException();
+}
+
+//
+// Methods from EndpointI_connectors
+//
+void
+IceInternal::OutgoingConnectionFactory::ConnectCallback::connectors(const vector<ConnectorPtr>& connectors)
+{
+ vector<ConnectorPtr> cons = connectors;
+ if(_selType == Random)
+ {
+ RandomNumberGenerator rng;
+ random_shuffle(cons.begin(), cons.end(), rng);
+ }
+
+ for(vector<ConnectorPtr>::const_iterator p = cons.begin(); p != cons.end(); ++p)
+ {
+ _connectors.push_back(ConnectorInfo(*p, *_endpointsIter, _threadPerConnection));
+ }
+
+ if(++_endpointsIter != _endpoints.end())
+ {
+ (*_endpointsIter)->connectors_async(this);
+ }
+ else
+ {
+ assert(!_connectors.empty());
+
+ //
+ // We now have all the connectors for the given endpoints. We can try to obtain the
+ // connection.
+ //
+ _iter = _connectors.begin();
+ getConnection();
+ }
+}
+
+void
+IceInternal::OutgoingConnectionFactory::ConnectCallback::exception(const Ice::LocalException& ex)
+{
+ _factory->handleException(ex, _hasMore || _endpointsIter != _endpoints.end() - 1);
+ if(++_endpointsIter != _endpoints.end())
+ {
+ (*_endpointsIter)->connectors_async(this);
+ }
+ else if(!_connectors.empty())
+ {
+ //
+ // We now have all the connectors for the given endpoints. We can try to obtain the
+ // connection.
+ //
+ _iter = _connectors.begin();
+ getConnection();
+ }
+ else
+ {
+ _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
+ _factory->_instance->clientThreadPool()->execute(this);
+ }
+}
+
+//
+// Methods from ThreadPoolWorkItem
+//
+void
+IceInternal::OutgoingConnectionFactory::ConnectCallback::execute(const ThreadPoolPtr& threadPool)
+{
+ threadPool->promoteFollower();
+ assert(_exception.get());
+ _factory->removePendingEndpoints(_endpoints);
+ _callback->setException(*_exception.get());
+}
+
+void
+IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection()
+{
+ //
+ // First, get the connectors for all the endpoints.
+ //
+ if(_endpointsIter != _endpoints.end())
+ {
+ try
+ {
+ _factory->addPendingEndpoints(_endpoints);
+ (*_endpointsIter)->connectors_async(this);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ _callback->setException(ex);
+ }
+ return;
+ }
+
+ try
+ {
+ //
+ // If all the connectors have been created, we ask the factory to get a
+ // connection.
+ //
+ bool compress;
+ Ice::ConnectionIPtr connection = _factory->getConnection(_connectors, this, compress);
+ if(!connection)
+ {
+ //
+ // A null return value from getConnection indicates that the connection
+ // is being established and that everthing has been done to ensure that
+ // the callback will be notified when the connection establishment is
+ // done.
+ //
+ return;
+ }
+
+ _factory->removePendingEndpoints(_endpoints);
+ _callback->setConnection(connection, compress);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
+ _factory->_instance->clientThreadPool()->execute(this);
+ }
+}
+
+void
+IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector()
+{
+ try
+ {
+ _exception.reset(0);
+ _connection = _factory->createConnection(_iter->connector->connect(0), *_iter);
+ _connection->start(this);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
+ handleException();
+ }
+}
+
+void
+IceInternal::OutgoingConnectionFactory::ConnectCallback::handleException()
+{
+ assert(_iter != _connectors.end() && _exception.get());
+
+ _factory->handleException(*_exception.get(), *_iter, _connection, _hasMore || _iter != _connectors.end() - 1);
+ if(dynamic_cast<Ice::CommunicatorDestroyedException*>(_exception.get())) // No need to continue.
+ {
+ _factory->finishGetConnection(_connectors, this, 0);
+ _factory->removePendingEndpoints(_endpoints);
+ _callback->setException(*_exception.get());
+ }
+ else if(++_iter != _connectors.end()) // Try the next connector.
+ {
+ nextConnector();
+ }
+ else
+ {
+ _factory->finishGetConnection(_connectors, this, 0);
+ _factory->removePendingEndpoints(_endpoints);
+ _callback->setException(*_exception.get());
+ }
+}
+
+bool
+IceInternal::OutgoingConnectionFactory::ConnectCallback::operator<(const ConnectCallback& rhs) const
+{
+ return this < &rhs;
+}
+
void
IceInternal::IncomingConnectionFactory::activate()
{
@@ -612,7 +1165,7 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished()
// We want to wait until all connections are finished outside the
// thread synchronization.
//
- connections.swap(_connections);
+ connections = _connections;
}
if(threadPerIncomingConnectionFactory)
@@ -621,6 +1174,11 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished()
}
for_each(connections.begin(), connections.end(), Ice::voidMemFun(&ConnectionI::waitUntilFinished));
+
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ _connections.clear();
+ }
}
EndpointIPtr
@@ -641,7 +1199,7 @@ IceInternal::IncomingConnectionFactory::connections() const
// Only copy connections which have not been destroyed.
//
remove_copy_if(_connections.begin(), _connections.end(), back_inserter(result),
- Ice::constMemFun(&ConnectionI::isDestroyed));
+ not1(Ice::constMemFun(&ConnectionI::isActiveOrHolding)));
return result;
}
@@ -678,11 +1236,12 @@ IceInternal::IncomingConnectionFactory::readable() const
return false;
}
-void
+bool
IceInternal::IncomingConnectionFactory::read(BasicStream&)
{
assert(!_threadPerConnection); // Only for use with a thread pool.
- assert(false); // Must not be called.
+ assert(false); // Must not be called, readable() returns false.
+ return false;
}
class PromoteFollower
@@ -772,10 +1331,23 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt
{
assert(!_threadPerConnection);
connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, false, 0);
- connection->start();
}
- catch(const LocalException&)
+ catch(const LocalException& ex)
{
+ try
+ {
+ transceiver->close();
+ }
+ catch(const Ice::LocalException&)
+ {
+ // Ignore.
+ }
+
+ if(_warn)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "connection exception:\n" << ex << '\n' << _acceptor->toString();
+ }
return;
}
@@ -784,23 +1356,7 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt
assert(connection);
- //
- // We validate outside the thread synchronization, to not block
- // the factory.
- //
- try
- {
- connection->validate();
- }
- catch(const LocalException&)
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup.
- _connections.remove(connection);
- return;
- }
-
- connection->activate();
+ connection->start(this);
}
void
@@ -844,6 +1400,48 @@ IceInternal::IncomingConnectionFactory::toString() const
return _acceptor->toString();
}
+void
+IceInternal::IncomingConnectionFactory::connectionStartCompleted(const Ice::ConnectionIPtr& connection)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // Initialy, connections are in the holding state. If the factory is active
+ // we activate the connection.
+ //
+ if(_state == StateActive)
+ {
+ connection->activate();
+ }
+}
+
+void
+IceInternal::IncomingConnectionFactory::connectionStartFailed(const Ice::ConnectionIPtr& connection,
+ const Ice::LocalException& ex)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state == StateClosed)
+ {
+ return;
+ }
+
+ if(_warn)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "connection exception:\n" << ex << '\n' << _acceptor->toString();
+ }
+
+ //
+ // If the connection is finished, remove it right away from
+ // the connection map. Otherwise, we keep it in the map, it
+ // will eventually be reaped.
+ //
+ if(connection->isFinished())
+ {
+ _connections.remove(connection);
+ }
+}
+
IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const InstancePtr& instance,
const EndpointIPtr& endpoint,
const ObjectAdapterPtr& adapter,
@@ -881,23 +1479,22 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
{
connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter, _threadPerConnection,
_threadPerConnectionStackSize);
- connection->start();
- connection->validate();
}
catch(const LocalException&)
{
- //
- // If a connection object was constructed, then validate()
- // must have raised the exception.
- //
- if(connection)
+ try
{
- connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup.
+ _transceiver->close();
}
-
- return;
+ catch(const Ice::LocalException&)
+ {
+ // Ignore
+ }
+ throw;
}
+ connection->start(0);
+
_connections.push_back(connection);
}
else
@@ -1148,33 +1745,44 @@ IceInternal::IncomingConnectionFactory::run()
//
// Create a connection object for the connection.
//
- if(transceiver)
+ if(!transceiver)
+ {
+ continue;
+ }
+
+ try
+ {
+ connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection,
+ _threadPerConnectionStackSize);
+ }
+ catch(const LocalException& ex)
{
try
{
- connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection,
- _threadPerConnectionStackSize);
- connection->start();
+ transceiver->close();
}
- catch(const LocalException&)
+ catch(const Ice::LocalException&)
{
- return;
}
- _connections.push_back(connection);
+ if(_warn)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "connection exception:\n" << ex << '\n' << _acceptor->toString();
+ }
+ continue;
}
+
+ _connections.push_back(connection);
}
//
- // In thread per connection mode, the connection's thread will
- // take care of connection validation and activation (for
- // non-datagram connections). We don't want to block this
- // thread waiting until validation is complete, because in
- // contrast to thread pool mode, it is the only thread that
- // can accept connections with this factory's
- // acceptor. Therefore we don't call validate() and activate()
- // from the connection factory in thread per connection mode.
+ // In thread-per-connection mode and regardless of the background mode,
+ // start() doesn't block. The connection thread is started and takes
+ // care of the connection validation and notifies the factory through
+ // the callback when it's done.
//
+ connection->start(this);
}
}