diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-07-15 10:22:40 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-07-15 10:22:40 +0200 |
commit | 710a9221852d6c92b1727a429a33b38f1f949352 (patch) | |
tree | 6bc9ac9ed04a6b1858d8fc30282d4f18ef04abbb /java/src | |
parent | - Fix for ICE-5578 - Python build failure (diff) | |
download | ice-710a9221852d6c92b1727a429a33b38f1f949352.tar.bz2 ice-710a9221852d6c92b1727a429a33b38f1f949352.tar.xz ice-710a9221852d6c92b1727a429a33b38f1f949352.zip |
Fixed collocation optimization to use the dispatcher, minor test fixes
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/Ice/AsyncResult.java | 24 | ||||
-rw-r--r-- | java/src/Ice/ConnectionI.java | 169 | ||||
-rw-r--r-- | java/src/IceInternal/BatchOutgoing.java | 2 | ||||
-rw-r--r-- | java/src/IceInternal/BatchOutgoingAsync.java | 38 | ||||
-rw-r--r-- | java/src/IceInternal/CollocatedRequestHandler.java | 97 | ||||
-rw-r--r-- | java/src/IceInternal/ConnectRequestHandler.java | 25 | ||||
-rw-r--r-- | java/src/IceInternal/DispatchWorkItem.java | 36 | ||||
-rw-r--r-- | java/src/IceInternal/Outgoing.java | 20 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 30 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingAsyncMessageCallback.java | 15 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingMessageCallback.java | 2 | ||||
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 32 | ||||
-rw-r--r-- | java/src/IceInternal/ThreadPoolCurrent.java | 6 |
13 files changed, 247 insertions, 249 deletions
diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java index d3146d1533f..8667a1afb88 100644 --- a/java/src/Ice/AsyncResult.java +++ b/java/src/Ice/AsyncResult.java @@ -252,7 +252,7 @@ public class AsyncResult // try { - _instance.clientThreadPool().execute(new IceInternal.DispatchWorkItem(_cachedConnection) + _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection) { public void run() @@ -370,7 +370,7 @@ public class AsyncResult // try { - _instance.clientThreadPool().execute(new IceInternal.DispatchWorkItem(_cachedConnection) + _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection) { public void run() @@ -488,25 +488,7 @@ public class AsyncResult if(handler != null) { - final IceInternal.RequestHandler h = handler; - Ice.Connection connection = null; - try - { - connection = handler.getConnection(false); - } - catch(Ice.LocalException e) - { - // Ignore. - } - _instance.clientThreadPool().execute( - new IceInternal.DispatchWorkItem(connection) - { - public void - run() - { - h.asyncRequestTimedOut((IceInternal.OutgoingAsyncMessageCallback)AsyncResult.this); - } - }); + handler.asyncRequestTimedOut((IceInternal.OutgoingAsyncMessageCallback)this); } } diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 15290246234..70fed8c7ec7 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -949,12 +949,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // If the request is being sent, don't remove it from the send streams, // it will be removed once the sending is finished. // - boolean isSent = o.timedOut(); + o.timedOut(); if(o != _sendStreams.getFirst()) { it.remove(); } - out.finished(new InvocationTimeoutException(), isSent); + out.finished(new InvocationTimeoutException()); return; // We're done. } } @@ -967,7 +967,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(it2.next() == o) { - o.finished(new InvocationTimeoutException(), true); + o.finished(new InvocationTimeoutException()); it2.remove(); return; // We're done. } @@ -978,60 +978,45 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public void asyncRequestTimedOut(IceInternal.OutgoingAsyncMessageCallback outAsync) { - boolean isSent = false; - boolean finished = false; - - synchronized(this) + java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator(); + while(it.hasNext()) { - java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator(); - while(it.hasNext()) + OutgoingMessage o = it.next(); + if(o.outAsync == outAsync) { - OutgoingMessage o = it.next(); - if(o.outAsync == outAsync) + if(o.requestId > 0) { - if(o.requestId > 0) - { - _asyncRequests.remove(o.requestId); - } + _asyncRequests.remove(o.requestId); + } - // - // If the request is being sent, don't remove it from the send streams, - // it will be removed once the sending is finished. - // - isSent = o.timedOut(); - if(o != _sendStreams.getFirst()) - { - it.remove(); - } - finished = true; - break; // We're done. + // + // If the request is being sent, don't remove it from the send streams, + // it will be removed once the sending is finished. + // + o.timedOut(); + if(o != _sendStreams.getFirst()) + { + it.remove(); } + outAsync.__dispatchInvocationTimeout(_threadPool, this); + return; // We're done } + } - if(!finished) + if(outAsync instanceof IceInternal.OutgoingAsync) + { + IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)outAsync; + java.util.Iterator<IceInternal.OutgoingAsync> it2 = _asyncRequests.values().iterator(); + while(it2.hasNext()) { - if(outAsync instanceof IceInternal.OutgoingAsync) + if(it2.next() == o) { - IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)outAsync; - java.util.Iterator<IceInternal.OutgoingAsync> it2 = _asyncRequests.values().iterator(); - while(it2.hasNext()) - { - if(it2.next() == o) - { - it2.remove(); - finished = true; - isSent = true; - break; // We're done. - } - } + it2.remove(); + outAsync.__dispatchInvocationTimeout(_threadPool, this); + return; // We're done. } } } - - if(finished) - { - outAsync.__finished(new InvocationTimeoutException(), isSent); - } } @@ -1418,7 +1403,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne current.ioCompleted(); } - if(_dispatcher != null) + if(!_dispatcher) // Optimization, call dispatch() directly if there's no dispatcher. + { + dispatch(startCB, sentCBs, info); + } + else { if(info != null && info.heartbeatCallback == null) // No need for the stream if heartbeat callback { @@ -1435,28 +1424,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne final StartCallback finalStartCB = startCB; final java.util.List<OutgoingMessage> finalSentCBs = sentCBs; final MessageInfo finalInfo = info; - try - { - _dispatcher.dispatch(new Runnable() - { - public void - run() - { - dispatch(finalStartCB, finalSentCBs, finalInfo); - } - }, this); - } - catch(java.lang.Exception ex) - { - if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + _threadPool.dispatchFromThisThread( + new IceInternal.DispatchWorkItem(this) { - warning("dispatch exception", ex); - } - } - } - else - { - dispatch(startCB, sentCBs, info); + public void + run() + { + dispatch(finalStartCB, finalSentCBs, finalInfo); + } + }); } } @@ -1586,32 +1562,22 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return; } - if(_dispatcher == null) + current.ioCompleted(); + if(!_dispatcher) // Optimization, call finish() directly if there's no dispatcher. { - current.ioCompleted(); finish(); } else { - try - { - _dispatcher.dispatch(new Runnable() - { - public void - run() - { - finish(); - } - }, - this); - } - catch(java.lang.Exception ex) - { - if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + _threadPool.dispatchFromThisThread( + new IceInternal.DispatchWorkItem(this) { - warning("dispatch exception", ex); - } - } + public void + run() + { + finish(); + } + }); } } @@ -1656,13 +1622,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne for(IceInternal.Outgoing p : _requests.values()) { - p.finished(_exception, true); + p.finished(_exception); } _requests.clear(); for(IceInternal.OutgoingAsync p : _asyncRequests.values()) { - p.__finished(_exception, true); + p.__finished(_exception); } _asyncRequests.clear(); @@ -1796,7 +1762,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _endpoint = endpoint; _adapter = adapter; final Ice.InitializationData initData = instance.initializationData(); - _dispatcher = initData.dispatcher; // Cached for better performance. + _dispatcher = initData.dispatcher != null; // Cached for better performance. _logger = initData.logger; // Cached for better performance. _traceLevels = instance.traceLevels(); // Cached for better performance. _timer = instance.timer(); @@ -3093,7 +3059,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne this.stream = stream; this.compress = compress; this.adopt = adopt; - this.isSent = false; this.requestId = 0; } @@ -3104,7 +3069,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne this.compress = compress; this.out = out; this.requestId = requestId; - this.isSent = false; } OutgoingMessage(IceInternal.OutgoingAsyncMessageCallback out, IceInternal.BasicStream stream, boolean compress, @@ -3114,16 +3078,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne this.compress = compress; this.outAsync = out; this.requestId = requestId; - this.isSent = false; } - public boolean + public void timedOut() { - assert((out != null || outAsync != null) && !isSent); + assert((out != null || outAsync != null)); out = null; outAsync = null; - return isSent; } public void @@ -3142,21 +3104,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public boolean sent() { - isSent = true; // The message is sent. - if(out != null) { out.sent(); - return false; } else if(outAsync != null) { return outAsync.__sent(); } - else - { - return false; - } + return false; } public void @@ -3164,11 +3120,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(out != null) { - out.finished(ex, isSent); + out.finished(ex); } else if(outAsync != null) { - outAsync.__finished(ex, isSent); + outAsync.__finished(ex); } } @@ -3179,7 +3135,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public int requestId; boolean adopt; boolean prepared; - boolean isSent; } private Communicator _communicator; @@ -3194,7 +3149,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private ObjectAdapter _adapter; private IceInternal.ServantManager _servantManager; - private final Dispatcher _dispatcher; + private final boolean _dispatcher; private final Logger _logger; private final IceInternal.TraceLevels _traceLevels; private final IceInternal.ThreadPool _threadPool; diff --git a/java/src/IceInternal/BatchOutgoing.java b/java/src/IceInternal/BatchOutgoing.java index 8647e89ccde..acee3cdd6de 100644 --- a/java/src/IceInternal/BatchOutgoing.java +++ b/java/src/IceInternal/BatchOutgoing.java @@ -182,7 +182,7 @@ public final class BatchOutgoing implements OutgoingMessageCallback } public synchronized void - finished(Ice.Exception ex, boolean sent) + finished(Ice.Exception ex) { if(_childObserver != null) { diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java index 2b08cb9aa04..81dfe8a5a4c 100644 --- a/java/src/IceInternal/BatchOutgoingAsync.java +++ b/java/src/IceInternal/BatchOutgoingAsync.java @@ -58,26 +58,42 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync } public void - __finished(Ice.Exception exc, boolean sent) + __finished(Ice.Exception exc) { - if(_childObserver != null) - { - _childObserver.failed(exc.ice_name()); - _childObserver.detach(); - _childObserver = null; - } - if(_timeoutRequestHandler != null) + synchronized(_monitor) { - _instance.timer().cancel(this); - _timeoutRequestHandler = null; + if(_childObserver != null) + { + _childObserver.failed(exc.ice_name()); + _childObserver.detach(); + _childObserver = null; + } + if(_timeoutRequestHandler != null) + { + _instance.timer().cancel(this); + _timeoutRequestHandler = null; + } } __invokeException(exc); } public void + __dispatchInvocationTimeout(ThreadPool threadPool, Ice.Connection connection) + { + threadPool.dispatch( + new DispatchWorkItem(connection) + { + public void + run() + { + BatchOutgoingAsync.this.__finished(new Ice.InvocationTimeoutException()); + } + }); + } + + public void runTimerTask() { __runTimerTask(); } - } diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java index 245d714a64b..d857b443475 100644 --- a/java/src/IceInternal/CollocatedRequestHandler.java +++ b/java/src/IceInternal/CollocatedRequestHandler.java @@ -80,6 +80,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler CollocatedRequestHandler(Reference ref, Ice.ObjectAdapter adapter) { _reference = ref; + _dispatcher = ref.getInstance().initializationData().dispatcher != null; _response = _reference.getMode() == Reference.ModeTwoway; _adapter = (Ice.ObjectAdapterI)adapter; @@ -150,7 +151,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler _batchAutoFlush); stream.swap(_batchStream); - _adapter.getThreadPool().execute( + _adapter.getThreadPool().dispatch( new DispatchWorkItem() { public void @@ -235,7 +236,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { _requests.remove(requestId); } - out.finished(new Ice.InvocationTimeoutException(), false); + out.finished(new Ice.InvocationTimeoutException()); _sendRequests.remove(out); } else if(out instanceof Outgoing) @@ -246,7 +247,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { if(e.getValue() == o) { - o.finished(new Ice.InvocationTimeoutException(), true); + out.finished(new Ice.InvocationTimeoutException()); _requests.remove(e.getKey()); return; // We're done. } @@ -254,45 +255,35 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } } - public void + synchronized public void asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync) { - OutgoingAsyncMessageCallback out = null; - boolean sent = false; - synchronized(this) + Integer requestId = _sendAsyncRequests.get(outAsync); + if(requestId != null) { - Integer requestId = _sendAsyncRequests.get(outAsync); - if(requestId != null) + if(requestId > 0) { - if(requestId > 0) - { - _asyncRequests.remove(requestId); - } - out = outAsync; - sent = false; - _sendAsyncRequests.remove(outAsync); + _asyncRequests.remove(requestId); } - else if(outAsync instanceof OutgoingAsync) + _sendAsyncRequests.remove(outAsync); + outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null); + return; // We're done + } + + if(outAsync instanceof OutgoingAsync) + { + OutgoingAsync o = (OutgoingAsync)outAsync; + assert(o != null); + for(java.util.Map.Entry<Integer, OutgoingAsync> e : _asyncRequests.entrySet()) { - OutgoingAsync o = (OutgoingAsync)outAsync; - assert(o != null); - for(java.util.Map.Entry<Integer, OutgoingAsync> e : _asyncRequests.entrySet()) + if(e.getValue() == o) { - if(e.getValue() == o) - { - out = o; - sent = true; - _asyncRequests.remove(e.getKey()); - break; - } + _asyncRequests.remove(e.getKey()); + outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null); + return; // We're done } } } - - if(out != null) - { - out.__finished(new Ice.InvocationTimeoutException(), sent); - } } public void @@ -319,9 +310,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler if(_reference.getInvocationTimeout() > 0) { - _adapter.getThreadPool().execute(new InvokeAll(out, out.os(), requestId, 1, false)); + // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise. + _adapter.getThreadPool().dispatch(new InvokeAll(out, out.os(), requestId, 1, false)); } - else + else if(_dispatcher) + { + _adapter.getThreadPool().dispatchFromThisThread(new InvokeAll(out, out.os(), requestId, 1, false)); + } + else // Optimization: directly call invokeAll if there's no dispatcher. { out.sent(); invokeAll(out.os(), requestId, 1, false); @@ -350,7 +346,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler outAsync.__attachCollocatedObserver(_adapter, requestId); - _adapter.getThreadPool().execute(new InvokeAllAsync(outAsync, outAsync.__getOs(), requestId, 1, false)); + _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.__getOs(), requestId, 1, false)); return AsyncStatus.Queued; } @@ -401,9 +397,13 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { if(_reference.getInvocationTimeout() > 0) { - _adapter.getThreadPool().execute(new InvokeAll(out, out.os(), 0, invokeNum, true)); + _adapter.getThreadPool().dispatch(new InvokeAll(out, out.os(), 0, invokeNum, true)); } - else + else if(_dispatcher) + { + _adapter.getThreadPool().dispatchFromThisThread(new InvokeAll(out, out.os(), 0, invokeNum, true)); + } + else // Optimization: directly call invokeAll if there's no dispatcher. { out.sent(); invokeAll(out.os(), 0, invokeNum, true); @@ -458,7 +458,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler if(invokeNum > 0) { - _adapter.getThreadPool().execute(new InvokeAllAsync(outAsync, outAsync.__getOs(), 0, invokeNum, true)); + _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.__getOs(), 0, invokeNum, true)); return AsyncStatus.Queued; } else if(outAsync.__sent()) @@ -527,7 +527,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler Outgoing out = _requests.remove(requestId); if(out != null) { - out.finished(ex, true); + out.finished(ex); } else { @@ -536,7 +536,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } if(outAsync != null) { - outAsync.__finished(ex, true); + outAsync.__finished(ex); } } _adapter.decDirectCount(); @@ -565,13 +565,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { return false; // The request timed-out. } - out.sent(); } } - else - { - out.sent(); - } + out.sent(); return true; } @@ -632,7 +628,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } catch(Ice.ObjectAdapterDeactivatedException ex) { - handleException(requestId, ex, false); + handleException(requestId, ex); return; } @@ -644,7 +640,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } catch(Ice.SystemException ex) { - handleException(requestId, ex, true); + handleException(requestId, ex); _adapter.decDirectCount(); } --invokeNum; @@ -679,7 +675,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } void - handleException(int requestId, Ice.Exception ex, boolean sent) + handleException(int requestId, Ice.Exception ex) { if(requestId == 0) { @@ -692,7 +688,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler Outgoing out = _requests.get(requestId); if(out != null) { - out.finished(ex, sent); + out.finished(ex); _requests.remove(requestId); } else @@ -707,11 +703,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler if(outAsync != null) { - outAsync.__finished(ex, sent); + outAsync.__finished(ex); } } private final Reference _reference; + private final boolean _dispatcher; private final boolean _response; private final Ice.ObjectAdapterI _adapter; private final Ice.Logger _logger; diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index 0505688cdda..6c08f7dc261 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -203,7 +203,7 @@ public class ConnectRequestHandler Request request = it.next(); if(request.out == out) { - out.finished(new Ice.InvocationTimeoutException(), false); + out.finished(new Ice.InvocationTimeoutException()); it.remove(); return; } @@ -217,7 +217,6 @@ public class ConnectRequestHandler public void asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync) { - boolean timedOut = false; synchronized(this) { if(_exception != null) @@ -234,17 +233,13 @@ public class ConnectRequestHandler if(request.outAsync == outAsync) { it.remove(); - timedOut = true; - break; + outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null); + return; // We're done } } + assert(false); // The request has to be queued if it timed out and we're not initialized yet. } } - if(timedOut) - { - outAsync.__finished(new Ice.InvocationTimeoutException(), false); - return; - } _connection.asyncRequestTimedOut(outAsync); } @@ -333,7 +328,7 @@ public class ConnectRequestHandler // if(!_requests.isEmpty()) { - _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_connection) + _reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection) { public void run() @@ -487,7 +482,7 @@ public class ConnectRequestHandler { assert(_exception == null && !_requests.isEmpty()); _exception = ex.get(); - _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_connection) + _reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection) { public void run() @@ -503,7 +498,7 @@ public class ConnectRequestHandler { assert(_exception == null && !_requests.isEmpty()); _exception = ex; - _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_connection) + _reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection) { public void run() @@ -516,7 +511,7 @@ public class ConnectRequestHandler if(!sentCallbacks.isEmpty()) { - _reference.getInstance().clientThreadPool().execute( + _reference.getInstance().clientThreadPool().dispatch( new DispatchWorkItem(_connection) { public void @@ -564,11 +559,11 @@ public class ConnectRequestHandler { if(request.out != null) { - request.out.finished(_exception, false); + request.out.finished(_exception); } else if(request.outAsync != null) { - request.outAsync.__finished(_exception, false); + request.outAsync.__finished(_exception); } } _requests.clear(); diff --git a/java/src/IceInternal/DispatchWorkItem.java b/java/src/IceInternal/DispatchWorkItem.java index a7c5ef7fa57..94fea26ada5 100644 --- a/java/src/IceInternal/DispatchWorkItem.java +++ b/java/src/IceInternal/DispatchWorkItem.java @@ -26,33 +26,17 @@ abstract public class DispatchWorkItem implements ThreadPoolWorkItem, Runnable _connection = connection; } - final public void execute(ThreadPoolCurrent current) + final public void + execute(ThreadPoolCurrent current) { - Instance instance = current.stream.instance(); - Ice.Dispatcher dispatcher = instance.initializationData().dispatcher; - if(dispatcher != null) - { - try - { - dispatcher.dispatch(this, _connection); - } - catch(java.lang.Exception ex) - { - if(instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - instance.initializationData().logger.warning("dispatch exception:\n" + sw.toString()); - } - } - } - else - { - current.ioCompleted(); // Promote a follower. - this.run(); - } + current.ioCompleted(); // Promote a follower + current.dispatchFromThisThread(this); + } + + public Ice.Connection + getConnection() + { + return _connection; } private Ice.Connection _connection; diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java index 3a14e835110..69d7b3d6cad 100644 --- a/java/src/IceInternal/Outgoing.java +++ b/java/src/IceInternal/Outgoing.java @@ -415,9 +415,24 @@ public final class Outgoing implements OutgoingMessageCallback } public synchronized void - finished(Ice.Exception ex, boolean sent) + finished(Ice.Exception ex) { - assert(_state <= StateInProgress); + //assert(_state <= StateInProgress); + if(_state > StateInProgress) + { + // + // Response was already received but message + // didn't get removed first from the connection + // send message queue so it's possible we can be + // notified of failures. In this case, ignore the + // failure and assume the outgoing has been sent. + // + assert(_state != StateFailed); + _sent = true; + notify(); + return; + } + if(_childObserver != null) { _childObserver.failed(ex.ice_name()); @@ -426,7 +441,6 @@ public final class Outgoing implements OutgoingMessageCallback } _state = StateFailed; _exception = ex; - _sent = sent; notify(); } diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 38e2eea29d7..e5e143a7e71 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -23,6 +23,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa { _handler = null; _cnt = 0; + _sent = false; _mode = mode; _sentSynchronously = false; @@ -120,6 +121,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa { boolean alreadySent = (_state & Sent) != 0; _state |= Sent; + _sent = true; assert((_state & Done) == 0); @@ -150,7 +152,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } public void - __finished(Ice.Exception exc, boolean sent) + __finished(Ice.Exception exc) { synchronized(_monitor) { @@ -174,7 +176,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa // try { - if(!handleException(exc, sent)) + if(!handleException(exc)) { return; // Can't be retried immediately. } @@ -187,6 +189,20 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } } + public void + __dispatchInvocationTimeout(ThreadPool threadPool, Ice.Connection connection) + { + threadPool.dispatch( + new DispatchWorkItem(connection) + { + public void + run() + { + OutgoingAsync.this.__finished(new Ice.InvocationTimeoutException()); + } + }); + } + public final void __finished(BasicStream is) { @@ -350,7 +366,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } catch(Ice.LocalException ex) { - __finished(ex, true); + __finished(ex); return; } @@ -365,6 +381,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa { try { + _sent = false; _handler = _proxy.__getRequestHandler(true); int status = _handler.sendAsyncRequest(this); if((status & AsyncStatus.Sent) > 0) @@ -409,7 +426,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } catch(Ice.Exception ex) { - if(!handleException(ex, false)) // This will throw if the invocation can't be retried. + if(!handleException(ex)) // This will throw if the invocation can't be retried. { break; // Can't be retried immediately. } @@ -463,12 +480,12 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } private boolean - handleException(Ice.Exception exc, boolean sent) + handleException(Ice.Exception exc) { try { Ice.IntHolder interval = new Ice.IntHolder(); - _cnt = _proxy.__handleException(exc, _handler, _mode, sent, interval, _cnt); + _cnt = _proxy.__handleException(exc, _handler, _mode, _sent, interval, _cnt); if(_observer != null) { _observer.retried(); // Invocation is being retried. @@ -499,6 +516,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa private Ice.EncodingVersion _encoding; private int _cnt; private Ice.OperationMode _mode; + private boolean _sent; private static final java.util.Map<String, String> _emptyContext = new java.util.HashMap<String, String>(); } diff --git a/java/src/IceInternal/OutgoingAsyncMessageCallback.java b/java/src/IceInternal/OutgoingAsyncMessageCallback.java index 8d830987428..4b6b73a8b06 100644 --- a/java/src/IceInternal/OutgoingAsyncMessageCallback.java +++ b/java/src/IceInternal/OutgoingAsyncMessageCallback.java @@ -30,17 +30,20 @@ public interface OutgoingAsyncMessageCallback // connection will call the __invokeSent() method bellow (which in turn should call the // sent callback). // - public abstract boolean __sent(); + boolean __sent(); // // Called by the connection to call the user sent callback. // - public abstract void __invokeSent(); + void __invokeSent(); // - // Called by the connection when the request failed. The boolean indicates whether or - // not the message was possibly sent (this is useful for retry to figure out whether - // or not the request can't be retried without breaking at-most-once semantics.) + // Called by the connection when the request failed. // - public abstract void __finished(Ice.Exception ex, boolean sent); + void __finished(Ice.Exception ex); + + // + // Helper to dispatch invocation timeout. + // + void __dispatchInvocationTimeout(ThreadPool threadPool, Ice.Connection connection); } diff --git a/java/src/IceInternal/OutgoingMessageCallback.java b/java/src/IceInternal/OutgoingMessageCallback.java index 18b18a09575..1a8426b2e19 100644 --- a/java/src/IceInternal/OutgoingMessageCallback.java +++ b/java/src/IceInternal/OutgoingMessageCallback.java @@ -18,5 +18,5 @@ public interface OutgoingMessageCallback void sent(); - void finished(Ice.Exception ex, boolean sent); + void finished(Ice.Exception ex); } diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 8ab584e9623..8d350b1e19c 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -82,6 +82,7 @@ public final class ThreadPool ThreadPool(Instance instance, String prefix, int timeout) { _instance = instance; + _dispatcher = instance.initializationData().dispatcher; _destroyed = false; _prefix = prefix; _selector = new Selector(instance); @@ -278,7 +279,6 @@ public final class ThreadPool { return; } - _selector.update(handler, remove, add); if((add & SocketOperation.Read) != 0 && handler._hasMoreData.value && @@ -311,7 +311,34 @@ public final class ThreadPool } public void - execute(ThreadPoolWorkItem workItem) + dispatchFromThisThread(DispatchWorkItem workItem) + { + if(_dispatcher != null) + { + try + { + _dispatcher.dispatch(workItem, workItem.getConnection()); + } + catch(java.lang.Exception ex) + { + if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + _instance.initializationData().logger.warning("dispatch exception:\n" + sw.toString()); + } + } + } + else + { + workItem.run(); + } + } + + public void + dispatch(DispatchWorkItem workItem) { _workQueue.queue(workItem); } @@ -698,6 +725,7 @@ public final class ThreadPool } private final Instance _instance; + private final Ice.Dispatcher _dispatcher; private final ThreadPoolWorkQueue _workQueue; private boolean _destroyed; private final String _prefix; diff --git a/java/src/IceInternal/ThreadPoolCurrent.java b/java/src/IceInternal/ThreadPoolCurrent.java index e278fcb917b..7b81127016c 100644 --- a/java/src/IceInternal/ThreadPoolCurrent.java +++ b/java/src/IceInternal/ThreadPoolCurrent.java @@ -31,6 +31,12 @@ public final class ThreadPoolCurrent _threadPool.ioCompleted(this); } + public void + dispatchFromThisThread(DispatchWorkItem workItem) + { + _threadPool.dispatchFromThisThread(workItem); + } + final ThreadPool _threadPool; final ThreadPool.EventHandlerThread _thread; EventHandler _handler; |