diff options
Diffstat (limited to 'java/src/IceInternal/OutgoingAsync.java')
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 378 |
1 files changed, 236 insertions, 142 deletions
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index dbb6ac37ab9..0618ffb93e9 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -9,23 +9,33 @@ package IceInternal; -public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessageCallback, Runnable +public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageCallback { public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb) { - super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase)prx).__reference().getInstance(), operation, cb); - _proxy = (Ice.ObjectPrxHelperBase)prx; + super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb); + _proxy = (Ice.ObjectPrxHelperBase) prx; _encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding()); } - - public void __prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx, - boolean explicitCtx) + + public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb, IceInternal.BasicStream is, + IceInternal.BasicStream os) + { + super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb, + is, os); + _proxy = (Ice.ObjectPrxHelperBase) prx; + _encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding()); + } + + public void prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx, + boolean explicitCtx, boolean synchronous) { _handler = null; _cnt = 0; _sent = false; _mode = mode; _sentSynchronously = false; + _synchronous = synchronous; Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_proxy.__reference().getProtocol())); @@ -36,15 +46,47 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa _observer = ObserverHelper.get(_proxy, operation, ctx); - // - // Can't call async via a batch proxy. - // - if(_proxy.ice_isBatchOneway() || _proxy.ice_isBatchDatagram()) + switch(_proxy.__reference().getMode()) { - throw new Ice.FeatureNotSupportedException("can't send batch requests with AMI"); - } + case Reference.ModeTwoway: + case Reference.ModeOneway: + case Reference.ModeDatagram: + { + _os.writeBlob(IceInternal.Protocol.requestHdr); + break; + } - _os.writeBlob(IceInternal.Protocol.requestHdr); + case Reference.ModeBatchOneway: + case Reference.ModeBatchDatagram: + { + while(true) + { + try + { + _handler = _proxy.__getRequestHandler(); + _handler.prepareBatchRequest(_os); + break; + } + catch(RetryException ex) + { + // Clear request handler and retry. + _proxy.__setRequestHandler(_handler, null); + } + catch(Ice.LocalException ex) + { + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } + // Clear request handler + _proxy.__setRequestHandler(_handler, null); + _handler = null; + throw ex; + } + } + break; + } + } Reference ref = _proxy.__reference(); @@ -66,7 +108,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa _os.writeString(operation); - _os.writeByte((byte)mode.value()); + _os.writeByte((byte) mode.value()); if(ctx != null) { @@ -94,39 +136,44 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } } - @Override public Ice.ObjectPrx - getProxy() + @Override + public Ice.ObjectPrx getProxy() { return _proxy; } @Override - public int - __send(Ice.ConnectionI connection, boolean compress, boolean response) - throws RetryException + public int send(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException { _cachedConnection = connection; return connection.sendAsyncRequest(this, compress, response); } @Override - public int - __invokeCollocated(CollocatedRequestHandler handler) + public int invokeCollocated(CollocatedRequestHandler handler) { - return handler.invokeAsyncRequest(this); + // The BasicStream cannot be cached if background io is enabled, + // the proxy is not a twoway or there is an invocation timeout set. + if(_proxy.__reference().getInstance().queueRequests() || !_proxy.ice_isTwoway() || + _proxy.__reference().getInvocationTimeout() > 0) + { + // Disable caching by marking the streams as cached! + _state |= StateCachedBuffers; + } + handler.invokeAsyncRequest(this, _synchronous); + return AsyncStatus.Queued; } @Override - public boolean - __sent() + public boolean sent() { synchronized(_monitor) { - boolean alreadySent = (_state & Sent) != 0; - _state |= Sent; + boolean alreadySent = (_state & StateSent) != 0; + _state |= StateSent; _sent = true; - assert((_state & Done) == 0); + assert ((_state & StateDone) == 0); if(!_proxy.ice_isTwoway()) { @@ -135,34 +182,40 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa _childObserver.detach(); _childObserver = null; } + if(_observer != null && (_callback == null || !_callback.__hasSentCallback())) + { + _observer.detach(); + _observer = null; + } if(_timeoutRequestHandler != null) { _future.cancel(false); _future = null; _timeoutRequestHandler = null; } - _state |= Done | OK; - //_os.resize(0, false); // Don't clear the buffer now, it's needed for the collocation optimization + _state |= StateDone | StateOK; + // _os.resize(0, false); // Don't clear the buffer now, it's + // needed for the collocation optimization } _monitor.notifyAll(); - return !alreadySent; // Don't call the sent call is already sent. + + // Don't call the sent call is already sent. + return !alreadySent && _callback != null && _callback.__hasSentCallback(); } } @Override - public void - __invokeSent() + public void invokeSent() { - __invokeSentInternal(); + invokeSentInternal(); } @Override - public void - __finished(Ice.Exception exc) + public void finished(Ice.Exception exc) { synchronized(_monitor) { - assert((_state & Done) == 0); + assert ((_state & StateDone) == 0); if(_childObserver != null) { _childObserver.failed(exc.ice_name()); @@ -178,8 +231,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } // - // NOTE: at this point, synchronization isn't needed, no other threads should be - // calling on the callback. + // NOTE: at this point, synchronization isn't needed, no other threads + // should be calling on the callback. // try { @@ -188,41 +241,37 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa return; // Can't be retried immediately. } - __invoke(false); // Retry the invocation + invoke(false); // Retry the invocation } catch(Ice.Exception ex) { - __invokeException(ex); + invokeException(ex); } } @Override - public void - __dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection) + public void dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection) { - threadPool.dispatch( - new DispatchWorkItem(connection) + threadPool.dispatch(new DispatchWorkItem(connection) + { + @Override + public void run() { - @Override - public void - run() - { - OutgoingAsync.this.__finished(ex); - } - }); + OutgoingAsync.this.finished(ex); + } + }); } - public final void - __finished(BasicStream is) + public final void finished(BasicStream is) { - assert(_proxy.ice_isTwoway()); // Can only be called for twoways. + assert (_proxy.ice_isTwoway()); // Can only be called for twoways. byte replyStatus; try { synchronized(_monitor) { - assert(_exception == null && (_state & Done) == 0); + assert (_exception == null && (_state & StateDone) == 0); if(_childObserver != null) { _childObserver.reply(is.size() - Protocol.headerSize - 4); @@ -237,7 +286,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa _timeoutRequestHandler = null; } - if(_is == null) // _is can already be initialized if the invocation is retried + // _is can already be initialized if the invocation is retried + if(_is == null) { _is = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding); } @@ -290,29 +340,29 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa Ice.RequestFailedException ex = null; switch(replyStatus) { - case ReplyStatus.replyObjectNotExist: - { - ex = new Ice.ObjectNotExistException(); - break; - } + case ReplyStatus.replyObjectNotExist: + { + ex = new Ice.ObjectNotExistException(); + break; + } - case ReplyStatus.replyFacetNotExist: - { - ex = new Ice.FacetNotExistException(); - break; - } + case ReplyStatus.replyFacetNotExist: + { + ex = new Ice.FacetNotExistException(); + break; + } - case ReplyStatus.replyOperationNotExist: - { - ex = new Ice.OperationNotExistException(); - break; - } + case ReplyStatus.replyOperationNotExist: + { + ex = new Ice.OperationNotExistException(); + break; + } - default: - { - assert(false); - break; - } + default: + { + assert (false); + break; + } } ex.id = id; @@ -330,29 +380,29 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa Ice.UnknownException ex = null; switch(replyStatus) { - case ReplyStatus.replyUnknownException: - { - ex = new Ice.UnknownException(); - break; - } + case ReplyStatus.replyUnknownException: + { + ex = new Ice.UnknownException(); + break; + } - case ReplyStatus.replyUnknownLocalException: - { - ex = new Ice.UnknownLocalException(); - break; - } + case ReplyStatus.replyUnknownLocalException: + { + ex = new Ice.UnknownLocalException(); + break; + } - case ReplyStatus.replyUnknownUserException: - { - ex = new Ice.UnknownUserException(); - break; - } + case ReplyStatus.replyUnknownUserException: + { + ex = new Ice.UnknownUserException(); + break; + } - default: - { - assert(false); - break; - } + default: + { + assert (false); + break; + } } ex.unknown = unknown; @@ -365,34 +415,48 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } } - _state |= Done; - _os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation + _state |= StateDone; + // Clear buffer now, instead of waiting for AsyncResult + // deallocation + // _os.resize(0, false); if(replyStatus == ReplyStatus.replyOK) { - _state |= OK; + _state |= StateOK; } _monitor.notifyAll(); } } catch(Ice.LocalException ex) { - __finished(ex); + finished(ex); return; } - assert(replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException); - __invokeCompleted(); + assert (replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException); + invokeCompleted(); } - public final boolean - __invoke(boolean synchronous) + public final boolean invoke(boolean synchronous) { + int mode = _proxy.__reference().getMode(); + if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) + { + _state |= StateDone | StateOK; + _handler.finishBatchRequest(_os); + if(_observer != null) + { + _observer.detach(); + _observer = null; + } + return true; + } + while(true) { - _handler = _proxy.__getRequestHandler(); try { _sent = false; + _handler = _proxy.__getRequestHandler(); int status = _handler.sendAsyncRequest(this); if((status & AsyncStatus.Sent) > 0) { @@ -401,29 +465,36 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa _sentSynchronously = true; if((status & AsyncStatus.InvokeSentCallback) > 0) { - __invokeSent(); // Call from the user thread. + invokeSent(); // Call from the user thread. } } else { if((status & AsyncStatus.InvokeSentCallback) > 0) { - __invokeSentAsync(); // Call from a client thread pool thread. + // Call from a client thread pool thread. + invokeSentAsync(); } } } - if(_proxy.ice_isTwoway() || (status & AsyncStatus.Sent) == 0) + if(mode == IceInternal.Reference.ModeTwoway || (status & AsyncStatus.Sent) == 0) { synchronized(_monitor) { - if((_state & Done) == 0) + if((_state & StateDone) == 0) { int invocationTimeout = _handler.getReference().getInvocationTimeout(); if(invocationTimeout > 0) { - _future = _instance.timer().schedule(this, invocationTimeout, - java.util.concurrent.TimeUnit.MILLISECONDS); + _future = _instance.timer().schedule(new Runnable() + { + @Override + public void run() + { + timeout(); + } + }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS); _timeoutRequestHandler = _handler; } } @@ -431,22 +502,15 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } break; } - catch(Ice.OperationInterruptedException ex) - { - // - // Clear the request handler, and cancel the outgoing request. - // - _proxy.__setRequestHandler(_handler, null); - _handler.asyncRequestCanceled(this, ex); - break; - } catch(RetryException ex) { - _proxy.__setRequestHandler(_handler, null); // Clear request handler and retry. + // Clear request handler and retry. + _proxy.__setRequestHandler(_handler, null); } catch(Ice.Exception ex) { - if(!handleException(ex)) // This will throw if the invocation can't be retried. + // This will throw if the invocation can't be retried. + if(!handleException(ex)) { break; // Can't be retried immediately. } @@ -455,27 +519,23 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa return _sentSynchronously; } - public BasicStream - __startWriteParams(Ice.FormatType format) + public BasicStream startWriteParams(Ice.FormatType format) { _os.startWriteEncaps(_encoding, format); return _os; } - public void - __endWriteParams() + public void endWriteParams() { _os.endWriteEncaps(); } - public void - __writeEmptyParams() + public void writeEmptyParams() { _os.writeEmptyEncaps(_encoding); } - public void - __writeParamEncaps(byte[] encaps) + public void writeParamEncaps(byte[] encaps) { if(encaps == null || encaps.length == 0) { @@ -486,22 +546,50 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa _os.writeEncaps(encaps); } } - - BasicStream - __getIs() + + public void cacheMessageBuffers() { - return _is; - } + if(_proxy.__reference().getInstance().cacheMessageBuffers() > 0) + { + synchronized(_monitor) + { + if((_state & StateCachedBuffers) > 0) { + return; + } + _state |= StateCachedBuffers; + } + if(_is != null) + { + _is.reset(); + } + _os.reset(); + + _proxy.cacheMessageBuffers(_is, _os); + } + } + @Override - public void - run() + public void invokeExceptionAsync(final Ice.Exception ex) { - __runTimerTask(); - } + if((_state & StateDone) == 0 && _handler != null) + { + // + // If we didn't finish a batch oneway or datagram request, we + // must notify the connection about that we give up ownership + // of the batch stream. + // + int mode = _proxy.__reference().getMode(); + if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) + { + _handler.abortBatchRequest(); + } + } - private boolean - handleException(Ice.Exception exc) + super.invokeExceptionAsync(ex); + } + + private boolean handleException(Ice.Exception exc) { try { @@ -514,7 +602,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa if(interval.value > 0) { _instance.retryQueue().add(this, interval.value); - return false; // Don't retry immediately, the retry queue will take care of the retry. + return false; // Don't retry immediately, the retry queue will + // take care of the retry. } else { @@ -538,6 +627,11 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa private int _cnt; private Ice.OperationMode _mode; private boolean _sent; + // + // If true this AMI request is being used for a generated synchronous invocation. + // + private boolean _synchronous; + private static final java.util.Map<String, String> _emptyContext = new java.util.HashMap<String, String>(); } |