summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/OutgoingAsync.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/OutgoingAsync.java')
-rw-r--r--java/src/IceInternal/OutgoingAsync.java378
1 files changed, 236 insertions, 142 deletions
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index dbb6ac37ab9..0618ffb93e9 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -9,23 +9,33 @@
package IceInternal;
-public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessageCallback, Runnable
+public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageCallback
{
public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb)
{
- super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase)prx).__reference().getInstance(), operation, cb);
- _proxy = (Ice.ObjectPrxHelperBase)prx;
+ super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb);
+ _proxy = (Ice.ObjectPrxHelperBase) prx;
_encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding());
}
-
- public void __prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx,
- boolean explicitCtx)
+
+ public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb, IceInternal.BasicStream is,
+ IceInternal.BasicStream os)
+ {
+ super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb,
+ is, os);
+ _proxy = (Ice.ObjectPrxHelperBase) prx;
+ _encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding());
+ }
+
+ public void prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx,
+ boolean explicitCtx, boolean synchronous)
{
_handler = null;
_cnt = 0;
_sent = false;
_mode = mode;
_sentSynchronously = false;
+ _synchronous = synchronous;
Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_proxy.__reference().getProtocol()));
@@ -36,15 +46,47 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
_observer = ObserverHelper.get(_proxy, operation, ctx);
- //
- // Can't call async via a batch proxy.
- //
- if(_proxy.ice_isBatchOneway() || _proxy.ice_isBatchDatagram())
+ switch(_proxy.__reference().getMode())
{
- throw new Ice.FeatureNotSupportedException("can't send batch requests with AMI");
- }
+ case Reference.ModeTwoway:
+ case Reference.ModeOneway:
+ case Reference.ModeDatagram:
+ {
+ _os.writeBlob(IceInternal.Protocol.requestHdr);
+ break;
+ }
- _os.writeBlob(IceInternal.Protocol.requestHdr);
+ case Reference.ModeBatchOneway:
+ case Reference.ModeBatchDatagram:
+ {
+ while(true)
+ {
+ try
+ {
+ _handler = _proxy.__getRequestHandler();
+ _handler.prepareBatchRequest(_os);
+ break;
+ }
+ catch(RetryException ex)
+ {
+ // Clear request handler and retry.
+ _proxy.__setRequestHandler(_handler, null);
+ }
+ catch(Ice.LocalException ex)
+ {
+ if(_observer != null)
+ {
+ _observer.failed(ex.ice_name());
+ }
+ // Clear request handler
+ _proxy.__setRequestHandler(_handler, null);
+ _handler = null;
+ throw ex;
+ }
+ }
+ break;
+ }
+ }
Reference ref = _proxy.__reference();
@@ -66,7 +108,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
_os.writeString(operation);
- _os.writeByte((byte)mode.value());
+ _os.writeByte((byte) mode.value());
if(ctx != null)
{
@@ -94,39 +136,44 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
}
}
- @Override public Ice.ObjectPrx
- getProxy()
+ @Override
+ public Ice.ObjectPrx getProxy()
{
return _proxy;
}
@Override
- public int
- __send(Ice.ConnectionI connection, boolean compress, boolean response)
- throws RetryException
+ public int send(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException
{
_cachedConnection = connection;
return connection.sendAsyncRequest(this, compress, response);
}
@Override
- public int
- __invokeCollocated(CollocatedRequestHandler handler)
+ public int invokeCollocated(CollocatedRequestHandler handler)
{
- return handler.invokeAsyncRequest(this);
+ // The BasicStream cannot be cached if background io is enabled,
+ // the proxy is not a twoway or there is an invocation timeout set.
+ if(_proxy.__reference().getInstance().queueRequests() || !_proxy.ice_isTwoway() ||
+ _proxy.__reference().getInvocationTimeout() > 0)
+ {
+ // Disable caching by marking the streams as cached!
+ _state |= StateCachedBuffers;
+ }
+ handler.invokeAsyncRequest(this, _synchronous);
+ return AsyncStatus.Queued;
}
@Override
- public boolean
- __sent()
+ public boolean sent()
{
synchronized(_monitor)
{
- boolean alreadySent = (_state & Sent) != 0;
- _state |= Sent;
+ boolean alreadySent = (_state & StateSent) != 0;
+ _state |= StateSent;
_sent = true;
- assert((_state & Done) == 0);
+ assert ((_state & StateDone) == 0);
if(!_proxy.ice_isTwoway())
{
@@ -135,34 +182,40 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
_childObserver.detach();
_childObserver = null;
}
+ if(_observer != null && (_callback == null || !_callback.__hasSentCallback()))
+ {
+ _observer.detach();
+ _observer = null;
+ }
if(_timeoutRequestHandler != null)
{
_future.cancel(false);
_future = null;
_timeoutRequestHandler = null;
}
- _state |= Done | OK;
- //_os.resize(0, false); // Don't clear the buffer now, it's needed for the collocation optimization
+ _state |= StateDone | StateOK;
+ // _os.resize(0, false); // Don't clear the buffer now, it's
+ // needed for the collocation optimization
}
_monitor.notifyAll();
- return !alreadySent; // Don't call the sent call is already sent.
+
+ // Don't call the sent call is already sent.
+ return !alreadySent && _callback != null && _callback.__hasSentCallback();
}
}
@Override
- public void
- __invokeSent()
+ public void invokeSent()
{
- __invokeSentInternal();
+ invokeSentInternal();
}
@Override
- public void
- __finished(Ice.Exception exc)
+ public void finished(Ice.Exception exc)
{
synchronized(_monitor)
{
- assert((_state & Done) == 0);
+ assert ((_state & StateDone) == 0);
if(_childObserver != null)
{
_childObserver.failed(exc.ice_name());
@@ -178,8 +231,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
}
//
- // NOTE: at this point, synchronization isn't needed, no other threads should be
- // calling on the callback.
+ // NOTE: at this point, synchronization isn't needed, no other threads
+ // should be calling on the callback.
//
try
{
@@ -188,41 +241,37 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
return; // Can't be retried immediately.
}
- __invoke(false); // Retry the invocation
+ invoke(false); // Retry the invocation
}
catch(Ice.Exception ex)
{
- __invokeException(ex);
+ invokeException(ex);
}
}
@Override
- public void
- __dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
+ public void dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
{
- threadPool.dispatch(
- new DispatchWorkItem(connection)
+ threadPool.dispatch(new DispatchWorkItem(connection)
+ {
+ @Override
+ public void run()
{
- @Override
- public void
- run()
- {
- OutgoingAsync.this.__finished(ex);
- }
- });
+ OutgoingAsync.this.finished(ex);
+ }
+ });
}
- public final void
- __finished(BasicStream is)
+ public final void finished(BasicStream is)
{
- assert(_proxy.ice_isTwoway()); // Can only be called for twoways.
+ assert (_proxy.ice_isTwoway()); // Can only be called for twoways.
byte replyStatus;
try
{
synchronized(_monitor)
{
- assert(_exception == null && (_state & Done) == 0);
+ assert (_exception == null && (_state & StateDone) == 0);
if(_childObserver != null)
{
_childObserver.reply(is.size() - Protocol.headerSize - 4);
@@ -237,7 +286,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
_timeoutRequestHandler = null;
}
- if(_is == null) // _is can already be initialized if the invocation is retried
+ // _is can already be initialized if the invocation is retried
+ if(_is == null)
{
_is = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding);
}
@@ -290,29 +340,29 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
Ice.RequestFailedException ex = null;
switch(replyStatus)
{
- case ReplyStatus.replyObjectNotExist:
- {
- ex = new Ice.ObjectNotExistException();
- break;
- }
+ case ReplyStatus.replyObjectNotExist:
+ {
+ ex = new Ice.ObjectNotExistException();
+ break;
+ }
- case ReplyStatus.replyFacetNotExist:
- {
- ex = new Ice.FacetNotExistException();
- break;
- }
+ case ReplyStatus.replyFacetNotExist:
+ {
+ ex = new Ice.FacetNotExistException();
+ break;
+ }
- case ReplyStatus.replyOperationNotExist:
- {
- ex = new Ice.OperationNotExistException();
- break;
- }
+ case ReplyStatus.replyOperationNotExist:
+ {
+ ex = new Ice.OperationNotExistException();
+ break;
+ }
- default:
- {
- assert(false);
- break;
- }
+ default:
+ {
+ assert (false);
+ break;
+ }
}
ex.id = id;
@@ -330,29 +380,29 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
Ice.UnknownException ex = null;
switch(replyStatus)
{
- case ReplyStatus.replyUnknownException:
- {
- ex = new Ice.UnknownException();
- break;
- }
+ case ReplyStatus.replyUnknownException:
+ {
+ ex = new Ice.UnknownException();
+ break;
+ }
- case ReplyStatus.replyUnknownLocalException:
- {
- ex = new Ice.UnknownLocalException();
- break;
- }
+ case ReplyStatus.replyUnknownLocalException:
+ {
+ ex = new Ice.UnknownLocalException();
+ break;
+ }
- case ReplyStatus.replyUnknownUserException:
- {
- ex = new Ice.UnknownUserException();
- break;
- }
+ case ReplyStatus.replyUnknownUserException:
+ {
+ ex = new Ice.UnknownUserException();
+ break;
+ }
- default:
- {
- assert(false);
- break;
- }
+ default:
+ {
+ assert (false);
+ break;
+ }
}
ex.unknown = unknown;
@@ -365,34 +415,48 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
}
}
- _state |= Done;
- _os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation
+ _state |= StateDone;
+ // Clear buffer now, instead of waiting for AsyncResult
+ // deallocation
+ // _os.resize(0, false);
if(replyStatus == ReplyStatus.replyOK)
{
- _state |= OK;
+ _state |= StateOK;
}
_monitor.notifyAll();
}
}
catch(Ice.LocalException ex)
{
- __finished(ex);
+ finished(ex);
return;
}
- assert(replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException);
- __invokeCompleted();
+ assert (replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException);
+ invokeCompleted();
}
- public final boolean
- __invoke(boolean synchronous)
+ public final boolean invoke(boolean synchronous)
{
+ int mode = _proxy.__reference().getMode();
+ if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
+ {
+ _state |= StateDone | StateOK;
+ _handler.finishBatchRequest(_os);
+ if(_observer != null)
+ {
+ _observer.detach();
+ _observer = null;
+ }
+ return true;
+ }
+
while(true)
{
- _handler = _proxy.__getRequestHandler();
try
{
_sent = false;
+ _handler = _proxy.__getRequestHandler();
int status = _handler.sendAsyncRequest(this);
if((status & AsyncStatus.Sent) > 0)
{
@@ -401,29 +465,36 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
_sentSynchronously = true;
if((status & AsyncStatus.InvokeSentCallback) > 0)
{
- __invokeSent(); // Call from the user thread.
+ invokeSent(); // Call from the user thread.
}
}
else
{
if((status & AsyncStatus.InvokeSentCallback) > 0)
{
- __invokeSentAsync(); // Call from a client thread pool thread.
+ // Call from a client thread pool thread.
+ invokeSentAsync();
}
}
}
- if(_proxy.ice_isTwoway() || (status & AsyncStatus.Sent) == 0)
+ if(mode == IceInternal.Reference.ModeTwoway || (status & AsyncStatus.Sent) == 0)
{
synchronized(_monitor)
{
- if((_state & Done) == 0)
+ if((_state & StateDone) == 0)
{
int invocationTimeout = _handler.getReference().getInvocationTimeout();
if(invocationTimeout > 0)
{
- _future = _instance.timer().schedule(this, invocationTimeout,
- java.util.concurrent.TimeUnit.MILLISECONDS);
+ _future = _instance.timer().schedule(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ timeout();
+ }
+ }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS);
_timeoutRequestHandler = _handler;
}
}
@@ -431,22 +502,15 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
}
break;
}
- catch(Ice.OperationInterruptedException ex)
- {
- //
- // Clear the request handler, and cancel the outgoing request.
- //
- _proxy.__setRequestHandler(_handler, null);
- _handler.asyncRequestCanceled(this, ex);
- break;
- }
catch(RetryException ex)
{
- _proxy.__setRequestHandler(_handler, null); // Clear request handler and retry.
+ // Clear request handler and retry.
+ _proxy.__setRequestHandler(_handler, null);
}
catch(Ice.Exception ex)
{
- if(!handleException(ex)) // This will throw if the invocation can't be retried.
+ // This will throw if the invocation can't be retried.
+ if(!handleException(ex))
{
break; // Can't be retried immediately.
}
@@ -455,27 +519,23 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
return _sentSynchronously;
}
- public BasicStream
- __startWriteParams(Ice.FormatType format)
+ public BasicStream startWriteParams(Ice.FormatType format)
{
_os.startWriteEncaps(_encoding, format);
return _os;
}
- public void
- __endWriteParams()
+ public void endWriteParams()
{
_os.endWriteEncaps();
}
- public void
- __writeEmptyParams()
+ public void writeEmptyParams()
{
_os.writeEmptyEncaps(_encoding);
}
- public void
- __writeParamEncaps(byte[] encaps)
+ public void writeParamEncaps(byte[] encaps)
{
if(encaps == null || encaps.length == 0)
{
@@ -486,22 +546,50 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
_os.writeEncaps(encaps);
}
}
-
- BasicStream
- __getIs()
+
+ public void cacheMessageBuffers()
{
- return _is;
- }
+ if(_proxy.__reference().getInstance().cacheMessageBuffers() > 0)
+ {
+ synchronized(_monitor)
+ {
+ if((_state & StateCachedBuffers) > 0) {
+ return;
+ }
+ _state |= StateCachedBuffers;
+ }
+ if(_is != null)
+ {
+ _is.reset();
+ }
+ _os.reset();
+
+ _proxy.cacheMessageBuffers(_is, _os);
+ }
+ }
+
@Override
- public void
- run()
+ public void invokeExceptionAsync(final Ice.Exception ex)
{
- __runTimerTask();
- }
+ if((_state & StateDone) == 0 && _handler != null)
+ {
+ //
+ // If we didn't finish a batch oneway or datagram request, we
+ // must notify the connection about that we give up ownership
+ // of the batch stream.
+ //
+ int mode = _proxy.__reference().getMode();
+ if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
+ {
+ _handler.abortBatchRequest();
+ }
+ }
- private boolean
- handleException(Ice.Exception exc)
+ super.invokeExceptionAsync(ex);
+ }
+
+ private boolean handleException(Ice.Exception exc)
{
try
{
@@ -514,7 +602,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
if(interval.value > 0)
{
_instance.retryQueue().add(this, interval.value);
- return false; // Don't retry immediately, the retry queue will take care of the retry.
+ return false; // Don't retry immediately, the retry queue will
+ // take care of the retry.
}
else
{
@@ -538,6 +627,11 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
private int _cnt;
private Ice.OperationMode _mode;
private boolean _sent;
+ //
+ // If true this AMI request is being used for a generated synchronous invocation.
+ //
+ private boolean _synchronous;
+
private static final java.util.Map<String, String> _emptyContext = new java.util.HashMap<String, String>();
}