diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 324 | ||||
-rw-r--r-- | cpp/src/Ice/Connection.h | 20 |
2 files changed, 187 insertions, 157 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index 9c1a28ff1da..74554edddb8 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -37,150 +37,6 @@ void IceInternal::incRef(Connection* p) { p->__incRef(); } void IceInternal::decRef(Connection* p) { p->__decRef(); } void -IceInternal::Connection::activate() -{ - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - setState(StateActive); -} - -void -IceInternal::Connection::hold() -{ - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - setState(StateHolding); -} - -void -IceInternal::Connection::destroy(DestructionReason reason) -{ - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - - switch(reason) - { - case ObjectAdapterDeactivated: - { - setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__)); - break; - } - - case CommunicatorDestroyed: - { - setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__)); - break; - } - } -} - -bool -IceInternal::Connection::isDestroyed() const -{ - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - - return _state >= StateClosing; -} - -bool -IceInternal::Connection::isFinished() const -{ - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - - return _transceiver == 0 && _dispatchCount == 0; -} - -void -IceInternal::Connection::waitUntilHolding() const -{ - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - - while(_state < StateHolding || _dispatchCount > 0) - { - wait(); - } -} - -void -IceInternal::Connection::waitUntilFinished() -{ - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - - // - // We wait indefinitely until all outstanding requests are - // completed. Otherwise we couldn't guarantee that there are no - // outstanding calls when deactivate() is called on the servant - // locators. - // - while(_dispatchCount > 0) - { - wait(); - } - - // - // Now we must wait for connection closure. If there is a timeout, - // we force the connection closure. - // - while(_transceiver) - { - if(_endpoint->timeout() >= 0) - { - if(!timedWait(IceUtil::Time::milliSeconds(_endpoint->timeout()))) - { - setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); - // No return here, we must still wait until _transceiver becomes null. - } - } - else - { - wait(); - } - } - - assert(_state == StateClosed); -} - -void -IceInternal::Connection::monitor() -{ - - IceUtil::Monitor<IceUtil::RecMutex>::TryLock sync(*this); - if(!sync.acquired()) - { - return; - } - - if(_state != StateActive) - { - return; - } - - // - // Check for timed out async requests. - // - for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) - { - if(p->second->__timedOut()) - { - setState(StateClosed, TimeoutException(__FILE__, __LINE__)); - return; - } - } - - // - // Active connection management for idle connections. - // - // TODO: Hack: ACM for incoming connections doesn't work right - // with AMI. - // - if(_acmTimeout > 0 && closeOK() && !_adapter) - { - if(IceUtil::Time::now() >= _acmAbsoluteTimeout) - { - setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__)); - return; - } - } -} - -void IceInternal::Connection::validate() { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); @@ -340,13 +196,166 @@ IceInternal::Connection::validate() setState(StateHolding); } +void +IceInternal::Connection::activate() +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + setState(StateActive); +} + +void +IceInternal::Connection::hold() +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + setState(StateHolding); +} + +void +IceInternal::Connection::destroy(DestructionReason reason) +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + switch(reason) + { + case ObjectAdapterDeactivated: + { + setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__)); + break; + } + + case CommunicatorDestroyed: + { + setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__)); + break; + } + } +} + bool IceInternal::Connection::isValidated() const { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + // See _queryMutex comment in header file. + IceUtil::Mutex::Lock sync(_queryMutex); + return _state > StateNotValidated; } +bool +IceInternal::Connection::isDestroyed() const +{ + // See _queryMutex comment in header file. + IceUtil::Mutex::Lock sync(_queryMutex); + + return _state >= StateClosing; +} + +bool +IceInternal::Connection::isFinished() const +{ + // See _queryMutex comment in header file. + IceUtil::Mutex::Lock sync(_queryMutex); + + // + // If _transceiver is 0, then _dispatchCount must also be 0; + // + assert(!(_transceiver == 0 && _dispatchCount != 0)); + + return _transceiver == 0; +} + +void +IceInternal::Connection::waitUntilHolding() const +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + while(_state < StateHolding || _dispatchCount > 0) + { + wait(); + } +} + +void +IceInternal::Connection::waitUntilFinished() +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + // + // We wait indefinitely until all outstanding requests are + // completed. Otherwise we couldn't guarantee that there are no + // outstanding calls when deactivate() is called on the servant + // locators. + // + while(_dispatchCount > 0) + { + wait(); + } + + // + // Now we must wait for connection closure. If there is a timeout, + // we force the connection closure. + // + while(_transceiver) + { + if(_endpoint->timeout() >= 0) + { + if(!timedWait(IceUtil::Time::milliSeconds(_endpoint->timeout()))) + { + setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); + // No return here, we must still wait until _transceiver becomes null. + } + } + else + { + wait(); + } + } + + assert(_state == StateClosed); +} + +void +IceInternal::Connection::monitor() +{ + + IceUtil::Monitor<IceUtil::RecMutex>::TryLock sync(*this); + if(!sync.acquired()) + { + return; + } + + if(_state != StateActive) + { + return; + } + + // + // Check for timed out async requests. + // + for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) + { + if(p->second->__timedOut()) + { + setState(StateClosed, TimeoutException(__FILE__, __LINE__)); + return; + } + } + + // + // Active connection management for idle connections. + // + // TODO: Hack: ACM for incoming connections doesn't work right + // with AMI. + // + if(_acmTimeout > 0 && closingOK() && !_adapter) + { + if(IceUtil::Time::now() >= _acmAbsoluteTimeout) + { + setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__)); + return; + } + } +} + void IceInternal::Connection::incProxyCount() { @@ -362,7 +371,7 @@ IceInternal::Connection::decProxyCount() assert(_proxyCount > 0); --_proxyCount; - if(_proxyCount == 0 && !_adapter && closeOK()) + if(_proxyCount == 0 && !_adapter && closingOK()) { setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); } @@ -749,7 +758,7 @@ IceInternal::Connection::flushBatchRequest() } } - if(_proxyCount == 0 && !_adapter && closeOK()) + if(_proxyCount == 0 && !_adapter && closingOK()) { setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); } @@ -1149,7 +1158,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa _asyncRequests.erase(q); } - if(_proxyCount == 0 && !_adapter && closeOK()) + if(_proxyCount == 0 && !_adapter && closingOK()) { setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); } @@ -1279,7 +1288,11 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) else if(_state == StateClosed && _transceiver) { _transceiver->close(); - _transceiver = 0; + { + // See _queryMutex comment in header file. + IceUtil::Mutex::Lock sync(_queryMutex); + _transceiver = 0; + } _threadPool = 0; // We don't need the thread pool anymore. notifyAll(); } @@ -1529,6 +1542,10 @@ IceInternal::Connection::setState(State state) { assert(!_registeredWithPool); _transceiver->close(); + { + // See _queryMutex comment in header file. + IceUtil::Mutex::Lock sync(_queryMutex); + } _transceiver = 0; } else @@ -1540,7 +1557,12 @@ IceInternal::Connection::setState(State state) } } - _state = state; + { + // See _queryMutex comment in header file. + IceUtil::Mutex::Lock sync(_queryMutex); + _state = state; + } + notifyAll(); if(_state == StateClosing && _dispatchCount == 0) @@ -1763,7 +1785,7 @@ IceInternal::Connection::doUncompress(BasicStream& compressed, BasicStream& unco } bool -IceInternal::Connection::closeOK() const +IceInternal::Connection::closingOK() const { return _requests.empty() && diff --git a/cpp/src/Ice/Connection.h b/cpp/src/Ice/Connection.h index 72834bdcbb0..2198fafbb83 100644 --- a/cpp/src/Ice/Connection.h +++ b/cpp/src/Ice/Connection.h @@ -15,6 +15,7 @@ #ifndef ICE_CONNECTION_H #define ICE_CONNECTION_H +#include <IceUtil/Mutex.h> #include <IceUtil/RecMutex.h> #include <IceUtil/Monitor.h> #include <IceUtil/Time.h> @@ -46,15 +47,17 @@ class Connection : public EventHandler, public IceUtil::Monitor<IceUtil::RecMute { public: - void activate(); - void hold(); + void validate(); enum DestructionReason { ObjectAdapterDeactivated, CommunicatorDestroyed }; + void activate(); + void hold(); void destroy(DestructionReason); + bool isValidated() const; bool isDestroyed() const; bool isFinished() const; @@ -63,9 +66,6 @@ public: void monitor(); - void validate(); - bool isValidated() const; - void incProxyCount(); void decProxyCount(); @@ -132,7 +132,7 @@ private: static void doCompress(BasicStream&, BasicStream&); static void doUncompress(BasicStream&, BasicStream&); - bool closeOK() const; + bool closingOK() const; TransceiverPtr _transceiver; const EndpointPtr _endpoint; @@ -170,6 +170,14 @@ private: int _proxyCount; // The number of proxies using this connection. State _state; + + // + // We need a special mutex for the isDestroyed() and isFinished() + // functions, because these functions must not block. Using the + // default mutex is therefore not possible, as the default mutex + // might be locked for a longer period of time. + // + IceUtil::Mutex _queryMutex; }; } |