diff options
author | Benoit Foucher <benoit@zeroc.com> | 2007-11-27 11:58:35 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2007-11-27 11:58:35 +0100 |
commit | 47f800495093fd7679a315e2d730fea22f6135b7 (patch) | |
tree | a7b8d3488f3841367dd03d10cae293f36fd10481 /java/src/IceInternal/Outgoing.java | |
parent | Fixed SystemException to no longer derive from LocalException (diff) | |
download | ice-47f800495093fd7679a315e2d730fea22f6135b7.tar.bz2 ice-47f800495093fd7679a315e2d730fea22f6135b7.tar.xz ice-47f800495093fd7679a315e2d730fea22f6135b7.zip |
- Added support for non-blocking AMI/batch requests, connection
creation.
- Added support for AMI oneway requests.
- Changed collocation optimization to not perform any DNS lookups.
Diffstat (limited to 'java/src/IceInternal/Outgoing.java')
-rw-r--r-- | java/src/IceInternal/Outgoing.java | 217 |
1 files changed, 135 insertions, 82 deletions
diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java index 85cf0c1ce9b..fd22795c6cf 100644 --- a/java/src/IceInternal/Outgoing.java +++ b/java/src/IceInternal/Outgoing.java @@ -9,19 +9,19 @@ package IceInternal; -public final class Outgoing +public final class Outgoing implements OutgoingMessageCallback { public - Outgoing(Ice.ConnectionI connection, Reference ref, String operation, Ice.OperationMode mode, - java.util.Map context, boolean compress) + Outgoing(RequestHandler handler, String operation, Ice.OperationMode mode, java.util.Map context) throws LocalExceptionWrapper { - _connection = connection; - _reference = ref; _state = StateUnsent; - _is = new BasicStream(ref.getInstance()); - _os = new BasicStream(ref.getInstance()); - _compress = compress; + _sent = false; + _handler = handler; + + Instance instance = _handler.getReference().getInstance(); + _is = new BasicStream(instance); + _os = new BasicStream(instance); writeHeader(operation, mode, context); } @@ -30,13 +30,13 @@ public final class Outgoing // These functions allow this object to be reused, rather than reallocated. // public void - reset(Reference ref, String operation, Ice.OperationMode mode, java.util.Map context, boolean compress) + reset(RequestHandler handler, String operation, Ice.OperationMode mode, java.util.Map context) throws LocalExceptionWrapper { - _reference = ref; _state = StateUnsent; _exception = null; - _compress = compress; + _sent = false; + _handler = handler; writeHeader(operation, mode, context); } @@ -57,41 +57,39 @@ public final class Outgoing _os.endWriteEncaps(); - switch(_reference.getMode()) + switch(_handler.getReference().getMode()) { case Reference.ModeTwoway: { - // - // We let all exceptions raised by sending directly - // propagate to the caller, because they can be - // retried without violating "at-most-once". In case - // of such exceptions, the connection object does not - // call back on this object, so we don't need to lock - // the mutex, keep track of state, or save exceptions. - // - _connection.sendRequest(_os, this, _compress); + _state = StateInProgress; - // - // Wait until the request has completed, or until the - // request times out. - // + Ice.ConnectionI connection = _handler.sendRequest(this); boolean timedOut = false; synchronized(this) { + // - // It's possible that the request has already - // completed, due to a regular response, or because of - // an exception. So we only change the state to "in - // progress" if it is still "unsent". + // If the request is being sent in the background we first wait for the + // sent notification. // - if(_state == StateUnsent) + while(_state != StateFailed && !_sent) { - _state = StateInProgress; + try + { + wait(); + } + catch(java.lang.InterruptedException ex) + { + } } - - int timeout = _connection.timeout(); + + // + // Wait until the request has completed, or until the request + // times out. + // + int timeout = connection.timeout(); while(_state == StateInProgress && !timedOut) { try @@ -99,7 +97,7 @@ public final class Outgoing if(timeout >= 0) { wait(timeout); - + if(_state == StateInProgress) { timedOut = true; @@ -115,15 +113,15 @@ public final class Outgoing } } } - + if(timedOut) { // // Must be called outside the synchronization of // this object // - _connection.exception(new Ice.TimeoutException()); - + connection.exception(new Ice.TimeoutException()); + // // We must wait until the exception set above has // propagated to this Outgoing object. @@ -142,11 +140,11 @@ public final class Outgoing } } } - + if(_exception != null) { _exception.fillInStackTrace(); - + // // A CloseConnectionException indicates graceful // server shutdown, and is therefore always repeatable @@ -158,12 +156,13 @@ public final class Outgoing // An ObjectNotExistException can always be retried as // well without violating "at-most-once". // - if(_exception instanceof Ice.CloseConnectionException || + if(!_sent || + _exception instanceof Ice.CloseConnectionException || _exception instanceof Ice.ObjectNotExistException) { throw _exception; } - + // // Throw the exception wrapped in a LocalExceptionWrapper, to // indicate that the request cannot be resent without @@ -171,30 +170,68 @@ public final class Outgoing // throw new LocalExceptionWrapper(_exception, false); } - + if(_state == StateUserException) { return false; } - - assert(_state == StateOK); - break; + else + { + assert(_state == StateOK); + return true; + } + } case Reference.ModeOneway: case Reference.ModeDatagram: { - // - // For oneway and datagram requests, the connection object - // never calls back on this object. Therefore we don't - // need to lock the mutex or save exceptions. We simply - // let all exceptions from sending propagate to the - // caller, because such exceptions can be retried without - // violating "at-most-once". - // - _state = StateInProgress; - _connection.sendRequest(_os, null, _compress); - break; + try + { + _state = StateInProgress; + if(_handler.sendRequest(this) != null) + { + // + // If the handler returns the connection, we must wait for the sent callback. + // + synchronized(this) + { + while(_state != StateFailed && !_sent) + { + try + { + wait(); + } + catch(java.lang.InterruptedException ex) + { + } + } + + if(_exception != null) + { + assert(!_sent); + throw _exception; + } + } + } + return true; + } + catch(Ice.LocalException ex) // Java specfic work-around (see ConnectionI.sendRequest()) + { + if(!_sent) // The send might have failed but the request might still be sent... + { + throw ex; + } + else + { + // + // We wrap the exception into a LocalExceptionWrapper to indicate that + // the request cannot be resent without potentially violating the + // "at-most-once" principle. + // + throw new IceInternal.LocalExceptionWrapper(ex, false); + } + } } case Reference.ModeBatchOneway: @@ -206,12 +243,13 @@ public final class Outgoing // apply. // _state = StateInProgress; - _connection.finishBatchRequest(_os, _compress); - break; + _handler.finishBatchRequest(_os); + return true; } } - - return true; + + assert(false); + return false; } public void @@ -225,10 +263,10 @@ public final class Outgoing // must notify the connection about that we give up ownership // of the batch stream. // - int mode = _reference.getMode(); + int mode = _handler.getReference().getMode(); if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) { - _connection.abortBatchRequest(); + _handler.abortBatchRequest(); // // If we abort a batch requests, we cannot retry, because @@ -242,10 +280,31 @@ public final class Outgoing throw ex; } + public void + sent(boolean notify) + { + if(notify) + { + synchronized(this) + { + _sent = true; + notify(); + } + } + else + { + // + // No synchronization is necessary if called from sendRequest() because the connection + // send mutex is locked and no other threads can call on Outgoing until it's released. + // + _sent = true; + } + } + public synchronized void finished(BasicStream is) { - assert(_reference.getMode() == Reference.ModeTwoway); // Can only be called for twoways. + assert(_handler.getReference().getMode() == Reference.ModeTwoway); // Only for twoways. assert(_state <= StateInProgress); @@ -390,11 +449,8 @@ public final class Outgoing public synchronized void finished(Ice.LocalException ex) { - assert(_reference.getMode() == Reference.ModeTwoway); // Can only be called for twoways. - assert(_state <= StateInProgress); - - _state = StateLocalException; + _state = StateFailed; _exception = ex; notify(); } @@ -415,7 +471,7 @@ public final class Outgoing writeHeader(String operation, Ice.OperationMode mode, java.util.Map context) throws LocalExceptionWrapper { - switch(_reference.getMode()) + switch(_handler.getReference().getMode()) { case Reference.ModeTwoway: case Reference.ModeOneway: @@ -428,19 +484,19 @@ public final class Outgoing case Reference.ModeBatchOneway: case Reference.ModeBatchDatagram: { - _connection.prepareBatchRequest(_os); + _handler.prepareBatchRequest(_os); break; } } try { - _reference.getIdentity().__write(_os); + _handler.getReference().getIdentity().__write(_os); // // For compatibility with the old FacetPath. // - String facet = _reference.getFacet(); + String facet = _handler.getReference().getFacet(); if(facet == null || facet.length() == 0) { _os.writeStringSeq(null); @@ -467,10 +523,8 @@ public final class Outgoing // // Implicit context // - Ice.ImplicitContextI implicitContext = - _reference.getInstance().getImplicitContext(); - - java.util.Map prxContext = _reference.getContext(); + Ice.ImplicitContextI implicitContext = _handler.getReference().getInstance().getImplicitContext(); + java.util.Map prxContext = _handler.getReference().getContext(); if(implicitContext == null) { @@ -495,8 +549,11 @@ public final class Outgoing } } - private Ice.ConnectionI _connection; - private Reference _reference; + private RequestHandler _handler; + private BasicStream _is; + private BasicStream _os; + private boolean _sent; + private Ice.LocalException _exception; private static final int StateUnsent = 0; @@ -504,12 +561,8 @@ public final class Outgoing private static final int StateOK = 2; private static final int StateUserException = 3; private static final int StateLocalException = 4; + private static final int StateFailed = 5; private int _state; - private BasicStream _is; - private BasicStream _os; - - private boolean _compress; - public Outgoing next; // For use by Ice._ObjectDelM } |