summaryrefslogtreecommitdiff
path: root/java/src/Ice/ConnectionI.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r--java/src/Ice/ConnectionI.java93
1 files changed, 51 insertions, 42 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 00645ce8dc9..5cc8188ef09 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -953,7 +953,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
StartCallback startCB = null;
java.util.List<OutgoingMessage> sentCBs = null;
MessageInfo info = null;
-
+ int dispatchCount = 0;
+
synchronized(this)
{
if(_state >= StateClosed)
@@ -1114,7 +1115,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_startCallback = null;
if(startCB != null)
{
- ++_dispatchCount;
+ ++dispatchCount;
}
}
}
@@ -1131,6 +1132,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// Optimization: use the thread's stream.
info = new MessageInfo(current.stream);
newOp |= parseMessage(info);
+ dispatchCount += info.messageDispatchCount;
}
if((readyOp & IceInternal.SocketOperation.Write) != 0)
@@ -1139,7 +1141,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
newOp |= sendNextMessage(sentCBs);
if(!sentCBs.isEmpty())
{
- ++_dispatchCount;
+ ++dispatchCount;
}
else
{
@@ -1155,9 +1157,23 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(readyOp == 0)
{
+ assert(dispatchCount == 0);
return;
}
}
+
+ if(_acmLastActivity > 0)
+ {
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
+ }
+
+ if(dispatchCount == 0)
+ {
+ return; // Nothing to dispatch we're done!
+ }
+
+ _dispatchCount += dispatchCount;
+ current.ioCompleted();
}
catch(DatagramLimitException ex) // Expected.
{
@@ -1194,13 +1210,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
return;
}
-
- if(_acmLastActivity > 0)
- {
- _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
- }
-
- current.ioCompleted();
}
if(!_dispatcher) // Optimization, call dispatch() directly if there's no
@@ -1239,7 +1248,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
protected void dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info)
{
- int count = 0;
+ int dispatchedCount = 0;
//
// Notify the factory that the connection establishment and
@@ -1248,7 +1257,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(startCB != null)
{
startCB.connectionStartCompleted(this);
- ++count;
+ ++dispatchedCount;
}
//
@@ -1260,7 +1269,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
msg.outAsync.invokeSent();
}
- ++count;
+ ++dispatchedCount;
}
if(info != null)
@@ -1271,8 +1280,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
if(info.outAsync != null)
{
- info.outAsync.finished(info.stream);
- ++count;
+ info.outAsync.invokeCompleted();
+ ++dispatchedCount;
}
if(info.heartbeatCallback != null)
@@ -1285,7 +1294,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
_logger.error("connection callback exception:\n" + ex + '\n' + _desc);
}
- ++count;
+ ++dispatchedCount;
}
//
@@ -1298,7 +1307,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter);
//
- // Don't increase count, the dispatch count is
+ // Don't increase dispatchedCount, the dispatch count is
// decreased when the incoming reply is sent.
//
}
@@ -1307,11 +1316,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
// Decrease dispatch count.
//
- if(count > 0)
+ if(dispatchedCount > 0)
{
synchronized(this)
{
- _dispatchCount -= count;
+ _dispatchCount -= dispatchedCount;
if(_dispatchCount == 0)
{
//
@@ -2194,28 +2203,26 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
observerFinishWrite(_writeStream.getBuffer());
}
}
+
+ //
+ // If all the messages were sent and we are in the closing state, we
+ // schedule the close timeout to wait for the peer to close the
+ // connection.
+ //
+ if(_state == StateClosing && _shutdownInitiated)
+ {
+ setState(StateClosingPending);
+ int op = _transceiver.closing(true, _exception);
+ if(op != 0)
+ {
+ return op;
+ }
+ }
}
catch(Ice.LocalException ex)
{
setState(StateClosed, ex);
- return IceInternal.SocketOperation.None;
}
-
- //
- // If all the messages were sent and we are in the closing state, we
- // schedule the close timeout to wait for the peer to close the
- // connection.
- //
- if(_state == StateClosing && _dispatchCount == 0)
- {
- setState(StateClosingPending);
- int op = _transceiver.closing(true, _exception);
- if(op != 0)
- {
- return op;
- }
- }
-
return IceInternal.SocketOperation.None;
}
@@ -2362,6 +2369,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
ObjectAdapter adapter;
IceInternal.OutgoingAsync outAsync;
ConnectionCallback heartbeatCallback;
+ int messageDispatchCount;
}
private int parseMessage(MessageInfo info)
@@ -2453,7 +2461,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
info.invokeNum = 1;
info.servantManager = _servantManager;
info.adapter = _adapter;
- ++_dispatchCount;
+ ++info.messageDispatchCount;
}
break;
}
@@ -2477,7 +2485,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
info.servantManager = _servantManager;
info.adapter = _adapter;
- _dispatchCount += info.invokeNum;
+ info.messageDispatchCount += info.invokeNum;
}
break;
}
@@ -2486,10 +2494,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
info.requestId = info.stream.readInt();
- info.outAsync = _asyncRequests.remove(info.requestId);
- if(info.outAsync != null)
+ IceInternal.OutgoingAsync outAsync = _asyncRequests.remove(info.requestId);
+ if(outAsync != null && outAsync.finished(info.stream))
{
- ++_dispatchCount;
+ info.outAsync = outAsync;
+ ++info.messageDispatchCount;
}
notifyAll(); // Notify threads blocked in close(false)
break;
@@ -2501,7 +2510,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_callback != null)
{
info.heartbeatCallback = _callback;
- ++_dispatchCount;
+ ++info.messageDispatchCount;
}
break;
}