diff options
author | Marc Laukien <marc@zeroc.com> | 2003-10-09 19:03:41 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2003-10-09 19:03:41 +0000 |
commit | a771e0d874afb1672110f66864ec64ef1ba69fa5 (patch) | |
tree | e16ff207cb6cd8844f3aaecbe5ebf862e14fad5a /cpp/src/Ice/Connection.cpp | |
parent | fixed connection establishment deadlock (diff) | |
download | ice-a771e0d874afb1672110f66864ec64ef1ba69fa5.tar.bz2 ice-a771e0d874afb1672110f66864ec64ef1ba69fa5.tar.xz ice-a771e0d874afb1672110f66864ec64ef1ba69fa5.zip |
fixed connection establishment deadlock
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 324 |
1 files changed, 173 insertions, 151 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index 9c1a28ff1da..74554edddb8 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -37,150 +37,6 @@ void IceInternal::incRef(Connection* p) { p->__incRef(); } void IceInternal::decRef(Connection* p) { p->__decRef(); } void -IceInternal::Connection::activate() -{ - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - setState(StateActive); -} - -void -IceInternal::Connection::hold() -{ - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - setState(StateHolding); -} - -void -IceInternal::Connection::destroy(DestructionReason reason) -{ - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - - switch(reason) - { - case ObjectAdapterDeactivated: - { - setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__)); - break; - } - - case CommunicatorDestroyed: - { - setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__)); - break; - } - } -} - -bool -IceInternal::Connection::isDestroyed() const -{ - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - - return _state >= StateClosing; -} - -bool -IceInternal::Connection::isFinished() const -{ - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - - return _transceiver == 0 && _dispatchCount == 0; -} - -void -IceInternal::Connection::waitUntilHolding() const -{ - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - - while(_state < StateHolding || _dispatchCount > 0) - { - wait(); - } -} - -void -IceInternal::Connection::waitUntilFinished() -{ - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - - // - // We wait indefinitely until all outstanding requests are - // completed. Otherwise we couldn't guarantee that there are no - // outstanding calls when deactivate() is called on the servant - // locators. - // - while(_dispatchCount > 0) - { - wait(); - } - - // - // Now we must wait for connection closure. If there is a timeout, - // we force the connection closure. - // - while(_transceiver) - { - if(_endpoint->timeout() >= 0) - { - if(!timedWait(IceUtil::Time::milliSeconds(_endpoint->timeout()))) - { - setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); - // No return here, we must still wait until _transceiver becomes null. - } - } - else - { - wait(); - } - } - - assert(_state == StateClosed); -} - -void -IceInternal::Connection::monitor() -{ - - IceUtil::Monitor<IceUtil::RecMutex>::TryLock sync(*this); - if(!sync.acquired()) - { - return; - } - - if(_state != StateActive) - { - return; - } - - // - // Check for timed out async requests. - // - for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) - { - if(p->second->__timedOut()) - { - setState(StateClosed, TimeoutException(__FILE__, __LINE__)); - return; - } - } - - // - // Active connection management for idle connections. - // - // TODO: Hack: ACM for incoming connections doesn't work right - // with AMI. - // - if(_acmTimeout > 0 && closeOK() && !_adapter) - { - if(IceUtil::Time::now() >= _acmAbsoluteTimeout) - { - setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__)); - return; - } - } -} - -void IceInternal::Connection::validate() { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); @@ -340,13 +196,166 @@ IceInternal::Connection::validate() setState(StateHolding); } +void +IceInternal::Connection::activate() +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + setState(StateActive); +} + +void +IceInternal::Connection::hold() +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + setState(StateHolding); +} + +void +IceInternal::Connection::destroy(DestructionReason reason) +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + switch(reason) + { + case ObjectAdapterDeactivated: + { + setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__)); + break; + } + + case CommunicatorDestroyed: + { + setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__)); + break; + } + } +} + bool IceInternal::Connection::isValidated() const { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + // See _queryMutex comment in header file. + IceUtil::Mutex::Lock sync(_queryMutex); + return _state > StateNotValidated; } +bool +IceInternal::Connection::isDestroyed() const +{ + // See _queryMutex comment in header file. + IceUtil::Mutex::Lock sync(_queryMutex); + + return _state >= StateClosing; +} + +bool +IceInternal::Connection::isFinished() const +{ + // See _queryMutex comment in header file. + IceUtil::Mutex::Lock sync(_queryMutex); + + // + // If _transceiver is 0, then _dispatchCount must also be 0; + // + assert(!(_transceiver == 0 && _dispatchCount != 0)); + + return _transceiver == 0; +} + +void +IceInternal::Connection::waitUntilHolding() const +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + while(_state < StateHolding || _dispatchCount > 0) + { + wait(); + } +} + +void +IceInternal::Connection::waitUntilFinished() +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + // + // We wait indefinitely until all outstanding requests are + // completed. Otherwise we couldn't guarantee that there are no + // outstanding calls when deactivate() is called on the servant + // locators. + // + while(_dispatchCount > 0) + { + wait(); + } + + // + // Now we must wait for connection closure. If there is a timeout, + // we force the connection closure. + // + while(_transceiver) + { + if(_endpoint->timeout() >= 0) + { + if(!timedWait(IceUtil::Time::milliSeconds(_endpoint->timeout()))) + { + setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); + // No return here, we must still wait until _transceiver becomes null. + } + } + else + { + wait(); + } + } + + assert(_state == StateClosed); +} + +void +IceInternal::Connection::monitor() +{ + + IceUtil::Monitor<IceUtil::RecMutex>::TryLock sync(*this); + if(!sync.acquired()) + { + return; + } + + if(_state != StateActive) + { + return; + } + + // + // Check for timed out async requests. + // + for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) + { + if(p->second->__timedOut()) + { + setState(StateClosed, TimeoutException(__FILE__, __LINE__)); + return; + } + } + + // + // Active connection management for idle connections. + // + // TODO: Hack: ACM for incoming connections doesn't work right + // with AMI. + // + if(_acmTimeout > 0 && closingOK() && !_adapter) + { + if(IceUtil::Time::now() >= _acmAbsoluteTimeout) + { + setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__)); + return; + } + } +} + void IceInternal::Connection::incProxyCount() { @@ -362,7 +371,7 @@ IceInternal::Connection::decProxyCount() assert(_proxyCount > 0); --_proxyCount; - if(_proxyCount == 0 && !_adapter && closeOK()) + if(_proxyCount == 0 && !_adapter && closingOK()) { setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); } @@ -749,7 +758,7 @@ IceInternal::Connection::flushBatchRequest() } } - if(_proxyCount == 0 && !_adapter && closeOK()) + if(_proxyCount == 0 && !_adapter && closingOK()) { setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); } @@ -1149,7 +1158,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa _asyncRequests.erase(q); } - if(_proxyCount == 0 && !_adapter && closeOK()) + if(_proxyCount == 0 && !_adapter && closingOK()) { setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); } @@ -1279,7 +1288,11 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) else if(_state == StateClosed && _transceiver) { _transceiver->close(); - _transceiver = 0; + { + // See _queryMutex comment in header file. + IceUtil::Mutex::Lock sync(_queryMutex); + _transceiver = 0; + } _threadPool = 0; // We don't need the thread pool anymore. notifyAll(); } @@ -1529,6 +1542,10 @@ IceInternal::Connection::setState(State state) { assert(!_registeredWithPool); _transceiver->close(); + { + // See _queryMutex comment in header file. + IceUtil::Mutex::Lock sync(_queryMutex); + } _transceiver = 0; } else @@ -1540,7 +1557,12 @@ IceInternal::Connection::setState(State state) } } - _state = state; + { + // See _queryMutex comment in header file. + IceUtil::Mutex::Lock sync(_queryMutex); + _state = state; + } + notifyAll(); if(_state == StateClosing && _dispatchCount == 0) @@ -1763,7 +1785,7 @@ IceInternal::Connection::doUncompress(BasicStream& compressed, BasicStream& unco } bool -IceInternal::Connection::closeOK() const +IceInternal::Connection::closingOK() const { return _requests.empty() && |