summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/CommunicatorFlushBatch.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/CommunicatorFlushBatch.java')
-rw-r--r--java/src/IceInternal/CommunicatorFlushBatch.java194
1 files changed, 194 insertions, 0 deletions
diff --git a/java/src/IceInternal/CommunicatorFlushBatch.java b/java/src/IceInternal/CommunicatorFlushBatch.java
new file mode 100644
index 00000000000..19e55ecc91c
--- /dev/null
+++ b/java/src/IceInternal/CommunicatorFlushBatch.java
@@ -0,0 +1,194 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+
+import Ice.CommunicatorDestroyedException;
+
+public class CommunicatorFlushBatch extends IceInternal.AsyncResultI
+{
+ public static CommunicatorFlushBatch check(Ice.AsyncResult r, Ice.Communicator com, String operation)
+ {
+ check(r, operation);
+ if(!(r instanceof CommunicatorFlushBatch))
+ {
+ throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method");
+ }
+ if(r.getCommunicator() != com)
+ {
+ throw new IllegalArgumentException("Communicator for call to end_" + operation +
+ " does not match communicator that was used to call corresponding " +
+ "begin_" + operation + " method");
+ }
+ return (CommunicatorFlushBatch)r;
+ }
+
+ public CommunicatorFlushBatch(Ice.Communicator communicator, Instance instance, String op, CallbackBase callback)
+ {
+ super(communicator, instance, op, callback);
+
+ _observer = ObserverHelper.get(instance, op);
+
+ //
+ // _useCount is initialized to 1 to prevent premature callbacks.
+ // The caller must invoke ready() after all flush requests have
+ // been initiated.
+ //
+ _useCount = 1;
+ }
+
+ public void flushConnection(final Ice.ConnectionI con)
+ {
+ class FlushBatch extends OutgoingAsyncBase
+ {
+ public FlushBatch()
+ {
+ super(CommunicatorFlushBatch.this.getCommunicator(),
+ CommunicatorFlushBatch.this._instance,
+ CommunicatorFlushBatch.this.getOperation(),
+ null);
+ }
+
+ @Override
+ public boolean sent()
+ {
+ if(_childObserver != null)
+ {
+ _childObserver.detach();
+ _childObserver = null;
+ }
+ doCheck(false);
+ return false;
+ }
+
+ // TODO: MJN: This is missing a test.
+ @Override
+ public boolean completed(Ice.Exception ex)
+ {
+ if(_childObserver != null)
+ {
+ _childObserver.failed(ex.ice_name());
+ _childObserver.detach();
+ _childObserver = null;
+ }
+ doCheck(false);
+ return false;
+ }
+
+ @Override
+ protected Ice.Instrumentation.InvocationObserver getObserver()
+ {
+ return CommunicatorFlushBatch.this._observer;
+ }
+ }
+
+ synchronized(this)
+ {
+ ++_useCount;
+ }
+
+ try
+ {
+ if(_instance.queueRequests())
+ {
+ Future<Integer> future = _instance.getQueueExecutor().submit(new Callable<Integer>()
+ {
+ @Override
+ public Integer call() throws RetryException
+ {
+ return con.flushAsyncBatchRequests(new FlushBatch());
+ }
+ });
+
+ boolean interrupted = false;
+ while(true)
+ {
+ try
+ {
+ future.get();
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ break;
+ }
+ catch(InterruptedException ex)
+ {
+ interrupted = true;
+ }
+ catch(RejectedExecutionException e)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ catch(ExecutionException e)
+ {
+ try
+ {
+ throw e.getCause();
+ }
+ catch(RuntimeException ex)
+ {
+ throw ex;
+ }
+ catch(Throwable ex)
+ {
+ assert(false);
+ }
+ }
+ }
+ }
+ else
+ {
+ con.flushAsyncBatchRequests(new FlushBatch());
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ doCheck(false);
+ throw ex;
+ }
+ }
+
+ public void ready()
+ {
+ doCheck(true);
+ }
+
+ private void doCheck(boolean userThread)
+ {
+ synchronized(this)
+ {
+ assert(_useCount > 0);
+ if(--_useCount > 0)
+ {
+ return;
+ }
+ }
+
+ if(sent(true))
+ {
+ if(userThread)
+ {
+ _sentSynchronously = true;
+ invokeSent();
+ }
+ else
+ {
+ invokeSentAsync();
+ }
+ }
+ }
+
+ private int _useCount;
+}