diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 205 |
1 files changed, 138 insertions, 67 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index ff635001516..9c29f05c431 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -40,10 +40,61 @@ void IceInternal::decRef(OutgoingConnectionFactory* p) { p->__decRef(); } void IceInternal::incRef(IncomingConnectionFactory* p) { p->__incRef(); } void IceInternal::decRef(IncomingConnectionFactory* p) { p->__decRef(); } +void +IceInternal::OutgoingConnectionFactory::destroy() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(!_instance) + { + return; + } + +#ifdef _STLP_BEGIN_NAMESPACE + // voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h + for_each(_connections.begin(), _connections.end(), + voidbind2nd(Ice::secondVoidMemFun1<EndpointPtr, Connection, Connection::DestructionReason> + (&Connection::destroy), Connection::CommunicatorDestroyed)); +#else + for_each(_connections.begin(), _connections.end(), + bind2nd(Ice::secondVoidMemFun1<EndpointPtr, Connection, Connection::DestructionReason> + (&Connection::destroy), Connection::CommunicatorDestroyed)); +#endif + + _instance = 0; + notifyAll(); +} + +void +IceInternal::OutgoingConnectionFactory::waitUntilFinished() +{ + ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this); + + // + // First we wait until the factory is destroyed. + // + while(_instance) + { + wait(); + } + + // + // Now we wait for until the destruction of each connection is + // finished. + // + for_each(_connections.begin(), _connections.end(), + Ice::secondConstVoidMemFun<EndpointPtr, Connection>(&Connection::waitUntilFinished)); + + // + // We're done, now we can throw away all connections. + // + _connections.clear(); +} + ConnectionPtr IceInternal::OutgoingConnectionFactory::create(const vector<EndpointPtr>& endpoints) { - IceUtil::Mutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(!_instance) { @@ -53,12 +104,12 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointPtr>& endpoi assert(!endpoints.empty()); // - // Reap destroyed connections. + // Reap connections for which destruction has completed. // std::map<EndpointPtr, ConnectionPtr>::iterator p = _connections.begin(); while(p != _connections.end()) { - if(p->second->destroyed()) + if(p->second->isFinished()) { _connections.erase(p++); } @@ -84,7 +135,14 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointPtr>& endpoi map<EndpointPtr, ConnectionPtr>::const_iterator r = _connections.find(endpoint); if(r != _connections.end()) { - return r->second; + // + // Don't return connections for which destruction has been + // initiated. + // + if(!r->second->isDestroyed()) + { + return r->second; + } } } @@ -156,7 +214,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointPtr>& endpoi void IceInternal::OutgoingConnectionFactory::setRouter(const RouterPrx& router) { - IceUtil::Mutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(!_instance) { @@ -196,7 +254,7 @@ IceInternal::OutgoingConnectionFactory::setRouter(const RouterPrx& router) void IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& adapter) { - IceUtil::Mutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(!_instance) { @@ -220,45 +278,73 @@ IceInternal::OutgoingConnectionFactory::OutgoingConnectionFactory(const Instance IceInternal::OutgoingConnectionFactory::~OutgoingConnectionFactory() { assert(!_instance); + assert(_connections.empty()); } void -IceInternal::OutgoingConnectionFactory::destroy() +IceInternal::IncomingConnectionFactory::activate() { - IceUtil::Mutex::Lock sync(*this); - - if(!_instance) - { - return; - } - -#ifdef _STLP_BEGIN_NAMESPACE - // voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h - for_each(_connections.begin(), _connections.end(), - voidbind2nd(Ice::secondVoidMemFun1<EndpointPtr, Connection, Connection::DestructionReason> - (&Connection::destroy), Connection::CommunicatorDestroyed)); -#else - for_each(_connections.begin(), _connections.end(), - bind2nd(Ice::secondVoidMemFun1<EndpointPtr, Connection, Connection::DestructionReason> - (&Connection::destroy), Connection::CommunicatorDestroyed)); -#endif - - _connections.clear(); - _instance = 0; + ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this); + setState(StateActive); } void IceInternal::IncomingConnectionFactory::hold() { - ::IceUtil::Mutex::Lock sync(*this); + ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this); setState(StateHolding); } void -IceInternal::IncomingConnectionFactory::activate() +IceInternal::IncomingConnectionFactory::destroy() { - ::IceUtil::Mutex::Lock sync(*this); - setState(StateActive); + ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed); +} + +void +IceInternal::IncomingConnectionFactory::waitUntilHolding() const +{ + ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this); + + // + // First we wait until the connection factory itself is in holding + // state. + // + while(_state < StateHolding) + { + wait(); + } + + // + // Now we wait until each connection is in holding state. + // + for_each(_connections.begin(), _connections.end(), Ice::constVoidMemFun(&Connection::waitUntilHolding)); +} + +void +IceInternal::IncomingConnectionFactory::waitUntilFinished() +{ + ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this); + + // + // First we wait until the factory is destroyed. + // + while(_acceptor) + { + wait(); + } + + // + // Now we wait for until the destruction of each connection is + // finished. + // + for_each(_connections.begin(), _connections.end(), Ice::constVoidMemFun(&Connection::waitUntilFinished)); + + // + // We're done, now we can throw away all connections. + // + _connections.clear(); } EndpointPtr @@ -283,16 +369,17 @@ IceInternal::IncomingConnectionFactory::equivalent(const EndpointPtr& endp) cons list<ConnectionPtr> IceInternal::IncomingConnectionFactory::connections() const { - ::IceUtil::Mutex::Lock sync(*this); + ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this); + + list<ConnectionPtr> result; // - // Reap destroyed connections. + // Only copy connections which have not been destroyed. // - list<ConnectionPtr>& connections = const_cast<list<ConnectionPtr>& >(_connections); - connections.erase(remove_if(connections.begin(), connections.end(), ::Ice::constMemFun(&Connection::destroyed)), - connections.end()); + remove_copy_if(_connections.begin(), _connections.end(), back_inserter(result), + ::Ice::constMemFun(&Connection::isDestroyed)); - return _connections; + return result; } bool @@ -310,7 +397,7 @@ IceInternal::IncomingConnectionFactory::read(BasicStream&) void IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPtr& threadPool) { - ::IceUtil::Mutex::Lock sync(*this); + ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this); if(_state != StateActive) { @@ -320,10 +407,11 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt } // - // Reap destroyed connections. + // Reap connections for which destruction has completed. // - _connections.erase(remove_if(_connections.begin(), _connections.end(), ::Ice::constMemFun(&Connection::destroyed)), - _connections.end()); + _connections.erase(remove_if(_connections.begin(), _connections.end(), + ::Ice::constMemFun(&Connection::isFinished)), + _connections.end()); // // Now accept a new connection. @@ -394,7 +482,7 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt void IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool) { - ::IceUtil::Mutex::Lock sync(*this); + ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this); threadPool->promoteFollower(); @@ -405,12 +493,8 @@ IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool else if(_state == StateClosed) { _acceptor->close(); - - // - // We don't need the adapter anymore after we closed the - // acceptor. - // - _adapter = 0; + _acceptor = 0; + notifyAll(); } } @@ -438,9 +522,9 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance EventHandler(instance), _endpoint(endpoint), _adapter(adapter), + _registeredWithPool(false), _warn(_instance->properties()->getPropertyAsInt("Ice.Warn.Connections") > 0), - _state(StateHolding), - _registeredWithPool(false) + _state(StateHolding) { DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); if(defaultsAndOverrides->overrideTimeout) @@ -456,16 +540,10 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance ConnectionPtr connection = new Connection(_instance, _transceiver, _endpoint, _adapter); connection->validate(); _connections.push_back(connection); - - // - // We don't need an adapter anymore if we don't use an - // acceptor. - // - _adapter = 0; } else { - const_cast<AcceptorPtr&>(_acceptor) = _endpoint->acceptor(const_cast<EndpointPtr&>(_endpoint)); + _acceptor = _endpoint->acceptor(const_cast<EndpointPtr&>(_endpoint)); assert(_acceptor); _acceptor->listen(); } @@ -480,14 +558,8 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance IceInternal::IncomingConnectionFactory::~IncomingConnectionFactory() { assert(_state == StateClosed); - assert(!_adapter); -} - -void -IceInternal::IncomingConnectionFactory::destroy() -{ - ::IceUtil::Mutex::Lock sync(*this); - setState(StateClosed); + assert(!_acceptor); + assert(_connections.empty()); } void @@ -542,13 +614,12 @@ IceInternal::IncomingConnectionFactory::setState(State state) for_each(_connections.begin(), _connections.end(), bind2nd(Ice::voidMemFun1(&Connection::destroy), Connection::ObjectAdapterDeactivated)); #endif - _connections.clear(); - break; } } _state = state; + notifyAll(); } void |