summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/Outgoing.java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2007-11-27 11:58:35 +0100
committerBenoit Foucher <benoit@zeroc.com>2007-11-27 11:58:35 +0100
commit47f800495093fd7679a315e2d730fea22f6135b7 (patch)
treea7b8d3488f3841367dd03d10cae293f36fd10481 /java/src/IceInternal/Outgoing.java
parentFixed SystemException to no longer derive from LocalException (diff)
downloadice-47f800495093fd7679a315e2d730fea22f6135b7.tar.bz2
ice-47f800495093fd7679a315e2d730fea22f6135b7.tar.xz
ice-47f800495093fd7679a315e2d730fea22f6135b7.zip
- Added support for non-blocking AMI/batch requests, connection
creation. - Added support for AMI oneway requests. - Changed collocation optimization to not perform any DNS lookups.
Diffstat (limited to 'java/src/IceInternal/Outgoing.java')
-rw-r--r--java/src/IceInternal/Outgoing.java217
1 files changed, 135 insertions, 82 deletions
diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java
index 85cf0c1ce9b..fd22795c6cf 100644
--- a/java/src/IceInternal/Outgoing.java
+++ b/java/src/IceInternal/Outgoing.java
@@ -9,19 +9,19 @@
package IceInternal;
-public final class Outgoing
+public final class Outgoing implements OutgoingMessageCallback
{
public
- Outgoing(Ice.ConnectionI connection, Reference ref, String operation, Ice.OperationMode mode,
- java.util.Map context, boolean compress)
+ Outgoing(RequestHandler handler, String operation, Ice.OperationMode mode, java.util.Map context)
throws LocalExceptionWrapper
{
- _connection = connection;
- _reference = ref;
_state = StateUnsent;
- _is = new BasicStream(ref.getInstance());
- _os = new BasicStream(ref.getInstance());
- _compress = compress;
+ _sent = false;
+ _handler = handler;
+
+ Instance instance = _handler.getReference().getInstance();
+ _is = new BasicStream(instance);
+ _os = new BasicStream(instance);
writeHeader(operation, mode, context);
}
@@ -30,13 +30,13 @@ public final class Outgoing
// These functions allow this object to be reused, rather than reallocated.
//
public void
- reset(Reference ref, String operation, Ice.OperationMode mode, java.util.Map context, boolean compress)
+ reset(RequestHandler handler, String operation, Ice.OperationMode mode, java.util.Map context)
throws LocalExceptionWrapper
{
- _reference = ref;
_state = StateUnsent;
_exception = null;
- _compress = compress;
+ _sent = false;
+ _handler = handler;
writeHeader(operation, mode, context);
}
@@ -57,41 +57,39 @@ public final class Outgoing
_os.endWriteEncaps();
- switch(_reference.getMode())
+ switch(_handler.getReference().getMode())
{
case Reference.ModeTwoway:
{
- //
- // We let all exceptions raised by sending directly
- // propagate to the caller, because they can be
- // retried without violating "at-most-once". In case
- // of such exceptions, the connection object does not
- // call back on this object, so we don't need to lock
- // the mutex, keep track of state, or save exceptions.
- //
- _connection.sendRequest(_os, this, _compress);
+ _state = StateInProgress;
- //
- // Wait until the request has completed, or until the
- // request times out.
- //
+ Ice.ConnectionI connection = _handler.sendRequest(this);
boolean timedOut = false;
synchronized(this)
{
+
//
- // It's possible that the request has already
- // completed, due to a regular response, or because of
- // an exception. So we only change the state to "in
- // progress" if it is still "unsent".
+ // If the request is being sent in the background we first wait for the
+ // sent notification.
//
- if(_state == StateUnsent)
+ while(_state != StateFailed && !_sent)
{
- _state = StateInProgress;
+ try
+ {
+ wait();
+ }
+ catch(java.lang.InterruptedException ex)
+ {
+ }
}
-
- int timeout = _connection.timeout();
+
+ //
+ // Wait until the request has completed, or until the request
+ // times out.
+ //
+ int timeout = connection.timeout();
while(_state == StateInProgress && !timedOut)
{
try
@@ -99,7 +97,7 @@ public final class Outgoing
if(timeout >= 0)
{
wait(timeout);
-
+
if(_state == StateInProgress)
{
timedOut = true;
@@ -115,15 +113,15 @@ public final class Outgoing
}
}
}
-
+
if(timedOut)
{
//
// Must be called outside the synchronization of
// this object
//
- _connection.exception(new Ice.TimeoutException());
-
+ connection.exception(new Ice.TimeoutException());
+
//
// We must wait until the exception set above has
// propagated to this Outgoing object.
@@ -142,11 +140,11 @@ public final class Outgoing
}
}
}
-
+
if(_exception != null)
{
_exception.fillInStackTrace();
-
+
//
// A CloseConnectionException indicates graceful
// server shutdown, and is therefore always repeatable
@@ -158,12 +156,13 @@ public final class Outgoing
// An ObjectNotExistException can always be retried as
// well without violating "at-most-once".
//
- if(_exception instanceof Ice.CloseConnectionException ||
+ if(!_sent ||
+ _exception instanceof Ice.CloseConnectionException ||
_exception instanceof Ice.ObjectNotExistException)
{
throw _exception;
}
-
+
//
// Throw the exception wrapped in a LocalExceptionWrapper, to
// indicate that the request cannot be resent without
@@ -171,30 +170,68 @@ public final class Outgoing
//
throw new LocalExceptionWrapper(_exception, false);
}
-
+
if(_state == StateUserException)
{
return false;
}
-
- assert(_state == StateOK);
- break;
+ else
+ {
+ assert(_state == StateOK);
+ return true;
+ }
+
}
case Reference.ModeOneway:
case Reference.ModeDatagram:
{
- //
- // For oneway and datagram requests, the connection object
- // never calls back on this object. Therefore we don't
- // need to lock the mutex or save exceptions. We simply
- // let all exceptions from sending propagate to the
- // caller, because such exceptions can be retried without
- // violating "at-most-once".
- //
- _state = StateInProgress;
- _connection.sendRequest(_os, null, _compress);
- break;
+ try
+ {
+ _state = StateInProgress;
+ if(_handler.sendRequest(this) != null)
+ {
+ //
+ // If the handler returns the connection, we must wait for the sent callback.
+ //
+ synchronized(this)
+ {
+ while(_state != StateFailed && !_sent)
+ {
+ try
+ {
+ wait();
+ }
+ catch(java.lang.InterruptedException ex)
+ {
+ }
+ }
+
+ if(_exception != null)
+ {
+ assert(!_sent);
+ throw _exception;
+ }
+ }
+ }
+ return true;
+ }
+ catch(Ice.LocalException ex) // Java specfic work-around (see ConnectionI.sendRequest())
+ {
+ if(!_sent) // The send might have failed but the request might still be sent...
+ {
+ throw ex;
+ }
+ else
+ {
+ //
+ // We wrap the exception into a LocalExceptionWrapper to indicate that
+ // the request cannot be resent without potentially violating the
+ // "at-most-once" principle.
+ //
+ throw new IceInternal.LocalExceptionWrapper(ex, false);
+ }
+ }
}
case Reference.ModeBatchOneway:
@@ -206,12 +243,13 @@ public final class Outgoing
// apply.
//
_state = StateInProgress;
- _connection.finishBatchRequest(_os, _compress);
- break;
+ _handler.finishBatchRequest(_os);
+ return true;
}
}
-
- return true;
+
+ assert(false);
+ return false;
}
public void
@@ -225,10 +263,10 @@ public final class Outgoing
// must notify the connection about that we give up ownership
// of the batch stream.
//
- int mode = _reference.getMode();
+ int mode = _handler.getReference().getMode();
if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
{
- _connection.abortBatchRequest();
+ _handler.abortBatchRequest();
//
// If we abort a batch requests, we cannot retry, because
@@ -242,10 +280,31 @@ public final class Outgoing
throw ex;
}
+ public void
+ sent(boolean notify)
+ {
+ if(notify)
+ {
+ synchronized(this)
+ {
+ _sent = true;
+ notify();
+ }
+ }
+ else
+ {
+ //
+ // No synchronization is necessary if called from sendRequest() because the connection
+ // send mutex is locked and no other threads can call on Outgoing until it's released.
+ //
+ _sent = true;
+ }
+ }
+
public synchronized void
finished(BasicStream is)
{
- assert(_reference.getMode() == Reference.ModeTwoway); // Can only be called for twoways.
+ assert(_handler.getReference().getMode() == Reference.ModeTwoway); // Only for twoways.
assert(_state <= StateInProgress);
@@ -390,11 +449,8 @@ public final class Outgoing
public synchronized void
finished(Ice.LocalException ex)
{
- assert(_reference.getMode() == Reference.ModeTwoway); // Can only be called for twoways.
-
assert(_state <= StateInProgress);
-
- _state = StateLocalException;
+ _state = StateFailed;
_exception = ex;
notify();
}
@@ -415,7 +471,7 @@ public final class Outgoing
writeHeader(String operation, Ice.OperationMode mode, java.util.Map context)
throws LocalExceptionWrapper
{
- switch(_reference.getMode())
+ switch(_handler.getReference().getMode())
{
case Reference.ModeTwoway:
case Reference.ModeOneway:
@@ -428,19 +484,19 @@ public final class Outgoing
case Reference.ModeBatchOneway:
case Reference.ModeBatchDatagram:
{
- _connection.prepareBatchRequest(_os);
+ _handler.prepareBatchRequest(_os);
break;
}
}
try
{
- _reference.getIdentity().__write(_os);
+ _handler.getReference().getIdentity().__write(_os);
//
// For compatibility with the old FacetPath.
//
- String facet = _reference.getFacet();
+ String facet = _handler.getReference().getFacet();
if(facet == null || facet.length() == 0)
{
_os.writeStringSeq(null);
@@ -467,10 +523,8 @@ public final class Outgoing
//
// Implicit context
//
- Ice.ImplicitContextI implicitContext =
- _reference.getInstance().getImplicitContext();
-
- java.util.Map prxContext = _reference.getContext();
+ Ice.ImplicitContextI implicitContext = _handler.getReference().getInstance().getImplicitContext();
+ java.util.Map prxContext = _handler.getReference().getContext();
if(implicitContext == null)
{
@@ -495,8 +549,11 @@ public final class Outgoing
}
}
- private Ice.ConnectionI _connection;
- private Reference _reference;
+ private RequestHandler _handler;
+ private BasicStream _is;
+ private BasicStream _os;
+ private boolean _sent;
+
private Ice.LocalException _exception;
private static final int StateUnsent = 0;
@@ -504,12 +561,8 @@ public final class Outgoing
private static final int StateOK = 2;
private static final int StateUserException = 3;
private static final int StateLocalException = 4;
+ private static final int StateFailed = 5;
private int _state;
- private BasicStream _is;
- private BasicStream _os;
-
- private boolean _compress;
-
public Outgoing next; // For use by Ice._ObjectDelM
}