summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/src')
-rw-r--r--java/src/IceInternal/Connection.java160
-rw-r--r--java/src/IceInternal/ThreadPool.java38
2 files changed, 127 insertions, 71 deletions
diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java
index c0344de79b1..9d7bc899cd1 100644
--- a/java/src/IceInternal/Connection.java
+++ b/java/src/IceInternal/Connection.java
@@ -837,19 +837,101 @@ public final class Connection extends EventHandler
public void
message(BasicStream stream, ThreadPool threadPool)
{
- OutgoingAsync outAsync = null;
+ byte messageType;
+ byte compress;
+
+ //
+ // Read the message type, and uncompress the message if
+ // necessary.
+ //
+ try
+ {
+ assert(stream.pos() == stream.size());
+
+ //
+ // We don't need to check magic and version here. This has
+ // already been done by the ThreadPool, which provides us the
+ // stream.
+ //
+
+ stream.pos(8);
+ messageType = stream.readByte();
+ compress = stream.readByte();
+ if(compress == (byte)2)
+ {
+ throw new Ice.CompressionNotSupportedException();
+ }
+
+ stream.pos(Protocol.headerSize);
+ }
+ catch(Ice.LocalException ex)
+ {
+ synchronized(this)
+ {
+ threadPool.promoteFollower();
+ setState(StateClosed, ex);
+ return;
+ }
+ }
+ //
+ // We need a special handling for close connection
+ // messages. If we get a close connection message, we must
+ // *first* set the state to closed, and *then* promote a
+ // follower thread. Otherwise we get lots of bogus warnings
+ // about connections being lost.
+ //
+ if(messageType == Protocol.closeConnectionMsg)
+ {
+ synchronized(this)
+ {
+ if(_state == StateClosed)
+ {
+ threadPool.promoteFollower();
+ return;
+ }
+
+ try
+ {
+ TraceUtil.traceHeader("received close connection", stream, _logger, _traceLevels);
+ if(_endpoint.datagram())
+ {
+ if(_warn)
+ {
+ _logger.warning("ignoring close connection message for datagram connection:\n" +
+ _transceiver.toString());
+ }
+ }
+ else
+ {
+ setState(StateClosed, new Ice.CloseConnectionException());
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ }
+
+ threadPool.promoteFollower();
+ return;
+ }
+ }
+
+ //
+ // For all other messages, we can promote a follower right
+ // away, without setting the state first, or holding the mutex
+ // lock.
+ //
+ threadPool.promoteFollower();
+
+ OutgoingAsync outAsync = null;
int invoke = 0;
int requestId = 0;
- byte compress = 0;
synchronized(this)
{
- threadPool.promoteFollower();
-
if(_state == StateClosed)
{
- Thread.yield();
return;
}
@@ -860,56 +942,6 @@ public final class Connection extends EventHandler
try
{
- assert(stream.pos() == stream.size());
- stream.pos(0);
-
- byte[] m = new byte[4];
- m[0] = stream.readByte();
- m[1] = stream.readByte();
- m[2] = stream.readByte();
- m[3] = stream.readByte();
- if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1]
- || m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
- {
- Ice.BadMagicException ex = new Ice.BadMagicException();
- ex.badMagic = m;
- throw ex;
- }
-
- byte pMajor = stream.readByte();
- byte pMinor = stream.readByte();
- if(pMajor != Protocol.protocolMajor || pMinor > Protocol.protocolMinor)
- {
- Ice.UnsupportedProtocolException e = new Ice.UnsupportedProtocolException();
- e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor;
- e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor;
- e.major = Protocol.protocolMajor;
- e.minor = Protocol.protocolMinor;
- throw e;
- }
-
- byte eMajor = stream.readByte();
- byte eMinor = stream.readByte();
- if(eMajor != Protocol.encodingMajor || eMinor > Protocol.encodingMinor)
- {
- Ice.UnsupportedEncodingException e = new Ice.UnsupportedEncodingException();
- e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor;
- e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor;
- e.major = Protocol.encodingMajor;
- e.minor = Protocol.encodingMinor;
- throw e;
- }
-
- byte messageType = stream.readByte();
- compress = stream.readByte();
-
- if(compress == (byte)2)
- {
- throw new Ice.CompressionNotSupportedException();
- }
-
- stream.pos(Protocol.headerSize);
-
switch(messageType)
{
case Protocol.requestMsg:
@@ -989,19 +1021,7 @@ public final class Connection extends EventHandler
case Protocol.closeConnectionMsg:
{
- TraceUtil.traceHeader("received close connection", stream, _logger, _traceLevels);
- if(_endpoint.datagram())
- {
- if(_warn)
- {
- _logger.warning("ignoring close connection message for datagram connection:\n" +
- _transceiver.toString());
- }
- }
- else
- {
- throw new Ice.CloseConnectionException();
- }
+ assert(false); // Message has special handling above.
break;
}
@@ -1110,6 +1130,8 @@ public final class Connection extends EventHandler
public void
finished(ThreadPool threadPool)
{
+ threadPool.promoteFollower();
+
Ice.LocalException closeException = null;
IntMap requests = null;
@@ -1117,8 +1139,6 @@ public final class Connection extends EventHandler
synchronized(this)
{
- threadPool.promoteFollower();
-
if(_state == StateActive || _state == StateClosing)
{
registerWithPool();
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 813c6ac8816..0256c015b0f 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -627,6 +627,12 @@ public final class ThreadPool
promoteFollower();
factory.shutdown();
+
+ //
+ // No "continue", because we want shutdown to be done in
+ // its own thread from this pool. Therefore we called
+ // promoteFollower().
+ //
}
else
{
@@ -650,6 +656,13 @@ public final class ThreadPool
sw.toString() + "\n" + handler.toString();
_instance.logger().error(s);
}
+
+ //
+ // No "continue", because we want finished() to be
+ // called in its own thread from this pool. Note
+ // that this means that finished() must call
+ // promoteFollower().
+ //
}
else
{
@@ -690,7 +703,30 @@ public final class ThreadPool
assert(stream.pos() == stream.size());
}
- handler.message(stream, this);
+ //
+ // Provide a new mesage to the handler.
+ //
+ try
+ {
+ handler.message(stream, this);
+ }
+ catch(Ice.LocalException ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "exception in `" + _prefix + "' while calling finished():\n" +
+ sw.toString() + "\n" + handler.toString();
+ _instance.logger().error(s);
+ }
+
+ //
+ // No "continue", because we want message() to
+ // be called in its own thread from this
+ // pool. Note that this means that message()
+ // must call promoteFollower().
+ //
}
finally
{