diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
commit | a4f93259dc3494d98addf38e69b87eb557d432b3 (patch) | |
tree | d2b78bb5cea24e33dc1b46be22dba6167e96c9ed /java/src/IceInternal/Outgoing.java | |
parent | Fix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff) | |
download | ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2 ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip |
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'java/src/IceInternal/Outgoing.java')
-rw-r--r-- | java/src/IceInternal/Outgoing.java | 279 |
1 files changed, 165 insertions, 114 deletions
diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java index c00a99a2b2b..ac06172ce72 100644 --- a/java/src/IceInternal/Outgoing.java +++ b/java/src/IceInternal/Outgoing.java @@ -15,44 +15,53 @@ import Ice.Instrumentation.InvocationObserver; public final class Outgoing implements OutgoingMessageCallback { public - Outgoing(RequestHandler handler, String operation, Ice.OperationMode mode, java.util.Map<String, String> context, - InvocationObserver observer) - throws LocalExceptionWrapper + Outgoing(Ice.ObjectPrxHelperBase proxy, String op, Ice.OperationMode mode, java.util.Map<String, String> context, + boolean explicitCtx) { + Reference ref = proxy.__reference(); _state = StateUnsent; - _exceptionWrapper = false; - _exceptionWrapperRetry = false; _sent = false; - _handler = handler; - _observer = observer; - _encoding = Protocol.getCompatibleEncoding(handler.getReference().getEncoding()); - _os = new BasicStream(_handler.getReference().getInstance(), Protocol.currentProtocolEncoding); + _proxy = proxy; + _mode = mode; + _handler = null; + _observer = IceInternal.ObserverHelper.get(proxy, op, context); + _encoding = Protocol.getCompatibleEncoding(ref.getEncoding()); + _os = new BasicStream(ref.getInstance(), Protocol.currentProtocolEncoding); - Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_handler.getReference().getProtocol())); + Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(ref.getProtocol())); - writeHeader(operation, mode, context); + writeHeader(op, mode, context, explicitCtx); } // // These functions allow this object to be reused, rather than reallocated. // public void - reset(RequestHandler handler, String operation, Ice.OperationMode mode, java.util.Map<String, String> context, - InvocationObserver observer) - throws LocalExceptionWrapper + reset(Ice.ObjectPrxHelperBase proxy, String op, Ice.OperationMode mode, java.util.Map<String, String> context, + boolean explicitCtx) { + Reference ref = proxy.__reference(); _state = StateUnsent; _exception = null; - _exceptionWrapper = false; - _exceptionWrapperRetry = false; _sent = false; - _handler = handler; - _observer = observer; - _encoding = Protocol.getCompatibleEncoding(handler.getReference().getEncoding()); + _proxy = proxy; + _mode = mode; + _handler = null; + _observer = IceInternal.ObserverHelper.get(proxy, op, context); + _encoding = Protocol.getCompatibleEncoding(ref.getEncoding()); - Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_handler.getReference().getProtocol())); + Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(ref.getProtocol())); - writeHeader(operation, mode, context); + writeHeader(op, mode, context, explicitCtx); + } + + public void + detach() + { + if(_observer != null) + { + _observer.detach(); + } } public void @@ -68,17 +77,27 @@ public final class Outgoing implements OutgoingMessageCallback // Returns true if ok, false if user exception. public boolean invoke() - throws LocalExceptionWrapper { assert(_state == StateUnsent); - switch(_handler.getReference().getMode()) + int mode = _proxy.__reference().getMode(); + if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) { - case Reference.ModeOneway: - case Reference.ModeDatagram: - case Reference.ModeTwoway: + _state = StateInProgress; + _handler.finishBatchRequest(_os); + return true; + } + + int cnt = 0; + while(true) + { + try { _state = StateInProgress; + _exception = null; + _sent = false; + + _handler = _proxy.__getRequestHandler(false); if(_handler.sendRequest(this)) // Request sent and no response expected, we're done. { @@ -92,7 +111,7 @@ public final class Outgoing implements OutgoingMessageCallback // If the handler says it's not finished, we wait until we're done. // - int invocationTimeout = _handler.getReference().getInvocationTimeout(); + int invocationTimeout = _proxy.__reference().getInvocationTimeout(); if(invocationTimeout > 0) { long now = Time.currentMonotonicTimeMillis(); @@ -131,71 +150,76 @@ public final class Outgoing implements OutgoingMessageCallback if(timedOut) { _handler.requestTimedOut(this); - assert(_exception != null); + + // + // Wait for the exception to propagate. It's possible the request handler ignores + // the timeout if there was a failure shortly before requestTimedOut got called. + // In this case, the exception should be set on the Outgoing. + // + synchronized(this) + { + while(_exception == null) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + } } if(_exception != null) { - _exception.fillInStackTrace(); - if(_exceptionWrapper) + throw (Ice.Exception)_exception.fillInStackTrace(); + } + else + { + assert(_state != StateInProgress); + return _state == StateOK; + } + } + catch(RetryException ex) + { + _proxy.__setRequestHandler(_handler, null); // Clear request handler and retry. + } + catch(Ice.Exception ex) + { + try + { + Ice.IntHolder interval = new Ice.IntHolder(); + cnt = _proxy.__handleException(ex, _handler, _mode, _sent, interval, cnt); + if(_observer != null) { - throw new LocalExceptionWrapper(_exception, _exceptionWrapperRetry); + _observer.retried(); // Invocation is being retried. } - - // - // A CloseConnectionException indicates graceful - // server shutdown, and is therefore always repeatable - // without violating "at-most-once". That's because by - // sending a close connection message, the server - // guarantees that all outstanding requests can safely - // be repeated. - // - // An ObjectNotExistException can always be retried as - // well without violating "at-most-once" (see the - // implementation of the checkRetryAfterException - // method of the ProxyFactory class for the reasons - // why it can be useful). - // - if(!_sent || - _exception instanceof Ice.CloseConnectionException || - _exception instanceof Ice.ObjectNotExistException) + if(interval.value > 0) { - throw _exception; + try + { + Thread.sleep(interval.value); + } + catch(InterruptedException exi) + { + } } - - // - // Throw the exception wrapped in a LocalExceptionWrapper, - // to indicate that the request cannot be resent without - // potentially violating the "at-most-once" principle. - // - throw new LocalExceptionWrapper(_exception, false); } - - assert(_state != StateInProgress); - return _state == StateOK; - } - - case Reference.ModeBatchOneway: - case Reference.ModeBatchDatagram: - { - // - // For batch oneways and datagrams, the same rules as for - // regular oneways and datagrams (see comment above) - // apply. - // - _state = StateInProgress; - _handler.finishBatchRequest(_os); - return true; + catch(Ice.Exception exc) + { + if(_observer != null) + { + _observer.failed(exc.ice_name()); + } + throw exc; + } } } - - assert(false); - return false; } public void abort(Ice.LocalException ex) - throws LocalExceptionWrapper { assert(_state == StateUnsent); @@ -204,7 +228,7 @@ public final class Outgoing implements OutgoingMessageCallback // must notify the connection about that we give up ownership // of the batch stream. // - int mode = _handler.getReference().getMode(); + int mode = _proxy.__reference().getMode(); if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) { _handler.abortBatchRequest(); @@ -215,15 +239,21 @@ public final class Outgoing implements OutgoingMessageCallback public boolean send(Ice.ConnectionI connection, boolean compress, boolean response) - throws LocalExceptionWrapper + throws RetryException { return connection.sendRequest(this, compress, response); } + public void + invokeCollocated(CollocatedRequestHandler handler) + { + handler.invokeRequest(this); + } + synchronized public void sent() { - if(_handler.getReference().getMode() != Reference.ModeTwoway) + if(_proxy.__reference().getMode() != Reference.ModeTwoway) { if(_remoteObserver != null) { @@ -239,7 +269,7 @@ public final class Outgoing implements OutgoingMessageCallback public synchronized void finished(BasicStream is) { - assert(_handler.getReference().getMode() == Reference.ModeTwoway); // Only for twoways. + assert(_proxy.__reference().getMode() == Reference.ModeTwoway); // Only for twoways. assert(_state <= StateInProgress); @@ -252,7 +282,7 @@ public final class Outgoing implements OutgoingMessageCallback if(_is == null) { - _is = new IceInternal.BasicStream(_handler.getReference().getInstance(), Protocol.currentProtocolEncoding); + _is = new IceInternal.BasicStream(_proxy.__reference().getInstance(), Protocol.currentProtocolEncoding); } _is.swap(is); byte replyStatus = _is.readByte(); @@ -385,7 +415,7 @@ public final class Outgoing implements OutgoingMessageCallback } public synchronized void - finished(Ice.LocalException ex, boolean sent) + finished(Ice.Exception ex, boolean sent) { assert(_state <= StateInProgress); if(_remoteObserver != null) @@ -399,24 +429,6 @@ public final class Outgoing implements OutgoingMessageCallback _sent = sent; notify(); } - - public synchronized void - finished(LocalExceptionWrapper ex) - { - if(_remoteObserver != null) - { - _remoteObserver.failed(ex.get().ice_name()); - _remoteObserver.detach(); - _remoteObserver = null; - } - - _state = StateFailed; - _exceptionWrapper = true; - _exceptionWrapperRetry = ex.retry(); - _exception = ex.get(); - _sent = false; - notify(); - } public BasicStream os() @@ -516,11 +528,28 @@ public final class Outgoing implements OutgoingMessageCallback } } + public void + attachCollocatedObserver(int requestId) + { + if(_observer != null) + { + _remoteObserver = _observer.getCollocatedObserver(requestId, _os.size() - Protocol.headerSize - 4); + if(_remoteObserver != null) + { + _remoteObserver.attach(); + } + } + } + private void - writeHeader(String operation, Ice.OperationMode mode, java.util.Map<String, String> context) - throws LocalExceptionWrapper + writeHeader(String operation, Ice.OperationMode mode, java.util.Map<String, String> context, boolean explicitCtx) { - switch(_handler.getReference().getMode()) + if(explicitCtx && context == null) + { + context = _emptyContext; + } + + switch(_proxy.__reference().getMode()) { case Reference.ModeTwoway: case Reference.ModeOneway: @@ -533,19 +562,40 @@ public final class Outgoing implements OutgoingMessageCallback case Reference.ModeBatchOneway: case Reference.ModeBatchDatagram: { - _handler.prepareBatchRequest(_os); + while(true) + { + try + { + _handler = _proxy.__getRequestHandler(true); + _handler.prepareBatchRequest(_os); + break; + } + catch(RetryException ex) + { + _proxy.__setRequestHandler(_handler, null); // Clear request handler and retry. + } + catch(Ice.LocalException ex) + { + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } + _proxy.__setRequestHandler(_handler, null); // Clear request handler + throw ex; + } + } break; } } try { - _handler.getReference().getIdentity().__write(_os); + _proxy.__reference().getIdentity().__write(_os); // // For compatibility with the old FacetPath. // - String facet = _handler.getReference().getFacet(); + String facet = _proxy.__reference().getFacet(); if(facet == null || facet.length() == 0) { _os.writeStringSeq(null); @@ -572,8 +622,8 @@ public final class Outgoing implements OutgoingMessageCallback // // Implicit context // - Ice.ImplicitContextI implicitContext = _handler.getReference().getInstance().getImplicitContext(); - java.util.Map<String, String> prxContext = _handler.getReference().getContext(); + Ice.ImplicitContextI implicitContext = _proxy.__reference().getInstance().getImplicitContext(); + java.util.Map<String, String> prxContext = _proxy.__reference().getContext(); if(implicitContext == null) { @@ -591,15 +641,14 @@ public final class Outgoing implements OutgoingMessageCallback } } + private Ice.ObjectPrxHelperBase _proxy; + private Ice.OperationMode _mode; private RequestHandler _handler; private Ice.EncodingVersion _encoding; private BasicStream _is; private BasicStream _os; private boolean _sent; - - private Ice.LocalException _exception; - private boolean _exceptionWrapper; - private boolean _exceptionWrapperRetry; + private Ice.Exception _exception; private static final int StateUnsent = 0; private static final int StateInProgress = 1; @@ -612,5 +661,7 @@ public final class Outgoing implements OutgoingMessageCallback private InvocationObserver _observer; private RemoteObserver _remoteObserver; - public Outgoing next; // For use by Ice._ObjectDelM + public Outgoing next; // For use by Ice.ObjectPrxHelperBase + + private static final java.util.Map<String, String> _emptyContext = new java.util.HashMap<String, String>(); } |