summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionFactory.cpp
diff options
context:
space:
mode:
authorZeroC Staff <git@zeroc.com>2007-12-12 21:01:52 -0500
committerZeroC Staff <git@zeroc.com>2007-12-12 21:01:52 -0500
commit883a047970693d63716aade9bd94f38c75012c7c (patch)
tree28f414393cc199bd852d618bcaf4702dd380759b /cpp/src/Ice/ConnectionFactory.cpp
parentFixed VC build (diff)
parentFixed bug 2592 (diff)
downloadice-883a047970693d63716aade9bd94f38c75012c7c.tar.bz2
ice-883a047970693d63716aade9bd94f38c75012c7c.tar.xz
ice-883a047970693d63716aade9bd94f38c75012c7c.zip
Merge branch 'master' of bernard@cvs.zeroc.com:/home/git/ice
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp184
1 files changed, 101 insertions, 83 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index 12fd5e15374..835c75c6912 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -102,7 +102,7 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished()
// until there are no pending connections anymore. Only then
// we can be sure the _connections contains all connections.
//
- while(!_destroyed || !_pending.empty() || !_pendingEndpoints.empty())
+ while(!_destroyed || !_pending.empty() || _pendingConnectCount > 0)
{
wait();
}
@@ -120,6 +120,7 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
_connections.clear();
+ _connectionsByEndpoint.clear();
}
}
@@ -279,16 +280,24 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
//
// Try to find a connection to one of the given endpoints.
//
- bool compress;
- Ice::ConnectionIPtr connection = findConnection(endpoints, tpc, compress);
- if(connection)
+ try
{
- callback->setConnection(connection, compress);
+ bool compress;
+ Ice::ConnectionIPtr connection = findConnection(endpoints, tpc, compress);
+ if(connection)
+ {
+ callback->setConnection(connection, compress);
+ return;
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ callback->setException(ex);
return;
}
-
+
ConnectCallbackPtr cb = new ConnectCallback(this, endpoints, hasMore, callback, selType, tpc);
- cb->getConnection();
+ cb->getConnectors();
}
void
@@ -412,7 +421,8 @@ IceInternal::OutgoingConnectionFactory::flushBatchRequests()
IceInternal::OutgoingConnectionFactory::OutgoingConnectionFactory(const InstancePtr& instance) :
_instance(instance),
- _destroyed(false)
+ _destroyed(false),
+ _pendingConnectCount(0)
{
}
@@ -420,6 +430,9 @@ IceInternal::OutgoingConnectionFactory::~OutgoingConnectionFactory()
{
assert(_destroyed);
assert(_connections.empty());
+ assert(_connectionsByEndpoint.empty());
+ assert(_pending.empty());
+ assert(_pendingConnectCount == 0);
}
vector<EndpointIPtr>
@@ -444,6 +457,10 @@ ConnectionIPtr
IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr>& endpoints, bool tpc, bool& compress)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_destroyed)
+ {
+ throw CommunicatorDestroyedException(__FILE__, __LINE__);
+ }
DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
assert(!endpoints.empty());
@@ -475,6 +492,8 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr
ConnectionIPtr
IceInternal::OutgoingConnectionFactory::findConnection(const vector<ConnectorInfo>& connectors, bool& compress)
{
+ // This must be called with the mutex locked.
+
DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
{
@@ -512,27 +531,31 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<ConnectorInf
}
void
-IceInternal::OutgoingConnectionFactory::addPendingEndpoints(const vector<EndpointIPtr>& endpoints)
+IceInternal::OutgoingConnectionFactory::incPendingConnectCount()
{
+ //
+ // Keep track of the number of pending connects. The outgoing connection factory
+ // waitUntilFinished() method waits for all the pending connects to terminate before
+ // to return. This ensures that the communicator client thread pool isn't destroyed
+ // too soon and will still be available to execute the ice_exception() callbacks for
+ // the asynchronous requests waiting on a connection to be established.
+ //
+
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(_destroyed)
{
throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__);
}
- _pendingEndpoints.insert(endpoints.begin(), endpoints.end());
+ ++_pendingConnectCount;
}
void
-IceInternal::OutgoingConnectionFactory::removePendingEndpoints(const vector<EndpointIPtr>& endpoints)
+IceInternal::OutgoingConnectionFactory::decPendingConnectCount()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
- {
- assert(_pendingEndpoints.find(*p) != _pendingEndpoints.end());
- _pendingEndpoints.erase(_pendingEndpoints.find(*p));
- }
-
- if(_destroyed)
+ --_pendingConnectCount;
+ assert(_pendingConnectCount >= 0);
+ if(_destroyed && _pendingConnectCount == 0)
{
notifyAll();
}
@@ -890,8 +913,6 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::ConnectCallback(const O
void
IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartCompleted(const ConnectionIPtr& connection)
{
- assert(!_exception.get() && connection == _connection);
-
bool compress;
DefaultsAndOverridesPtr defaultsAndOverrides = _factory->_instance->defaultsAndOverrides();
if(defaultsAndOverrides->overrideCompress)
@@ -904,18 +925,33 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartComplete
}
_factory->finishGetConnection(_connectors, this, connection);
- _factory->removePendingEndpoints(_endpoints);
_callback->setConnection(connection, compress);
+ _factory->decPendingConnectCount(); // Must be called last.
}
void
IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(const ConnectionIPtr& connection,
const LocalException& ex)
{
- assert(!_exception.get() && connection == _connection);
+ assert(_iter != _connectors.end());
- _exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
- handleException();
+ _factory->handleException(ex, *_iter, connection, _hasMore || _iter != _connectors.end() - 1);
+ if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) // No need to continue.
+ {
+ _factory->finishGetConnection(_connectors, this, 0);
+ _callback->setException(ex);
+ _factory->decPendingConnectCount(); // Must be called last.
+ }
+ else if(++_iter != _connectors.end()) // Try the next connector.
+ {
+ nextConnector();
+ }
+ else
+ {
+ _factory->finishGetConnection(_connectors, this, 0);
+ _callback->setException(ex);
+ _factory->decPendingConnectCount(); // Must be called last.
+ }
}
//
@@ -938,7 +974,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectors(const vector
if(++_endpointsIter != _endpoints.end())
{
- (*_endpointsIter)->connectors_async(this);
+ nextEndpoint();
}
else
{
@@ -959,7 +995,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::exception(const Ice::Lo
_factory->handleException(ex, _hasMore || _endpointsIter != _endpoints.end() - 1);
if(++_endpointsIter != _endpoints.end())
{
- (*_endpointsIter)->connectors_async(this);
+ nextEndpoint();
}
else if(!_connectors.empty())
{
@@ -972,43 +1008,49 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::exception(const Ice::Lo
}
else
{
- _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
- _factory->_instance->clientThreadPool()->execute(this);
+ _callback->setException(ex);
+ _factory->decPendingConnectCount(); // Must be called last.
}
}
-//
-// Methods from ThreadPoolWorkItem
-//
void
-IceInternal::OutgoingConnectionFactory::ConnectCallback::execute(const ThreadPoolPtr& threadPool)
+IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnectors()
{
- threadPool->promoteFollower();
- assert(_exception.get());
- _factory->removePendingEndpoints(_endpoints);
- _callback->setException(*_exception.get());
+ try
+ {
+ //
+ // Notify the factory that there's an async connect pending. This is necessary
+ // to prevent the outgoing connection factory to be destroyed before all the
+ // pending asynchronous connects are finished.
+ //
+ _factory->incPendingConnectCount();
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ _callback->setException(ex);
+ return;
+ }
+
+ nextEndpoint();
}
void
-IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection()
+IceInternal::OutgoingConnectionFactory::ConnectCallback::nextEndpoint()
{
- //
- // First, get the connectors for all the endpoints.
- //
- if(_endpointsIter != _endpoints.end())
+ try
{
- try
- {
- _factory->addPendingEndpoints(_endpoints);
- (*_endpointsIter)->connectors_async(this);
- }
- catch(const Ice::LocalException& ex)
- {
- _callback->setException(ex);
- }
- return;
+ assert(_endpointsIter != _endpoints.end());
+ (*_endpointsIter)->connectors_async(this);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ exception(ex);
}
+}
+void
+IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection()
+{
try
{
//
@@ -1028,53 +1070,29 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection()
return;
}
- _factory->removePendingEndpoints(_endpoints);
_callback->setConnection(connection, compress);
+ _factory->decPendingConnectCount(); // Must be called last.
}
catch(const Ice::LocalException& ex)
{
- _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
- _factory->_instance->clientThreadPool()->execute(this);
+ _callback->setException(ex);
+ _factory->decPendingConnectCount(); // Must be called last.
}
}
void
IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector()
{
+ Ice::ConnectionIPtr connection;
try
{
- _exception.reset(0);
- _connection = _factory->createConnection(_iter->connector->connect(0), *_iter);
- _connection->start(this);
+ assert(_iter != _connectors.end());
+ connection = _factory->createConnection(_iter->connector->connect(0), *_iter);
+ connection->start(this);
}
catch(const Ice::LocalException& ex)
{
- _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
- handleException();
- }
-}
-
-void
-IceInternal::OutgoingConnectionFactory::ConnectCallback::handleException()
-{
- assert(_iter != _connectors.end() && _exception.get());
-
- _factory->handleException(*_exception.get(), *_iter, _connection, _hasMore || _iter != _connectors.end() - 1);
- if(dynamic_cast<Ice::CommunicatorDestroyedException*>(_exception.get())) // No need to continue.
- {
- _factory->finishGetConnection(_connectors, this, 0);
- _factory->removePendingEndpoints(_endpoints);
- _callback->setException(*_exception.get());
- }
- else if(++_iter != _connectors.end()) // Try the next connector.
- {
- nextConnector();
- }
- else
- {
- _factory->finishGetConnection(_connectors, this, 0);
- _factory->removePendingEndpoints(_endpoints);
- _callback->setException(*_exception.get());
+ connectionStartFailed(connection, ex);
}
}