diff options
Diffstat (limited to 'java/src/IceInternal/OutgoingAsync.java')
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 623 |
1 files changed, 197 insertions, 426 deletions
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index ef4d3d7c959..e42c817b42c 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -9,41 +9,47 @@ package IceInternal; -public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMessageCallback +public class OutgoingAsync extends ProxyOutgoingAsyncBase { + public static OutgoingAsync check(Ice.AsyncResult r, Ice.ObjectPrx prx, String operation) + { + ProxyOutgoingAsyncBase.checkImpl(r, prx, operation); + try + { + return (OutgoingAsync)r; + } + catch(ClassCastException ex) + { + throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method"); + } + } + public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb) { - super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb); - _proxy = (Ice.ObjectPrxHelperBase) prx; + super((Ice.ObjectPrxHelperBase)prx, operation, cb); _encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding()); + _is = null; } - public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb, IceInternal.BasicStream is, - IceInternal.BasicStream os) + public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb, BasicStream is, BasicStream os) { - super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb, - is, os); - _proxy = (Ice.ObjectPrxHelperBase) prx; + super((Ice.ObjectPrxHelperBase)prx, operation, cb, os); _encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding()); + _is = is; } public void prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx, boolean explicitCtx, boolean synchronous) { - _handler = null; - _cnt = 0; - _sent = false; + Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_proxy.__reference().getProtocol())); + _mode = mode; - _sentSynchronously = false; _synchronous = synchronous; - Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_proxy.__reference().getProtocol())); - if(explicitCtx && ctx == null) { ctx = _emptyContext; } - _observer = ObserverHelper.get(_proxy, operation, ctx); switch(_proxy.__reference().getMode()) @@ -137,12 +143,6 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes } @Override - public Ice.ObjectPrx getProxy() - { - return _proxy; - } - - @Override public int send(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException { _cachedConnection = connection; @@ -158,117 +158,52 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes // Disable caching by marking the streams as cached! _state |= StateCachedBuffers; } - handler.invokeAsyncRequest(this, _synchronous); - return AsyncStatus.Queued; + return handler.invokeAsyncRequest(this, _synchronous); } @Override - public boolean sent() + public void abort(Ice.Exception ex) { - synchronized(_monitor) + int mode = _proxy.__reference().getMode(); + if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) { - boolean alreadySent = (_state & StateSent) != 0; - _state |= StateSent; - _sent = true; - - assert ((_state & StateDone) == 0); - - if(!_proxy.ice_isTwoway()) + if(_handler != null) { - if(_childObserver != null) - { - _childObserver.detach(); - _childObserver = null; - } - if(_observer != null && (_callback == null || !_callback.__hasSentCallback())) - { - _observer.detach(); - _observer = null; - } - if(_timeoutRequestHandler != null) - { - _future.cancel(false); - _future = null; - _timeoutRequestHandler = null; - } - _state |= StateDone | StateOK; - // _os.resize(0, false); // Don't clear the buffer now, it's - // needed for the collocation optimization - - // For oneway requests after the data has been sent the buffers - // can be reused unless this is a collocated invocation. For - // collocated invocations the buffer won't be reused as the - // because it has already been marked as cached in - // invokeCollocated. - cacheMessageBuffers(); + // + // If we didn't finish a batch oneway or datagram request, we + // must notify the connection about that we give up ownership + // of the batch stream. + // + _handler.abortBatchRequest(); } - _monitor.notifyAll(); - - // Don't call the sent call is already sent. - return !alreadySent && _callback != null && _callback.__hasSentCallback(); } - } - @Override - public void invokeSent() - { - invokeSentInternal(); + super.abort(ex); } - @Override - public void finished(Ice.Exception exc) + public void invoke() { - synchronized(_monitor) + int mode = _proxy.__reference().getMode(); + if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) { - assert ((_state & StateDone) == 0); - if(_childObserver != null) - { - _childObserver.failed(exc.ice_name()); - _childObserver.detach(); - _childObserver = null; - } - if(_timeoutRequestHandler != null) + if(_handler != null) { - _future.cancel(false); - _future = null; - _timeoutRequestHandler = null; + _sentSynchronously = true; + _handler.finishBatchRequest(_os); + finished(true); } + return; // Don't call sent/completed callback for batch AMI requests } // - // NOTE: at this point, synchronization isn't needed, no other threads - // should be calling on the callback. + // NOTE: invokeImpl doesn't throw so this can be called from the + // try block with the catch block calling abort() in case of an + // exception. // - try - { - handleException(exc); - } - catch(Ice.Exception ex) - { - invokeException(ex); - } - } - - @Override - void processRetry() - { - invoke(false); - } - - @Override - public void dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection) - { - threadPool.dispatch(new DispatchWorkItem(connection) - { - @Override - public void run() - { - OutgoingAsync.this.finished(ex); - } - }); + invokeImpl(true); // userThread = true } - public final boolean finished(BasicStream is) + public final boolean completed(BasicStream is) { // // NOTE: this method is called from ConnectionI.parseMessage @@ -276,291 +211,153 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes // any user callbacks. // - assert (_proxy.ice_isTwoway()); // Can only be called for twoways. - + assert(_proxy.ice_isTwoway()); // Can only be called for twoways. + + if(_childObserver != null) + { + _childObserver.reply(is.size() - Protocol.headerSize - 4); + _childObserver.detach(); + _childObserver = null; + } + byte replyStatus; try { - synchronized(_monitor) + // _is can already be initialized if the invocation is retried + if(_is == null) + { + _is = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding); + } + _is.swap(is); + replyStatus = _is.readByte(); + + switch(replyStatus) + { + case ReplyStatus.replyOK: + { + break; + } + + case ReplyStatus.replyUserException: { - assert (_exception == null && (_state & StateDone) == 0); - if(_childObserver != null) + if(_observer != null) { - _childObserver.reply(is.size() - Protocol.headerSize - 4); - _childObserver.detach(); - _childObserver = null; + _observer.userException(); } - - if(_timeoutRequestHandler != null) + break; + } + + case ReplyStatus.replyObjectNotExist: + case ReplyStatus.replyFacetNotExist: + case ReplyStatus.replyOperationNotExist: + { + Ice.Identity id = new Ice.Identity(); + id.__read(_is); + + // + // For compatibility with the old FacetPath. + // + String[] facetPath = _is.readStringSeq(); + String facet; + if(facetPath.length > 0) { - _future.cancel(false); - _future = null; - _timeoutRequestHandler = null; + if(facetPath.length > 1) + { + throw new Ice.MarshalException(); + } + facet = facetPath[0]; } - - // _is can already be initialized if the invocation is retried - if(_is == null) + else { - _is = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding); + facet = ""; } - _is.swap(is); - replyStatus = _is.readByte(); - + + String operation = _is.readString(); + + Ice.RequestFailedException ex = null; switch(replyStatus) { - case ReplyStatus.replyOK: - { - break; - } - - case ReplyStatus.replyUserException: - { - if(_observer != null) - { - _observer.userException(); - } - break; - } - - case ReplyStatus.replyObjectNotExist: - case ReplyStatus.replyFacetNotExist: - case ReplyStatus.replyOperationNotExist: - { - Ice.Identity id = new Ice.Identity(); - id.__read(_is); - - // - // For compatibility with the old FacetPath. - // - String[] facetPath = _is.readStringSeq(); - String facet; - if(facetPath.length > 0) - { - if(facetPath.length > 1) - { - throw new Ice.MarshalException(); - } - facet = facetPath[0]; - } - else - { - facet = ""; - } - - String operation = _is.readString(); - - Ice.RequestFailedException ex = null; - switch(replyStatus) - { - case ReplyStatus.replyObjectNotExist: - { - ex = new Ice.ObjectNotExistException(); - break; - } - - case ReplyStatus.replyFacetNotExist: - { - ex = new Ice.FacetNotExistException(); - break; - } - - case ReplyStatus.replyOperationNotExist: - { - ex = new Ice.OperationNotExistException(); - break; - } - - default: - { - assert (false); - break; - } - } - - ex.id = id; - ex.facet = facet; - ex.operation = operation; - throw ex; - } - - case ReplyStatus.replyUnknownException: - case ReplyStatus.replyUnknownLocalException: - case ReplyStatus.replyUnknownUserException: - { - String unknown = _is.readString(); - - Ice.UnknownException ex = null; - switch(replyStatus) - { - case ReplyStatus.replyUnknownException: - { - ex = new Ice.UnknownException(); - break; - } - - case ReplyStatus.replyUnknownLocalException: - { - ex = new Ice.UnknownLocalException(); - break; - } - - case ReplyStatus.replyUnknownUserException: - { - ex = new Ice.UnknownUserException(); - break; - } - - default: - { - assert (false); - break; - } - } - - ex.unknown = unknown; - throw ex; - } + case ReplyStatus.replyObjectNotExist: + { + ex = new Ice.ObjectNotExistException(); + break; + } - default: - { - throw new Ice.UnknownReplyStatusException(); - } + case ReplyStatus.replyFacetNotExist: + { + ex = new Ice.FacetNotExistException(); + break; } - if(replyStatus == ReplyStatus.replyOK) + case ReplyStatus.replyOperationNotExist: { - _state |= StateOK; + ex = new Ice.OperationNotExistException(); + break; } - _state |= StateDone; - _monitor.notifyAll(); - if(_callback == null) + default: { - if(_observer != null) - { - _observer.detach(); - _observer = null; - } - return false; + assert(false); + break; } - return true; - } - } - catch(Ice.Exception exc) - { - // - // We don't call finished(exc) here because we don't want - // to invoke the completion callback. The completion - // callback is invoked by the connection is this method - // returns true. - // - try - { - handleException(exc); - return false; + } + + ex.id = id; + ex.facet = facet; + ex.operation = operation; + throw ex; } - catch(Ice.LocalException ex) + + case ReplyStatus.replyUnknownException: + case ReplyStatus.replyUnknownLocalException: + case ReplyStatus.replyUnknownUserException: { - synchronized(_monitor) - { - _state |= StateDone; - _exception = ex; - _monitor.notifyAll(); + String unknown = _is.readString(); - if(_callback == null) - { - if(_observer != null) - { - _observer.detach(); - _observer = null; - } - return false; - } - return true; + Ice.UnknownException ex = null; + switch(replyStatus) + { + case ReplyStatus.replyUnknownException: + { + ex = new Ice.UnknownException(); + break; } - } - } - } - public final boolean invoke(boolean userThread) - { - int mode = _proxy.__reference().getMode(); - if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) - { - _state |= StateDone | StateOK; - _handler.finishBatchRequest(_os); - if(_observer != null) - { - _observer.detach(); - _observer = null; - } - return true; - } + case ReplyStatus.replyUnknownLocalException: + { + ex = new Ice.UnknownLocalException(); + break; + } - while(true) - { - try - { - _sent = false; - _handler = _proxy.__getRequestHandler(); - int status = _handler.sendAsyncRequest(this); - if((status & AsyncStatus.Sent) > 0) + case ReplyStatus.replyUnknownUserException: { - if(userThread) - { - _sentSynchronously = true; - if((status & AsyncStatus.InvokeSentCallback) > 0) - { - invokeSent(); // Call from the user thread. - } - } - else - { - if((status & AsyncStatus.InvokeSentCallback) > 0) - { - // Call from a client thread pool thread. - invokeSentAsync(); - } - } + ex = new Ice.UnknownUserException(); + break; } - if(mode == IceInternal.Reference.ModeTwoway || (status & AsyncStatus.Sent) == 0) + default: { - synchronized(_monitor) - { - if((_state & StateDone) == 0) - { - int invocationTimeout = _handler.getReference().getInvocationTimeout(); - if(invocationTimeout > 0) - { - _future = _instance.timer().schedule(new Runnable() - { - @Override - public void run() - { - timeout(); - } - }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS); - _timeoutRequestHandler = _handler; - } - } - } + assert(false); + break; + } } + + ex.unknown = unknown; + throw ex; } - catch(RetryException ex) + + default: { - // Clear request handler and retry. - _proxy.__setRequestHandler(_handler, null); - continue; + throw new Ice.UnknownReplyStatusException(); } - catch(Ice.Exception ex) - { - // This will throw if the invocation can't be retried. - handleException(ex); } - break; + + return finished(replyStatus == ReplyStatus.replyOK); + } + catch(Ice.Exception ex) + { + return completed(ex); } - return _sentSynchronously; } public BasicStream startWriteParams(Ice.FormatType format) @@ -591,12 +388,48 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes } } + public IceInternal.BasicStream startReadParams() + { + _is.startReadEncaps(); + return _is; + } + + public void endReadParams() + { + _is.endReadEncaps(); + } + + public void readEmptyParams() + { + _is.skipEmptyEncaps(null); + } + + public byte[] readParamEncaps() + { + return _is.readEncaps(null); + } + + public final void throwUserException() + throws Ice.UserException + { + try + { + _is.startReadEncaps(); + _is.throwException(null); + } + catch(Ice.UserException ex) + { + _is.endReadEncaps(); + throw ex; + } + } + @Override public void cacheMessageBuffers() { if(_proxy.__reference().getInstance().cacheMessageBuffers() > 0) { - synchronized(_monitor) + synchronized(this) { if((_state & StateCachedBuffers) > 0) { @@ -612,76 +445,14 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes _os.reset(); _proxy.cacheMessageBuffers(_is, _os); - } - } - @Override - public void invokeExceptionAsync(final Ice.Exception ex) - { - if((_state & StateDone) == 0 && _handler != null) - { - // - // If we didn't finish a batch oneway or datagram request, we - // must notify the connection about that we give up ownership - // of the batch stream. - // - int mode = _proxy.__reference().getMode(); - if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) - { - _handler.abortBatchRequest(); - } - } - - super.invokeExceptionAsync(ex); - } - - @Override - protected void cancelRequest() - { - if(_handler != null) - { - _handler.asyncRequestCanceled(this, new Ice.OperationInterruptedException()); + _is = null; + _os = null; } } - private void handleException(Ice.Exception exc) - { - try - { - Ice.Holder<Integer> interval = new Ice.Holder<Integer>(); - _cnt = _proxy.__handleException(exc, _handler, _mode, _sent, interval, _cnt); - if(_observer != null) - { - _observer.retried(); // Invocation is being retried. - } - - // - // Schedule the retry. Note that we always schedule the retry - // on the retry queue even if the invocation can be retried - // immediately. This is required because it might not be safe - // to retry from this thread (this is for instance called by - // finished(BasicStream) which is called with the connection - // locked. - // - _instance.retryQueue().add(this, interval.value); - } - catch(Ice.Exception ex) - { - if(_observer != null) - { - _observer.failed(ex.ice_name()); - } - throw ex; - } - } - - final private Ice.ObjectPrxHelperBase _proxy; final private Ice.EncodingVersion _encoding; - - private RequestHandler _handler; - private int _cnt; - private Ice.OperationMode _mode; - private boolean _sent; + private BasicStream _is; // // If true this AMI request is being used for a generated synchronous invocation. |