summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-10-10 12:03:07 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-10-10 12:03:07 +0200
commit570455a381e6620f8ddfcca448559d3fa545ba38 (patch)
treefe3fa45e6a643b473d9370babff6224b1a9d4dcb /java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
parentFixed ICE-5726: provide deprecated public StringConverterPlugin (diff)
downloadice-570455a381e6620f8ddfcca448559d3fa545ba38.tar.bz2
ice-570455a381e6620f8ddfcca448559d3fa545ba38.tar.xz
ice-570455a381e6620f8ddfcca448559d3fa545ba38.zip
Fixed invocation timeouts/interrupt issues, addded AsyncResult.cancel()
Diffstat (limited to 'java/src/IceInternal/CommunicatorBatchOutgoingAsync.java')
-rw-r--r--java/src/IceInternal/CommunicatorBatchOutgoingAsync.java233
1 files changed, 0 insertions, 233 deletions
diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
deleted file mode 100644
index ab1854cbce1..00000000000
--- a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
+++ /dev/null
@@ -1,233 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-package IceInternal;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-
-import Ice.CommunicatorDestroyedException;
-
-public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
-{
- public CommunicatorBatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation,
- CallbackBase callback)
- {
- super(communicator, instance, operation, callback);
-
- //
- // _useCount is initialized to 1 to prevent premature callbacks.
- // The caller must invoke ready() after all flush requests have
- // been initiated.
- //
- _useCount = 1;
-
- //
- // Assume all connections are flushed synchronously.
- //
- _sentSynchronously = true;
-
- //
- // Attach observer
- //
- _observer = ObserverHelper.get(instance, operation);
- }
-
- public void flushConnection(final Ice.ConnectionI con)
- {
- class BatchOutgoingAsyncI extends BatchOutgoingAsync
- {
- public
- BatchOutgoingAsyncI()
- {
- super(CommunicatorBatchOutgoingAsync.this._communicator,
- CommunicatorBatchOutgoingAsync.this._instance,
- CommunicatorBatchOutgoingAsync.this._operation,
- null);
- }
-
- @Override
- public boolean
- sent()
- {
- if(_childObserver != null)
- {
- _childObserver.detach();
- _childObserver = null;
- }
- doCheck(false);
- return false;
- }
-
- // TODO: MJN: This is missing a test.
- @Override
- public void
- finished(Ice.Exception ex)
- {
- if(_childObserver != null)
- {
- _childObserver.failed(ex.ice_name());
- _childObserver.detach();
- _childObserver = null;
- }
- doCheck(false);
- }
-
- @Override
- public void
- attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size)
- {
- if(CommunicatorBatchOutgoingAsync.this._observer != null)
- {
- _childObserver = CommunicatorBatchOutgoingAsync.this._observer.getRemoteObserver(info, endpt,
- requestId, size);
- if(_childObserver != null)
- {
- _childObserver.attach();
- }
- }
- }
-
- @Override
- protected void cancelRequest()
- {
- }
- }
-
- synchronized(_monitor)
- {
- ++_useCount;
- }
-
- try
- {
- int status;
- if(_instance.queueRequests())
- {
- Future<Integer> future = _instance.getQueueExecutor().submit(new Callable<Integer>()
- {
- @Override
- public Integer call() throws RetryException
- {
- return con.flushAsyncBatchRequests(new BatchOutgoingAsyncI());
- }
- });
-
- boolean interrupted = false;
- while(true)
- {
- try
- {
- status = future.get();
- if(interrupted)
- {
- Thread.currentThread().interrupt();
- }
- break;
- }
- catch(InterruptedException ex)
- {
- interrupted = true;
- }
- catch(RejectedExecutionException e)
- {
- throw new CommunicatorDestroyedException();
- }
- catch(ExecutionException e)
- {
- try
- {
- throw e.getCause();
- }
- catch(RuntimeException ex)
- {
- throw ex;
- }
- catch(Throwable ex)
- {
- assert(false);
- }
- }
- }
- }
- else
- {
- status = con.flushAsyncBatchRequests(new BatchOutgoingAsyncI());
- }
- if((status & AsyncStatus.Sent) > 0)
- {
- _sentSynchronously = false;
- }
- }
- catch(Ice.LocalException ex)
- {
- doCheck(false);
- throw ex;
- }
- }
-
- public void ready()
- {
- doCheck(true);
- }
-
- private void doCheck(boolean userThread)
- {
- synchronized(_monitor)
- {
- assert(_useCount > 0);
- if(--_useCount > 0)
- {
- return;
- }
- _state |= StateDone | StateOK | StateSent;
- _os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation
- _monitor.notifyAll();
- }
-
- if(_callback == null || !_callback.__hasSentCallback())
- {
- if(_observer != null)
- {
- _observer.detach();
- _observer = null;
- }
- }
- else
- {
- //
- // sentSynchronously_ is immutable here.
- //
- if(!_sentSynchronously || !userThread)
- {
- invokeSentAsync();
- }
- else
- {
- invokeSentInternal();
- }
- }
- }
-
- @Override
- public void
- processRetry()
- {
- assert(false); // Retries are never scheduled
- }
-
- @Override
- protected void cancelRequest()
- {
- }
-
- private int _useCount;
-}