diff options
author | Benoit Foucher <benoit@zeroc.com> | 2007-12-12 18:54:19 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2007-12-12 18:54:19 +0100 |
commit | 3dff2b82d498d2e29dc4c42c4053557e16a373d4 (patch) | |
tree | 4242da8678ce8f36e34b9d821212cf78519af415 /java/src/IceInternal/ConnectRequestHandler.java | |
parent | Merge branch 'master' of ssh://cvs.zeroc.com/home/git/ice (diff) | |
download | ice-3dff2b82d498d2e29dc4c42c4053557e16a373d4.tar.bz2 ice-3dff2b82d498d2e29dc4c42c4053557e16a373d4.tar.xz ice-3dff2b82d498d2e29dc4c42c4053557e16a373d4.zip |
Fixed bug 2592
Diffstat (limited to 'java/src/IceInternal/ConnectRequestHandler.java')
-rw-r--r-- | java/src/IceInternal/ConnectRequestHandler.java | 299 |
1 files changed, 181 insertions, 118 deletions
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index 792674f62ac..8bba5a6fc5a 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -35,6 +35,29 @@ public class ConnectRequestHandler BasicStream os = null; } + public RequestHandler + connect() + { + _reference.getConnection(this); + + synchronized(this) + { + if(_exception != null) + { + throw _exception; + } + else if(_connection != null) + { + return new ConnectionRequestHandler(_reference, _connection, _compress); + } + else + { + _updateRequestHandler = true; // The proxy request handler will be updated when the connection is set. + return this; + } + } + } + public void prepareBatchRequest(BasicStream os) { @@ -114,33 +137,29 @@ public class ConnectRequestHandler sendRequest(Outgoing out) throws LocalExceptionWrapper { - return (!getConnection(true).sendRequest(out, _compress, _response) || _response) ? _connection : null; + if(!getConnection(true).sendRequest(out, _compress, _response) || _response) + { + return _connection; // The request has been sent or we're expecting a response. + } + else + { + return null; // The request hasn't been sent yet. + } } public void sendAsyncRequest(OutgoingAsync out) + throws LocalExceptionWrapper { - try - { - synchronized(this) + synchronized(this) + { + if(!initialized()) { - if(!initialized()) - { - _requests.add(new Request(out)); - return; - } + _requests.add(new Request(out)); + return; } - - _connection.sendAsyncRequest(out, _compress, _response); - } - catch(LocalExceptionWrapper ex) - { - out.__finished(ex); - } - catch(Ice.LocalException ex) - { - out.__finished(ex); } + _connection.sendAsyncRequest(out, _compress, _response); } public boolean @@ -152,23 +171,15 @@ public class ConnectRequestHandler public void flushAsyncBatchRequests(BatchOutgoingAsync out) { - try - { - synchronized(this) + synchronized(this) + { + if(!initialized()) { - if(!initialized()) - { - _requests.add(new Request(out)); - return; - } + _requests.add(new Request(out)); + return; } - - _connection.flushAsyncBatchRequests(out); - } - catch(Ice.LocalException ex) - { - out.__finished(ex); } + _connection.flushAsyncBatchRequests(out); } public Outgoing @@ -256,43 +267,46 @@ public class ConnectRequestHandler // add this proxy to the router info object. // RouterInfo ri = _reference.getRouterInfo(); - if(ri != null) + if(ri != null && !ri.addProxy(_proxy, this)) { - if(!ri.addProxy(_proxy, this)) - { - return; // The request handler will be initialized once addProxy returns. - } + return; // The request handler will be initialized once addProxy returns. } + // + // We can now send the queued requests. + // flushRequests(); } - public void - setException(Ice.LocalException ex) + public synchronized void + setException(final Ice.LocalException ex) { - synchronized(this) - { - assert(!_initialized && _exception == null); - _exception = ex; - _proxy = null; // Break cyclic reference count. - _delegate = null; // Break cyclic reference count. - notifyAll(); - } + assert(!_initialized && _exception == null); + assert(_updateRequestHandler || _requests.isEmpty()); - java.util.Iterator p = _requests.iterator(); - while(p.hasNext()) + _exception = ex; + _proxy = null; // Break cyclic reference count. + _delegate = null; // Break cyclic reference count. + + // + // If some requests were queued, we notify them of the failure. This is done from a thread + // from the client thread pool since this will result in ice_exception callbacks to be + // called. + // + if(!_requests.isEmpty()) { - Request request = (Request)p.next(); - if(request.out != null) - { - request.out.__finished(ex); - } - else if(request.batchOut != null) - { - request.batchOut.__finished(ex); - } + _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem() + { + public void + execute(ThreadPool threadPool) + { + threadPool.promoteFollower(); + flushRequestsWithException(ex); + }; + }); } - _requests.clear(); + + notifyAll(); } // @@ -301,6 +315,10 @@ public class ConnectRequestHandler public void addedProxy() { + // + // The proxy was added to the router info, we're now ready to send the + // queued requests. + // flushRequests(); } @@ -319,25 +337,6 @@ public class ConnectRequestHandler _updateRequestHandler = false; } - public RequestHandler - connect() - { - _reference.getConnection(this); - - synchronized(this) - { - if(_connection != null) - { - return new ConnectionRequestHandler(_reference, _connection, _compress); - } - else - { - _updateRequestHandler = true; - return this; - } - } - } - private boolean initialized() { @@ -348,7 +347,7 @@ public class ConnectRequestHandler } else { - while(_flushing) + while(_flushing && _exception == null) { try { @@ -395,66 +394,92 @@ public class ConnectRequestHandler // _flushing = true; } - - java.util.Iterator p = _requests.iterator(); // _requests is immutable when _flushing = true - while(p.hasNext()) + + try { - Request request = (Request)p.next(); - if(request.out != null) + java.util.Iterator p = _requests.iterator(); // _requests is immutable when _flushing = true + while(p.hasNext()) { - try + Request request = (Request)p.next(); + if(request.out != null) { _connection.sendAsyncRequest(request.out, _compress, _response); } - catch(LocalExceptionWrapper ex) + else if(request.batchOut != null) { - request.out.__finished(ex); + _connection.flushAsyncBatchRequests(request.batchOut); } - catch(Ice.LocalException ex) + else { - request.out.__finished(ex); + BasicStream os = new BasicStream(request.os.instance()); + _connection.prepareBatchRequest(os); + try + { + request.os.pos(0); + os.writeBlob(request.os.readBlob(request.os.size())); + _connection.finishBatchRequest(os, _compress); + } + catch(Ice.LocalException ex) + { + _connection.abortBatchRequest(); + throw ex; + } } + p.remove(); } - else if(request.batchOut != null) + } + catch(final LocalExceptionWrapper ex) + { + synchronized(this) { - try - { - _connection.flushAsyncBatchRequests(request.batchOut); - } - catch(Ice.LocalException ex) - { - request.batchOut.__finished(ex); - } + assert(_exception != null && !_requests.isEmpty()); + _exception = ex.get(); + _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem() + { + public void + execute(ThreadPool threadPool) + { + threadPool.promoteFollower(); + flushRequestsWithException(ex); + }; + }); + return; } - else + } + catch(final Ice.LocalException ex) + { + synchronized(this) { - // - // TODO: Add sendBatchRequest() method to ConnectionI? - // - try - { - BasicStream os = new BasicStream(request.os.instance()); - _connection.prepareBatchRequest(os); - request.os.pos(0); - os.writeBlob(request.os.readBlob(request.os.size())); - _connection.finishBatchRequest(os, _compress); - } - catch(Ice.LocalException ex) - { - _connection.abortBatchRequest(); - _exception = ex; - } + assert(_exception != null && !_requests.isEmpty()); + _exception = ex; + _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem() + { + public void + execute(ThreadPool threadPool) + { + threadPool.promoteFollower(); + flushRequestsWithException(ex); + }; + }); + return; } } - _requests.clear(); synchronized(this) { + assert(!_initialized); _initialized = true; _flushing = false; notifyAll(); } + // + // We've finished sending the queued requests and the request handler now send + // the requests over the connection directly. It's time to substitute the + // request handler of the proxy with the more efficient connection request + // handler which does not have any synchronization. This also breaks the cyclic + // reference count with the proxy. + // if(_updateRequestHandler && _exception == null) { _proxy.__setRequestHandler(_delegate, new ConnectionRequestHandler(_reference, _connection, _compress)); @@ -463,6 +488,44 @@ public class ConnectRequestHandler _delegate = null; // Break cyclic reference count. } + void + flushRequestsWithException(Ice.LocalException ex) + { + java.util.Iterator p = _requests.iterator(); + while(p.hasNext()) + { + Request request = (Request)p.next(); + if(request.out != null) + { + request.out.__finished(ex); + } + else if(request.batchOut != null) + { + request.batchOut.__finished(ex); + } + } + _requests.clear(); + } + + void + flushRequestsWithException(LocalExceptionWrapper ex) + { + java.util.Iterator p = _requests.iterator(); + while(p.hasNext()) + { + Request request = (Request)p.next(); + if(request.out != null) + { + request.out.__finished(ex); + } + else if(request.batchOut != null) + { + request.batchOut.__finished(ex.get()); + } + } + _requests.clear(); + } + private final Reference _reference; private final boolean _batchAutoFlush; private Ice.ObjectPrxHelperBase _proxy; @@ -474,7 +537,7 @@ public class ConnectRequestHandler private boolean _response; private Ice.LocalException _exception = null; - private java.util.ArrayList _requests = new java.util.ArrayList(); + private java.util.List _requests = new java.util.LinkedList(); private boolean _batchRequestInProgress; private int _batchRequestsSize; private BasicStream _batchStream; |