diff options
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/IceInternal/Connection.java | 160 | ||||
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 38 |
2 files changed, 127 insertions, 71 deletions
diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java index c0344de79b1..9d7bc899cd1 100644 --- a/java/src/IceInternal/Connection.java +++ b/java/src/IceInternal/Connection.java @@ -837,19 +837,101 @@ public final class Connection extends EventHandler public void message(BasicStream stream, ThreadPool threadPool) { - OutgoingAsync outAsync = null; + byte messageType; + byte compress; + + // + // Read the message type, and uncompress the message if + // necessary. + // + try + { + assert(stream.pos() == stream.size()); + + // + // We don't need to check magic and version here. This has + // already been done by the ThreadPool, which provides us the + // stream. + // + + stream.pos(8); + messageType = stream.readByte(); + compress = stream.readByte(); + if(compress == (byte)2) + { + throw new Ice.CompressionNotSupportedException(); + } + + stream.pos(Protocol.headerSize); + } + catch(Ice.LocalException ex) + { + synchronized(this) + { + threadPool.promoteFollower(); + setState(StateClosed, ex); + return; + } + } + // + // We need a special handling for close connection + // messages. If we get a close connection message, we must + // *first* set the state to closed, and *then* promote a + // follower thread. Otherwise we get lots of bogus warnings + // about connections being lost. + // + if(messageType == Protocol.closeConnectionMsg) + { + synchronized(this) + { + if(_state == StateClosed) + { + threadPool.promoteFollower(); + return; + } + + try + { + TraceUtil.traceHeader("received close connection", stream, _logger, _traceLevels); + if(_endpoint.datagram()) + { + if(_warn) + { + _logger.warning("ignoring close connection message for datagram connection:\n" + + _transceiver.toString()); + } + } + else + { + setState(StateClosed, new Ice.CloseConnectionException()); + } + } + catch(Ice.LocalException ex) + { + setState(StateClosed, ex); + } + + threadPool.promoteFollower(); + return; + } + } + + // + // For all other messages, we can promote a follower right + // away, without setting the state first, or holding the mutex + // lock. + // + threadPool.promoteFollower(); + + OutgoingAsync outAsync = null; int invoke = 0; int requestId = 0; - byte compress = 0; synchronized(this) { - threadPool.promoteFollower(); - if(_state == StateClosed) { - Thread.yield(); return; } @@ -860,56 +942,6 @@ public final class Connection extends EventHandler try { - assert(stream.pos() == stream.size()); - stream.pos(0); - - byte[] m = new byte[4]; - m[0] = stream.readByte(); - m[1] = stream.readByte(); - m[2] = stream.readByte(); - m[3] = stream.readByte(); - if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] - || m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3]) - { - Ice.BadMagicException ex = new Ice.BadMagicException(); - ex.badMagic = m; - throw ex; - } - - byte pMajor = stream.readByte(); - byte pMinor = stream.readByte(); - if(pMajor != Protocol.protocolMajor || pMinor > Protocol.protocolMinor) - { - Ice.UnsupportedProtocolException e = new Ice.UnsupportedProtocolException(); - e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor; - e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor; - e.major = Protocol.protocolMajor; - e.minor = Protocol.protocolMinor; - throw e; - } - - byte eMajor = stream.readByte(); - byte eMinor = stream.readByte(); - if(eMajor != Protocol.encodingMajor || eMinor > Protocol.encodingMinor) - { - Ice.UnsupportedEncodingException e = new Ice.UnsupportedEncodingException(); - e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor; - e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor; - e.major = Protocol.encodingMajor; - e.minor = Protocol.encodingMinor; - throw e; - } - - byte messageType = stream.readByte(); - compress = stream.readByte(); - - if(compress == (byte)2) - { - throw new Ice.CompressionNotSupportedException(); - } - - stream.pos(Protocol.headerSize); - switch(messageType) { case Protocol.requestMsg: @@ -989,19 +1021,7 @@ public final class Connection extends EventHandler case Protocol.closeConnectionMsg: { - TraceUtil.traceHeader("received close connection", stream, _logger, _traceLevels); - if(_endpoint.datagram()) - { - if(_warn) - { - _logger.warning("ignoring close connection message for datagram connection:\n" + - _transceiver.toString()); - } - } - else - { - throw new Ice.CloseConnectionException(); - } + assert(false); // Message has special handling above. break; } @@ -1110,6 +1130,8 @@ public final class Connection extends EventHandler public void finished(ThreadPool threadPool) { + threadPool.promoteFollower(); + Ice.LocalException closeException = null; IntMap requests = null; @@ -1117,8 +1139,6 @@ public final class Connection extends EventHandler synchronized(this) { - threadPool.promoteFollower(); - if(_state == StateActive || _state == StateClosing) { registerWithPool(); diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 813c6ac8816..0256c015b0f 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -627,6 +627,12 @@ public final class ThreadPool promoteFollower(); factory.shutdown(); + + // + // No "continue", because we want shutdown to be done in + // its own thread from this pool. Therefore we called + // promoteFollower(). + // } else { @@ -650,6 +656,13 @@ public final class ThreadPool sw.toString() + "\n" + handler.toString(); _instance.logger().error(s); } + + // + // No "continue", because we want finished() to be + // called in its own thread from this pool. Note + // that this means that finished() must call + // promoteFollower(). + // } else { @@ -690,7 +703,30 @@ public final class ThreadPool assert(stream.pos() == stream.size()); } - handler.message(stream, this); + // + // Provide a new mesage to the handler. + // + try + { + handler.message(stream, this); + } + catch(Ice.LocalException ex) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = "exception in `" + _prefix + "' while calling finished():\n" + + sw.toString() + "\n" + handler.toString(); + _instance.logger().error(s); + } + + // + // No "continue", because we want message() to + // be called in its own thread from this + // pool. Note that this means that message() + // must call promoteFollower(). + // } finally { |