diff options
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 46 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.h | 11 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 12 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 1 | ||||
-rw-r--r-- | cpp/test/Ice/hold/AllTests.cpp | 13 | ||||
-rw-r--r-- | cpp/test/Ice/hold/Client.cpp | 5 | ||||
-rw-r--r-- | cpp/test/Ice/hold/TestI.cpp | 12 |
7 files changed, 58 insertions, 42 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index b720f2b274c..6326501067f 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -789,11 +789,9 @@ IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool threadPool->promoteFollower(); - if(_state == StateActive) - { - registerWithPool(); - } - else if(_state == StateClosed) + --_finishedCount; + + if(_finishedCount == 0 && _state == StateClosed) { _acceptor->close(); _acceptor = 0; @@ -828,6 +826,7 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance _endpoint(endpoint), _adapter(adapter), _registeredWithPool(false), + _finishedCount(0), _warn(_instance->properties()->getPropertyAsInt("Ice.Warn.Connections") > 0), _state(StateHolding) { @@ -938,7 +937,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) { return; } - if(!_instance->threadPerConnection()) + if(!_instance->threadPerConnection() && _acceptor) { registerWithPool(); } @@ -952,7 +951,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) { return; } - if(!_instance->threadPerConnection()) + if(!_instance->threadPerConnection() && _acceptor) { unregisterWithPool(); } @@ -962,27 +961,23 @@ IceInternal::IncomingConnectionFactory::setState(State state) case StateClosed: { - if(_instance->threadPerConnection()) + if(_instance->threadPerConnection() && _acceptor) { - if(_acceptor) - { - // - // Connect to our own acceptor, which unblocks our - // thread per incoming connection factory stuck in accept(). - // - _acceptor->connectToSelf(); - } + // + // If we are in thread per connection mode, we connect + // to our own acceptor, which unblocks our thread per + // incoming connection factory stuck in accept(). + // + _acceptor->connectToSelf(); } else { // - // If we come from holding state, we first need to - // register again before we unregister. + // Otherwise we first must make sure that we are + // registered, then we unregister, and let finished() + // do the close. // - if(_state == StateHolding) - { - registerWithPool(); - } + registerWithPool(); unregisterWithPool(); } @@ -1006,8 +1001,9 @@ void IceInternal::IncomingConnectionFactory::registerWithPool() { assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(_acceptor); // Not for datagram connections. - if(_acceptor && !_registeredWithPool) + if(!_registeredWithPool) { dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->_register(_acceptor->fd(), this); _registeredWithPool = true; @@ -1018,11 +1014,13 @@ void IceInternal::IncomingConnectionFactory::unregisterWithPool() { assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(_acceptor); // Not for datagram connections. - if(_acceptor && _registeredWithPool) + if(_registeredWithPool) { dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->unregister(_acceptor->fd()); _registeredWithPool = false; + ++_finishedCount; // For each unregistration, finished() is called once. } } diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h index 9beccb40c31..f2645215edb 100644 --- a/cpp/src/Ice/ConnectionFactory.h +++ b/cpp/src/Ice/ConnectionFactory.h @@ -46,7 +46,7 @@ public: Ice::ConnectionIPtr create(const std::vector<EndpointIPtr>&, bool&); void setRouterInfo(const RouterInfoPtr&); - void removeAdapter(const ::Ice::ObjectAdapterPtr&); + void removeAdapter(const Ice::ObjectAdapterPtr&); void flushBatchRequests(); private: @@ -85,14 +85,14 @@ public: virtual void read(BasicStream&); virtual void message(BasicStream&, const ThreadPoolPtr&); virtual void finished(const ThreadPoolPtr&); - virtual void exception(const ::Ice::LocalException&); + virtual void exception(const Ice::LocalException&); virtual std::string toString() const; private: - IncomingConnectionFactory(const InstancePtr&, const EndpointIPtr&, const ::Ice::ObjectAdapterPtr&); + IncomingConnectionFactory(const InstancePtr&, const EndpointIPtr&, const Ice::ObjectAdapterPtr&); virtual ~IncomingConnectionFactory(); - friend class ::Ice::ObjectAdapterI; + friend class Ice::ObjectAdapterI; enum State { @@ -125,9 +125,10 @@ private: const TransceiverPtr _transceiver; const EndpointIPtr _endpoint; - const ::Ice::ObjectAdapterPtr _adapter; + const Ice::ObjectAdapterPtr _adapter; bool _registeredWithPool; + int _finishedCount; const bool _warn; diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 0e3bf717d02..69e0f7a672a 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -292,7 +292,7 @@ Ice::ConnectionI::isFinished() const return false; } - if(_transceiver != 0 || _dispatchCount != 0 || + if(_transceiver || _dispatchCount != 0 || (_threadPerConnection && _threadPerConnection->getThreadControl().isAlive())) { return false; @@ -1295,11 +1295,9 @@ Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_state == StateActive || _state == StateClosing) - { - registerWithPool(); - } - else if(_state == StateClosed) + --_finishedCount; + + if(_finishedCount == 0 && _state == StateClosed) { // // We must make sure that nobody is sending when we close @@ -1384,6 +1382,7 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, _logger(_instance->logger()), // Cached for better performance. _traceLevels(_instance->traceLevels()), // Cached for better performance. _registeredWithPool(false), + _finishedCount(0), _warn(_instance->properties()->getPropertyAsInt("Ice.Warn.Connections") > 0), _acmTimeout(0), _requestHdr(headerSize + sizeof(Int), 0), @@ -1838,6 +1837,7 @@ Ice::ConnectionI::unregisterWithPool() { _threadPool->unregister(_transceiver->fd()); _registeredWithPool = false; + ++_finishedCount; // For each unregistration, finished() is called once. } } diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 046eb3404a6..8717fea2dd9 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -155,6 +155,7 @@ private: const IceInternal::TraceLevelsPtr _traceLevels; bool _registeredWithPool; + int _finishedCount; const IceInternal::ThreadPoolPtr _threadPool; const bool _warn; diff --git a/cpp/test/Ice/hold/AllTests.cpp b/cpp/test/Ice/hold/AllTests.cpp index 9a19d99a352..47aa8394274 100644 --- a/cpp/test/Ice/hold/AllTests.cpp +++ b/cpp/test/Ice/hold/AllTests.cpp @@ -14,7 +14,7 @@ using namespace std; using namespace Test; -HoldPrx +void allTests(const Ice::CommunicatorPtr& communicator) { cout << "testing stringToProxy... " << flush; @@ -29,5 +29,14 @@ allTests(const Ice::CommunicatorPtr& communicator) test(hold == base); cout << "ok" << endl; - return hold; + cout << "changing state between active and hold rapidly... "; + for(int i = 0; i < 100; ++i) + { + hold->putOnHold(0); + } + cout << "ok" << endl; + + cout << "changing state to hold and shutting down server... "; + hold->shutdown(); + cout << "ok" << endl; } diff --git a/cpp/test/Ice/hold/Client.cpp b/cpp/test/Ice/hold/Client.cpp index a9d5fb23ab9..3517d43ca34 100644 --- a/cpp/test/Ice/hold/Client.cpp +++ b/cpp/test/Ice/hold/Client.cpp @@ -17,9 +17,8 @@ using namespace Test; int run(int argc, char* argv[], const Ice::CommunicatorPtr& communicator) { - HoldPrx allTests(const Ice::CommunicatorPtr&); - HoldPrx hold = allTests(communicator); - hold->shutdown(); + void allTests(const Ice::CommunicatorPtr&); + allTests(communicator); return EXIT_SUCCESS; } diff --git a/cpp/test/Ice/hold/TestI.cpp b/cpp/test/Ice/hold/TestI.cpp index 00d3de083d9..964a05b7ff7 100644 --- a/cpp/test/Ice/hold/TestI.cpp +++ b/cpp/test/Ice/hold/TestI.cpp @@ -13,12 +13,20 @@ void HoldI::putOnHold(Ice::Int seconds, const Ice::Current& current) { - current.adapter->hold(); - current.adapter->activate(); + if(seconds <= 0) + { + current.adapter->hold(); + current.adapter->activate(); + } + else + { + assert(false); // TODO + } } void HoldI::shutdown(const Ice::Current& current) { + current.adapter->hold(); current.adapter->getCommunicator()->shutdown(); } |