diff options
Diffstat (limited to 'java/src/IceInternal')
-rw-r--r-- | java/src/IceInternal/ConnectRequestHandler.java | 2 | ||||
-rw-r--r-- | java/src/IceInternal/GetConnectionOutgoingAsync.java | 108 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 32 | ||||
-rw-r--r-- | java/src/IceInternal/QueueRequestHandler.java | 6 |
4 files changed, 128 insertions, 20 deletions
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index 25d9859b997..60881c35c6d 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -201,7 +201,7 @@ public class ConnectRequestHandler return _connection; } } - + @Override synchronized public ConnectionI waitForConnection() diff --git a/java/src/IceInternal/GetConnectionOutgoingAsync.java b/java/src/IceInternal/GetConnectionOutgoingAsync.java new file mode 100644 index 00000000000..95878b90709 --- /dev/null +++ b/java/src/IceInternal/GetConnectionOutgoingAsync.java @@ -0,0 +1,108 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public class GetConnectionOutgoingAsync extends IceInternal.OutgoingAsync +{ + public GetConnectionOutgoingAsync(Ice.ObjectPrxHelperBase prx, String operation, CallbackBase callback) + { + super(prx, operation, callback); + _observer = ObserverHelper.get(prx, operation); + } + + public void __invoke() + { + while(true) + { + try + { + _handler = _proxy.__getRequestHandler(); + _handler.sendAsyncRequest(this); + } + catch(RetryException ex) + { + _proxy.__setRequestHandler(_handler, null); + } + catch(Ice.Exception ex) + { + handleException(ex); + } + break; + } + } + + @Override + public int send(Ice.ConnectionI conection, boolean compress, boolean response) + throws RetryException + { + sent(); + return 0; + } + + @Override + public int invokeCollocated(CollocatedRequestHandler handler) + { + sent(); + return 0; + } + + @Override + public boolean sent() + { + synchronized(_monitor) + { + _state |= StateDone; + _monitor.notifyAll(); + } + invokeCompleted(); + return false; + } + + @Override + public void invokeSent() + { + // No sent callback + } + + @Override + public void finished(Ice.Exception exc) + { + try + { + handleException(exc); + } + catch(Ice.Exception ex) + { + invokeExceptionAsync(ex); + } + } + + private void handleException(Ice.Exception exc) + { + try + { + Ice.Holder<Integer> interval = new Ice.Holder<Integer>(); + _cnt = _proxy.__handleException(exc, _handler, Ice.OperationMode.Idempotent, false, interval, _cnt); + if(_observer != null) + { + _observer.retried(); // Invocation is being retried + } + _instance.retryQueue().add(this, interval.value); + } + catch(Ice.Exception ex) + { + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } + throw ex; + } + } +} diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 84bfc477768..992b9ca6265 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -17,7 +17,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC _proxy = (Ice.ObjectPrxHelperBase) prx; _encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding()); } - + public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb, IceInternal.BasicStream is, IceInternal.BasicStream os) { @@ -26,8 +26,8 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC _proxy = (Ice.ObjectPrxHelperBase) prx; _encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding()); } - - public void prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx, + + public void prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx, boolean explicitCtx, boolean synchronous) { _handler = null; @@ -200,7 +200,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC _monitor.notifyAll(); // Don't call the sent call is already sent. - return !alreadySent && _callback != null && _callback.__hasSentCallback(); + return !alreadySent && _callback != null && _callback.__hasSentCallback(); } } @@ -264,7 +264,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC // with the connection locked. Therefore, it must not invoke // any user callbacks. // - + assert (_proxy.ice_isTwoway()); // Can only be called for twoways. byte replyStatus; @@ -279,14 +279,14 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC _childObserver.detach(); _childObserver = null; } - + if(_timeoutRequestHandler != null) { _future.cancel(false); _future = null; _timeoutRequestHandler = null; } - + // _is can already be initialized if the invocation is retried if(_is == null) { @@ -294,7 +294,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC } _is.swap(is); replyStatus = _is.readByte(); - + switch(replyStatus) { case ReplyStatus.replyOK: @@ -422,7 +422,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC } _state |= StateDone; _monitor.notifyAll(); - + if(_callback == null) { if(_observer != null) @@ -579,7 +579,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC _os.writeEncaps(encaps); } } - + public void cacheMessageBuffers() { if(_proxy.__reference().getInstance().cacheMessageBuffers() > 0) @@ -597,11 +597,11 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC _is.reset(); } _os.reset(); - + _proxy.cacheMessageBuffers(_is, _os); } } - + @Override public void invokeExceptionAsync(final Ice.Exception ex) { @@ -621,7 +621,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC super.invokeExceptionAsync(ex); } - + private void handleException(Ice.Exception exc) { try @@ -632,7 +632,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC { _observer.retried(); // Invocation is being retried. } - + // // Schedule the retry. Note that we always schedule the retry // on the retry queue even if the invocation can be retried @@ -654,10 +654,10 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC } protected Ice.ObjectPrxHelperBase _proxy; + protected RequestHandler _handler; + protected int _cnt; - private RequestHandler _handler; private Ice.EncodingVersion _encoding; - private int _cnt; private Ice.OperationMode _mode; private boolean _sent; // diff --git a/java/src/IceInternal/QueueRequestHandler.java b/java/src/IceInternal/QueueRequestHandler.java index f9d90e3e72e..6709c2f7ab3 100644 --- a/java/src/IceInternal/QueueRequestHandler.java +++ b/java/src/IceInternal/QueueRequestHandler.java @@ -21,7 +21,7 @@ import Ice.ConnectionI; public class QueueRequestHandler implements RequestHandler { public - QueueRequestHandler(Instance instance, RequestHandler delegate) + QueueRequestHandler(Instance instance, RequestHandler delegate) { _executor = instance.getQueueExecutor(); assert(delegate != null); @@ -34,7 +34,7 @@ public class QueueRequestHandler implements RequestHandler { try { - Future<Void> future = _executor.submit(new Callable<Void>() + Future<Void> future = _executor.submit(new Callable<Void>() { @Override public Void call() throws RetryException @@ -81,7 +81,7 @@ public class QueueRequestHandler implements RequestHandler { try { - Future<Void> future = _executor.submit(new Callable<Void>() + Future<Void> future = _executor.submit(new Callable<Void>() { @Override public Void call() |