summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-09-05 13:17:45 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-09-05 13:17:45 +0200
commit65d91832bd0f6bf55bfefd1244582cec2e5139dc (patch)
treee8aa359587a3605a5c6fa79f0842321554449c0b /java
parentJS minor fix, remove unused variables (diff)
downloadice-65d91832bd0f6bf55bfefd1244582cec2e5139dc.tar.bz2
ice-65d91832bd0f6bf55bfefd1244582cec2e5139dc.tar.xz
ice-65d91832bd0f6bf55bfefd1244582cec2e5139dc.zip
Added back to optmization to not call connection dispatch if not necessary
Diffstat (limited to 'java')
-rw-r--r--java/src/Ice/ConnectionI.java93
-rw-r--r--java/src/IceInternal/AsyncResultI.java2
-rw-r--r--java/src/IceInternal/CollocatedRequestHandler.java17
-rw-r--r--java/src/IceInternal/OutgoingAsync.java105
4 files changed, 125 insertions, 92 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 00645ce8dc9..5cc8188ef09 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -953,7 +953,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
StartCallback startCB = null;
java.util.List<OutgoingMessage> sentCBs = null;
MessageInfo info = null;
-
+ int dispatchCount = 0;
+
synchronized(this)
{
if(_state >= StateClosed)
@@ -1114,7 +1115,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_startCallback = null;
if(startCB != null)
{
- ++_dispatchCount;
+ ++dispatchCount;
}
}
}
@@ -1131,6 +1132,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// Optimization: use the thread's stream.
info = new MessageInfo(current.stream);
newOp |= parseMessage(info);
+ dispatchCount += info.messageDispatchCount;
}
if((readyOp & IceInternal.SocketOperation.Write) != 0)
@@ -1139,7 +1141,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
newOp |= sendNextMessage(sentCBs);
if(!sentCBs.isEmpty())
{
- ++_dispatchCount;
+ ++dispatchCount;
}
else
{
@@ -1155,9 +1157,23 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(readyOp == 0)
{
+ assert(dispatchCount == 0);
return;
}
}
+
+ if(_acmLastActivity > 0)
+ {
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
+ }
+
+ if(dispatchCount == 0)
+ {
+ return; // Nothing to dispatch we're done!
+ }
+
+ _dispatchCount += dispatchCount;
+ current.ioCompleted();
}
catch(DatagramLimitException ex) // Expected.
{
@@ -1194,13 +1210,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
return;
}
-
- if(_acmLastActivity > 0)
- {
- _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
- }
-
- current.ioCompleted();
}
if(!_dispatcher) // Optimization, call dispatch() directly if there's no
@@ -1239,7 +1248,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
protected void dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info)
{
- int count = 0;
+ int dispatchedCount = 0;
//
// Notify the factory that the connection establishment and
@@ -1248,7 +1257,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(startCB != null)
{
startCB.connectionStartCompleted(this);
- ++count;
+ ++dispatchedCount;
}
//
@@ -1260,7 +1269,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
msg.outAsync.invokeSent();
}
- ++count;
+ ++dispatchedCount;
}
if(info != null)
@@ -1271,8 +1280,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
if(info.outAsync != null)
{
- info.outAsync.finished(info.stream);
- ++count;
+ info.outAsync.invokeCompleted();
+ ++dispatchedCount;
}
if(info.heartbeatCallback != null)
@@ -1285,7 +1294,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
_logger.error("connection callback exception:\n" + ex + '\n' + _desc);
}
- ++count;
+ ++dispatchedCount;
}
//
@@ -1298,7 +1307,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter);
//
- // Don't increase count, the dispatch count is
+ // Don't increase dispatchedCount, the dispatch count is
// decreased when the incoming reply is sent.
//
}
@@ -1307,11 +1316,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
// Decrease dispatch count.
//
- if(count > 0)
+ if(dispatchedCount > 0)
{
synchronized(this)
{
- _dispatchCount -= count;
+ _dispatchCount -= dispatchedCount;
if(_dispatchCount == 0)
{
//
@@ -2194,28 +2203,26 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
observerFinishWrite(_writeStream.getBuffer());
}
}
+
+ //
+ // If all the messages were sent and we are in the closing state, we
+ // schedule the close timeout to wait for the peer to close the
+ // connection.
+ //
+ if(_state == StateClosing && _shutdownInitiated)
+ {
+ setState(StateClosingPending);
+ int op = _transceiver.closing(true, _exception);
+ if(op != 0)
+ {
+ return op;
+ }
+ }
}
catch(Ice.LocalException ex)
{
setState(StateClosed, ex);
- return IceInternal.SocketOperation.None;
}
-
- //
- // If all the messages were sent and we are in the closing state, we
- // schedule the close timeout to wait for the peer to close the
- // connection.
- //
- if(_state == StateClosing && _dispatchCount == 0)
- {
- setState(StateClosingPending);
- int op = _transceiver.closing(true, _exception);
- if(op != 0)
- {
- return op;
- }
- }
-
return IceInternal.SocketOperation.None;
}
@@ -2362,6 +2369,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
ObjectAdapter adapter;
IceInternal.OutgoingAsync outAsync;
ConnectionCallback heartbeatCallback;
+ int messageDispatchCount;
}
private int parseMessage(MessageInfo info)
@@ -2453,7 +2461,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
info.invokeNum = 1;
info.servantManager = _servantManager;
info.adapter = _adapter;
- ++_dispatchCount;
+ ++info.messageDispatchCount;
}
break;
}
@@ -2477,7 +2485,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
info.servantManager = _servantManager;
info.adapter = _adapter;
- _dispatchCount += info.invokeNum;
+ info.messageDispatchCount += info.invokeNum;
}
break;
}
@@ -2486,10 +2494,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
info.requestId = info.stream.readInt();
- info.outAsync = _asyncRequests.remove(info.requestId);
- if(info.outAsync != null)
+ IceInternal.OutgoingAsync outAsync = _asyncRequests.remove(info.requestId);
+ if(outAsync != null && outAsync.finished(info.stream))
{
- ++_dispatchCount;
+ info.outAsync = outAsync;
+ ++info.messageDispatchCount;
}
notifyAll(); // Notify threads blocked in close(false)
break;
@@ -2501,7 +2510,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_callback != null)
{
info.heartbeatCallback = _callback;
- ++_dispatchCount;
+ ++info.messageDispatchCount;
}
break;
}
diff --git a/java/src/IceInternal/AsyncResultI.java b/java/src/IceInternal/AsyncResultI.java
index 9b211f0bb72..e2ca0b7c466 100644
--- a/java/src/IceInternal/AsyncResultI.java
+++ b/java/src/IceInternal/AsyncResultI.java
@@ -456,7 +456,7 @@ public class AsyncResultI implements Ice.AsyncResult
{
}
- protected final void invokeCompleted()
+ public final void invokeCompleted()
{
//
// Note: no need to change the _state here, specializations are responsible for
diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java
index 1d97dc9a8d6..3c94e5d93a8 100644
--- a/java/src/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/IceInternal/CollocatedRequestHandler.java
@@ -227,16 +227,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
TraceUtil.traceRecv(os, _logger, _traceLevels);
}
- outAsync = _asyncRequests.get(requestId);
- if(outAsync != null)
- {
- _asyncRequests.remove(requestId);
- }
+ outAsync = _asyncRequests.remove(requestId);
}
- if(outAsync != null)
+ if(outAsync != null && outAsync.finished(os))
{
- outAsync.finished(os);
+ outAsync.invokeCompleted();
}
_adapter.decDirectCount();
}
@@ -500,13 +496,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
OutgoingAsync outAsync = null;
synchronized(this)
{
- outAsync = _asyncRequests.get(requestId);
- if(outAsync != null)
- {
- _asyncRequests.remove(requestId);
- }
+ outAsync = _asyncRequests.remove(requestId);
}
-
if(outAsync != null)
{
outAsync.finished(ex);
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 0618ffb93e9..84bfc477768 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -236,12 +236,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
//
try
{
- if(!handleException(exc))
- {
- return; // Can't be retried immediately.
- }
-
- invoke(false); // Retry the invocation
+ handleException(exc);
}
catch(Ice.Exception ex)
{
@@ -262,8 +257,14 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
});
}
- public final void finished(BasicStream is)
+ public final boolean finished(BasicStream is)
{
+ //
+ // NOTE: this method is called from ConnectionI.parseMessage
+ // with the connection locked. Therefore, it must not invoke
+ // any user callbacks.
+ //
+
assert (_proxy.ice_isTwoway()); // Can only be called for twoways.
byte replyStatus;
@@ -278,14 +279,14 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
_childObserver.detach();
_childObserver = null;
}
-
+
if(_timeoutRequestHandler != null)
{
_future.cancel(false);
_future = null;
_timeoutRequestHandler = null;
}
-
+
// _is can already be initialized if the invocation is retried
if(_is == null)
{
@@ -293,7 +294,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
}
_is.swap(is);
replyStatus = _is.readByte();
-
+
switch(replyStatus)
{
case ReplyStatus.replyOK:
@@ -415,25 +416,59 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
}
}
- _state |= StateDone;
- // Clear buffer now, instead of waiting for AsyncResult
- // deallocation
- // _os.resize(0, false);
if(replyStatus == ReplyStatus.replyOK)
{
_state |= StateOK;
}
+ _state |= StateDone;
_monitor.notifyAll();
+
+ if(_callback == null)
+ {
+ if(_observer != null)
+ {
+ _observer.detach();
+ _observer = null;
+ }
+ return false;
+ }
+ return true;
}
}
- catch(Ice.LocalException ex)
+ catch(Ice.Exception exc)
{
- finished(ex);
- return;
- }
+ //
+ // We don't call finished(exc) here because we don't want
+ // to invoke the completion callback. The completion
+ // callback is invoked by the connection is this method
+ // returns true.
+ //
+ try
+ {
+ handleException(exc);
+ return false;
+ }
+ catch(Ice.LocalException ex)
+ {
+ synchronized(_monitor)
+ {
+ _state |= StateDone;
+ _exception = ex;
+ _monitor.notifyAll();
- assert (replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException);
- invokeCompleted();
+ if(_callback == null)
+ {
+ if(_observer != null)
+ {
+ _observer.detach();
+ _observer = null;
+ }
+ return false;
+ }
+ return true;
+ }
+ }
+ }
}
public final boolean invoke(boolean synchronous)
@@ -500,21 +535,19 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
}
}
}
- break;
}
catch(RetryException ex)
{
// Clear request handler and retry.
_proxy.__setRequestHandler(_handler, null);
+ continue;
}
catch(Ice.Exception ex)
{
// This will throw if the invocation can't be retried.
- if(!handleException(ex))
- {
- break; // Can't be retried immediately.
- }
+ handleException(ex);
}
+ break;
}
return _sentSynchronously;
}
@@ -589,7 +622,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
super.invokeExceptionAsync(ex);
}
- private boolean handleException(Ice.Exception exc)
+ private void handleException(Ice.Exception exc)
{
try
{
@@ -599,16 +632,16 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
{
_observer.retried(); // Invocation is being retried.
}
- if(interval.value > 0)
- {
- _instance.retryQueue().add(this, interval.value);
- return false; // Don't retry immediately, the retry queue will
- // take care of the retry.
- }
- else
- {
- return true; // Retry immediately.
- }
+
+ //
+ // Schedule the retry. Note that we always schedule the retry
+ // on the retry queue even if the invocation can be retried
+ // immediately. This is required because it might not be safe
+ // to retry from this thread (this is for instance called by
+ // finished(BasicStream) which is called with the connection
+ // locked.
+ //
+ _instance.retryQueue().add(this, interval.value);
}
catch(Ice.Exception ex)
{