diff options
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r-- | java/src/Ice/ConnectionI.java | 96 |
1 files changed, 52 insertions, 44 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 73880042340..9ee6ac10121 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -9,7 +9,8 @@ package Ice; -public final class ConnectionI extends IceInternal.EventHandler implements Connection, IceInternal.ResponseHandler +public final class ConnectionI extends IceInternal.EventHandler + implements Connection, IceInternal.ResponseHandler, IceInternal.CancellationHandler { public interface StartCallback { @@ -373,8 +374,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne os.writeInt(requestId); } - out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, os.size() - - IceInternal.Protocol.headerSize - 4); + out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId); int status; try @@ -388,6 +388,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne throw (Ice.LocalException) _exception.fillInStackTrace(); } + if(response || (status & IceInternal.AsyncStatus.Queued) > 0) + { + out.cancelable(this); // Notify the request that it's cancelable + } + if(response) { // @@ -640,29 +645,21 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private Ice.AsyncResult begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb) { - IceInternal.ConnectionBatchOutgoingAsync result = new IceInternal.ConnectionBatchOutgoingAsync(this, - _communicator, _instance, __flushBatchRequests_name, cb); - try - { - result.__invoke(); - } - catch(LocalException __ex) - { - result.invokeExceptionAsync(__ex); - } - + IceInternal.ConnectionFlushBatch result = + new IceInternal.ConnectionFlushBatch(this, _communicator, _instance, __flushBatchRequests_name, cb); + result.invoke(); return result; } @Override public void end_flushBatchRequests(AsyncResult ir) { - IceInternal.OutgoingAsyncBase r = (IceInternal.OutgoingAsyncBase) ir; - IceInternal.OutgoingAsyncBase.check(r, this, __flushBatchRequests_name); + IceInternal.ConnectionFlushBatch r = + IceInternal.ConnectionFlushBatch.check(ir, this, __flushBatchRequests_name); r.__wait(); } - synchronized public int flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync) + synchronized public int flushAsyncBatchRequests(IceInternal.OutgoingAsyncBase outAsync) { waitBatchStreamInUse(); @@ -687,11 +684,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _batchStream.pos(IceInternal.Protocol.headerSize); _batchStream.writeInt(_batchRequestNum); - outAsync.attachRemoteObserver(initConnectionInfo(), _endpoint, 0, _batchStream.size() - - IceInternal.Protocol.headerSize - 4); - _batchStream.swap(outAsync.getOs()); + outAsync.attachRemoteObserver(initConnectionInfo(), _endpoint, 0); + // // Send the batch stream. // @@ -708,6 +704,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne throw (Ice.LocalException) _exception.fillInStackTrace(); } + if((status & IceInternal.AsyncStatus.Queued) > 0) + { + outAsync.cancelable(this); // Notify the request that it's cancelable. + } + // // Reset the batch stream. // @@ -728,12 +729,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(callback != null) { - class CallbackWorkItem extends IceInternal.DispatchWorkItem + _threadPool.dispatch(new IceInternal.DispatchWorkItem(this) { - public CallbackWorkItem() - { - } - @Override public void run() { @@ -746,8 +743,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _logger.error("connection callback exception:\n" + ex + '\n' + _desc); } } - }; - _threadPool.dispatch(new CallbackWorkItem()); + }); } } else @@ -793,9 +789,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff); } - synchronized public boolean asyncRequestCanceled(IceInternal.OutgoingAsyncMessageCallback outAsync, - Ice.LocalException ex) + @Override + synchronized public void asyncRequestCanceled(IceInternal.OutgoingAsyncBase outAsync, Ice.LocalException ex) { + if(_state >= StateClosed) + { + return; // The request has already been or will be shortly notified of the failure. + } + java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator(); while(it.hasNext()) { @@ -815,13 +816,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // it's fine if the OutgoingAsync output stream is released (and // as long as canceled requests cannot be retried). // - o.timedOut(); + o.canceled(); if(o != _sendStreams.getFirst()) { it.remove(); } - outAsync.dispatchInvocationCancel(ex, _threadPool, this); - return true; // We're done + if(outAsync.completed(ex)) + { + outAsync.invokeCompletedAsync(); + } + return; } } @@ -834,12 +838,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(it2.next() == o) { it2.remove(); - outAsync.dispatchInvocationCancel(ex, _threadPool, this); - return true; // We're done. + if(outAsync.completed(ex)) + { + outAsync.invokeCompletedAsync(); + } } } } - return false; } @Override @@ -1469,7 +1474,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne for(OutgoingMessage p : _sendStreams) { - p.finished(_exception); + p.completed(_exception); if(p.requestId > 0) // Make sure finished isn't called twice. { _asyncRequests.remove(p.requestId); @@ -1480,7 +1485,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne for(IceInternal.OutgoingAsync p : _asyncRequests.values()) { - p.finished(_exception); + if(p.completed(_exception)) + { + p.invokeCompleted(); + } } _asyncRequests.clear(); @@ -2580,7 +2588,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels); info.requestId = info.stream.readInt(); IceInternal.OutgoingAsync outAsync = _asyncRequests.remove(info.requestId); - if(outAsync != null && outAsync.finished(info.stream)) + if(outAsync != null && outAsync.completed(info.stream)) { info.outAsync = outAsync; ++info.messageDispatchCount; @@ -2999,7 +3007,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne this.requestId = 0; } - OutgoingMessage(IceInternal.OutgoingAsyncMessageCallback out, IceInternal.BasicStream stream, boolean compress, + OutgoingMessage(IceInternal.OutgoingAsyncBase out, IceInternal.BasicStream stream, boolean compress, int requestId) { this.stream = stream; @@ -3008,7 +3016,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne this.requestId = requestId; } - public void timedOut() + public void canceled() { assert (outAsync != null); outAsync = null; @@ -3035,16 +3043,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return false; } - public void finished(Ice.LocalException ex) + public void completed(Ice.LocalException ex) { - if(outAsync != null) + if(outAsync != null && outAsync.completed(ex)) { - outAsync.finished(ex); + outAsync.invokeCompleted(); } } public IceInternal.BasicStream stream; - public IceInternal.OutgoingAsyncMessageCallback outAsync; + public IceInternal.OutgoingAsyncBase outAsync; public boolean compress; public int requestId; boolean adopt; |