summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ConnectRequestHandler.java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
commita4f93259dc3494d98addf38e69b87eb557d432b3 (patch)
treed2b78bb5cea24e33dc1b46be22dba6167e96c9ed /java/src/IceInternal/ConnectRequestHandler.java
parentFix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff)
downloadice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'java/src/IceInternal/ConnectRequestHandler.java')
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java204
1 files changed, 91 insertions, 113 deletions
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index 0cfe4e61aac..8b2f346a997 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -59,7 +59,7 @@ public class ConnectRequestHandler
public void
prepareBatchRequest(BasicStream os)
- throws LocalExceptionWrapper
+ throws RetryException
{
synchronized(this)
{
@@ -74,11 +74,18 @@ public class ConnectRequestHandler
}
}
- if(!initialized())
+ try
{
- _batchRequestInProgress = true;
- _batchStream.swap(os);
- return;
+ if(!initialized())
+ {
+ _batchRequestInProgress = true;
+ _batchStream.swap(os);
+ return;
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ throw new RetryException(ex);
}
}
@@ -90,7 +97,7 @@ public class ConnectRequestHandler
{
synchronized(this)
{
- if(!initialized())
+ if(!initialized()) // This can't throw until _batchRequestInProgress = false
{
assert(_batchRequestInProgress);
_batchRequestInProgress = false;
@@ -117,7 +124,7 @@ public class ConnectRequestHandler
{
synchronized(this)
{
- if(!initialized())
+ if(!initialized()) // This can't throw until _batchRequestInProgress = false
{
assert(_batchRequestInProgress);
_batchRequestInProgress = false;
@@ -136,14 +143,21 @@ public class ConnectRequestHandler
public boolean
sendRequest(OutgoingMessageCallback out)
- throws LocalExceptionWrapper
+ throws RetryException
{
synchronized(this)
{
- if(!initialized())
+ try
+ {
+ if(!initialized())
+ {
+ _requests.add(new Request(out));
+ return false; // Not sent
+ }
+ }
+ catch(Ice.LocalException ex)
{
- _requests.add(new Request(out));
- return false; // Not sent
+ throw new RetryException(ex);
}
}
return out.send(_connection, _compress, _response) && !_response; // Finished if sent and no response.
@@ -151,14 +165,21 @@ public class ConnectRequestHandler
public int
sendAsyncRequest(OutgoingAsyncMessageCallback out)
- throws LocalExceptionWrapper
+ throws RetryException
{
synchronized(this)
{
- if(!initialized())
+ try
{
- _requests.add(new Request(out));
- return AsyncStatus.Queued;
+ if(!initialized())
+ {
+ _requests.add(new Request(out));
+ return AsyncStatus.Queued;
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ throw new RetryException(ex);
}
}
return out.__send(_connection, _compress, _response);
@@ -169,6 +190,11 @@ public class ConnectRequestHandler
{
synchronized(this)
{
+ if(_exception != null)
+ {
+ return; // The request has been notified of a failure already.
+ }
+
if(!initialized())
{
java.util.Iterator<Request> it = _requests.iterator();
@@ -191,8 +217,14 @@ public class ConnectRequestHandler
public void
asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync)
{
+ boolean timedOut = false;
synchronized(this)
{
+ if(_exception != null)
+ {
+ return; // The request has been notified of a failure already.
+ }
+
if(!initialized())
{
java.util.Iterator<Request> it = _requests.iterator();
@@ -201,45 +233,19 @@ public class ConnectRequestHandler
Request request = it.next();
if(request.outAsync == outAsync)
{
- outAsync.__finished(new Ice.InvocationTimeoutException(), false);
it.remove();
- return;
+ timedOut = true;
+ break;
}
}
- assert(false); // The request has to be queued if it timed out and we're not initialized yet.
- }
- }
- _connection.asyncRequestTimedOut(outAsync);
- }
-
- public Outgoing
- getOutgoing(String operation, Ice.OperationMode mode, java.util.Map<String, String> context,
- InvocationObserver observer)
- throws LocalExceptionWrapper
- {
- synchronized(this)
- {
- if(!initialized())
- {
- return new IceInternal.Outgoing(this, operation, mode, context, observer);
}
}
-
- return _connection.getOutgoing(this, operation, mode, context, observer);
- }
-
- public void
- reclaimOutgoing(Outgoing out)
- {
- synchronized(this)
+ if(timedOut)
{
- if(_connection == null)
- {
- return;
- }
+ outAsync.__finished(new Ice.InvocationTimeoutException(), false);
+ return;
}
-
- _connection.reclaimOutgoing(out);
+ _connection.asyncRequestTimedOut(outAsync);
}
public Reference
@@ -319,7 +325,6 @@ public class ConnectRequestHandler
_exception = ex;
_proxy = null; // Break cyclic reference count.
- _delegate = null; // Break cyclic reference count.
//
// If some requests were queued, we notify them of the failure. This is done from a thread
@@ -328,14 +333,14 @@ public class ConnectRequestHandler
//
if(!_requests.isEmpty())
{
- _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_reference.getInstance())
- {
- public void
- run()
- {
- flushRequestsWithException(ex);
- };
- });
+ _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem()
+ {
+ public void
+ run()
+ {
+ flushRequestsWithException();
+ };
+ });
}
notifyAll();
@@ -355,12 +360,11 @@ public class ConnectRequestHandler
}
public
- ConnectRequestHandler(Reference ref, Ice.ObjectPrx proxy, Ice._ObjectDelM delegate)
+ ConnectRequestHandler(Reference ref, Ice.ObjectPrx proxy)
{
_reference = ref;
_response = _reference.getMode() == Reference.ModeTwoway;
_proxy = (Ice.ObjectPrxHelperBase)proxy;
- _delegate = delegate;
_batchAutoFlush = ref.getInstance().initializationData().properties.getPropertyAsIntWithDefault(
"Ice.BatchAutoFlush", 1) > 0 ? true : false;
_initialized = false;
@@ -470,18 +474,25 @@ public class ConnectRequestHandler
p.remove();
}
}
- catch(final LocalExceptionWrapper ex)
+ catch(final RetryException ex)
{
+ //
+ // If the connection dies shortly after connection
+ // establishment, we don't systematically retry on
+ // RetryException. We handle the exception like it
+ // was an exception that occured while sending the
+ // request.
+ //
synchronized(this)
{
assert(_exception == null && !_requests.isEmpty());
_exception = ex.get();
- _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_reference.getInstance())
+ _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem()
{
public void
run()
{
- flushRequestsWithException(ex);
+ flushRequestsWithException();
};
});
}
@@ -492,12 +503,12 @@ public class ConnectRequestHandler
{
assert(_exception == null && !_requests.isEmpty());
_exception = ex;
- _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_reference.getInstance())
+ _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem()
{
public void
run()
{
- flushRequestsWithException(ex);
+ flushRequestsWithException();
};
});
}
@@ -505,20 +516,20 @@ public class ConnectRequestHandler
if(!sentCallbacks.isEmpty())
{
- final Instance instance = _reference.getInstance();
- instance.clientThreadPool().execute(new DispatchWorkItem(instance)
- {
- public void
- run()
- {
- for(OutgoingAsyncMessageCallback callback : sentCallbacks)
- {
- callback.__invokeSent();
- }
- };
- });
+ _reference.getInstance().clientThreadPool().execute(
+ new DispatchWorkItem()
+ {
+ public void
+ run()
+ {
+ for(OutgoingAsyncMessageCallback callback : sentCallbacks)
+ {
+ callback.__invokeSent();
+ }
+ };
+ });
}
-
+
//
// We've finished sending the queued requests and the request handler now send
// the requests over the connection directly. It's time to substitute the
@@ -530,7 +541,7 @@ public class ConnectRequestHandler
//
if(_updateRequestHandler && _exception == null)
{
- _proxy.__setRequestHandler(_delegate, new ConnectionRequestHandler(_reference, _connection, _compress));
+ _proxy.__setRequestHandler(this, new ConnectionRequestHandler(_reference, _connection, _compress));
}
synchronized(this)
@@ -542,54 +553,22 @@ public class ConnectRequestHandler
_flushing = false;
}
_proxy = null; // Break cyclic reference count.
- _delegate = null; // Break cyclic reference count.
notifyAll();
}
}
void
- flushRequestsWithException(Ice.LocalException ex)
- {
- for(Request request : _requests)
- {
- if(request.out != null)
- {
- request.out.finished(ex, false);
- }
- else if(request.outAsync != null)
- {
- request.outAsync.__finished(ex, false);
- }
- }
- _requests.clear();
- }
-
- void
- flushRequestsWithException(LocalExceptionWrapper ex)
+ flushRequestsWithException()
{
for(Request request : _requests)
{
if(request.out != null)
{
- if(request.out instanceof Outgoing)
- {
- ((Outgoing)request.out).finished(ex);
- }
- else
- {
- request.out.finished(ex.get(), false);
- }
+ request.out.finished(_exception, false);
}
else if(request.outAsync != null)
{
- if(request.outAsync instanceof OutgoingAsync)
- {
- ((OutgoingAsync)request.outAsync).__finished(ex);
- }
- else
- {
- request.outAsync.__finished(ex.get(), false);
- }
+ request.outAsync.__finished(_exception, false);
}
}
_requests.clear();
@@ -599,7 +578,6 @@ public class ConnectRequestHandler
private boolean _response;
private Ice.ObjectPrxHelperBase _proxy;
- private Ice._ObjectDelM _delegate;
private final boolean _batchAutoFlush;