diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
commit | a4f93259dc3494d98addf38e69b87eb557d432b3 (patch) | |
tree | d2b78bb5cea24e33dc1b46be22dba6167e96c9ed /java/src/IceInternal/ConnectRequestHandler.java | |
parent | Fix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff) | |
download | ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2 ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip |
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'java/src/IceInternal/ConnectRequestHandler.java')
-rw-r--r-- | java/src/IceInternal/ConnectRequestHandler.java | 204 |
1 files changed, 91 insertions, 113 deletions
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index 0cfe4e61aac..8b2f346a997 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -59,7 +59,7 @@ public class ConnectRequestHandler public void prepareBatchRequest(BasicStream os) - throws LocalExceptionWrapper + throws RetryException { synchronized(this) { @@ -74,11 +74,18 @@ public class ConnectRequestHandler } } - if(!initialized()) + try { - _batchRequestInProgress = true; - _batchStream.swap(os); - return; + if(!initialized()) + { + _batchRequestInProgress = true; + _batchStream.swap(os); + return; + } + } + catch(Ice.LocalException ex) + { + throw new RetryException(ex); } } @@ -90,7 +97,7 @@ public class ConnectRequestHandler { synchronized(this) { - if(!initialized()) + if(!initialized()) // This can't throw until _batchRequestInProgress = false { assert(_batchRequestInProgress); _batchRequestInProgress = false; @@ -117,7 +124,7 @@ public class ConnectRequestHandler { synchronized(this) { - if(!initialized()) + if(!initialized()) // This can't throw until _batchRequestInProgress = false { assert(_batchRequestInProgress); _batchRequestInProgress = false; @@ -136,14 +143,21 @@ public class ConnectRequestHandler public boolean sendRequest(OutgoingMessageCallback out) - throws LocalExceptionWrapper + throws RetryException { synchronized(this) { - if(!initialized()) + try + { + if(!initialized()) + { + _requests.add(new Request(out)); + return false; // Not sent + } + } + catch(Ice.LocalException ex) { - _requests.add(new Request(out)); - return false; // Not sent + throw new RetryException(ex); } } return out.send(_connection, _compress, _response) && !_response; // Finished if sent and no response. @@ -151,14 +165,21 @@ public class ConnectRequestHandler public int sendAsyncRequest(OutgoingAsyncMessageCallback out) - throws LocalExceptionWrapper + throws RetryException { synchronized(this) { - if(!initialized()) + try { - _requests.add(new Request(out)); - return AsyncStatus.Queued; + if(!initialized()) + { + _requests.add(new Request(out)); + return AsyncStatus.Queued; + } + } + catch(Ice.LocalException ex) + { + throw new RetryException(ex); } } return out.__send(_connection, _compress, _response); @@ -169,6 +190,11 @@ public class ConnectRequestHandler { synchronized(this) { + if(_exception != null) + { + return; // The request has been notified of a failure already. + } + if(!initialized()) { java.util.Iterator<Request> it = _requests.iterator(); @@ -191,8 +217,14 @@ public class ConnectRequestHandler public void asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync) { + boolean timedOut = false; synchronized(this) { + if(_exception != null) + { + return; // The request has been notified of a failure already. + } + if(!initialized()) { java.util.Iterator<Request> it = _requests.iterator(); @@ -201,45 +233,19 @@ public class ConnectRequestHandler Request request = it.next(); if(request.outAsync == outAsync) { - outAsync.__finished(new Ice.InvocationTimeoutException(), false); it.remove(); - return; + timedOut = true; + break; } } - assert(false); // The request has to be queued if it timed out and we're not initialized yet. - } - } - _connection.asyncRequestTimedOut(outAsync); - } - - public Outgoing - getOutgoing(String operation, Ice.OperationMode mode, java.util.Map<String, String> context, - InvocationObserver observer) - throws LocalExceptionWrapper - { - synchronized(this) - { - if(!initialized()) - { - return new IceInternal.Outgoing(this, operation, mode, context, observer); } } - - return _connection.getOutgoing(this, operation, mode, context, observer); - } - - public void - reclaimOutgoing(Outgoing out) - { - synchronized(this) + if(timedOut) { - if(_connection == null) - { - return; - } + outAsync.__finished(new Ice.InvocationTimeoutException(), false); + return; } - - _connection.reclaimOutgoing(out); + _connection.asyncRequestTimedOut(outAsync); } public Reference @@ -319,7 +325,6 @@ public class ConnectRequestHandler _exception = ex; _proxy = null; // Break cyclic reference count. - _delegate = null; // Break cyclic reference count. // // If some requests were queued, we notify them of the failure. This is done from a thread @@ -328,14 +333,14 @@ public class ConnectRequestHandler // if(!_requests.isEmpty()) { - _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_reference.getInstance()) - { - public void - run() - { - flushRequestsWithException(ex); - }; - }); + _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem() + { + public void + run() + { + flushRequestsWithException(); + }; + }); } notifyAll(); @@ -355,12 +360,11 @@ public class ConnectRequestHandler } public - ConnectRequestHandler(Reference ref, Ice.ObjectPrx proxy, Ice._ObjectDelM delegate) + ConnectRequestHandler(Reference ref, Ice.ObjectPrx proxy) { _reference = ref; _response = _reference.getMode() == Reference.ModeTwoway; _proxy = (Ice.ObjectPrxHelperBase)proxy; - _delegate = delegate; _batchAutoFlush = ref.getInstance().initializationData().properties.getPropertyAsIntWithDefault( "Ice.BatchAutoFlush", 1) > 0 ? true : false; _initialized = false; @@ -470,18 +474,25 @@ public class ConnectRequestHandler p.remove(); } } - catch(final LocalExceptionWrapper ex) + catch(final RetryException ex) { + // + // If the connection dies shortly after connection + // establishment, we don't systematically retry on + // RetryException. We handle the exception like it + // was an exception that occured while sending the + // request. + // synchronized(this) { assert(_exception == null && !_requests.isEmpty()); _exception = ex.get(); - _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_reference.getInstance()) + _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem() { public void run() { - flushRequestsWithException(ex); + flushRequestsWithException(); }; }); } @@ -492,12 +503,12 @@ public class ConnectRequestHandler { assert(_exception == null && !_requests.isEmpty()); _exception = ex; - _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_reference.getInstance()) + _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem() { public void run() { - flushRequestsWithException(ex); + flushRequestsWithException(); }; }); } @@ -505,20 +516,20 @@ public class ConnectRequestHandler if(!sentCallbacks.isEmpty()) { - final Instance instance = _reference.getInstance(); - instance.clientThreadPool().execute(new DispatchWorkItem(instance) - { - public void - run() - { - for(OutgoingAsyncMessageCallback callback : sentCallbacks) - { - callback.__invokeSent(); - } - }; - }); + _reference.getInstance().clientThreadPool().execute( + new DispatchWorkItem() + { + public void + run() + { + for(OutgoingAsyncMessageCallback callback : sentCallbacks) + { + callback.__invokeSent(); + } + }; + }); } - + // // We've finished sending the queued requests and the request handler now send // the requests over the connection directly. It's time to substitute the @@ -530,7 +541,7 @@ public class ConnectRequestHandler // if(_updateRequestHandler && _exception == null) { - _proxy.__setRequestHandler(_delegate, new ConnectionRequestHandler(_reference, _connection, _compress)); + _proxy.__setRequestHandler(this, new ConnectionRequestHandler(_reference, _connection, _compress)); } synchronized(this) @@ -542,54 +553,22 @@ public class ConnectRequestHandler _flushing = false; } _proxy = null; // Break cyclic reference count. - _delegate = null; // Break cyclic reference count. notifyAll(); } } void - flushRequestsWithException(Ice.LocalException ex) - { - for(Request request : _requests) - { - if(request.out != null) - { - request.out.finished(ex, false); - } - else if(request.outAsync != null) - { - request.outAsync.__finished(ex, false); - } - } - _requests.clear(); - } - - void - flushRequestsWithException(LocalExceptionWrapper ex) + flushRequestsWithException() { for(Request request : _requests) { if(request.out != null) { - if(request.out instanceof Outgoing) - { - ((Outgoing)request.out).finished(ex); - } - else - { - request.out.finished(ex.get(), false); - } + request.out.finished(_exception, false); } else if(request.outAsync != null) { - if(request.outAsync instanceof OutgoingAsync) - { - ((OutgoingAsync)request.outAsync).__finished(ex); - } - else - { - request.outAsync.__finished(ex.get(), false); - } + request.outAsync.__finished(_exception, false); } } _requests.clear(); @@ -599,7 +578,6 @@ public class ConnectRequestHandler private boolean _response; private Ice.ObjectPrxHelperBase _proxy; - private Ice._ObjectDelM _delegate; private final boolean _batchAutoFlush; |