diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-10-10 12:03:07 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-10-10 12:03:07 +0200 |
commit | 570455a381e6620f8ddfcca448559d3fa545ba38 (patch) | |
tree | fe3fa45e6a643b473d9370babff6224b1a9d4dcb /java/src/IceInternal/CommunicatorBatchOutgoingAsync.java | |
parent | Fixed ICE-5726: provide deprecated public StringConverterPlugin (diff) | |
download | ice-570455a381e6620f8ddfcca448559d3fa545ba38.tar.bz2 ice-570455a381e6620f8ddfcca448559d3fa545ba38.tar.xz ice-570455a381e6620f8ddfcca448559d3fa545ba38.zip |
Fixed invocation timeouts/interrupt issues, addded AsyncResult.cancel()
Diffstat (limited to 'java/src/IceInternal/CommunicatorBatchOutgoingAsync.java')
-rw-r--r-- | java/src/IceInternal/CommunicatorBatchOutgoingAsync.java | 233 |
1 files changed, 0 insertions, 233 deletions
diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java deleted file mode 100644 index ab1854cbce1..00000000000 --- a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java +++ /dev/null @@ -1,233 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package IceInternal; - -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; - -import Ice.CommunicatorDestroyedException; - -public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase -{ - public CommunicatorBatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, - CallbackBase callback) - { - super(communicator, instance, operation, callback); - - // - // _useCount is initialized to 1 to prevent premature callbacks. - // The caller must invoke ready() after all flush requests have - // been initiated. - // - _useCount = 1; - - // - // Assume all connections are flushed synchronously. - // - _sentSynchronously = true; - - // - // Attach observer - // - _observer = ObserverHelper.get(instance, operation); - } - - public void flushConnection(final Ice.ConnectionI con) - { - class BatchOutgoingAsyncI extends BatchOutgoingAsync - { - public - BatchOutgoingAsyncI() - { - super(CommunicatorBatchOutgoingAsync.this._communicator, - CommunicatorBatchOutgoingAsync.this._instance, - CommunicatorBatchOutgoingAsync.this._operation, - null); - } - - @Override - public boolean - sent() - { - if(_childObserver != null) - { - _childObserver.detach(); - _childObserver = null; - } - doCheck(false); - return false; - } - - // TODO: MJN: This is missing a test. - @Override - public void - finished(Ice.Exception ex) - { - if(_childObserver != null) - { - _childObserver.failed(ex.ice_name()); - _childObserver.detach(); - _childObserver = null; - } - doCheck(false); - } - - @Override - public void - attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size) - { - if(CommunicatorBatchOutgoingAsync.this._observer != null) - { - _childObserver = CommunicatorBatchOutgoingAsync.this._observer.getRemoteObserver(info, endpt, - requestId, size); - if(_childObserver != null) - { - _childObserver.attach(); - } - } - } - - @Override - protected void cancelRequest() - { - } - } - - synchronized(_monitor) - { - ++_useCount; - } - - try - { - int status; - if(_instance.queueRequests()) - { - Future<Integer> future = _instance.getQueueExecutor().submit(new Callable<Integer>() - { - @Override - public Integer call() throws RetryException - { - return con.flushAsyncBatchRequests(new BatchOutgoingAsyncI()); - } - }); - - boolean interrupted = false; - while(true) - { - try - { - status = future.get(); - if(interrupted) - { - Thread.currentThread().interrupt(); - } - break; - } - catch(InterruptedException ex) - { - interrupted = true; - } - catch(RejectedExecutionException e) - { - throw new CommunicatorDestroyedException(); - } - catch(ExecutionException e) - { - try - { - throw e.getCause(); - } - catch(RuntimeException ex) - { - throw ex; - } - catch(Throwable ex) - { - assert(false); - } - } - } - } - else - { - status = con.flushAsyncBatchRequests(new BatchOutgoingAsyncI()); - } - if((status & AsyncStatus.Sent) > 0) - { - _sentSynchronously = false; - } - } - catch(Ice.LocalException ex) - { - doCheck(false); - throw ex; - } - } - - public void ready() - { - doCheck(true); - } - - private void doCheck(boolean userThread) - { - synchronized(_monitor) - { - assert(_useCount > 0); - if(--_useCount > 0) - { - return; - } - _state |= StateDone | StateOK | StateSent; - _os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation - _monitor.notifyAll(); - } - - if(_callback == null || !_callback.__hasSentCallback()) - { - if(_observer != null) - { - _observer.detach(); - _observer = null; - } - } - else - { - // - // sentSynchronously_ is immutable here. - // - if(!_sentSynchronously || !userThread) - { - invokeSentAsync(); - } - else - { - invokeSentInternal(); - } - } - } - - @Override - public void - processRetry() - { - assert(false); // Retries are never scheduled - } - - @Override - protected void cancelRequest() - { - } - - private int _useCount; -} |