diff options
author | Marc Laukien <marc@zeroc.com> | 2004-01-24 14:39:21 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2004-01-24 14:39:21 +0000 |
commit | a22e2269d6b11b5f4113f5de17083ec3d77a5346 (patch) | |
tree | c4b8d01a0ececf74fb3f260d47563580eac9c568 /cpp/src/Ice/Connection.cpp | |
parent | reorganizing transform code (diff) | |
download | ice-a22e2269d6b11b5f4113f5de17083ec3d77a5346.tar.bz2 ice-a22e2269d6b11b5f4113f5de17083ec3d77a5346.tar.xz ice-a22e2269d6b11b5f4113f5de17083ec3d77a5346.zip |
fixes
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 156 |
1 files changed, 88 insertions, 68 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index d19493b9ec9..6a2d185c3b3 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -39,7 +39,7 @@ void IceInternal::decRef(Connection* p) { p->__decRef(); } void IceInternal::Connection::validate() { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_exception.get()) { @@ -207,21 +207,21 @@ IceInternal::Connection::validate() void IceInternal::Connection::activate() { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateActive); } void IceInternal::Connection::hold() { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateHolding); } void IceInternal::Connection::destroy(DestructionReason reason) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); switch(reason) { @@ -242,28 +242,28 @@ IceInternal::Connection::destroy(DestructionReason reason) bool IceInternal::Connection::isValidated() const { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); return _state > StateNotValidated; } bool IceInternal::Connection::isDestroyed() const { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); return _state >= StateClosing; } bool IceInternal::Connection::isFinished() const { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); return _threadPool == 0 && _dispatchCount == 0; } void IceInternal::Connection::waitUntilHolding() const { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); while(_state < StateHolding || _dispatchCount > 0) { @@ -274,7 +274,7 @@ IceInternal::Connection::waitUntilHolding() const void IceInternal::Connection::waitUntilFinished() { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); // // We wait indefinitely until connection closing has been @@ -337,7 +337,7 @@ void IceInternal::Connection::monitor() { - IceUtil::Monitor<IceUtil::RecMutex>::TryLock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this); if(!sync.acquired()) { return; @@ -379,7 +379,7 @@ IceInternal::Connection::monitor() void IceInternal::Connection::incProxyCount() { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_proxyCount >= 0); ++_proxyCount; } @@ -387,7 +387,7 @@ IceInternal::Connection::incProxyCount() void IceInternal::Connection::decProxyCount() { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_proxyCount > 0); --_proxyCount; @@ -409,7 +409,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) Int requestId; { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_exception.get()) { @@ -428,28 +428,30 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) return; } } - - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - requestId = _nextRequestId++; - if(requestId <= 0) + else { - _nextRequestId = 1; + assert(_state > StateNotValidated); + assert(_state < StateClosing); + requestId = _nextRequestId++; - } - - // - // Only add to the request map if this is a twoway call. - // - if(!_endpoint->datagram() && !oneway) - { - _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); - } - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + if(requestId <= 0) + { + _nextRequestId = 1; + requestId = _nextRequestId++; + } + + // + // Only add to the request map if this is a twoway call. + // + if(!_endpoint->datagram() && !oneway) + { + _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); + } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } } } @@ -526,7 +528,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) } catch(const LocalException& ex) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateClosed, ex); assert(_exception.get()); @@ -548,33 +550,49 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out) assert(!_endpoint->datagram()); // Async requests are always twoway. Int requestId; + auto_ptr<LocalException> exception; { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_exception.get()) { - out->__finished(*_exception.get()); - return; + // + // __finished() is called outside the thread + // synchronization. See below. + // + exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(_exception->ice_clone())); } - - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - requestId = _nextRequestId++; - if(requestId <= 0) + else { - _nextRequestId = 1; + assert(_state > StateNotValidated); + assert(_state < StateClosing); + requestId = _nextRequestId++; + if(requestId <= 0) + { + _nextRequestId = 1; + requestId = _nextRequestId++; + } + + _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), + pair<const Int, OutgoingAsyncPtr>(requestId, out)); + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } } + } - _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), - pair<const Int, OutgoingAsyncPtr>(requestId, out)); - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); - } + // + // Exceptions for asynchronous messages must be handled outside + // the thread synchronization, so that nested calls are possible. + // + if(exception.get()) + { + out->__finished(*_exception.get()); + return; } try @@ -647,7 +665,7 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out) } catch(const LocalException& ex) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateClosed, ex); assert(_exception.get()); } @@ -727,7 +745,7 @@ void IceInternal::Connection::flushBatchRequest() { { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); // // Wait if flushing is currently in progress. @@ -836,7 +854,7 @@ IceInternal::Connection::flushBatchRequest() } catch(const LocalException& ex) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateClosed, ex); assert(_exception.get()); @@ -858,7 +876,7 @@ IceInternal::Connection::flushBatchRequest() } { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); // // Reset the batch stream, and notify that flushing is over. @@ -938,12 +956,12 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag) } catch(const LocalException& ex) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateClosed, ex); } { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); try { @@ -972,7 +990,7 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag) void IceInternal::Connection::sendNoResponse() { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); try { @@ -1009,7 +1027,7 @@ IceInternal::Connection::endpoint() const void IceInternal::Connection::setAdapter(const ObjectAdapterPtr& adapter) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); // // We never change the thread pool with which we were initially @@ -1030,7 +1048,7 @@ IceInternal::Connection::setAdapter(const ObjectAdapterPtr& adapter) ObjectAdapterPtr IceInternal::Connection::getAdapter() const { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); return _adapter; } @@ -1091,7 +1109,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa } catch(const LocalException& ex) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); threadPool->promoteFollower(); setState(StateClosed, ex); return; @@ -1105,7 +1123,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa // if(messageType == closeConnectionMsg) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_state == StateClosed) { @@ -1150,7 +1168,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa Int requestId = 0; { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_state == StateClosed) { @@ -1377,7 +1395,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa } catch(const LocalException& ex) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateClosed, ex); } } @@ -1393,13 +1411,13 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) map<Int, OutgoingAsyncPtr> asyncRequests; { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_state == StateActive || _state == StateClosing) { registerWithPool(); } - else if(_state == StateClosed && _transceiver) + else if(_state == StateClosed) { try { @@ -1410,6 +1428,7 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) closeException = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone())); } + assert(_threadPool); _threadPool = 0; // We don't need the thread pool anymore. notifyAll(); } @@ -1443,14 +1462,14 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) void IceInternal::Connection::exception(const LocalException& ex) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateClosed, ex); } string IceInternal::Connection::toString() const { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); return _transceiver->toString(); } @@ -1714,6 +1733,7 @@ IceInternal::Connection::setState(State state) // Here we ignore any exceptions in close(). } + assert(_threadPool); _threadPool = 0; // We don't need the thread pool anymore. } else |