diff options
Diffstat (limited to 'java/src/IceInternal/Outgoing.java')
-rw-r--r-- | java/src/IceInternal/Outgoing.java | 335 |
1 files changed, 170 insertions, 165 deletions
diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java index 962585b1da3..ea36c10e100 100644 --- a/java/src/IceInternal/Outgoing.java +++ b/java/src/IceInternal/Outgoing.java @@ -43,13 +43,6 @@ public final class Outgoing public void destroy() { - if(_state == StateUnsent && - (_reference.mode == Reference.ModeBatchOneway || - _reference.mode == Reference.ModeBatchDatagram)) - { - _connection.abortBatchRequest(); - } - _os.destroy(); _is.destroy(); } @@ -65,26 +58,48 @@ public final class Outgoing { case Reference.ModeTwoway: { - Ice.LocalException exception = null; + // + // We let all exceptions raised by sending directly + // propagate to the caller, because they can be + // retried without violating "at-most-once". In case + // of such exceptions, the connection object does not + // call back on this object, so we don't need to lock + // the mutex, keep track of state, or save exceptions. + // + _connection.sendRequest(_os, this); + + // + // Wait until the request has completed, or until the + // request times out. + // + + boolean timedOut = false; synchronized(this) { - _connection.sendRequest(this, false); - _state = StateInProgress; + // + // It's possible that the request has already + // completed, due to a regular response, or because of + // an exception. So we only change the state to "in + // progress" if it is still "unsent". + // + if(_state == StateUnsent) + { + _state = StateInProgress; + } int timeout = _connection.timeout(); - while(_state == StateInProgress) + while(_state == StateInProgress && !timedOut) { try { if(timeout >= 0) { wait(timeout); - + if(_state == StateInProgress) { - exception = new Ice.TimeoutException(); - break; + timedOut = true; } } else @@ -97,14 +112,14 @@ public final class Outgoing } } } - - if(exception != null) - { + + if(timedOut) + { // // Must be called outside the synchronization of // this object // - _connection.exception(exception); + _connection.exception(new Ice.TimeoutException()); // // We must wait until the exception set above has @@ -156,7 +171,7 @@ public final class Outgoing { return false; } - + assert(_state == StateOK); break; } @@ -164,30 +179,27 @@ public final class Outgoing case Reference.ModeOneway: case Reference.ModeDatagram: { - try - { - _connection.sendRequest(this, true); - } - catch(Ice.DatagramLimitException ex) - { - throw new NonRepeatable(ex); - } - _state = StateInProgress; + // + // For oneway and datagram requests, the connection + // object never calls back on this object. Therefore + // we don't need to lock the mutex, keep track of + // state, or save exceptions. We simply let all + // exceptions from sending propagate to the caller, + // because such exceptions can be retried without + // violating "at-most-once". + // + _connection.sendRequest(_os, null); break; } case Reference.ModeBatchOneway: case Reference.ModeBatchDatagram: { - // - // The state must be set to StateInProgress before calling - // finishBatchRequest, because otherwise if - // finishBatchRequest raises an exception, the destructor - // of this class will call abortBatchRequest, and calling - // both finishBatchRequest and abortBatchRequest is - // illegal. - // - _state = StateInProgress; + // + // For batch oneways and datagrams, the same rules as for + // regular oneways and datagrams (see comment above) + // apply. + // _connection.finishBatchRequest(_os); break; } @@ -199,130 +211,126 @@ public final class Outgoing public synchronized void finished(BasicStream is) { - // - // The state might be StateLocalException if there was a - // timeout in invoke(). - // - if(_state == StateInProgress) - { - _is.swap(is); - byte status = _is.readByte(); - - switch((int)status) - { - case DispatchStatus._DispatchOK: - { - // - // Input and output parameters are always sent in an - // encapsulation, which makes it possible to forward - // oneway requests as blobs. - // - _is.startReadEncaps(); - _state = StateOK; - break; - } - - case DispatchStatus._DispatchUserException: - { - // - // Input and output parameters are always sent in an - // encapsulation, which makes it possible to forward - // oneway requests as blobs. - // - _is.startReadEncaps(); - _state = StateUserException; - break; - } - - case DispatchStatus._DispatchObjectNotExist: - case DispatchStatus._DispatchFacetNotExist: - case DispatchStatus._DispatchOperationNotExist: + assert(_reference.mode == Reference.ModeTwoway); // Can only be called for twoways. + + assert(_state <= StateInProgress); + + byte status = _is.readByte(); + + switch((int)status) + { + case DispatchStatus._DispatchOK: + { + // + // Input and output parameters are always sent in an + // encapsulation, which makes it possible to forward + // oneway requests as blobs. + // + _is.startReadEncaps(); + _state = StateOK; + break; + } + + case DispatchStatus._DispatchUserException: + { + // + // Input and output parameters are always sent in an + // encapsulation, which makes it possible to forward + // oneway requests as blobs. + // + _is.startReadEncaps(); + _state = StateUserException; + break; + } + + case DispatchStatus._DispatchObjectNotExist: + case DispatchStatus._DispatchFacetNotExist: + case DispatchStatus._DispatchOperationNotExist: + { + _state = StateLocalException; + + Ice.RequestFailedException ex = null; + switch((int)status) { - _state = StateLocalException; - - Ice.RequestFailedException ex = null; - switch((int)status) + case DispatchStatus._DispatchObjectNotExist: { - case DispatchStatus._DispatchObjectNotExist: - { - ex = new Ice.ObjectNotExistException(); - break; - } - - case DispatchStatus._DispatchFacetNotExist: - { - ex = new Ice.FacetNotExistException(); - break; - } - - case DispatchStatus._DispatchOperationNotExist: - { - ex = new Ice.OperationNotExistException(); - break; - } - - default: - { - assert(false); - break; - } + ex = new Ice.ObjectNotExistException(); + break; } - - ex.id = new Ice.Identity(); - ex.id.__read(_is); - ex.facet = _is.readStringSeq(); - ex.operation = _is.readString(); - _exception = ex; - break; - } - - case DispatchStatus._DispatchUnknownException: - case DispatchStatus._DispatchUnknownLocalException: - case DispatchStatus._DispatchUnknownUserException: - { - _state = StateLocalException; - - Ice.UnknownException ex = null; - switch((int)status) + + case DispatchStatus._DispatchFacetNotExist: { - case DispatchStatus._DispatchUnknownException: - { - ex = new Ice.UnknownException(); - break; - } - - case DispatchStatus._DispatchUnknownLocalException: - { - ex = new Ice.UnknownLocalException(); - break; - } - - case DispatchStatus._DispatchUnknownUserException: - { - ex = new Ice.UnknownUserException(); - break; - } - - default: - { - assert(false); - break; - } + ex = new Ice.FacetNotExistException(); + break; } - - ex.unknown = _is.readString(); - _exception = ex; - break; - } + + case DispatchStatus._DispatchOperationNotExist: + { + ex = new Ice.OperationNotExistException(); + break; + } + + default: + { + assert(false); + break; + } + } - default: - { - _state = StateLocalException; - _exception = new Ice.UnknownReplyStatusException(); - break; - } - } - } + ex.id = new Ice.Identity(); + ex.id.__read(_is); + ex.facet = _is.readStringSeq(); + ex.operation = _is.readString(); + _exception = ex; + break; + } + + case DispatchStatus._DispatchUnknownException: + case DispatchStatus._DispatchUnknownLocalException: + case DispatchStatus._DispatchUnknownUserException: + { + _state = StateLocalException; + + Ice.UnknownException ex = null; + switch((int)status) + { + case DispatchStatus._DispatchUnknownException: + { + ex = new Ice.UnknownException(); + break; + } + + case DispatchStatus._DispatchUnknownLocalException: + { + ex = new Ice.UnknownLocalException(); + break; + } + + case DispatchStatus._DispatchUnknownUserException: + { + ex = new Ice.UnknownUserException(); + break; + } + + default: + { + assert(false); + break; + } + } + + ex.unknown = _is.readString(); + _exception = ex; + break; + } + + default: + { + _state = StateLocalException; + _exception = new Ice.UnknownReplyStatusException(); + break; + } + } notify(); } @@ -330,16 +338,13 @@ public final class Outgoing public synchronized void finished(Ice.LocalException ex) { - // - // The state might be StateLocalException if there was a - // timeout in invoke(). - // - if(_state == StateInProgress) - { - _state = StateLocalException; - _exception = ex; - notify(); - } + assert(_reference.mode == Reference.ModeTwoway); // Can only be called for twoways. + + assert(_state <= StateInProgress); + + _state = StateLocalException; + _exception = ex; + notify(); } public BasicStream |