diff options
author | Marc Laukien <marc@zeroc.com> | 2004-02-26 06:11:45 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2004-02-26 06:11:45 +0000 |
commit | 8f7c4e80f2ddcf66dda08ffbc484596ea616168c (patch) | |
tree | 7ce9ed6a51159f25c681762a1d867c7c986c2278 /cpp/src/Ice/Connection.cpp | |
parent | fix (diff) | |
download | ice-8f7c4e80f2ddcf66dda08ffbc484596ea616168c.tar.bz2 ice-8f7c4e80f2ddcf66dda08ffbc484596ea616168c.tar.xz ice-8f7c4e80f2ddcf66dda08ffbc484596ea616168c.zip |
fix
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 141 |
1 files changed, 41 insertions, 100 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index c7f331f81e1..8ee44c819f6 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -1073,100 +1073,28 @@ IceInternal::Connection::read(BasicStream& stream) void IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threadPool) { - Byte messageType; + OutgoingAsyncPtr outAsync; + Int invoke = 0; + Int requestId = 0; Byte compress; - // - // Read the message type, and uncompress the message if necessary. - // - try { - assert(stream.i == stream.b.end()); - + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + // - // We don't need to check magic and version here. This has - // already been done by the ThreadPool, which provides us the - // stream. + // We must promote within the synchronization, otherwise + // there could be various race conditions with close + // connection messages and other messages. // - - stream.i = stream.b.begin() + 8; - stream.read(messageType); - stream.read(compress); - if(compress == 2) - { - BasicStream ustream(_instance.get()); - doUncompress(stream, ustream); - stream.b.swap(ustream.b); - } - - stream.i = stream.b.begin() + headerSize; - } - catch(const LocalException& ex) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); threadPool->promoteFollower(); - return; - } - - // - // We need a special handling for close connection messages. If we - // get a close connection message, we must *first* set the state - // to closed, and *then* promote a follower thread. Otherwise we get - // lots of bogus warnings about connections being lost. - // - if(messageType == closeConnectionMsg) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_state > StateNotValidated); if(_state == StateClosed) { - threadPool->promoteFollower(); return; } - try - { - traceHeader("received close connection", stream, _logger, _traceLevels); - if(_endpoint->datagram()) - { - if(_warn) - { - Warning out(_logger); - out << "ignoring close connection message for datagram connection:\n" << _desc; - } - } - else - { - setState(StateClosed, CloseConnectionException(__FILE__, __LINE__)); - } - } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - } - - threadPool->promoteFollower(); - return; - } - - OutgoingAsyncPtr outAsync; - Int invoke = 0; - Int requestId = 0; - - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - assert(_state > StateNotValidated); - - if(_state == StateClosed) - { - threadPool->promoteFollower(); - return; - } - if(_acmTimeout > 0) { _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); @@ -1174,8 +1102,41 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa try { + // + // We don't need to check magic and version here. This + // has already been done by the ThreadPool, which + // provides us the stream. + // + assert(stream.i == stream.b.end()); + stream.i = stream.b.begin() + 8; + Byte messageType; + stream.read(messageType); + stream.read(compress); + if(compress == 2) + { + BasicStream ustream(_instance.get()); + doUncompress(stream, ustream); + stream.b.swap(ustream.b); + } + stream.i = stream.b.begin() + headerSize; + switch(messageType) { + case closeConnectionMsg: + { + traceHeader("received close connection", stream, _logger, _traceLevels); + if(_endpoint->datagram() && _warn) + { + Warning out(_logger); + out << "ignoring close connection message for datagram connection:\n" << _desc; + } + else + { + setState(StateClosed, CloseConnectionException(__FILE__, __LINE__)); + } + break; + } + case requestMsg: { if(_state == StateClosing) @@ -1303,12 +1264,6 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa break; } - case closeConnectionMsg: - { - assert(false); // Message has special handling above. - break; - } - default: { traceHeader("received unknown message\n" @@ -1322,25 +1277,11 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa catch(const LocalException& ex) { setState(StateClosed, ex); - threadPool->promoteFollower(); return; } } // - // For all messages other than close connection (see comment - // above), we can promote a follower thread without holding the - // mutex lock. However, this must be done after requests have been - // removed from the request maps (due to reply messages, see code - // above). Otherwise there is a race condition with a close - // connection message that is received after reply messages. The - // close connection message might be processed before the reply - // messages are procsessed, meaning that requests would see a - // close connection instead of the response they already received. - // - threadPool->promoteFollower(); - - // // Asynchronous replies must be handled outside the thread // synchronization, so that nested calls are possible. // |