summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ConnectRequestHandler.java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2007-12-12 18:54:19 +0100
committerBenoit Foucher <benoit@zeroc.com>2007-12-12 18:54:19 +0100
commit3dff2b82d498d2e29dc4c42c4053557e16a373d4 (patch)
tree4242da8678ce8f36e34b9d821212cf78519af415 /java/src/IceInternal/ConnectRequestHandler.java
parentMerge branch 'master' of ssh://cvs.zeroc.com/home/git/ice (diff)
downloadice-3dff2b82d498d2e29dc4c42c4053557e16a373d4.tar.bz2
ice-3dff2b82d498d2e29dc4c42c4053557e16a373d4.tar.xz
ice-3dff2b82d498d2e29dc4c42c4053557e16a373d4.zip
Fixed bug 2592
Diffstat (limited to 'java/src/IceInternal/ConnectRequestHandler.java')
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java299
1 files changed, 181 insertions, 118 deletions
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index 792674f62ac..8bba5a6fc5a 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -35,6 +35,29 @@ public class ConnectRequestHandler
BasicStream os = null;
}
+ public RequestHandler
+ connect()
+ {
+ _reference.getConnection(this);
+
+ synchronized(this)
+ {
+ if(_exception != null)
+ {
+ throw _exception;
+ }
+ else if(_connection != null)
+ {
+ return new ConnectionRequestHandler(_reference, _connection, _compress);
+ }
+ else
+ {
+ _updateRequestHandler = true; // The proxy request handler will be updated when the connection is set.
+ return this;
+ }
+ }
+ }
+
public void
prepareBatchRequest(BasicStream os)
{
@@ -114,33 +137,29 @@ public class ConnectRequestHandler
sendRequest(Outgoing out)
throws LocalExceptionWrapper
{
- return (!getConnection(true).sendRequest(out, _compress, _response) || _response) ? _connection : null;
+ if(!getConnection(true).sendRequest(out, _compress, _response) || _response)
+ {
+ return _connection; // The request has been sent or we're expecting a response.
+ }
+ else
+ {
+ return null; // The request hasn't been sent yet.
+ }
}
public void
sendAsyncRequest(OutgoingAsync out)
+ throws LocalExceptionWrapper
{
- try
- {
- synchronized(this)
+ synchronized(this)
+ {
+ if(!initialized())
{
- if(!initialized())
- {
- _requests.add(new Request(out));
- return;
- }
+ _requests.add(new Request(out));
+ return;
}
-
- _connection.sendAsyncRequest(out, _compress, _response);
- }
- catch(LocalExceptionWrapper ex)
- {
- out.__finished(ex);
- }
- catch(Ice.LocalException ex)
- {
- out.__finished(ex);
}
+ _connection.sendAsyncRequest(out, _compress, _response);
}
public boolean
@@ -152,23 +171,15 @@ public class ConnectRequestHandler
public void
flushAsyncBatchRequests(BatchOutgoingAsync out)
{
- try
- {
- synchronized(this)
+ synchronized(this)
+ {
+ if(!initialized())
{
- if(!initialized())
- {
- _requests.add(new Request(out));
- return;
- }
+ _requests.add(new Request(out));
+ return;
}
-
- _connection.flushAsyncBatchRequests(out);
- }
- catch(Ice.LocalException ex)
- {
- out.__finished(ex);
}
+ _connection.flushAsyncBatchRequests(out);
}
public Outgoing
@@ -256,43 +267,46 @@ public class ConnectRequestHandler
// add this proxy to the router info object.
//
RouterInfo ri = _reference.getRouterInfo();
- if(ri != null)
+ if(ri != null && !ri.addProxy(_proxy, this))
{
- if(!ri.addProxy(_proxy, this))
- {
- return; // The request handler will be initialized once addProxy returns.
- }
+ return; // The request handler will be initialized once addProxy returns.
}
+ //
+ // We can now send the queued requests.
+ //
flushRequests();
}
- public void
- setException(Ice.LocalException ex)
+ public synchronized void
+ setException(final Ice.LocalException ex)
{
- synchronized(this)
- {
- assert(!_initialized && _exception == null);
- _exception = ex;
- _proxy = null; // Break cyclic reference count.
- _delegate = null; // Break cyclic reference count.
- notifyAll();
- }
+ assert(!_initialized && _exception == null);
+ assert(_updateRequestHandler || _requests.isEmpty());
- java.util.Iterator p = _requests.iterator();
- while(p.hasNext())
+ _exception = ex;
+ _proxy = null; // Break cyclic reference count.
+ _delegate = null; // Break cyclic reference count.
+
+ //
+ // If some requests were queued, we notify them of the failure. This is done from a thread
+ // from the client thread pool since this will result in ice_exception callbacks to be
+ // called.
+ //
+ if(!_requests.isEmpty())
{
- Request request = (Request)p.next();
- if(request.out != null)
- {
- request.out.__finished(ex);
- }
- else if(request.batchOut != null)
- {
- request.batchOut.__finished(ex);
- }
+ _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem()
+ {
+ public void
+ execute(ThreadPool threadPool)
+ {
+ threadPool.promoteFollower();
+ flushRequestsWithException(ex);
+ };
+ });
}
- _requests.clear();
+
+ notifyAll();
}
//
@@ -301,6 +315,10 @@ public class ConnectRequestHandler
public void
addedProxy()
{
+ //
+ // The proxy was added to the router info, we're now ready to send the
+ // queued requests.
+ //
flushRequests();
}
@@ -319,25 +337,6 @@ public class ConnectRequestHandler
_updateRequestHandler = false;
}
- public RequestHandler
- connect()
- {
- _reference.getConnection(this);
-
- synchronized(this)
- {
- if(_connection != null)
- {
- return new ConnectionRequestHandler(_reference, _connection, _compress);
- }
- else
- {
- _updateRequestHandler = true;
- return this;
- }
- }
- }
-
private boolean
initialized()
{
@@ -348,7 +347,7 @@ public class ConnectRequestHandler
}
else
{
- while(_flushing)
+ while(_flushing && _exception == null)
{
try
{
@@ -395,66 +394,92 @@ public class ConnectRequestHandler
//
_flushing = true;
}
-
- java.util.Iterator p = _requests.iterator(); // _requests is immutable when _flushing = true
- while(p.hasNext())
+
+ try
{
- Request request = (Request)p.next();
- if(request.out != null)
+ java.util.Iterator p = _requests.iterator(); // _requests is immutable when _flushing = true
+ while(p.hasNext())
{
- try
+ Request request = (Request)p.next();
+ if(request.out != null)
{
_connection.sendAsyncRequest(request.out, _compress, _response);
}
- catch(LocalExceptionWrapper ex)
+ else if(request.batchOut != null)
{
- request.out.__finished(ex);
+ _connection.flushAsyncBatchRequests(request.batchOut);
}
- catch(Ice.LocalException ex)
+ else
{
- request.out.__finished(ex);
+ BasicStream os = new BasicStream(request.os.instance());
+ _connection.prepareBatchRequest(os);
+ try
+ {
+ request.os.pos(0);
+ os.writeBlob(request.os.readBlob(request.os.size()));
+ _connection.finishBatchRequest(os, _compress);
+ }
+ catch(Ice.LocalException ex)
+ {
+ _connection.abortBatchRequest();
+ throw ex;
+ }
}
+ p.remove();
}
- else if(request.batchOut != null)
+ }
+ catch(final LocalExceptionWrapper ex)
+ {
+ synchronized(this)
{
- try
- {
- _connection.flushAsyncBatchRequests(request.batchOut);
- }
- catch(Ice.LocalException ex)
- {
- request.batchOut.__finished(ex);
- }
+ assert(_exception != null && !_requests.isEmpty());
+ _exception = ex.get();
+ _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem()
+ {
+ public void
+ execute(ThreadPool threadPool)
+ {
+ threadPool.promoteFollower();
+ flushRequestsWithException(ex);
+ };
+ });
+ return;
}
- else
+ }
+ catch(final Ice.LocalException ex)
+ {
+ synchronized(this)
{
- //
- // TODO: Add sendBatchRequest() method to ConnectionI?
- //
- try
- {
- BasicStream os = new BasicStream(request.os.instance());
- _connection.prepareBatchRequest(os);
- request.os.pos(0);
- os.writeBlob(request.os.readBlob(request.os.size()));
- _connection.finishBatchRequest(os, _compress);
- }
- catch(Ice.LocalException ex)
- {
- _connection.abortBatchRequest();
- _exception = ex;
- }
+ assert(_exception != null && !_requests.isEmpty());
+ _exception = ex;
+ _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem()
+ {
+ public void
+ execute(ThreadPool threadPool)
+ {
+ threadPool.promoteFollower();
+ flushRequestsWithException(ex);
+ };
+ });
+ return;
}
}
- _requests.clear();
synchronized(this)
{
+ assert(!_initialized);
_initialized = true;
_flushing = false;
notifyAll();
}
+ //
+ // We've finished sending the queued requests and the request handler now send
+ // the requests over the connection directly. It's time to substitute the
+ // request handler of the proxy with the more efficient connection request
+ // handler which does not have any synchronization. This also breaks the cyclic
+ // reference count with the proxy.
+ //
if(_updateRequestHandler && _exception == null)
{
_proxy.__setRequestHandler(_delegate, new ConnectionRequestHandler(_reference, _connection, _compress));
@@ -463,6 +488,44 @@ public class ConnectRequestHandler
_delegate = null; // Break cyclic reference count.
}
+ void
+ flushRequestsWithException(Ice.LocalException ex)
+ {
+ java.util.Iterator p = _requests.iterator();
+ while(p.hasNext())
+ {
+ Request request = (Request)p.next();
+ if(request.out != null)
+ {
+ request.out.__finished(ex);
+ }
+ else if(request.batchOut != null)
+ {
+ request.batchOut.__finished(ex);
+ }
+ }
+ _requests.clear();
+ }
+
+ void
+ flushRequestsWithException(LocalExceptionWrapper ex)
+ {
+ java.util.Iterator p = _requests.iterator();
+ while(p.hasNext())
+ {
+ Request request = (Request)p.next();
+ if(request.out != null)
+ {
+ request.out.__finished(ex);
+ }
+ else if(request.batchOut != null)
+ {
+ request.batchOut.__finished(ex.get());
+ }
+ }
+ _requests.clear();
+ }
+
private final Reference _reference;
private final boolean _batchAutoFlush;
private Ice.ObjectPrxHelperBase _proxy;
@@ -474,7 +537,7 @@ public class ConnectRequestHandler
private boolean _response;
private Ice.LocalException _exception = null;
- private java.util.ArrayList _requests = new java.util.ArrayList();
+ private java.util.List _requests = new java.util.LinkedList();
private boolean _batchRequestInProgress;
private int _batchRequestsSize;
private BasicStream _batchStream;