summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/OutgoingAsync.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/OutgoingAsync.java')
-rw-r--r--java/src/IceInternal/OutgoingAsync.java623
1 files changed, 197 insertions, 426 deletions
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index ef4d3d7c959..e42c817b42c 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -9,41 +9,47 @@
package IceInternal;
-public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMessageCallback
+public class OutgoingAsync extends ProxyOutgoingAsyncBase
{
+ public static OutgoingAsync check(Ice.AsyncResult r, Ice.ObjectPrx prx, String operation)
+ {
+ ProxyOutgoingAsyncBase.checkImpl(r, prx, operation);
+ try
+ {
+ return (OutgoingAsync)r;
+ }
+ catch(ClassCastException ex)
+ {
+ throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method");
+ }
+ }
+
public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb)
{
- super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb);
- _proxy = (Ice.ObjectPrxHelperBase) prx;
+ super((Ice.ObjectPrxHelperBase)prx, operation, cb);
_encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding());
+ _is = null;
}
- public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb, IceInternal.BasicStream is,
- IceInternal.BasicStream os)
+ public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb, BasicStream is, BasicStream os)
{
- super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb,
- is, os);
- _proxy = (Ice.ObjectPrxHelperBase) prx;
+ super((Ice.ObjectPrxHelperBase)prx, operation, cb, os);
_encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding());
+ _is = is;
}
public void prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx,
boolean explicitCtx, boolean synchronous)
{
- _handler = null;
- _cnt = 0;
- _sent = false;
+ Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_proxy.__reference().getProtocol()));
+
_mode = mode;
- _sentSynchronously = false;
_synchronous = synchronous;
- Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_proxy.__reference().getProtocol()));
-
if(explicitCtx && ctx == null)
{
ctx = _emptyContext;
}
-
_observer = ObserverHelper.get(_proxy, operation, ctx);
switch(_proxy.__reference().getMode())
@@ -137,12 +143,6 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes
}
@Override
- public Ice.ObjectPrx getProxy()
- {
- return _proxy;
- }
-
- @Override
public int send(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException
{
_cachedConnection = connection;
@@ -158,117 +158,52 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes
// Disable caching by marking the streams as cached!
_state |= StateCachedBuffers;
}
- handler.invokeAsyncRequest(this, _synchronous);
- return AsyncStatus.Queued;
+ return handler.invokeAsyncRequest(this, _synchronous);
}
@Override
- public boolean sent()
+ public void abort(Ice.Exception ex)
{
- synchronized(_monitor)
+ int mode = _proxy.__reference().getMode();
+ if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
{
- boolean alreadySent = (_state & StateSent) != 0;
- _state |= StateSent;
- _sent = true;
-
- assert ((_state & StateDone) == 0);
-
- if(!_proxy.ice_isTwoway())
+ if(_handler != null)
{
- if(_childObserver != null)
- {
- _childObserver.detach();
- _childObserver = null;
- }
- if(_observer != null && (_callback == null || !_callback.__hasSentCallback()))
- {
- _observer.detach();
- _observer = null;
- }
- if(_timeoutRequestHandler != null)
- {
- _future.cancel(false);
- _future = null;
- _timeoutRequestHandler = null;
- }
- _state |= StateDone | StateOK;
- // _os.resize(0, false); // Don't clear the buffer now, it's
- // needed for the collocation optimization
-
- // For oneway requests after the data has been sent the buffers
- // can be reused unless this is a collocated invocation. For
- // collocated invocations the buffer won't be reused as the
- // because it has already been marked as cached in
- // invokeCollocated.
- cacheMessageBuffers();
+ //
+ // If we didn't finish a batch oneway or datagram request, we
+ // must notify the connection about that we give up ownership
+ // of the batch stream.
+ //
+ _handler.abortBatchRequest();
}
- _monitor.notifyAll();
-
- // Don't call the sent call is already sent.
- return !alreadySent && _callback != null && _callback.__hasSentCallback();
}
- }
- @Override
- public void invokeSent()
- {
- invokeSentInternal();
+ super.abort(ex);
}
- @Override
- public void finished(Ice.Exception exc)
+ public void invoke()
{
- synchronized(_monitor)
+ int mode = _proxy.__reference().getMode();
+ if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
{
- assert ((_state & StateDone) == 0);
- if(_childObserver != null)
- {
- _childObserver.failed(exc.ice_name());
- _childObserver.detach();
- _childObserver = null;
- }
- if(_timeoutRequestHandler != null)
+ if(_handler != null)
{
- _future.cancel(false);
- _future = null;
- _timeoutRequestHandler = null;
+ _sentSynchronously = true;
+ _handler.finishBatchRequest(_os);
+ finished(true);
}
+ return; // Don't call sent/completed callback for batch AMI requests
}
//
- // NOTE: at this point, synchronization isn't needed, no other threads
- // should be calling on the callback.
+ // NOTE: invokeImpl doesn't throw so this can be called from the
+ // try block with the catch block calling abort() in case of an
+ // exception.
//
- try
- {
- handleException(exc);
- }
- catch(Ice.Exception ex)
- {
- invokeException(ex);
- }
- }
-
- @Override
- void processRetry()
- {
- invoke(false);
- }
-
- @Override
- public void dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
- {
- threadPool.dispatch(new DispatchWorkItem(connection)
- {
- @Override
- public void run()
- {
- OutgoingAsync.this.finished(ex);
- }
- });
+ invokeImpl(true); // userThread = true
}
- public final boolean finished(BasicStream is)
+ public final boolean completed(BasicStream is)
{
//
// NOTE: this method is called from ConnectionI.parseMessage
@@ -276,291 +211,153 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes
// any user callbacks.
//
- assert (_proxy.ice_isTwoway()); // Can only be called for twoways.
-
+ assert(_proxy.ice_isTwoway()); // Can only be called for twoways.
+
+ if(_childObserver != null)
+ {
+ _childObserver.reply(is.size() - Protocol.headerSize - 4);
+ _childObserver.detach();
+ _childObserver = null;
+ }
+
byte replyStatus;
try
{
- synchronized(_monitor)
+ // _is can already be initialized if the invocation is retried
+ if(_is == null)
+ {
+ _is = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding);
+ }
+ _is.swap(is);
+ replyStatus = _is.readByte();
+
+ switch(replyStatus)
+ {
+ case ReplyStatus.replyOK:
+ {
+ break;
+ }
+
+ case ReplyStatus.replyUserException:
{
- assert (_exception == null && (_state & StateDone) == 0);
- if(_childObserver != null)
+ if(_observer != null)
{
- _childObserver.reply(is.size() - Protocol.headerSize - 4);
- _childObserver.detach();
- _childObserver = null;
+ _observer.userException();
}
-
- if(_timeoutRequestHandler != null)
+ break;
+ }
+
+ case ReplyStatus.replyObjectNotExist:
+ case ReplyStatus.replyFacetNotExist:
+ case ReplyStatus.replyOperationNotExist:
+ {
+ Ice.Identity id = new Ice.Identity();
+ id.__read(_is);
+
+ //
+ // For compatibility with the old FacetPath.
+ //
+ String[] facetPath = _is.readStringSeq();
+ String facet;
+ if(facetPath.length > 0)
{
- _future.cancel(false);
- _future = null;
- _timeoutRequestHandler = null;
+ if(facetPath.length > 1)
+ {
+ throw new Ice.MarshalException();
+ }
+ facet = facetPath[0];
}
-
- // _is can already be initialized if the invocation is retried
- if(_is == null)
+ else
{
- _is = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding);
+ facet = "";
}
- _is.swap(is);
- replyStatus = _is.readByte();
-
+
+ String operation = _is.readString();
+
+ Ice.RequestFailedException ex = null;
switch(replyStatus)
{
- case ReplyStatus.replyOK:
- {
- break;
- }
-
- case ReplyStatus.replyUserException:
- {
- if(_observer != null)
- {
- _observer.userException();
- }
- break;
- }
-
- case ReplyStatus.replyObjectNotExist:
- case ReplyStatus.replyFacetNotExist:
- case ReplyStatus.replyOperationNotExist:
- {
- Ice.Identity id = new Ice.Identity();
- id.__read(_is);
-
- //
- // For compatibility with the old FacetPath.
- //
- String[] facetPath = _is.readStringSeq();
- String facet;
- if(facetPath.length > 0)
- {
- if(facetPath.length > 1)
- {
- throw new Ice.MarshalException();
- }
- facet = facetPath[0];
- }
- else
- {
- facet = "";
- }
-
- String operation = _is.readString();
-
- Ice.RequestFailedException ex = null;
- switch(replyStatus)
- {
- case ReplyStatus.replyObjectNotExist:
- {
- ex = new Ice.ObjectNotExistException();
- break;
- }
-
- case ReplyStatus.replyFacetNotExist:
- {
- ex = new Ice.FacetNotExistException();
- break;
- }
-
- case ReplyStatus.replyOperationNotExist:
- {
- ex = new Ice.OperationNotExistException();
- break;
- }
-
- default:
- {
- assert (false);
- break;
- }
- }
-
- ex.id = id;
- ex.facet = facet;
- ex.operation = operation;
- throw ex;
- }
-
- case ReplyStatus.replyUnknownException:
- case ReplyStatus.replyUnknownLocalException:
- case ReplyStatus.replyUnknownUserException:
- {
- String unknown = _is.readString();
-
- Ice.UnknownException ex = null;
- switch(replyStatus)
- {
- case ReplyStatus.replyUnknownException:
- {
- ex = new Ice.UnknownException();
- break;
- }
-
- case ReplyStatus.replyUnknownLocalException:
- {
- ex = new Ice.UnknownLocalException();
- break;
- }
-
- case ReplyStatus.replyUnknownUserException:
- {
- ex = new Ice.UnknownUserException();
- break;
- }
-
- default:
- {
- assert (false);
- break;
- }
- }
-
- ex.unknown = unknown;
- throw ex;
- }
+ case ReplyStatus.replyObjectNotExist:
+ {
+ ex = new Ice.ObjectNotExistException();
+ break;
+ }
- default:
- {
- throw new Ice.UnknownReplyStatusException();
- }
+ case ReplyStatus.replyFacetNotExist:
+ {
+ ex = new Ice.FacetNotExistException();
+ break;
}
- if(replyStatus == ReplyStatus.replyOK)
+ case ReplyStatus.replyOperationNotExist:
{
- _state |= StateOK;
+ ex = new Ice.OperationNotExistException();
+ break;
}
- _state |= StateDone;
- _monitor.notifyAll();
- if(_callback == null)
+ default:
{
- if(_observer != null)
- {
- _observer.detach();
- _observer = null;
- }
- return false;
+ assert(false);
+ break;
}
- return true;
- }
- }
- catch(Ice.Exception exc)
- {
- //
- // We don't call finished(exc) here because we don't want
- // to invoke the completion callback. The completion
- // callback is invoked by the connection is this method
- // returns true.
- //
- try
- {
- handleException(exc);
- return false;
+ }
+
+ ex.id = id;
+ ex.facet = facet;
+ ex.operation = operation;
+ throw ex;
}
- catch(Ice.LocalException ex)
+
+ case ReplyStatus.replyUnknownException:
+ case ReplyStatus.replyUnknownLocalException:
+ case ReplyStatus.replyUnknownUserException:
{
- synchronized(_monitor)
- {
- _state |= StateDone;
- _exception = ex;
- _monitor.notifyAll();
+ String unknown = _is.readString();
- if(_callback == null)
- {
- if(_observer != null)
- {
- _observer.detach();
- _observer = null;
- }
- return false;
- }
- return true;
+ Ice.UnknownException ex = null;
+ switch(replyStatus)
+ {
+ case ReplyStatus.replyUnknownException:
+ {
+ ex = new Ice.UnknownException();
+ break;
}
- }
- }
- }
- public final boolean invoke(boolean userThread)
- {
- int mode = _proxy.__reference().getMode();
- if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
- {
- _state |= StateDone | StateOK;
- _handler.finishBatchRequest(_os);
- if(_observer != null)
- {
- _observer.detach();
- _observer = null;
- }
- return true;
- }
+ case ReplyStatus.replyUnknownLocalException:
+ {
+ ex = new Ice.UnknownLocalException();
+ break;
+ }
- while(true)
- {
- try
- {
- _sent = false;
- _handler = _proxy.__getRequestHandler();
- int status = _handler.sendAsyncRequest(this);
- if((status & AsyncStatus.Sent) > 0)
+ case ReplyStatus.replyUnknownUserException:
{
- if(userThread)
- {
- _sentSynchronously = true;
- if((status & AsyncStatus.InvokeSentCallback) > 0)
- {
- invokeSent(); // Call from the user thread.
- }
- }
- else
- {
- if((status & AsyncStatus.InvokeSentCallback) > 0)
- {
- // Call from a client thread pool thread.
- invokeSentAsync();
- }
- }
+ ex = new Ice.UnknownUserException();
+ break;
}
- if(mode == IceInternal.Reference.ModeTwoway || (status & AsyncStatus.Sent) == 0)
+ default:
{
- synchronized(_monitor)
- {
- if((_state & StateDone) == 0)
- {
- int invocationTimeout = _handler.getReference().getInvocationTimeout();
- if(invocationTimeout > 0)
- {
- _future = _instance.timer().schedule(new Runnable()
- {
- @Override
- public void run()
- {
- timeout();
- }
- }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS);
- _timeoutRequestHandler = _handler;
- }
- }
- }
+ assert(false);
+ break;
+ }
}
+
+ ex.unknown = unknown;
+ throw ex;
}
- catch(RetryException ex)
+
+ default:
{
- // Clear request handler and retry.
- _proxy.__setRequestHandler(_handler, null);
- continue;
+ throw new Ice.UnknownReplyStatusException();
}
- catch(Ice.Exception ex)
- {
- // This will throw if the invocation can't be retried.
- handleException(ex);
}
- break;
+
+ return finished(replyStatus == ReplyStatus.replyOK);
+ }
+ catch(Ice.Exception ex)
+ {
+ return completed(ex);
}
- return _sentSynchronously;
}
public BasicStream startWriteParams(Ice.FormatType format)
@@ -591,12 +388,48 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes
}
}
+ public IceInternal.BasicStream startReadParams()
+ {
+ _is.startReadEncaps();
+ return _is;
+ }
+
+ public void endReadParams()
+ {
+ _is.endReadEncaps();
+ }
+
+ public void readEmptyParams()
+ {
+ _is.skipEmptyEncaps(null);
+ }
+
+ public byte[] readParamEncaps()
+ {
+ return _is.readEncaps(null);
+ }
+
+ public final void throwUserException()
+ throws Ice.UserException
+ {
+ try
+ {
+ _is.startReadEncaps();
+ _is.throwException(null);
+ }
+ catch(Ice.UserException ex)
+ {
+ _is.endReadEncaps();
+ throw ex;
+ }
+ }
+
@Override
public void cacheMessageBuffers()
{
if(_proxy.__reference().getInstance().cacheMessageBuffers() > 0)
{
- synchronized(_monitor)
+ synchronized(this)
{
if((_state & StateCachedBuffers) > 0)
{
@@ -612,76 +445,14 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes
_os.reset();
_proxy.cacheMessageBuffers(_is, _os);
- }
- }
- @Override
- public void invokeExceptionAsync(final Ice.Exception ex)
- {
- if((_state & StateDone) == 0 && _handler != null)
- {
- //
- // If we didn't finish a batch oneway or datagram request, we
- // must notify the connection about that we give up ownership
- // of the batch stream.
- //
- int mode = _proxy.__reference().getMode();
- if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
- {
- _handler.abortBatchRequest();
- }
- }
-
- super.invokeExceptionAsync(ex);
- }
-
- @Override
- protected void cancelRequest()
- {
- if(_handler != null)
- {
- _handler.asyncRequestCanceled(this, new Ice.OperationInterruptedException());
+ _is = null;
+ _os = null;
}
}
- private void handleException(Ice.Exception exc)
- {
- try
- {
- Ice.Holder<Integer> interval = new Ice.Holder<Integer>();
- _cnt = _proxy.__handleException(exc, _handler, _mode, _sent, interval, _cnt);
- if(_observer != null)
- {
- _observer.retried(); // Invocation is being retried.
- }
-
- //
- // Schedule the retry. Note that we always schedule the retry
- // on the retry queue even if the invocation can be retried
- // immediately. This is required because it might not be safe
- // to retry from this thread (this is for instance called by
- // finished(BasicStream) which is called with the connection
- // locked.
- //
- _instance.retryQueue().add(this, interval.value);
- }
- catch(Ice.Exception ex)
- {
- if(_observer != null)
- {
- _observer.failed(ex.ice_name());
- }
- throw ex;
- }
- }
-
- final private Ice.ObjectPrxHelperBase _proxy;
final private Ice.EncodingVersion _encoding;
-
- private RequestHandler _handler;
- private int _cnt;
- private Ice.OperationMode _mode;
- private boolean _sent;
+ private BasicStream _is;
//
// If true this AMI request is being used for a generated synchronous invocation.