summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/BatchOutgoing.java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
commita4f93259dc3494d98addf38e69b87eb557d432b3 (patch)
treed2b78bb5cea24e33dc1b46be22dba6167e96c9ed /java/src/IceInternal/BatchOutgoing.java
parentFix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff)
downloadice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'java/src/IceInternal/BatchOutgoing.java')
-rw-r--r--java/src/IceInternal/BatchOutgoing.java163
1 files changed, 112 insertions, 51 deletions
diff --git a/java/src/IceInternal/BatchOutgoing.java b/java/src/IceInternal/BatchOutgoing.java
index 65795d25c92..b60e29268aa 100644
--- a/java/src/IceInternal/BatchOutgoing.java
+++ b/java/src/IceInternal/BatchOutgoing.java
@@ -15,102 +15,145 @@ import Ice.Instrumentation.InvocationObserver;
public final class BatchOutgoing implements OutgoingMessageCallback
{
public
- BatchOutgoing(Ice.ConnectionI connection, Instance instance, InvocationObserver observer)
+ BatchOutgoing(Ice.ConnectionI connection, Instance instance, String op)
{
_connection = connection;
_sent = false;
_os = new BasicStream(instance, Protocol.currentProtocolEncoding);
- _observer = observer;
+ _observer = IceInternal.ObserverHelper.get(instance, op);
}
public
- BatchOutgoing(RequestHandler handler, InvocationObserver observer)
+ BatchOutgoing(Ice.ObjectPrxHelperBase proxy, String op)
{
- _handler = handler;
+ _proxy = proxy;
_sent = false;
- _os = new BasicStream(handler.getReference().getInstance(), Protocol.currentProtocolEncoding);
- _observer = observer;
- Protocol.checkSupportedProtocol(_handler.getReference().getProtocol());
+ _os = new BasicStream(proxy.__reference().getInstance(), Protocol.currentProtocolEncoding);
+ _observer = IceInternal.ObserverHelper.get(proxy, op);
+ Protocol.checkSupportedProtocol(_proxy.__reference().getProtocol());
}
public void
invoke()
{
- assert(_handler != null || _connection != null);
+ assert(_proxy != null || _connection != null);
- int timeout;
if(_connection != null)
{
if(_connection.flushBatchRequests(this))
{
return;
}
- timeout = -1;
- }
- else
- {
- try
+
+ synchronized(this)
{
- if(_handler.sendRequest(this))
+ while(_exception == null && !_sent)
{
- return;
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+ if(_exception != null)
+ {
+ throw _exception;
}
}
- catch(IceInternal.LocalExceptionWrapper ex)
- {
- throw ex.get();
- }
- timeout = _handler.getReference().getInvocationTimeout();
+ return;
}
- boolean timedOut = false;
- synchronized(this)
+ RequestHandler handler = null;
+ try
{
- if(timeout > 0)
+ handler = _proxy.__getRequestHandler(false);
+ if(handler.sendRequest(this))
{
- long now = Time.currentMonotonicTimeMillis();
- long deadline = now + timeout;
- while(_exception == null && !_sent && !timedOut)
+ return;
+ }
+
+ boolean timedOut = false;
+ synchronized(this)
+ {
+ int timeout = _proxy.__reference().getInvocationTimeout();
+ if(timeout > 0)
{
- try
+ long now = Time.currentMonotonicTimeMillis();
+ long deadline = now + timeout;
+ while(_exception == null && !_sent && !timedOut)
{
- wait(deadline - now);
- if(_exception == null && !_sent)
+ try
+ {
+ wait(deadline - now);
+ if(_exception == null && !_sent)
+ {
+ now = Time.currentMonotonicTimeMillis();
+ timedOut = now >= deadline;
+ }
+ }
+ catch(InterruptedException ex)
{
- now = Time.currentMonotonicTimeMillis();
- timedOut = now >= deadline;
}
}
- catch(InterruptedException ex)
+ }
+ else
+ {
+ while(_exception == null && !_sent)
{
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
}
}
}
- else
+
+ if(timedOut)
{
- while(_exception == null && !_sent)
+ handler.requestTimedOut(this);
+
+ synchronized(this)
{
- try
- {
- wait();
- }
- catch(InterruptedException ex)
+ while(_exception == null)
{
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
}
}
}
+
+ if(_exception != null)
+ {
+ throw (Ice.Exception)_exception.fillInStackTrace();
+ }
}
-
- if(timedOut)
+ catch(RetryException ex)
{
- _handler.requestTimedOut(this);
- assert(_exception != null);
+ //
+ // Clear request handler but don't retry or throw. Retrying
+ // isn't useful, there were no batch requests associated with
+ // the proxy's request handler.
+ //
+ _proxy.__setRequestHandler(handler, null);
}
-
- if(_exception != null)
+ catch(Ice.Exception ex)
{
- _exception.fillInStackTrace();
- throw _exception;
+ _proxy.__setRequestHandler(handler, null); // Clear request handler
+ if(_observer != null)
+ {
+ _observer.failed(ex.ice_name());
+ }
+ throw ex; // Throw to notify the user that batch requests were potentially lost.
}
}
@@ -120,6 +163,12 @@ public final class BatchOutgoing implements OutgoingMessageCallback
return connection.flushBatchRequests(this);
}
+ public void
+ invokeCollocated(CollocatedRequestHandler handler)
+ {
+ handler.invokeBatchRequests(this);
+ }
+
synchronized public void
sent()
{
@@ -133,7 +182,7 @@ public final class BatchOutgoing implements OutgoingMessageCallback
}
public synchronized void
- finished(Ice.LocalException ex, boolean sent)
+ finished(Ice.Exception ex, boolean sent)
{
if(_remoteObserver != null)
{
@@ -164,11 +213,23 @@ public final class BatchOutgoing implements OutgoingMessageCallback
}
}
- private RequestHandler _handler;
+ public void attachCollocatedObserver(int requestId)
+ {
+ if(_observer != null)
+ {
+ _remoteObserver = _observer.getCollocatedObserver(requestId, _os.size() - Protocol.headerSize - 4);
+ if(_remoteObserver != null)
+ {
+ _remoteObserver.attach();
+ }
+ }
+ }
+
+ private Ice.ObjectPrxHelperBase _proxy;
private Ice.ConnectionI _connection;
private BasicStream _os;
private boolean _sent;
- private Ice.LocalException _exception;
+ private Ice.Exception _exception;
private InvocationObserver _observer;
private Observer _remoteObserver;