summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2004-02-26 06:11:45 +0000
committerMarc Laukien <marc@zeroc.com>2004-02-26 06:11:45 +0000
commit8f7c4e80f2ddcf66dda08ffbc484596ea616168c (patch)
tree7ce9ed6a51159f25c681762a1d867c7c986c2278 /cpp/src
parentfix (diff)
downloadice-8f7c4e80f2ddcf66dda08ffbc484596ea616168c.tar.bz2
ice-8f7c4e80f2ddcf66dda08ffbc484596ea616168c.tar.xz
ice-8f7c4e80f2ddcf66dda08ffbc484596ea616168c.zip
fix
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/Connection.cpp141
1 files changed, 41 insertions, 100 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index c7f331f81e1..8ee44c819f6 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -1073,100 +1073,28 @@ IceInternal::Connection::read(BasicStream& stream)
void
IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threadPool)
{
- Byte messageType;
+ OutgoingAsyncPtr outAsync;
+ Int invoke = 0;
+ Int requestId = 0;
Byte compress;
- //
- // Read the message type, and uncompress the message if necessary.
- //
- try
{
- assert(stream.i == stream.b.end());
-
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
//
- // We don't need to check magic and version here. This has
- // already been done by the ThreadPool, which provides us the
- // stream.
+ // We must promote within the synchronization, otherwise
+ // there could be various race conditions with close
+ // connection messages and other messages.
//
-
- stream.i = stream.b.begin() + 8;
- stream.read(messageType);
- stream.read(compress);
- if(compress == 2)
- {
- BasicStream ustream(_instance.get());
- doUncompress(stream, ustream);
- stream.b.swap(ustream.b);
- }
-
- stream.i = stream.b.begin() + headerSize;
- }
- catch(const LocalException& ex)
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
threadPool->promoteFollower();
- return;
- }
-
- //
- // We need a special handling for close connection messages. If we
- // get a close connection message, we must *first* set the state
- // to closed, and *then* promote a follower thread. Otherwise we get
- // lots of bogus warnings about connections being lost.
- //
- if(messageType == closeConnectionMsg)
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(_state > StateNotValidated);
if(_state == StateClosed)
{
- threadPool->promoteFollower();
return;
}
- try
- {
- traceHeader("received close connection", stream, _logger, _traceLevels);
- if(_endpoint->datagram())
- {
- if(_warn)
- {
- Warning out(_logger);
- out << "ignoring close connection message for datagram connection:\n" << _desc;
- }
- }
- else
- {
- setState(StateClosed, CloseConnectionException(__FILE__, __LINE__));
- }
- }
- catch(const LocalException& ex)
- {
- setState(StateClosed, ex);
- }
-
- threadPool->promoteFollower();
- return;
- }
-
- OutgoingAsyncPtr outAsync;
- Int invoke = 0;
- Int requestId = 0;
-
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- assert(_state > StateNotValidated);
-
- if(_state == StateClosed)
- {
- threadPool->promoteFollower();
- return;
- }
-
if(_acmTimeout > 0)
{
_acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
@@ -1174,8 +1102,41 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
try
{
+ //
+ // We don't need to check magic and version here. This
+ // has already been done by the ThreadPool, which
+ // provides us the stream.
+ //
+ assert(stream.i == stream.b.end());
+ stream.i = stream.b.begin() + 8;
+ Byte messageType;
+ stream.read(messageType);
+ stream.read(compress);
+ if(compress == 2)
+ {
+ BasicStream ustream(_instance.get());
+ doUncompress(stream, ustream);
+ stream.b.swap(ustream.b);
+ }
+ stream.i = stream.b.begin() + headerSize;
+
switch(messageType)
{
+ case closeConnectionMsg:
+ {
+ traceHeader("received close connection", stream, _logger, _traceLevels);
+ if(_endpoint->datagram() && _warn)
+ {
+ Warning out(_logger);
+ out << "ignoring close connection message for datagram connection:\n" << _desc;
+ }
+ else
+ {
+ setState(StateClosed, CloseConnectionException(__FILE__, __LINE__));
+ }
+ break;
+ }
+
case requestMsg:
{
if(_state == StateClosing)
@@ -1303,12 +1264,6 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
break;
}
- case closeConnectionMsg:
- {
- assert(false); // Message has special handling above.
- break;
- }
-
default:
{
traceHeader("received unknown message\n"
@@ -1322,25 +1277,11 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
catch(const LocalException& ex)
{
setState(StateClosed, ex);
- threadPool->promoteFollower();
return;
}
}
//
- // For all messages other than close connection (see comment
- // above), we can promote a follower thread without holding the
- // mutex lock. However, this must be done after requests have been
- // removed from the request maps (due to reply messages, see code
- // above). Otherwise there is a race condition with a close
- // connection message that is received after reply messages. The
- // close connection message might be processed before the reply
- // messages are procsessed, meaning that requests would see a
- // close connection instead of the response they already received.
- //
- threadPool->promoteFollower();
-
- //
// Asynchronous replies must be handled outside the thread
// synchronization, so that nested calls are possible.
//