summaryrefslogtreecommitdiff
path: root/java/src/IceInternal
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal')
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java2
-rw-r--r--java/src/IceInternal/GetConnectionOutgoingAsync.java108
-rw-r--r--java/src/IceInternal/OutgoingAsync.java32
-rw-r--r--java/src/IceInternal/QueueRequestHandler.java6
4 files changed, 128 insertions, 20 deletions
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index 25d9859b997..60881c35c6d 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -201,7 +201,7 @@ public class ConnectRequestHandler
return _connection;
}
}
-
+
@Override
synchronized public
ConnectionI waitForConnection()
diff --git a/java/src/IceInternal/GetConnectionOutgoingAsync.java b/java/src/IceInternal/GetConnectionOutgoingAsync.java
new file mode 100644
index 00000000000..95878b90709
--- /dev/null
+++ b/java/src/IceInternal/GetConnectionOutgoingAsync.java
@@ -0,0 +1,108 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+public class GetConnectionOutgoingAsync extends IceInternal.OutgoingAsync
+{
+ public GetConnectionOutgoingAsync(Ice.ObjectPrxHelperBase prx, String operation, CallbackBase callback)
+ {
+ super(prx, operation, callback);
+ _observer = ObserverHelper.get(prx, operation);
+ }
+
+ public void __invoke()
+ {
+ while(true)
+ {
+ try
+ {
+ _handler = _proxy.__getRequestHandler();
+ _handler.sendAsyncRequest(this);
+ }
+ catch(RetryException ex)
+ {
+ _proxy.__setRequestHandler(_handler, null);
+ }
+ catch(Ice.Exception ex)
+ {
+ handleException(ex);
+ }
+ break;
+ }
+ }
+
+ @Override
+ public int send(Ice.ConnectionI conection, boolean compress, boolean response)
+ throws RetryException
+ {
+ sent();
+ return 0;
+ }
+
+ @Override
+ public int invokeCollocated(CollocatedRequestHandler handler)
+ {
+ sent();
+ return 0;
+ }
+
+ @Override
+ public boolean sent()
+ {
+ synchronized(_monitor)
+ {
+ _state |= StateDone;
+ _monitor.notifyAll();
+ }
+ invokeCompleted();
+ return false;
+ }
+
+ @Override
+ public void invokeSent()
+ {
+ // No sent callback
+ }
+
+ @Override
+ public void finished(Ice.Exception exc)
+ {
+ try
+ {
+ handleException(exc);
+ }
+ catch(Ice.Exception ex)
+ {
+ invokeExceptionAsync(ex);
+ }
+ }
+
+ private void handleException(Ice.Exception exc)
+ {
+ try
+ {
+ Ice.Holder<Integer> interval = new Ice.Holder<Integer>();
+ _cnt = _proxy.__handleException(exc, _handler, Ice.OperationMode.Idempotent, false, interval, _cnt);
+ if(_observer != null)
+ {
+ _observer.retried(); // Invocation is being retried
+ }
+ _instance.retryQueue().add(this, interval.value);
+ }
+ catch(Ice.Exception ex)
+ {
+ if(_observer != null)
+ {
+ _observer.failed(ex.ice_name());
+ }
+ throw ex;
+ }
+ }
+}
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 84bfc477768..992b9ca6265 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -17,7 +17,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
_proxy = (Ice.ObjectPrxHelperBase) prx;
_encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding());
}
-
+
public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb, IceInternal.BasicStream is,
IceInternal.BasicStream os)
{
@@ -26,8 +26,8 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
_proxy = (Ice.ObjectPrxHelperBase) prx;
_encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding());
}
-
- public void prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx,
+
+ public void prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx,
boolean explicitCtx, boolean synchronous)
{
_handler = null;
@@ -200,7 +200,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
_monitor.notifyAll();
// Don't call the sent call is already sent.
- return !alreadySent && _callback != null && _callback.__hasSentCallback();
+ return !alreadySent && _callback != null && _callback.__hasSentCallback();
}
}
@@ -264,7 +264,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
// with the connection locked. Therefore, it must not invoke
// any user callbacks.
//
-
+
assert (_proxy.ice_isTwoway()); // Can only be called for twoways.
byte replyStatus;
@@ -279,14 +279,14 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
_childObserver.detach();
_childObserver = null;
}
-
+
if(_timeoutRequestHandler != null)
{
_future.cancel(false);
_future = null;
_timeoutRequestHandler = null;
}
-
+
// _is can already be initialized if the invocation is retried
if(_is == null)
{
@@ -294,7 +294,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
}
_is.swap(is);
replyStatus = _is.readByte();
-
+
switch(replyStatus)
{
case ReplyStatus.replyOK:
@@ -422,7 +422,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
}
_state |= StateDone;
_monitor.notifyAll();
-
+
if(_callback == null)
{
if(_observer != null)
@@ -579,7 +579,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
_os.writeEncaps(encaps);
}
}
-
+
public void cacheMessageBuffers()
{
if(_proxy.__reference().getInstance().cacheMessageBuffers() > 0)
@@ -597,11 +597,11 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
_is.reset();
}
_os.reset();
-
+
_proxy.cacheMessageBuffers(_is, _os);
}
}
-
+
@Override
public void invokeExceptionAsync(final Ice.Exception ex)
{
@@ -621,7 +621,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
super.invokeExceptionAsync(ex);
}
-
+
private void handleException(Ice.Exception exc)
{
try
@@ -632,7 +632,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
{
_observer.retried(); // Invocation is being retried.
}
-
+
//
// Schedule the retry. Note that we always schedule the retry
// on the retry queue even if the invocation can be retried
@@ -654,10 +654,10 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
}
protected Ice.ObjectPrxHelperBase _proxy;
+ protected RequestHandler _handler;
+ protected int _cnt;
- private RequestHandler _handler;
private Ice.EncodingVersion _encoding;
- private int _cnt;
private Ice.OperationMode _mode;
private boolean _sent;
//
diff --git a/java/src/IceInternal/QueueRequestHandler.java b/java/src/IceInternal/QueueRequestHandler.java
index f9d90e3e72e..6709c2f7ab3 100644
--- a/java/src/IceInternal/QueueRequestHandler.java
+++ b/java/src/IceInternal/QueueRequestHandler.java
@@ -21,7 +21,7 @@ import Ice.ConnectionI;
public class QueueRequestHandler implements RequestHandler
{
public
- QueueRequestHandler(Instance instance, RequestHandler delegate)
+ QueueRequestHandler(Instance instance, RequestHandler delegate)
{
_executor = instance.getQueueExecutor();
assert(delegate != null);
@@ -34,7 +34,7 @@ public class QueueRequestHandler implements RequestHandler
{
try
{
- Future<Void> future = _executor.submit(new Callable<Void>()
+ Future<Void> future = _executor.submit(new Callable<Void>()
{
@Override
public Void call() throws RetryException
@@ -81,7 +81,7 @@ public class QueueRequestHandler implements RequestHandler
{
try
{
- Future<Void> future = _executor.submit(new Callable<Void>()
+ Future<Void> future = _executor.submit(new Callable<Void>()
{
@Override
public Void call()