summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/Outgoing.java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
commita4f93259dc3494d98addf38e69b87eb557d432b3 (patch)
treed2b78bb5cea24e33dc1b46be22dba6167e96c9ed /java/src/IceInternal/Outgoing.java
parentFix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff)
downloadice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'java/src/IceInternal/Outgoing.java')
-rw-r--r--java/src/IceInternal/Outgoing.java279
1 files changed, 165 insertions, 114 deletions
diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java
index c00a99a2b2b..ac06172ce72 100644
--- a/java/src/IceInternal/Outgoing.java
+++ b/java/src/IceInternal/Outgoing.java
@@ -15,44 +15,53 @@ import Ice.Instrumentation.InvocationObserver;
public final class Outgoing implements OutgoingMessageCallback
{
public
- Outgoing(RequestHandler handler, String operation, Ice.OperationMode mode, java.util.Map<String, String> context,
- InvocationObserver observer)
- throws LocalExceptionWrapper
+ Outgoing(Ice.ObjectPrxHelperBase proxy, String op, Ice.OperationMode mode, java.util.Map<String, String> context,
+ boolean explicitCtx)
{
+ Reference ref = proxy.__reference();
_state = StateUnsent;
- _exceptionWrapper = false;
- _exceptionWrapperRetry = false;
_sent = false;
- _handler = handler;
- _observer = observer;
- _encoding = Protocol.getCompatibleEncoding(handler.getReference().getEncoding());
- _os = new BasicStream(_handler.getReference().getInstance(), Protocol.currentProtocolEncoding);
+ _proxy = proxy;
+ _mode = mode;
+ _handler = null;
+ _observer = IceInternal.ObserverHelper.get(proxy, op, context);
+ _encoding = Protocol.getCompatibleEncoding(ref.getEncoding());
+ _os = new BasicStream(ref.getInstance(), Protocol.currentProtocolEncoding);
- Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_handler.getReference().getProtocol()));
+ Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(ref.getProtocol()));
- writeHeader(operation, mode, context);
+ writeHeader(op, mode, context, explicitCtx);
}
//
// These functions allow this object to be reused, rather than reallocated.
//
public void
- reset(RequestHandler handler, String operation, Ice.OperationMode mode, java.util.Map<String, String> context,
- InvocationObserver observer)
- throws LocalExceptionWrapper
+ reset(Ice.ObjectPrxHelperBase proxy, String op, Ice.OperationMode mode, java.util.Map<String, String> context,
+ boolean explicitCtx)
{
+ Reference ref = proxy.__reference();
_state = StateUnsent;
_exception = null;
- _exceptionWrapper = false;
- _exceptionWrapperRetry = false;
_sent = false;
- _handler = handler;
- _observer = observer;
- _encoding = Protocol.getCompatibleEncoding(handler.getReference().getEncoding());
+ _proxy = proxy;
+ _mode = mode;
+ _handler = null;
+ _observer = IceInternal.ObserverHelper.get(proxy, op, context);
+ _encoding = Protocol.getCompatibleEncoding(ref.getEncoding());
- Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_handler.getReference().getProtocol()));
+ Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(ref.getProtocol()));
- writeHeader(operation, mode, context);
+ writeHeader(op, mode, context, explicitCtx);
+ }
+
+ public void
+ detach()
+ {
+ if(_observer != null)
+ {
+ _observer.detach();
+ }
}
public void
@@ -68,17 +77,27 @@ public final class Outgoing implements OutgoingMessageCallback
// Returns true if ok, false if user exception.
public boolean
invoke()
- throws LocalExceptionWrapper
{
assert(_state == StateUnsent);
- switch(_handler.getReference().getMode())
+ int mode = _proxy.__reference().getMode();
+ if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
{
- case Reference.ModeOneway:
- case Reference.ModeDatagram:
- case Reference.ModeTwoway:
+ _state = StateInProgress;
+ _handler.finishBatchRequest(_os);
+ return true;
+ }
+
+ int cnt = 0;
+ while(true)
+ {
+ try
{
_state = StateInProgress;
+ _exception = null;
+ _sent = false;
+
+ _handler = _proxy.__getRequestHandler(false);
if(_handler.sendRequest(this)) // Request sent and no response expected, we're done.
{
@@ -92,7 +111,7 @@ public final class Outgoing implements OutgoingMessageCallback
// If the handler says it's not finished, we wait until we're done.
//
- int invocationTimeout = _handler.getReference().getInvocationTimeout();
+ int invocationTimeout = _proxy.__reference().getInvocationTimeout();
if(invocationTimeout > 0)
{
long now = Time.currentMonotonicTimeMillis();
@@ -131,71 +150,76 @@ public final class Outgoing implements OutgoingMessageCallback
if(timedOut)
{
_handler.requestTimedOut(this);
- assert(_exception != null);
+
+ //
+ // Wait for the exception to propagate. It's possible the request handler ignores
+ // the timeout if there was a failure shortly before requestTimedOut got called.
+ // In this case, the exception should be set on the Outgoing.
+ //
+ synchronized(this)
+ {
+ while(_exception == null)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+ }
}
if(_exception != null)
{
- _exception.fillInStackTrace();
- if(_exceptionWrapper)
+ throw (Ice.Exception)_exception.fillInStackTrace();
+ }
+ else
+ {
+ assert(_state != StateInProgress);
+ return _state == StateOK;
+ }
+ }
+ catch(RetryException ex)
+ {
+ _proxy.__setRequestHandler(_handler, null); // Clear request handler and retry.
+ }
+ catch(Ice.Exception ex)
+ {
+ try
+ {
+ Ice.IntHolder interval = new Ice.IntHolder();
+ cnt = _proxy.__handleException(ex, _handler, _mode, _sent, interval, cnt);
+ if(_observer != null)
{
- throw new LocalExceptionWrapper(_exception, _exceptionWrapperRetry);
+ _observer.retried(); // Invocation is being retried.
}
-
- //
- // A CloseConnectionException indicates graceful
- // server shutdown, and is therefore always repeatable
- // without violating "at-most-once". That's because by
- // sending a close connection message, the server
- // guarantees that all outstanding requests can safely
- // be repeated.
- //
- // An ObjectNotExistException can always be retried as
- // well without violating "at-most-once" (see the
- // implementation of the checkRetryAfterException
- // method of the ProxyFactory class for the reasons
- // why it can be useful).
- //
- if(!_sent ||
- _exception instanceof Ice.CloseConnectionException ||
- _exception instanceof Ice.ObjectNotExistException)
+ if(interval.value > 0)
{
- throw _exception;
+ try
+ {
+ Thread.sleep(interval.value);
+ }
+ catch(InterruptedException exi)
+ {
+ }
}
-
- //
- // Throw the exception wrapped in a LocalExceptionWrapper,
- // to indicate that the request cannot be resent without
- // potentially violating the "at-most-once" principle.
- //
- throw new LocalExceptionWrapper(_exception, false);
}
-
- assert(_state != StateInProgress);
- return _state == StateOK;
- }
-
- case Reference.ModeBatchOneway:
- case Reference.ModeBatchDatagram:
- {
- //
- // For batch oneways and datagrams, the same rules as for
- // regular oneways and datagrams (see comment above)
- // apply.
- //
- _state = StateInProgress;
- _handler.finishBatchRequest(_os);
- return true;
+ catch(Ice.Exception exc)
+ {
+ if(_observer != null)
+ {
+ _observer.failed(exc.ice_name());
+ }
+ throw exc;
+ }
}
}
-
- assert(false);
- return false;
}
public void
abort(Ice.LocalException ex)
- throws LocalExceptionWrapper
{
assert(_state == StateUnsent);
@@ -204,7 +228,7 @@ public final class Outgoing implements OutgoingMessageCallback
// must notify the connection about that we give up ownership
// of the batch stream.
//
- int mode = _handler.getReference().getMode();
+ int mode = _proxy.__reference().getMode();
if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
{
_handler.abortBatchRequest();
@@ -215,15 +239,21 @@ public final class Outgoing implements OutgoingMessageCallback
public boolean
send(Ice.ConnectionI connection, boolean compress, boolean response)
- throws LocalExceptionWrapper
+ throws RetryException
{
return connection.sendRequest(this, compress, response);
}
+ public void
+ invokeCollocated(CollocatedRequestHandler handler)
+ {
+ handler.invokeRequest(this);
+ }
+
synchronized public void
sent()
{
- if(_handler.getReference().getMode() != Reference.ModeTwoway)
+ if(_proxy.__reference().getMode() != Reference.ModeTwoway)
{
if(_remoteObserver != null)
{
@@ -239,7 +269,7 @@ public final class Outgoing implements OutgoingMessageCallback
public synchronized void
finished(BasicStream is)
{
- assert(_handler.getReference().getMode() == Reference.ModeTwoway); // Only for twoways.
+ assert(_proxy.__reference().getMode() == Reference.ModeTwoway); // Only for twoways.
assert(_state <= StateInProgress);
@@ -252,7 +282,7 @@ public final class Outgoing implements OutgoingMessageCallback
if(_is == null)
{
- _is = new IceInternal.BasicStream(_handler.getReference().getInstance(), Protocol.currentProtocolEncoding);
+ _is = new IceInternal.BasicStream(_proxy.__reference().getInstance(), Protocol.currentProtocolEncoding);
}
_is.swap(is);
byte replyStatus = _is.readByte();
@@ -385,7 +415,7 @@ public final class Outgoing implements OutgoingMessageCallback
}
public synchronized void
- finished(Ice.LocalException ex, boolean sent)
+ finished(Ice.Exception ex, boolean sent)
{
assert(_state <= StateInProgress);
if(_remoteObserver != null)
@@ -399,24 +429,6 @@ public final class Outgoing implements OutgoingMessageCallback
_sent = sent;
notify();
}
-
- public synchronized void
- finished(LocalExceptionWrapper ex)
- {
- if(_remoteObserver != null)
- {
- _remoteObserver.failed(ex.get().ice_name());
- _remoteObserver.detach();
- _remoteObserver = null;
- }
-
- _state = StateFailed;
- _exceptionWrapper = true;
- _exceptionWrapperRetry = ex.retry();
- _exception = ex.get();
- _sent = false;
- notify();
- }
public BasicStream
os()
@@ -516,11 +528,28 @@ public final class Outgoing implements OutgoingMessageCallback
}
}
+ public void
+ attachCollocatedObserver(int requestId)
+ {
+ if(_observer != null)
+ {
+ _remoteObserver = _observer.getCollocatedObserver(requestId, _os.size() - Protocol.headerSize - 4);
+ if(_remoteObserver != null)
+ {
+ _remoteObserver.attach();
+ }
+ }
+ }
+
private void
- writeHeader(String operation, Ice.OperationMode mode, java.util.Map<String, String> context)
- throws LocalExceptionWrapper
+ writeHeader(String operation, Ice.OperationMode mode, java.util.Map<String, String> context, boolean explicitCtx)
{
- switch(_handler.getReference().getMode())
+ if(explicitCtx && context == null)
+ {
+ context = _emptyContext;
+ }
+
+ switch(_proxy.__reference().getMode())
{
case Reference.ModeTwoway:
case Reference.ModeOneway:
@@ -533,19 +562,40 @@ public final class Outgoing implements OutgoingMessageCallback
case Reference.ModeBatchOneway:
case Reference.ModeBatchDatagram:
{
- _handler.prepareBatchRequest(_os);
+ while(true)
+ {
+ try
+ {
+ _handler = _proxy.__getRequestHandler(true);
+ _handler.prepareBatchRequest(_os);
+ break;
+ }
+ catch(RetryException ex)
+ {
+ _proxy.__setRequestHandler(_handler, null); // Clear request handler and retry.
+ }
+ catch(Ice.LocalException ex)
+ {
+ if(_observer != null)
+ {
+ _observer.failed(ex.ice_name());
+ }
+ _proxy.__setRequestHandler(_handler, null); // Clear request handler
+ throw ex;
+ }
+ }
break;
}
}
try
{
- _handler.getReference().getIdentity().__write(_os);
+ _proxy.__reference().getIdentity().__write(_os);
//
// For compatibility with the old FacetPath.
//
- String facet = _handler.getReference().getFacet();
+ String facet = _proxy.__reference().getFacet();
if(facet == null || facet.length() == 0)
{
_os.writeStringSeq(null);
@@ -572,8 +622,8 @@ public final class Outgoing implements OutgoingMessageCallback
//
// Implicit context
//
- Ice.ImplicitContextI implicitContext = _handler.getReference().getInstance().getImplicitContext();
- java.util.Map<String, String> prxContext = _handler.getReference().getContext();
+ Ice.ImplicitContextI implicitContext = _proxy.__reference().getInstance().getImplicitContext();
+ java.util.Map<String, String> prxContext = _proxy.__reference().getContext();
if(implicitContext == null)
{
@@ -591,15 +641,14 @@ public final class Outgoing implements OutgoingMessageCallback
}
}
+ private Ice.ObjectPrxHelperBase _proxy;
+ private Ice.OperationMode _mode;
private RequestHandler _handler;
private Ice.EncodingVersion _encoding;
private BasicStream _is;
private BasicStream _os;
private boolean _sent;
-
- private Ice.LocalException _exception;
- private boolean _exceptionWrapper;
- private boolean _exceptionWrapperRetry;
+ private Ice.Exception _exception;
private static final int StateUnsent = 0;
private static final int StateInProgress = 1;
@@ -612,5 +661,7 @@ public final class Outgoing implements OutgoingMessageCallback
private InvocationObserver _observer;
private RemoteObserver _remoteObserver;
- public Outgoing next; // For use by Ice._ObjectDelM
+ public Outgoing next; // For use by Ice.ObjectPrxHelperBase
+
+ private static final java.util.Map<String, String> _emptyContext = new java.util.HashMap<String, String>();
}