diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 97 |
1 files changed, 67 insertions, 30 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index 103296f1fe3..98cd946cf83 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -287,16 +287,17 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointPtr>& endpts connection = new ConnectionI(_instance, transceiver, endpoint, 0); // - // In thread per acceptor mode, the thread per connection (in - // ConnectionI) will take care of connection validation. + // In thread per connection mode, the thread per + // connection (in ConnectionI) will take care of + // connection validation. // - if(!_instance->threadPerConnection()) + if(_instance->threadPerConnection()) { - connection->validate(); + connection->waitUntilValidated(); } else { - connection->waitUntilValidated(); + connection->validate(); } if(_instance->defaultsAndOverrides()->overrideCompress) @@ -542,6 +543,7 @@ IceInternal::IncomingConnectionFactory::waitUntilHolding() const void IceInternal::IncomingConnectionFactory::waitUntilFinished() { + IceUtil::ThreadPtr threadPerAcceptor; list<ConnectionIPtr> connections; { @@ -555,6 +557,11 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished() wait(); } + assert(_state == StateClosed); + + threadPerAcceptor = _threadPerAcceptor; + _threadPerAcceptor = 0; + // // We want to wait until all connections are finished outside the // thread synchronization. @@ -562,6 +569,11 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished() connections.swap(_connections); } + if(threadPerAcceptor) + { + threadPerAcceptor->getThreadControl().join(); + } + for_each(connections.begin(), connections.end(), Ice::voidMemFun(&ConnectionI::waitUntilFinished)); } @@ -722,22 +734,24 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt } return; } - + + assert(transceiver); + // // Create a connection object for the connection. // - assert(transceiver); connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter); _connections.push_back(connection); } - + + assert(connection); + // // We validate outside the thread synchronization, to not block // the factory. // try { - assert(connection); connection->validate(); } catch(const LocalException&) @@ -819,18 +833,18 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance if(_transceiver) { ConnectionIPtr connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter); - + // - // In thread per acceptor mode, the thread per connection (in - // ConnectionI) will take care of connection validation. + // In thread per connection mode, the thread per connection + // (in ConnectionI) will take care of connection validation. // - if(!_instance->threadPerConnection()) + if(_instance->threadPerConnection()) { - connection->validate(); + connection->waitUntilValidated(); } else { - connection->waitUntilValidated(); + connection->validate(); } _connections.push_back(connection); @@ -858,18 +872,20 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance // thread per acceptor, that accepts new connections on this // endpoint. // - IceUtil::ThreadPtr thread = new ThreadPerAcceptor(this); - thread->start(); - thread->getThreadControl().detach(); + _threadPerAcceptor = new ThreadPerAcceptor(this); + _threadPerAcceptor->start(); } } } IceInternal::IncomingConnectionFactory::~IncomingConnectionFactory() { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_state == StateClosed); assert(!_acceptor); assert(_connections.empty()); + assert(!_threadPerAcceptor); } void @@ -914,10 +930,11 @@ IceInternal::IncomingConnectionFactory::setState(State state) { if(_instance->threadPerConnection()) { - // TODO: This is not correct! We cannot close the - // acceptor before our thread per acceptor terminates. - _acceptor->close(); - _acceptor = 0; + // + // Connect to our own acceptor, which unblocks our + // thread per acceptor stuch in accept(). + // + _acceptor->connectToSelf(); } else { @@ -994,12 +1011,10 @@ IceInternal::IncomingConnectionFactory::run() catch(const SocketException&) { // Ignore socket exceptions. - continue; } catch(const TimeoutException&) { // Ignore timeouts. - continue; } catch(const LocalException& ex) { @@ -1009,11 +1024,10 @@ IceInternal::IncomingConnectionFactory::run() Warning out(_instance->logger()); out << "connection exception:\n" << ex << '\n' << acceptor->toString(); } - continue; } ConnectionIPtr connection; - + { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -1024,9 +1038,32 @@ IceInternal::IncomingConnectionFactory::run() if(_state == StateClosed) { - assert(transceiver); - transceiver->close(); - break; + if(transceiver) + { + try + { + transceiver->close(); + } + catch(const LocalException&) + { + // Here we ignore any exceptions in close(). + } + } + + try + { + _acceptor->close(); + } + catch(const LocalException& ex) + { + _acceptor = 0; + notifyAll(); + ex.ice_throw(); + } + + _acceptor = 0; + notifyAll(); + return; } assert(_state == StateActive); @@ -1037,7 +1074,7 @@ IceInternal::IncomingConnectionFactory::run() _connections.erase(remove_if(_connections.begin(), _connections.end(), Ice::constMemFun(&ConnectionI::isFinished)), _connections.end()); - + // // Create a connection object for the connection. // |