diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
commit | a4f93259dc3494d98addf38e69b87eb557d432b3 (patch) | |
tree | d2b78bb5cea24e33dc1b46be22dba6167e96c9ed /java/src/IceInternal/BatchOutgoing.java | |
parent | Fix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff) | |
download | ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2 ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip |
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'java/src/IceInternal/BatchOutgoing.java')
-rw-r--r-- | java/src/IceInternal/BatchOutgoing.java | 163 |
1 files changed, 112 insertions, 51 deletions
diff --git a/java/src/IceInternal/BatchOutgoing.java b/java/src/IceInternal/BatchOutgoing.java index 65795d25c92..b60e29268aa 100644 --- a/java/src/IceInternal/BatchOutgoing.java +++ b/java/src/IceInternal/BatchOutgoing.java @@ -15,102 +15,145 @@ import Ice.Instrumentation.InvocationObserver; public final class BatchOutgoing implements OutgoingMessageCallback { public - BatchOutgoing(Ice.ConnectionI connection, Instance instance, InvocationObserver observer) + BatchOutgoing(Ice.ConnectionI connection, Instance instance, String op) { _connection = connection; _sent = false; _os = new BasicStream(instance, Protocol.currentProtocolEncoding); - _observer = observer; + _observer = IceInternal.ObserverHelper.get(instance, op); } public - BatchOutgoing(RequestHandler handler, InvocationObserver observer) + BatchOutgoing(Ice.ObjectPrxHelperBase proxy, String op) { - _handler = handler; + _proxy = proxy; _sent = false; - _os = new BasicStream(handler.getReference().getInstance(), Protocol.currentProtocolEncoding); - _observer = observer; - Protocol.checkSupportedProtocol(_handler.getReference().getProtocol()); + _os = new BasicStream(proxy.__reference().getInstance(), Protocol.currentProtocolEncoding); + _observer = IceInternal.ObserverHelper.get(proxy, op); + Protocol.checkSupportedProtocol(_proxy.__reference().getProtocol()); } public void invoke() { - assert(_handler != null || _connection != null); + assert(_proxy != null || _connection != null); - int timeout; if(_connection != null) { if(_connection.flushBatchRequests(this)) { return; } - timeout = -1; - } - else - { - try + + synchronized(this) { - if(_handler.sendRequest(this)) + while(_exception == null && !_sent) { - return; + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + if(_exception != null) + { + throw _exception; } } - catch(IceInternal.LocalExceptionWrapper ex) - { - throw ex.get(); - } - timeout = _handler.getReference().getInvocationTimeout(); + return; } - boolean timedOut = false; - synchronized(this) + RequestHandler handler = null; + try { - if(timeout > 0) + handler = _proxy.__getRequestHandler(false); + if(handler.sendRequest(this)) { - long now = Time.currentMonotonicTimeMillis(); - long deadline = now + timeout; - while(_exception == null && !_sent && !timedOut) + return; + } + + boolean timedOut = false; + synchronized(this) + { + int timeout = _proxy.__reference().getInvocationTimeout(); + if(timeout > 0) { - try + long now = Time.currentMonotonicTimeMillis(); + long deadline = now + timeout; + while(_exception == null && !_sent && !timedOut) { - wait(deadline - now); - if(_exception == null && !_sent) + try + { + wait(deadline - now); + if(_exception == null && !_sent) + { + now = Time.currentMonotonicTimeMillis(); + timedOut = now >= deadline; + } + } + catch(InterruptedException ex) { - now = Time.currentMonotonicTimeMillis(); - timedOut = now >= deadline; } } - catch(InterruptedException ex) + } + else + { + while(_exception == null && !_sent) { + try + { + wait(); + } + catch(InterruptedException ex) + { + } } } } - else + + if(timedOut) { - while(_exception == null && !_sent) + handler.requestTimedOut(this); + + synchronized(this) { - try - { - wait(); - } - catch(InterruptedException ex) + while(_exception == null) { + try + { + wait(); + } + catch(InterruptedException ex) + { + } } } } + + if(_exception != null) + { + throw (Ice.Exception)_exception.fillInStackTrace(); + } } - - if(timedOut) + catch(RetryException ex) { - _handler.requestTimedOut(this); - assert(_exception != null); + // + // Clear request handler but don't retry or throw. Retrying + // isn't useful, there were no batch requests associated with + // the proxy's request handler. + // + _proxy.__setRequestHandler(handler, null); } - - if(_exception != null) + catch(Ice.Exception ex) { - _exception.fillInStackTrace(); - throw _exception; + _proxy.__setRequestHandler(handler, null); // Clear request handler + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } + throw ex; // Throw to notify the user that batch requests were potentially lost. } } @@ -120,6 +163,12 @@ public final class BatchOutgoing implements OutgoingMessageCallback return connection.flushBatchRequests(this); } + public void + invokeCollocated(CollocatedRequestHandler handler) + { + handler.invokeBatchRequests(this); + } + synchronized public void sent() { @@ -133,7 +182,7 @@ public final class BatchOutgoing implements OutgoingMessageCallback } public synchronized void - finished(Ice.LocalException ex, boolean sent) + finished(Ice.Exception ex, boolean sent) { if(_remoteObserver != null) { @@ -164,11 +213,23 @@ public final class BatchOutgoing implements OutgoingMessageCallback } } - private RequestHandler _handler; + public void attachCollocatedObserver(int requestId) + { + if(_observer != null) + { + _remoteObserver = _observer.getCollocatedObserver(requestId, _os.size() - Protocol.headerSize - 4); + if(_remoteObserver != null) + { + _remoteObserver.attach(); + } + } + } + + private Ice.ObjectPrxHelperBase _proxy; private Ice.ConnectionI _connection; private BasicStream _os; private boolean _sent; - private Ice.LocalException _exception; + private Ice.Exception _exception; private InvocationObserver _observer; private Observer _remoteObserver; |