diff options
Diffstat (limited to 'java/src/IceInternal/Outgoing.java')
-rw-r--r-- | java/src/IceInternal/Outgoing.java | 191 |
1 files changed, 76 insertions, 115 deletions
diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java index e64c20ae54c..c00a99a2b2b 100644 --- a/java/src/IceInternal/Outgoing.java +++ b/java/src/IceInternal/Outgoing.java @@ -20,6 +20,8 @@ public final class Outgoing implements OutgoingMessageCallback throws LocalExceptionWrapper { _state = StateUnsent; + _exceptionWrapper = false; + _exceptionWrapperRetry = false; _sent = false; _handler = handler; _observer = observer; @@ -41,6 +43,8 @@ public final class Outgoing implements OutgoingMessageCallback { _state = StateUnsent; _exception = null; + _exceptionWrapper = false; + _exceptionWrapperRetry = false; _sent = false; _handler = handler; _observer = observer; @@ -70,77 +74,48 @@ public final class Outgoing implements OutgoingMessageCallback switch(_handler.getReference().getMode()) { + case Reference.ModeOneway: + case Reference.ModeDatagram: case Reference.ModeTwoway: { _state = StateInProgress; - Ice.ConnectionI connection = _handler.sendRequest(this); - assert(connection != null); + if(_handler.sendRequest(this)) // Request sent and no response expected, we're done. + { + return true; + } boolean timedOut = false; - synchronized(this) { - - // - // If the request is being sent in the background we first wait for the - // sent notification. - // - while(_state != StateFailed && !_sent) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } - // - // Wait until the request has completed, or until the request - // times out. + // If the handler says it's not finished, we wait until we're done. // - int timeout = connection.timeout(); - while(_state == StateInProgress && !timedOut) + + int invocationTimeout = _handler.getReference().getInvocationTimeout(); + if(invocationTimeout > 0) { - try + long now = Time.currentMonotonicTimeMillis(); + long deadline = now + invocationTimeout; + while((_state == StateInProgress || !_sent) && _state != StateFailed && !timedOut) { - if(timeout >= 0) + try { - wait(timeout); - - if(_state == StateInProgress) - { - timedOut = true; - } + wait(deadline - now); } - else + catch(InterruptedException ex) { - wait(); } - } - catch(InterruptedException ex) - { + if((_state == StateInProgress || !_sent) && _state != StateFailed) + { + now = Time.currentMonotonicTimeMillis(); + timedOut = now >= deadline; + } } } - } - - if(timedOut) - { - // - // Must be called outside the synchronization of - // this object - // - connection.exception(new Ice.TimeoutException()); - - // - // We must wait until the exception set above has - // propagated to this Outgoing object. - // - synchronized(this) + else { - while(_state == StateInProgress) + while((_state == StateInProgress || !_sent) && _state != StateFailed) { try { @@ -153,9 +128,19 @@ public final class Outgoing implements OutgoingMessageCallback } } + if(timedOut) + { + _handler.requestTimedOut(this); + assert(_exception != null); + } + if(_exception != null) { _exception.fillInStackTrace(); + if(_exceptionWrapper) + { + throw new LocalExceptionWrapper(_exception, _exceptionWrapperRetry); + } // // A CloseConnectionException indicates graceful @@ -186,48 +171,8 @@ public final class Outgoing implements OutgoingMessageCallback throw new LocalExceptionWrapper(_exception, false); } - if(_state == StateUserException) - { - return false; - } - else - { - assert(_state == StateOK); - return true; - } - - } - - case Reference.ModeOneway: - case Reference.ModeDatagram: - { - _state = StateInProgress; - if(_handler.sendRequest(this) != null) - { - // - // If the handler returns the connection, we must wait for the sent callback. - // - synchronized(this) - { - while(_state != StateFailed && !_sent) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } - - if(_exception != null) - { - assert(!_sent); - throw _exception; - } - } - } - return true; + assert(_state != StateInProgress); + return _state == StateOK; } case Reference.ModeBatchOneway: @@ -268,31 +213,27 @@ public final class Outgoing implements OutgoingMessageCallback throw ex; } - public void - sent(boolean async) + public boolean + send(Ice.ConnectionI connection, boolean compress, boolean response) + throws LocalExceptionWrapper + { + return connection.sendRequest(this, compress, response); + } + + synchronized public void + sent() { - if(async) + if(_handler.getReference().getMode() != Reference.ModeTwoway) { - synchronized(this) + if(_remoteObserver != null) { - _sent = true; - notify(); + _remoteObserver.detach(); + _remoteObserver = null; } + _state = StateOK; } - else - { - // - // No synchronization is necessary if called from sendRequest() because the connection - // send mutex is locked and no other threads can call on Outgoing until it's released. - // - _sent = true; - } - - if(_remoteObserver != null && _handler.getReference().getMode() != Reference.ModeTwoway) - { - _remoteObserver.detach(); - _remoteObserver = null; - } + _sent = true; + notify(); } public synchronized void @@ -459,6 +400,24 @@ public final class Outgoing implements OutgoingMessageCallback notify(); } + public synchronized void + finished(LocalExceptionWrapper ex) + { + if(_remoteObserver != null) + { + _remoteObserver.failed(ex.get().ice_name()); + _remoteObserver.detach(); + _remoteObserver = null; + } + + _state = StateFailed; + _exceptionWrapper = true; + _exceptionWrapperRetry = ex.retry(); + _exception = ex.get(); + _sent = false; + notify(); + } + public BasicStream os() { @@ -639,7 +598,9 @@ public final class Outgoing implements OutgoingMessageCallback private boolean _sent; private Ice.LocalException _exception; - + private boolean _exceptionWrapper; + private boolean _exceptionWrapperRetry; + private static final int StateUnsent = 0; private static final int StateInProgress = 1; private static final int StateOK = 2; |