summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/Connection.java
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2004-02-26 06:11:45 +0000
committerMarc Laukien <marc@zeroc.com>2004-02-26 06:11:45 +0000
commit8f7c4e80f2ddcf66dda08ffbc484596ea616168c (patch)
tree7ce9ed6a51159f25c681762a1d867c7c986c2278 /java/src/IceInternal/Connection.java
parentfix (diff)
downloadice-8f7c4e80f2ddcf66dda08ffbc484596ea616168c.tar.bz2
ice-8f7c4e80f2ddcf66dda08ffbc484596ea616168c.tar.xz
ice-8f7c4e80f2ddcf66dda08ffbc484596ea616168c.zip
fix
Diffstat (limited to 'java/src/IceInternal/Connection.java')
-rw-r--r--java/src/IceInternal/Connection.java142
1 files changed, 39 insertions, 103 deletions
diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java
index 31350361a80..989307d98be 100644
--- a/java/src/IceInternal/Connection.java
+++ b/java/src/IceInternal/Connection.java
@@ -932,96 +932,24 @@ public final class Connection extends EventHandler
public void
message(BasicStream stream, ThreadPool threadPool)
{
- byte messageType;
- byte compress;
-
- //
- // Read the message type, and uncompress the message if
- // necessary.
- //
- try
- {
- assert(stream.pos() == stream.size());
-
- //
- // We don't need to check magic and version here. This has
- // already been done by the ThreadPool, which provides us the
- // stream.
- //
-
- stream.pos(8);
- messageType = stream.readByte();
- compress = stream.readByte();
- if(compress == (byte)2)
- {
- throw new Ice.CompressionNotSupportedException();
- }
-
- stream.pos(Protocol.headerSize);
- }
- catch(Ice.LocalException ex)
- {
- synchronized(this)
- {
- setState(StateClosed, ex);
- threadPool.promoteFollower();
- return;
- }
- }
-
- //
- // We need a special handling for close connection
- // messages. If we get a close connection message, we must
- // *first* set the state to closed, and *then* promote a
- // follower thread. Otherwise we get lots of bogus warnings
- // about connections being lost.
- //
- if(messageType == Protocol.closeConnectionMsg)
- {
- synchronized(this)
- {
- if(_state == StateClosed)
- {
- threadPool.promoteFollower();
- return;
- }
-
- try
- {
- TraceUtil.traceHeader("received close connection", stream, _logger, _traceLevels);
- if(_endpoint.datagram())
- {
- if(_warn)
- {
- _logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
- }
- }
- else
- {
- setState(StateClosed, new Ice.CloseConnectionException());
- }
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- }
-
- threadPool.promoteFollower();
- return;
- }
- }
-
OutgoingAsync outAsync = null;
int invoke = 0;
int requestId = 0;
+ byte compress = 0;
synchronized(this)
{
+ //
+ // We must promote within the synchronization, otherwise
+ // there could be various race conditions with close
+ // connection messages and other messages.
+ //
+ threadPool.promoteFollower();
+
assert(_state > StateNotValidated);
if(_state == StateClosed)
{
- threadPool.promoteFollower();
return;
}
@@ -1030,10 +958,39 @@ public final class Connection extends EventHandler
_acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
}
- try
- {
+ try
+ {
+ //
+ // We don't need to check magic and version here. This
+ // has already been done by the ThreadPool, which
+ // provides us the stream.
+ //
+ assert(stream.pos() == stream.size());
+ stream.pos(8);
+ byte messageType = stream.readByte();
+ compress = stream.readByte();
+ if(compress == (byte)2)
+ {
+ throw new Ice.CompressionNotSupportedException();
+ }
+ stream.pos(Protocol.headerSize);
+
switch(messageType)
{
+ case Protocol.closeConnectionMsg:
+ {
+ TraceUtil.traceHeader("received close connection", stream, _logger, _traceLevels);
+ if(_endpoint.datagram() && _warn)
+ {
+ _logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
+ }
+ else
+ {
+ setState(StateClosed, new Ice.CloseConnectionException());
+ }
+ break;
+ }
+
case Protocol.requestMsg:
{
if(_state == StateClosing)
@@ -1103,12 +1060,6 @@ public final class Connection extends EventHandler
break;
}
- case Protocol.closeConnectionMsg:
- {
- assert(false); // Message has special handling above.
- break;
- }
-
default:
{
TraceUtil.traceHeader("received unknown message\n" +
@@ -1121,26 +1072,11 @@ public final class Connection extends EventHandler
catch(Ice.LocalException ex)
{
setState(StateClosed, ex);
- threadPool.promoteFollower();
return;
}
}
//
- // For all messages other than close connection (see comment
- // above), we can promote a follower thread without holding
- // the mutex lock. However, this must be done after requests
- // have been removed from the request maps (due to reply
- // messages, see code above). Otherwise there is a race
- // condition with a close connection message that is received
- // after reply messages. The close connection message might be
- // processed before the reply messages are procsessed, meaning
- // that requests would see a close connection instead of the
- // response they already received.
- //
- threadPool.promoteFollower();
-
- //
// Asynchronous replies must be handled outside the thread
// synchronization, so that nested calls are possible.
//