diff options
Diffstat (limited to 'java/src/IceInternal/ConnectionFlushBatch.java')
-rw-r--r-- | java/src/IceInternal/ConnectionFlushBatch.java | 128 |
1 files changed, 128 insertions, 0 deletions
diff --git a/java/src/IceInternal/ConnectionFlushBatch.java b/java/src/IceInternal/ConnectionFlushBatch.java new file mode 100644 index 00000000000..4b3da0bcb5e --- /dev/null +++ b/java/src/IceInternal/ConnectionFlushBatch.java @@ -0,0 +1,128 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; + +import Ice.CommunicatorDestroyedException; + +public class ConnectionFlushBatch extends OutgoingAsyncBase +{ + public static ConnectionFlushBatch check(Ice.AsyncResult r, Ice.Connection con, String operation) + { + check(r, operation); + if(!(r instanceof ConnectionFlushBatch)) + { + throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method"); + } + if(r.getConnection() != con) + { + throw new IllegalArgumentException("Connection for call to end_" + operation + + " does not match connection that was used to call corresponding " + + "begin_" + operation + " method"); + } + return (ConnectionFlushBatch)r; + } + + public ConnectionFlushBatch(Ice.ConnectionI con, Ice.Communicator communicator, Instance instance, + String operation, CallbackBase callback) + { + super(communicator, instance, operation, callback); + _connection = con; + } + + @Override + public Ice.Connection getConnection() + { + return _connection; + } + + public void invoke() + { + try + { + int status; + if(_instance.queueRequests()) + { + Future<Integer> future = _instance.getQueueExecutor().submit( + new Callable<Integer>() + { + @Override + public Integer call() throws RetryException + { + return _connection.flushAsyncBatchRequests(ConnectionFlushBatch.this); + } + }); + + boolean interrupted = false; + while(true) + { + try + { + status = future.get(); + if(interrupted) + { + Thread.currentThread().interrupt(); + } + break; + } + catch(InterruptedException ex) + { + interrupted = true; + } + catch(RejectedExecutionException e) + { + throw new CommunicatorDestroyedException(); + } + catch(ExecutionException e) + { + try + { + throw e.getCause(); + } + catch(RuntimeException ex) + { + throw ex; + } + catch(Throwable ex) + { + assert(false); + } + } + } + } + else + { + status = _connection.flushAsyncBatchRequests(this); + } + + if((status & AsyncStatus.Sent) > 0) + { + _sentSynchronously = true; + if((status & AsyncStatus.InvokeSentCallback) > 0) + { + invokeSent(); + } + } + } + catch(Ice.Exception ex) + { + if(completed(ex)) + { + invokeCompletedAsync(); + } + } + } + + private Ice.ConnectionI _connection; +} |