diff options
author | ZeroC Staff <git@zeroc.com> | 2007-12-12 21:01:52 -0500 |
---|---|---|
committer | ZeroC Staff <git@zeroc.com> | 2007-12-12 21:01:52 -0500 |
commit | 883a047970693d63716aade9bd94f38c75012c7c (patch) | |
tree | 28f414393cc199bd852d618bcaf4702dd380759b /cpp/src/Ice/ConnectionFactory.cpp | |
parent | Fixed VC build (diff) | |
parent | Fixed bug 2592 (diff) | |
download | ice-883a047970693d63716aade9bd94f38c75012c7c.tar.bz2 ice-883a047970693d63716aade9bd94f38c75012c7c.tar.xz ice-883a047970693d63716aade9bd94f38c75012c7c.zip |
Merge branch 'master' of bernard@cvs.zeroc.com:/home/git/ice
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 184 |
1 files changed, 101 insertions, 83 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index 12fd5e15374..835c75c6912 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -102,7 +102,7 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished() // until there are no pending connections anymore. Only then // we can be sure the _connections contains all connections. // - while(!_destroyed || !_pending.empty() || !_pendingEndpoints.empty()) + while(!_destroyed || !_pending.empty() || _pendingConnectCount > 0) { wait(); } @@ -120,6 +120,7 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); _connections.clear(); + _connectionsByEndpoint.clear(); } } @@ -279,16 +280,24 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt // // Try to find a connection to one of the given endpoints. // - bool compress; - Ice::ConnectionIPtr connection = findConnection(endpoints, tpc, compress); - if(connection) + try { - callback->setConnection(connection, compress); + bool compress; + Ice::ConnectionIPtr connection = findConnection(endpoints, tpc, compress); + if(connection) + { + callback->setConnection(connection, compress); + return; + } + } + catch(const Ice::LocalException& ex) + { + callback->setException(ex); return; } - + ConnectCallbackPtr cb = new ConnectCallback(this, endpoints, hasMore, callback, selType, tpc); - cb->getConnection(); + cb->getConnectors(); } void @@ -412,7 +421,8 @@ IceInternal::OutgoingConnectionFactory::flushBatchRequests() IceInternal::OutgoingConnectionFactory::OutgoingConnectionFactory(const InstancePtr& instance) : _instance(instance), - _destroyed(false) + _destroyed(false), + _pendingConnectCount(0) { } @@ -420,6 +430,9 @@ IceInternal::OutgoingConnectionFactory::~OutgoingConnectionFactory() { assert(_destroyed); assert(_connections.empty()); + assert(_connectionsByEndpoint.empty()); + assert(_pending.empty()); + assert(_pendingConnectCount == 0); } vector<EndpointIPtr> @@ -444,6 +457,10 @@ ConnectionIPtr IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr>& endpoints, bool tpc, bool& compress) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_destroyed) + { + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); assert(!endpoints.empty()); @@ -475,6 +492,8 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr ConnectionIPtr IceInternal::OutgoingConnectionFactory::findConnection(const vector<ConnectorInfo>& connectors, bool& compress) { + // This must be called with the mutex locked. + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) { @@ -512,27 +531,31 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<ConnectorInf } void -IceInternal::OutgoingConnectionFactory::addPendingEndpoints(const vector<EndpointIPtr>& endpoints) +IceInternal::OutgoingConnectionFactory::incPendingConnectCount() { + // + // Keep track of the number of pending connects. The outgoing connection factory + // waitUntilFinished() method waits for all the pending connects to terminate before + // to return. This ensures that the communicator client thread pool isn't destroyed + // too soon and will still be available to execute the ice_exception() callbacks for + // the asynchronous requests waiting on a connection to be established. + // + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_destroyed) { throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); } - _pendingEndpoints.insert(endpoints.begin(), endpoints.end()); + ++_pendingConnectCount; } void -IceInternal::OutgoingConnectionFactory::removePendingEndpoints(const vector<EndpointIPtr>& endpoints) +IceInternal::OutgoingConnectionFactory::decPendingConnectCount() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) - { - assert(_pendingEndpoints.find(*p) != _pendingEndpoints.end()); - _pendingEndpoints.erase(_pendingEndpoints.find(*p)); - } - - if(_destroyed) + --_pendingConnectCount; + assert(_pendingConnectCount >= 0); + if(_destroyed && _pendingConnectCount == 0) { notifyAll(); } @@ -890,8 +913,6 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::ConnectCallback(const O void IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartCompleted(const ConnectionIPtr& connection) { - assert(!_exception.get() && connection == _connection); - bool compress; DefaultsAndOverridesPtr defaultsAndOverrides = _factory->_instance->defaultsAndOverrides(); if(defaultsAndOverrides->overrideCompress) @@ -904,18 +925,33 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartComplete } _factory->finishGetConnection(_connectors, this, connection); - _factory->removePendingEndpoints(_endpoints); _callback->setConnection(connection, compress); + _factory->decPendingConnectCount(); // Must be called last. } void IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(const ConnectionIPtr& connection, const LocalException& ex) { - assert(!_exception.get() && connection == _connection); + assert(_iter != _connectors.end()); - _exception.reset(dynamic_cast<LocalException*>(ex.ice_clone())); - handleException(); + _factory->handleException(ex, *_iter, connection, _hasMore || _iter != _connectors.end() - 1); + if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) // No need to continue. + { + _factory->finishGetConnection(_connectors, this, 0); + _callback->setException(ex); + _factory->decPendingConnectCount(); // Must be called last. + } + else if(++_iter != _connectors.end()) // Try the next connector. + { + nextConnector(); + } + else + { + _factory->finishGetConnection(_connectors, this, 0); + _callback->setException(ex); + _factory->decPendingConnectCount(); // Must be called last. + } } // @@ -938,7 +974,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectors(const vector if(++_endpointsIter != _endpoints.end()) { - (*_endpointsIter)->connectors_async(this); + nextEndpoint(); } else { @@ -959,7 +995,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::exception(const Ice::Lo _factory->handleException(ex, _hasMore || _endpointsIter != _endpoints.end() - 1); if(++_endpointsIter != _endpoints.end()) { - (*_endpointsIter)->connectors_async(this); + nextEndpoint(); } else if(!_connectors.empty()) { @@ -972,43 +1008,49 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::exception(const Ice::Lo } else { - _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); - _factory->_instance->clientThreadPool()->execute(this); + _callback->setException(ex); + _factory->decPendingConnectCount(); // Must be called last. } } -// -// Methods from ThreadPoolWorkItem -// void -IceInternal::OutgoingConnectionFactory::ConnectCallback::execute(const ThreadPoolPtr& threadPool) +IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnectors() { - threadPool->promoteFollower(); - assert(_exception.get()); - _factory->removePendingEndpoints(_endpoints); - _callback->setException(*_exception.get()); + try + { + // + // Notify the factory that there's an async connect pending. This is necessary + // to prevent the outgoing connection factory to be destroyed before all the + // pending asynchronous connects are finished. + // + _factory->incPendingConnectCount(); + } + catch(const Ice::LocalException& ex) + { + _callback->setException(ex); + return; + } + + nextEndpoint(); } void -IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection() +IceInternal::OutgoingConnectionFactory::ConnectCallback::nextEndpoint() { - // - // First, get the connectors for all the endpoints. - // - if(_endpointsIter != _endpoints.end()) + try { - try - { - _factory->addPendingEndpoints(_endpoints); - (*_endpointsIter)->connectors_async(this); - } - catch(const Ice::LocalException& ex) - { - _callback->setException(ex); - } - return; + assert(_endpointsIter != _endpoints.end()); + (*_endpointsIter)->connectors_async(this); + } + catch(const Ice::LocalException& ex) + { + exception(ex); } +} +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection() +{ try { // @@ -1028,53 +1070,29 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection() return; } - _factory->removePendingEndpoints(_endpoints); _callback->setConnection(connection, compress); + _factory->decPendingConnectCount(); // Must be called last. } catch(const Ice::LocalException& ex) { - _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); - _factory->_instance->clientThreadPool()->execute(this); + _callback->setException(ex); + _factory->decPendingConnectCount(); // Must be called last. } } void IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector() { + Ice::ConnectionIPtr connection; try { - _exception.reset(0); - _connection = _factory->createConnection(_iter->connector->connect(0), *_iter); - _connection->start(this); + assert(_iter != _connectors.end()); + connection = _factory->createConnection(_iter->connector->connect(0), *_iter); + connection->start(this); } catch(const Ice::LocalException& ex) { - _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); - handleException(); - } -} - -void -IceInternal::OutgoingConnectionFactory::ConnectCallback::handleException() -{ - assert(_iter != _connectors.end() && _exception.get()); - - _factory->handleException(*_exception.get(), *_iter, _connection, _hasMore || _iter != _connectors.end() - 1); - if(dynamic_cast<Ice::CommunicatorDestroyedException*>(_exception.get())) // No need to continue. - { - _factory->finishGetConnection(_connectors, this, 0); - _factory->removePendingEndpoints(_endpoints); - _callback->setException(*_exception.get()); - } - else if(++_iter != _connectors.end()) // Try the next connector. - { - nextConnector(); - } - else - { - _factory->finishGetConnection(_connectors, this, 0); - _factory->removePendingEndpoints(_endpoints); - _callback->setException(*_exception.get()); + connectionStartFailed(connection, ex); } } |