diff options
author | Benoit Foucher <benoit@zeroc.com> | 2007-12-12 18:54:19 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2007-12-12 18:54:19 +0100 |
commit | 3dff2b82d498d2e29dc4c42c4053557e16a373d4 (patch) | |
tree | 4242da8678ce8f36e34b9d821212cf78519af415 /java/src | |
parent | Merge branch 'master' of ssh://cvs.zeroc.com/home/git/ice (diff) | |
download | ice-3dff2b82d498d2e29dc4c42c4053557e16a373d4.tar.bz2 ice-3dff2b82d498d2e29dc4c42c4053557e16a373d4.tar.xz ice-3dff2b82d498d2e29dc4c42c4053557e16a373d4.zip |
Fixed bug 2592
Diffstat (limited to 'java/src')
25 files changed, 713 insertions, 785 deletions
diff --git a/java/src/Ice/AMI_Object_ice_flushBatchRequests.java b/java/src/Ice/AMI_Object_ice_flushBatchRequests.java index ca68d1c2382..31495520034 100644 --- a/java/src/Ice/AMI_Object_ice_flushBatchRequests.java +++ b/java/src/Ice/AMI_Object_ice_flushBatchRequests.java @@ -15,21 +15,29 @@ public abstract class AMI_Object_ice_flushBatchRequests extends IceInternal.Batc public final void __invoke(Ice.ObjectPrx prx) { - Ice._ObjectDel delegate; - IceInternal.RequestHandler handler; + __acquire(prx); try { + // + // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch + // requests were queued with the connection, they would be lost without being noticed. + // + Ice._ObjectDel delegate = null; + int cnt = -1; // Don't retry. Ice.ObjectPrxHelperBase proxy = (Ice.ObjectPrxHelperBase)prx; - __prepare(proxy.__reference().getInstance()); - delegate = proxy.__getDelegate(true); - handler = delegate.__getRequestHandler(); + try + { + delegate = proxy.__getDelegate(true); + delegate.__getRequestHandler().flushAsyncBatchRequests(this); + } + catch(Ice.LocalException ex) + { + cnt = proxy.__handleException(delegate, ex, cnt); + } } catch(Ice.LocalException ex) { - __finished(ex); - return; + __release(ex); } - - handler.flushAsyncBatchRequests(this); } } diff --git a/java/src/Ice/AMI_Object_ice_invoke.java b/java/src/Ice/AMI_Object_ice_invoke.java index 1965ac41b48..b891e74375a 100644 --- a/java/src/Ice/AMI_Object_ice_invoke.java +++ b/java/src/Ice/AMI_Object_ice_invoke.java @@ -17,18 +17,18 @@ public abstract class AMI_Object_ice_invoke extends IceInternal.OutgoingAsync public final void __invoke(Ice.ObjectPrx prx, String operation, OperationMode mode, byte[] inParams, java.util.Map context) { + __acquire(prx); try { __prepare(prx, operation, mode, context); __os.writeBlob(inParams); __os.endWriteEncaps(); + __send(); } catch(LocalException ex) { - __finished(ex); - return; + __release(ex); } - __send(); } protected final void __response(boolean ok) // ok == true means no user exception. @@ -45,5 +45,6 @@ public abstract class AMI_Object_ice_invoke extends IceInternal.OutgoingAsync return; } ice_response(ok, outParams); + __release(); } } diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index efdb68da8c1..425bdba5d14 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -903,6 +903,7 @@ public final class ConnectionI extends IceInternal.EventHandler if(_batchRequestNum == 0) { + out.sent(false); return true; } @@ -971,6 +972,7 @@ public final class ConnectionI extends IceInternal.EventHandler if(_batchRequestNum == 0) { + outAsync.__sent(this); return; } @@ -2035,10 +2037,6 @@ public final class ConnectionI extends IceInternal.EventHandler return IceInternal.SocketStatus.NeedWrite; } } - catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). - { - throw ex.get(); - } catch(Ice.TimeoutException ex) { throw new Ice.ConnectTimeoutException(); @@ -2169,21 +2167,10 @@ public final class ConnectionI extends IceInternal.EventHandler } - try + if(!_transceiver.write(message.stream.getBuffer(), timeout)) { - if(!_transceiver.write(message.stream.getBuffer(), timeout)) - { - assert(timeout == 0); - return false; - } - } - catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). - { - if(!ex.retry()) - { - message.sent(this, timeout == 0); - } - throw ex.get(); + assert(timeout == 0); + return false; } message.sent(this, timeout == 0); // timeout == 0 indicates that this is called by the selector thread. @@ -2276,25 +2263,14 @@ public final class ConnectionI extends IceInternal.EventHandler IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels); } - try + if(!foreground && _transceiver.write(message.stream.getBuffer(), 0)) { - if(!foreground && _transceiver.write(message.stream.getBuffer(), 0)) + message.sent(this, false); + if(_acmTimeout > 0) { - message.sent(this, false); - if(_acmTimeout > 0) - { - _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000; - } - return false; + _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000; } - } - catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). - { - if(!ex.retry()) - { - message.sent(this, false); - } - throw ex.get(); + return false; } _sendStreams.addLast(message); diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java index adb890ae473..1b39ac340e6 100644 --- a/java/src/Ice/ObjectAdapterI.java +++ b/java/src/Ice/ObjectAdapterI.java @@ -941,16 +941,6 @@ public final class ObjectAdapterI implements ObjectAdapter for(int i = 0; i < endpoints.size(); ++i) { IceInternal.EndpointI endp = (IceInternal.EndpointI)endpoints.get(i); - // - // TODO: Remove when we no longer support SSL for JDK 1.4. - // - if(!_threadPerConnection && endp.requiresThreadPerConnection()) - { - Ice.FeatureNotSupportedException ex = new Ice.FeatureNotSupportedException(); - ex.unsupportedFeature = "endpoint requires thread-per-connection:\n" + endp.toString(); - throw ex; - } - IceInternal.IncomingConnectionFactory factory = new IceInternal.IncomingConnectionFactory(instance, endp, this, _name); _incomingConnectionFactories.add(factory); diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java index 87c3b86f3d2..4269def1407 100644 --- a/java/src/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/ObjectPrxHelperBase.java @@ -790,8 +790,8 @@ public class ObjectPrxHelperBase implements ObjectPrx try { __del = __getDelegate(false); - // Wait for the connection to be established. - return __del.__getRequestHandler().getConnection(true); + return __del.__getRequestHandler().getConnection(true); // Wait for the connection to be established. + } catch(LocalException __ex) { @@ -826,24 +826,21 @@ public class ObjectPrxHelperBase implements ObjectPrx public void ice_flushBatchRequests() { - int __cnt = 0; - while(true) + // + // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch + // requests were queued with the connection, they would be lost without being noticed. + // + _ObjectDel __del = null; + int __cnt = -1; // Don't retry. + try { - _ObjectDel __del = null; - try - { - __del = __getDelegate(false); - __del.ice_flushBatchRequests(); - return; - } - catch(IceInternal.LocalExceptionWrapper __ex) - { - __handleExceptionWrapper(__del, __ex); - } - catch(LocalException __ex) - { - __cnt = __handleException(__del, __ex, __cnt); - } + __del = __getDelegate(false); + __del.ice_flushBatchRequests(); + return; + } + catch(LocalException __ex) + { + __cnt = __handleException(__del, __ex, __cnt); } } @@ -949,6 +946,11 @@ public class ObjectPrxHelperBase implements ObjectPrx } } + if(cnt == -1) // Don't retry if the retry count is -1. + { + throw ex; + } + IceInternal.ProxyFactory proxyFactory; try { diff --git a/java/src/Ice/_ObjectDel.java b/java/src/Ice/_ObjectDel.java index 8eefc506fca..dcc4feeb334 100644 --- a/java/src/Ice/_ObjectDel.java +++ b/java/src/Ice/_ObjectDel.java @@ -27,8 +27,7 @@ public interface _ObjectDel java.util.Map context) throws IceInternal.LocalExceptionWrapper; - void ice_flushBatchRequests() - throws IceInternal.LocalExceptionWrapper; + void ice_flushBatchRequests(); IceInternal.RequestHandler __getRequestHandler(); void __setRequestHandler(IceInternal.RequestHandler handler); diff --git a/java/src/Ice/_ObjectDelM.java b/java/src/Ice/_ObjectDelM.java index 8bc56496b33..a060df4fae3 100644 --- a/java/src/Ice/_ObjectDelM.java +++ b/java/src/Ice/_ObjectDelM.java @@ -205,22 +205,9 @@ public class _ObjectDelM implements _ObjectDel public void ice_flushBatchRequests() - throws IceInternal.LocalExceptionWrapper { IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(__handler); - try - { - out.invoke(); - } - catch(Ice.LocalException ex) - { - // - // We never retry flusing the batch requests as the connection batched - // requests were discarded and the caller needs to be notified of the - // failure. - // - throw new IceInternal.LocalExceptionWrapper(ex, false); - } + out.invoke(); } public IceInternal.RequestHandler diff --git a/java/src/IceInternal/BatchOutgoing.java b/java/src/IceInternal/BatchOutgoing.java index 1e0071f047a..eecd4cb692a 100644 --- a/java/src/IceInternal/BatchOutgoing.java +++ b/java/src/IceInternal/BatchOutgoing.java @@ -34,23 +34,25 @@ public final class BatchOutgoing implements OutgoingMessageCallback if(_handler != null && !_handler.flushBatchRequests(this) || _connection != null && !_connection.flushBatchRequests(this)) - synchronized(this) { - while(_exception == null && !_sent) + synchronized(this) { - try + while(_exception == null && !_sent) { - wait(); + try + { + wait(); + } + catch(java.lang.InterruptedException ex) + { + } } - catch(java.lang.InterruptedException ex) + + if(_exception != null) { + throw _exception; } } - - if(_exception != null) - { - throw _exception; - } } } diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java index 94e15c2dca3..b96e3e047cf 100644 --- a/java/src/IceInternal/BatchOutgoingAsync.java +++ b/java/src/IceInternal/BatchOutgoingAsync.java @@ -9,98 +9,18 @@ package IceInternal; -public abstract class BatchOutgoingAsync implements OutgoingAsyncMessageCallback +public abstract class BatchOutgoingAsync extends OutgoingAsyncMessageCallback { - public - BatchOutgoingAsync() - { - } - - public abstract void ice_exception(Ice.LocalException ex); - - public final BasicStream - __os() - { - return __os; - } - public final void __sent(final Ice.ConnectionI connection) { - synchronized(_monitor) - { - cleanup(); - } + __release(); } public final void __finished(Ice.LocalException exc) { - try - { - ice_exception(exc); - } - catch(java.lang.Exception ex) - { - warning(ex); - } - finally - { - synchronized(_monitor) - { - cleanup(); - } - } - } - - protected final void - __prepare(Instance instance) - { - synchronized(_monitor) - { - while(__os != null) - { - try - { - _monitor.wait(); - } - catch(InterruptedException ex) - { - } - } - - assert(__os == null); - __os = new BasicStream(instance); - } - } - - private final void - warning(java.lang.Exception ex) - { - if(__os != null) // Don't print anything if cleanup() was already called. - { - if(__os.instance().initializationData().properties.getPropertyAsIntWithDefault( - "Ice.Warn.AMICallback", 1) > 0) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - IceUtil.OutputBase out = new IceUtil.OutputBase(pw); - out.setUseTab(false); - out.print("exception raised by AMI callback:\n"); - ex.printStackTrace(pw); - pw.flush(); - __os.instance().initializationData().logger.warning(sw.toString()); - } - } - } - - private final void - cleanup() - { - __os = null; - _monitor.notify(); + __exception(exc); } - protected BasicStream __os; - private final java.lang.Object _monitor = new java.lang.Object(); } diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index 792674f62ac..8bba5a6fc5a 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -35,6 +35,29 @@ public class ConnectRequestHandler BasicStream os = null; } + public RequestHandler + connect() + { + _reference.getConnection(this); + + synchronized(this) + { + if(_exception != null) + { + throw _exception; + } + else if(_connection != null) + { + return new ConnectionRequestHandler(_reference, _connection, _compress); + } + else + { + _updateRequestHandler = true; // The proxy request handler will be updated when the connection is set. + return this; + } + } + } + public void prepareBatchRequest(BasicStream os) { @@ -114,33 +137,29 @@ public class ConnectRequestHandler sendRequest(Outgoing out) throws LocalExceptionWrapper { - return (!getConnection(true).sendRequest(out, _compress, _response) || _response) ? _connection : null; + if(!getConnection(true).sendRequest(out, _compress, _response) || _response) + { + return _connection; // The request has been sent or we're expecting a response. + } + else + { + return null; // The request hasn't been sent yet. + } } public void sendAsyncRequest(OutgoingAsync out) + throws LocalExceptionWrapper { - try - { - synchronized(this) + synchronized(this) + { + if(!initialized()) { - if(!initialized()) - { - _requests.add(new Request(out)); - return; - } + _requests.add(new Request(out)); + return; } - - _connection.sendAsyncRequest(out, _compress, _response); - } - catch(LocalExceptionWrapper ex) - { - out.__finished(ex); - } - catch(Ice.LocalException ex) - { - out.__finished(ex); } + _connection.sendAsyncRequest(out, _compress, _response); } public boolean @@ -152,23 +171,15 @@ public class ConnectRequestHandler public void flushAsyncBatchRequests(BatchOutgoingAsync out) { - try - { - synchronized(this) + synchronized(this) + { + if(!initialized()) { - if(!initialized()) - { - _requests.add(new Request(out)); - return; - } + _requests.add(new Request(out)); + return; } - - _connection.flushAsyncBatchRequests(out); - } - catch(Ice.LocalException ex) - { - out.__finished(ex); } + _connection.flushAsyncBatchRequests(out); } public Outgoing @@ -256,43 +267,46 @@ public class ConnectRequestHandler // add this proxy to the router info object. // RouterInfo ri = _reference.getRouterInfo(); - if(ri != null) + if(ri != null && !ri.addProxy(_proxy, this)) { - if(!ri.addProxy(_proxy, this)) - { - return; // The request handler will be initialized once addProxy returns. - } + return; // The request handler will be initialized once addProxy returns. } + // + // We can now send the queued requests. + // flushRequests(); } - public void - setException(Ice.LocalException ex) + public synchronized void + setException(final Ice.LocalException ex) { - synchronized(this) - { - assert(!_initialized && _exception == null); - _exception = ex; - _proxy = null; // Break cyclic reference count. - _delegate = null; // Break cyclic reference count. - notifyAll(); - } + assert(!_initialized && _exception == null); + assert(_updateRequestHandler || _requests.isEmpty()); - java.util.Iterator p = _requests.iterator(); - while(p.hasNext()) + _exception = ex; + _proxy = null; // Break cyclic reference count. + _delegate = null; // Break cyclic reference count. + + // + // If some requests were queued, we notify them of the failure. This is done from a thread + // from the client thread pool since this will result in ice_exception callbacks to be + // called. + // + if(!_requests.isEmpty()) { - Request request = (Request)p.next(); - if(request.out != null) - { - request.out.__finished(ex); - } - else if(request.batchOut != null) - { - request.batchOut.__finished(ex); - } + _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem() + { + public void + execute(ThreadPool threadPool) + { + threadPool.promoteFollower(); + flushRequestsWithException(ex); + }; + }); } - _requests.clear(); + + notifyAll(); } // @@ -301,6 +315,10 @@ public class ConnectRequestHandler public void addedProxy() { + // + // The proxy was added to the router info, we're now ready to send the + // queued requests. + // flushRequests(); } @@ -319,25 +337,6 @@ public class ConnectRequestHandler _updateRequestHandler = false; } - public RequestHandler - connect() - { - _reference.getConnection(this); - - synchronized(this) - { - if(_connection != null) - { - return new ConnectionRequestHandler(_reference, _connection, _compress); - } - else - { - _updateRequestHandler = true; - return this; - } - } - } - private boolean initialized() { @@ -348,7 +347,7 @@ public class ConnectRequestHandler } else { - while(_flushing) + while(_flushing && _exception == null) { try { @@ -395,66 +394,92 @@ public class ConnectRequestHandler // _flushing = true; } - - java.util.Iterator p = _requests.iterator(); // _requests is immutable when _flushing = true - while(p.hasNext()) + + try { - Request request = (Request)p.next(); - if(request.out != null) + java.util.Iterator p = _requests.iterator(); // _requests is immutable when _flushing = true + while(p.hasNext()) { - try + Request request = (Request)p.next(); + if(request.out != null) { _connection.sendAsyncRequest(request.out, _compress, _response); } - catch(LocalExceptionWrapper ex) + else if(request.batchOut != null) { - request.out.__finished(ex); + _connection.flushAsyncBatchRequests(request.batchOut); } - catch(Ice.LocalException ex) + else { - request.out.__finished(ex); + BasicStream os = new BasicStream(request.os.instance()); + _connection.prepareBatchRequest(os); + try + { + request.os.pos(0); + os.writeBlob(request.os.readBlob(request.os.size())); + _connection.finishBatchRequest(os, _compress); + } + catch(Ice.LocalException ex) + { + _connection.abortBatchRequest(); + throw ex; + } } + p.remove(); } - else if(request.batchOut != null) + } + catch(final LocalExceptionWrapper ex) + { + synchronized(this) { - try - { - _connection.flushAsyncBatchRequests(request.batchOut); - } - catch(Ice.LocalException ex) - { - request.batchOut.__finished(ex); - } + assert(_exception != null && !_requests.isEmpty()); + _exception = ex.get(); + _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem() + { + public void + execute(ThreadPool threadPool) + { + threadPool.promoteFollower(); + flushRequestsWithException(ex); + }; + }); + return; } - else + } + catch(final Ice.LocalException ex) + { + synchronized(this) { - // - // TODO: Add sendBatchRequest() method to ConnectionI? - // - try - { - BasicStream os = new BasicStream(request.os.instance()); - _connection.prepareBatchRequest(os); - request.os.pos(0); - os.writeBlob(request.os.readBlob(request.os.size())); - _connection.finishBatchRequest(os, _compress); - } - catch(Ice.LocalException ex) - { - _connection.abortBatchRequest(); - _exception = ex; - } + assert(_exception != null && !_requests.isEmpty()); + _exception = ex; + _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem() + { + public void + execute(ThreadPool threadPool) + { + threadPool.promoteFollower(); + flushRequestsWithException(ex); + }; + }); + return; } } - _requests.clear(); synchronized(this) { + assert(!_initialized); _initialized = true; _flushing = false; notifyAll(); } + // + // We've finished sending the queued requests and the request handler now send + // the requests over the connection directly. It's time to substitute the + // request handler of the proxy with the more efficient connection request + // handler which does not have any synchronization. This also breaks the cyclic + // reference count with the proxy. + // if(_updateRequestHandler && _exception == null) { _proxy.__setRequestHandler(_delegate, new ConnectionRequestHandler(_reference, _connection, _compress)); @@ -463,6 +488,44 @@ public class ConnectRequestHandler _delegate = null; // Break cyclic reference count. } + void + flushRequestsWithException(Ice.LocalException ex) + { + java.util.Iterator p = _requests.iterator(); + while(p.hasNext()) + { + Request request = (Request)p.next(); + if(request.out != null) + { + request.out.__finished(ex); + } + else if(request.batchOut != null) + { + request.batchOut.__finished(ex); + } + } + _requests.clear(); + } + + void + flushRequestsWithException(LocalExceptionWrapper ex) + { + java.util.Iterator p = _requests.iterator(); + while(p.hasNext()) + { + Request request = (Request)p.next(); + if(request.out != null) + { + request.out.__finished(ex); + } + else if(request.batchOut != null) + { + request.batchOut.__finished(ex.get()); + } + } + _requests.clear(); + } + private final Reference _reference; private final boolean _batchAutoFlush; private Ice.ObjectPrxHelperBase _proxy; @@ -474,7 +537,7 @@ public class ConnectRequestHandler private boolean _response; private Ice.LocalException _exception = null; - private java.util.ArrayList _requests = new java.util.ArrayList(); + private java.util.List _requests = new java.util.LinkedList(); private boolean _batchRequestInProgress; private int _batchRequestsSize; private BasicStream _batchStream; diff --git a/java/src/IceInternal/ConnectionRequestHandler.java b/java/src/IceInternal/ConnectionRequestHandler.java index 2f1b1c031ed..727ad6244d9 100644 --- a/java/src/IceInternal/ConnectionRequestHandler.java +++ b/java/src/IceInternal/ConnectionRequestHandler.java @@ -33,24 +33,21 @@ public class ConnectionRequestHandler implements RequestHandler sendRequest(Outgoing out) throws LocalExceptionWrapper { - return (!_connection.sendRequest(out, _compress, _response) || _response) ? _connection : null; + if(!_connection.sendRequest(out, _compress, _response) || _response) + { + return _connection; // The request has been sent or we're expecting a response. + } + else + { + return null; // The request hasn't been sent yet. + } } public void sendAsyncRequest(OutgoingAsync out) + throws LocalExceptionWrapper { - try - { - _connection.sendAsyncRequest(out, _compress, _response); - } - catch(LocalExceptionWrapper ex) - { - out.__finished(ex); - } - catch(Ice.LocalException ex) - { - out.__finished(ex); - } + _connection.sendAsyncRequest(out, _compress, _response); } public boolean @@ -62,14 +59,7 @@ public class ConnectionRequestHandler implements RequestHandler public void flushAsyncBatchRequests(BatchOutgoingAsync out) { - try - { - _connection.flushAsyncBatchRequests(out); - } - catch(Ice.LocalException ex) - { - out.__finished(ex); - } + _connection.flushAsyncBatchRequests(out); } public Outgoing diff --git a/java/src/IceInternal/EndpointI.java b/java/src/IceInternal/EndpointI.java index 8086bf138fd..3be504cada4 100644 --- a/java/src/IceInternal/EndpointI.java +++ b/java/src/IceInternal/EndpointI.java @@ -115,13 +115,6 @@ abstract public class EndpointI implements Ice.Endpoint, java.lang.Comparable public abstract boolean equals(java.lang.Object obj); public abstract int compareTo(java.lang.Object obj); // From java.lang.Comparable. - // - // Returns true if the endpoint's transport requires thread-per-connection. - // - // TODO: Remove this when we no longer support SSL for JDK 1.4. - // - public abstract boolean requiresThreadPerConnection(); - public java.util.List connectors(java.util.List addresses) { diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java index 2830f59e4b5..4f578b37adb 100644 --- a/java/src/IceInternal/Outgoing.java +++ b/java/src/IceInternal/Outgoing.java @@ -64,6 +64,7 @@ public final class Outgoing implements OutgoingMessageCallback _state = StateInProgress; Ice.ConnectionI connection = _handler.sendRequest(this); + assert(connection != null); boolean timedOut = false; @@ -186,52 +187,33 @@ public final class Outgoing implements OutgoingMessageCallback case Reference.ModeOneway: case Reference.ModeDatagram: { - try + _state = StateInProgress; + if(_handler.sendRequest(this) != null) { - _state = StateInProgress; - if(_handler.sendRequest(this) != null) + // + // If the handler returns the connection, we must wait for the sent callback. + // + synchronized(this) { - // - // If the handler returns the connection, we must wait for the sent callback. - // - synchronized(this) + while(_state != StateFailed && !_sent) { - while(_state != StateFailed && !_sent) + try { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } + wait(); } - - if(_exception != null) + catch(java.lang.InterruptedException ex) { - assert(!_sent); - throw _exception; } } - } - return true; - } - catch(Ice.LocalException ex) // Java specfic work-around (see ConnectionI.sendRequest()) - { - if(!_sent) // The send might have failed but the request might still be sent... - { - throw ex; - } - else - { - // - // We wrap the exception into a LocalExceptionWrapper to indicate that - // the request cannot be resent without potentially violating the - // "at-most-once" principle. - // - throw new IceInternal.LocalExceptionWrapper(ex, false); + + if(_exception != null) + { + assert(!_sent); + throw _exception; + } } } + return true; } case Reference.ModeBatchOneway: diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 184355e8e06..fcf104ed942 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -9,35 +9,22 @@ package IceInternal; -public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback +public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback { - public - OutgoingAsync() - { - } - - public abstract void ice_exception(Ice.LocalException ex); - - public final BasicStream - __os() - { - return __os; - } - public final void __sent(final Ice.ConnectionI connection) { - synchronized(_monitor) + synchronized(__monitor) { _sent = true; if(!_proxy.ice_isTwoway()) { - cleanup(); // No response expected, we're done with the OutgoingAsync. + __release(); } else if(_response) { - _monitor.notifyAll(); // If the response was already received notify finished() which is waiting. + __monitor.notifyAll(); // If the response was already received notify finished() which is waiting. } else if(connection.timeout() >= 0) { @@ -63,7 +50,7 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback byte replyStatus; try { - synchronized(_monitor) + synchronized(__monitor) { assert(__os != null); _response = true; @@ -77,7 +64,7 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback { try { - _monitor.wait(); + __monitor.wait(); } catch(java.lang.InterruptedException ex) { @@ -216,14 +203,8 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback } catch(java.lang.Exception ex) { - warning(ex); - } - finally - { - synchronized(_monitor) - { - cleanup(); - } + __warning(ex); + __release(); } } @@ -231,8 +212,7 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback public final void __finished(Ice.LocalException exc) { - boolean retry = false; - synchronized(_monitor) + synchronized(__monitor) { if(__os != null) // Might be called from __prepare or before __prepare { @@ -245,236 +225,213 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback { try { - _monitor.wait(); + __monitor.wait(); } catch(java.lang.InterruptedException ex) { } } - - // - // A CloseConnectionException indicates graceful - // server shutdown, and is therefore always repeatable - // without violating "at-most-once". That's because by - // sending a close connection message, the server - // guarantees that all outstanding requests can safely - // be repeated. Otherwise, we can also retry if the - // operation mode is Nonmutating or Idempotent. - // - // An ObjectNotExistException can always be retried as - // well without violating "at-most-once". - // - if(!_sent || - _mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent || - exc instanceof Ice.CloseConnectionException || exc instanceof Ice.ObjectNotExistException) - { - retry = true; - } - } - } - - if(retry) - { - try - { - _cnt = _proxy.__handleException(_delegate, exc, _cnt); - __send(); - return; - } - catch(Ice.LocalException ex) - { } } + + // + // NOTE: at this point, synchronization isn't needed, no other threads should be + // calling on the callback. + // try { - ice_exception(exc); - } - catch(java.lang.Exception ex) - { - warning(ex); + handleException(exc); // This will throw if the invocation can't be retried. + __send(); } - finally + catch(Ice.LocalException ex) { - synchronized(_monitor) - { - cleanup(); - } + __exception(ex); } } public final void __finished(LocalExceptionWrapper ex) { + assert(__os != null && !_sent); + // - // NOTE: This is called if sendRequest/sendAsyncRequest fails with - // a LocalExceptionWrapper exception. It's not possible for the - // timer to be set at this point because the request couldn't be - // sent. + // NOTE: at this point, synchronization isn't needed, no other threads should be + // calling on the callback. The LocalExceptionWrapper exception is only called + // before the invocation is sent. // - assert(!_sent && _timerTask == null); try { - if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent) - { - _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, _cnt); - } - else - { - _proxy.__handleExceptionWrapper(_delegate, ex); - } + handleException(ex); // This will throw if the invocation can't be retried. __send(); } catch(Ice.LocalException exc) { - try - { - ice_exception(exc); - } - catch(java.lang.Exception exl) + __exception(exc); + } + } + + protected final void + __prepare(Ice.ObjectPrx prx, String operation, Ice.OperationMode mode, java.util.Map context) + { + assert(__os != null); + + _proxy = (Ice.ObjectPrxHelperBase)prx; + _delegate = null; + _cnt = 0; + _mode = mode; + + // + // Can't call async via a batch proxy. + // + if(_proxy.ice_isBatchOneway() || _proxy.ice_isBatchDatagram()) + { + throw new Ice.FeatureNotSupportedException("can't send batch requests with AMI"); + } + + __os.writeBlob(IceInternal.Protocol.requestHdr); + + Reference ref = _proxy.__reference(); + + ref.getIdentity().__write(__os); + + // + // For compatibility with the old FacetPath. + // + String facet = ref.getFacet(); + if(facet == null || facet.length() == 0) + { + __os.writeStringSeq(null); + } + else + { + String[] facetPath = { facet }; + __os.writeStringSeq(facetPath); + } + + __os.writeString(operation); + + __os.writeByte((byte)mode.value()); + + if(context != null) + { + // + // Explicit context + // + Ice.ContextHelper.write(__os, context); + } + else + { + // + // Implicit context + // + Ice.ImplicitContextI implicitContext = ref.getInstance().getImplicitContext(); + java.util.Map prxContext = ref.getContext(); + + if(implicitContext == null) { - warning(exl); + Ice.ContextHelper.write(__os, prxContext); } - finally + else { - synchronized(_monitor) - { - cleanup(); - } + implicitContext.write(prxContext, __os); } } + + __os.startWriteEncaps(); } protected final void - __prepare(Ice.ObjectPrx prx, String operation, Ice.OperationMode mode, java.util.Map context) + __send() { - synchronized(_monitor) + while(true) { try { - // - // We must first wait for other requests to finish. - // - while(__os != null) - { - try - { - _monitor.wait(); - } - catch(InterruptedException ex) - { - } - } - - // - // Can't call async via a batch proxy. - // - _proxy = (Ice.ObjectPrxHelperBase)prx; - if(_proxy.ice_isBatchOneway() || _proxy.ice_isBatchDatagram()) - { - throw new Ice.FeatureNotSupportedException("can't send batch requests with AMI"); - } - - _delegate = null; - _cnt = 0; - _mode = mode; _sent = false; - _response = false; - - Reference ref = _proxy.__reference(); - assert(__is == null); - __is = new BasicStream(ref.getInstance()); - assert(__os == null); - __os = new BasicStream(ref.getInstance()); - - __os.writeBlob(IceInternal.Protocol.requestHdr); - - ref.getIdentity().__write(__os); - - // - // For compatibility with the old FacetPath. - // - String facet = ref.getFacet(); - if(facet == null || facet.length() == 0) - { - __os.writeStringSeq(null); - } - else - { - String[] facetPath = { facet }; - __os.writeStringSeq(facetPath); - } - - __os.writeString(operation); - - __os.writeByte((byte)mode.value()); - - if(context != null) - { - // - // Explicit context - // - Ice.ContextHelper.write(__os, context); - } - else - { - // - // Implicit context - // - Ice.ImplicitContextI implicitContext = ref.getInstance().getImplicitContext(); - java.util.Map prxContext = ref.getContext(); - - if(implicitContext == null) - { - Ice.ContextHelper.write(__os, prxContext); - } - else - { - implicitContext.write(prxContext, __os); - } - } - - __os.startWriteEncaps(); + _response = false; + _delegate = _proxy.__getDelegate(true); + _delegate.__getRequestHandler().sendAsyncRequest(this); + return; + } + catch(LocalExceptionWrapper ex) + { + handleException(ex); } catch(Ice.LocalException ex) { - cleanup(); - throw ex; + handleException(ex); } } } - protected final void - __send() + protected abstract void __response(boolean ok); + + private void + handleException(LocalExceptionWrapper ex) { - // - // NOTE: no synchronization needed. At this point, no other threads can be calling on this object. - // + if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent) + { + _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, _cnt); + } + else + { + _proxy.__handleExceptionWrapper(_delegate, ex); + } + } - RequestHandler handler; + private void + handleException(Ice.LocalException exc) + { try { - _delegate = _proxy.__getDelegate(true); - handler = _delegate.__getRequestHandler(); + // + // A CloseConnectionException indicates graceful + // server shutdown, and is therefore always repeatable + // without violating "at-most-once". That's because by + // sending a close connection message, the server + // guarantees that all outstanding requests can safely + // be repeated. + // + // An ObjectNotExistException can always be retried as + // well without violating "at-most-once". + // + if(!_sent || + exc instanceof Ice.CloseConnectionException || + exc instanceof Ice.ObjectNotExistException) + { + throw exc; + } + + // + // Throw the exception wrapped in a LocalExceptionWrapper, to + // indicate that the request cannot be resent without + // potentially violating the "at-most-once" principle. + // + throw new LocalExceptionWrapper(exc, false); + } + catch(LocalExceptionWrapper ex) + { + if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent) + { + _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, _cnt); + } + else + { + _proxy.__handleExceptionWrapper(_delegate, ex); + } } catch(Ice.LocalException ex) { - __finished(ex); - return; + _cnt = _proxy.__handleException(_delegate, ex, _cnt); } - - _sent = false; - _response = false; - handler.sendAsyncRequest(this); } - protected abstract void __response(boolean ok); - private final void __runTimerTask(Ice.ConnectionI connection) { - synchronized(_monitor) + synchronized(__monitor) { assert(_timerTask != null && _sent); // Can only be set once the request is sent. @@ -483,7 +440,7 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback connection = null; } _timerTask = null; - _monitor.notifyAll(); + __monitor.notifyAll(); } if(connection != null) @@ -492,49 +449,11 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback } } - private final void - warning(java.lang.Exception ex) - { - if(__os != null) // Don't print anything if cleanup() was already called. - { - Reference ref = _proxy.__reference(); - if(ref.getInstance().initializationData().properties.getPropertyAsIntWithDefault( - "Ice.Warn.AMICallback", 1) > 0) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - IceUtil.OutputBase out = new IceUtil.OutputBase(pw); - out.setUseTab(false); - out.print("exception raised by AMI callback:\n"); - ex.printStackTrace(pw); - pw.flush(); - ref.getInstance().initializationData().logger.warning(sw.toString()); - } - } - } - - private final void - cleanup() - { - assert(_timerTask == null); - - __is = null; - __os = null; - - _monitor.notify(); - } - - protected BasicStream __is; - protected BasicStream __os; - private boolean _sent; private boolean _response; private Ice.ObjectPrxHelperBase _proxy; private Ice._ObjectDel _delegate; private int _cnt; private Ice.OperationMode _mode; - private TimerTask _timerTask; - - private final java.lang.Object _monitor = new java.lang.Object(); } diff --git a/java/src/IceInternal/OutgoingAsyncMessageCallback.java b/java/src/IceInternal/OutgoingAsyncMessageCallback.java index 2575dd33b38..677075f59e9 100644 --- a/java/src/IceInternal/OutgoingAsyncMessageCallback.java +++ b/java/src/IceInternal/OutgoingAsyncMessageCallback.java @@ -9,8 +9,140 @@ package IceInternal; -public interface OutgoingAsyncMessageCallback +abstract public class OutgoingAsyncMessageCallback { - void __sent(Ice.ConnectionI connection); - void __finished(Ice.LocalException ex); + public abstract void __sent(Ice.ConnectionI connection); + public abstract void __finished(Ice.LocalException ex); + public abstract void ice_exception(Ice.LocalException ex); + + public final BasicStream + __os() + { + return __os; + } + + public void + __exception(Ice.LocalException exc) + { + try + { + ice_exception(exc); + } + catch(java.lang.Exception ex) + { + __warning(ex); + } + finally + { + __release(); + } + } + + protected synchronized void + finalize() + throws Throwable + { + assert(__os == null); + assert(__is == null); + } + + protected void + __acquire(Ice.ObjectPrx proxy) + { + synchronized(__monitor) + { + // + // We must first wait for other requests to finish. + // + while(__os != null) + { + try + { + __monitor.wait(); + } + catch(InterruptedException ex) + { + } + } + + Reference ref = ((Ice.ObjectPrxHelperBase)proxy).__reference(); + assert(__is == null); + __is = new BasicStream(ref.getInstance()); + assert(__os == null); + __os = new BasicStream(ref.getInstance()); + } + } + + protected void + __release(final Ice.LocalException ex) + { + synchronized(__monitor) + { + assert(__os != null); + + // + // This is called by the invoking thread to release the callback following a direct + // failure to marhsall/send the request. We call the ice_exception() callback with + // the thread pool to avoid potential deadlocks in case the invoking thread locked + // some mutexes/resources (which couldn't be re-acquired by the callback). + // + + try + { + __os.instance().clientThreadPool().execute(new ThreadPoolWorkItem() + { + public void + execute(ThreadPool threadPool) + { + threadPool.promoteFollower(); + __exception(ex); + } + }); + } + catch(Ice.CommunicatorDestroyedException exc) + { + __release(); + throw exc; // CommunicatorDestroyedException is the only exception that can propagate directly. + } + } + } + + protected void + __release() + { + synchronized(__monitor) + { + assert(__is != null); + __is = null; + + assert(__os != null); + __os = null; + + __monitor.notify(); + } + } + + protected void + __warning(java.lang.Exception ex) + { + if(__os != null) // Don't print anything if release() was already called. + { + Instance instance = __os.instance(); + if(instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + IceUtil.OutputBase out = new IceUtil.OutputBase(pw); + out.setUseTab(false); + out.print("exception raised by AMI callback:\n"); + ex.printStackTrace(pw); + pw.flush(); + instance.initializationData().logger.warning(sw.toString()); + } + } + } + + protected final java.lang.Object __monitor = new java.lang.Object(); + protected BasicStream __is; + protected BasicStream __os; };
\ No newline at end of file diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 601b522bf10..107ec4986a7 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -55,7 +55,7 @@ public final class OutgoingConnectionFactory // anymore. Only then we can be sure the _connections // contains all connections. // - while(!_destroyed || !_pending.isEmpty() || !_pendingEndpoints.isEmpty()) + while(!_destroyed || !_pending.isEmpty() || _pendingConnectCount > 0) { try { @@ -100,6 +100,7 @@ public final class OutgoingConnectionFactory // methods on member objects. // _connections = null; + _connectionsByEndpoint = null; } } @@ -110,20 +111,6 @@ public final class OutgoingConnectionFactory assert(endpts.length > 0); // - // TODO: Remove when we no longer support SSL for JDK 1.4. We can also remove - // the threadPerConnection argument. - // - for(int i = 0; i < endpts.length; i++) - { - if(!tpc && endpts[i].requiresThreadPerConnection()) - { - Ice.FeatureNotSupportedException ex = new Ice.FeatureNotSupportedException(); - ex.unsupportedFeature = "endpoint requires thread-per-connection:\n" + endpts[i].toString(); - throw ex; - } - } - - // // Apply the overrides. // java.util.List endpoints = applyOverrides(endpts); @@ -271,20 +258,6 @@ public final class OutgoingConnectionFactory assert(endpts.length > 0); // - // TODO: Remove when we no longer support SSL for JDK 1.4. We can also remove - // the threadPerConnection argument. - // - for(int i = 0; i < endpts.length; i++) - { - if(!tpc && endpts[i].requiresThreadPerConnection()) - { - Ice.FeatureNotSupportedException ex = new Ice.FeatureNotSupportedException(); - ex.unsupportedFeature = "endpoint requires thread-per-connection:\n" + endpts[i].toString(); - throw ex; - } - } - - // // Apply the overrides. // java.util.List endpoints = applyOverrides(endpts); @@ -292,16 +265,24 @@ public final class OutgoingConnectionFactory // // Try to find a connection to one of the given endpoints. // - Ice.BooleanHolder compress = new Ice.BooleanHolder(); - Ice.ConnectionI connection = findConnection(endpoints, tpc, compress); - if(connection != null) + try + { + Ice.BooleanHolder compress = new Ice.BooleanHolder(); + Ice.ConnectionI connection = findConnection(endpoints, tpc, compress); + if(connection != null) + { + callback.setConnection(connection, compress.value); + return; + } + } + catch(Ice.LocalException ex) { - callback.setConnection(connection, compress.value); + callback.setException(ex); return; } ConnectCallback cb = new ConnectCallback(this, endpoints, hasMore, callback, selType, tpc); - cb.getConnection(); + cb.getConnectors(); } public synchronized void @@ -456,6 +437,9 @@ public final class OutgoingConnectionFactory { IceUtil.Assert.FinalizerAssert(_destroyed); IceUtil.Assert.FinalizerAssert(_connections == null); + IceUtil.Assert.FinalizerAssert(_connectionsByEndpoint == null); + IceUtil.Assert.FinalizerAssert(_pendingConnectCount == 0); + IceUtil.Assert.FinalizerAssert(_pending.isEmpty()); super.finalize(); } @@ -486,6 +470,11 @@ public final class OutgoingConnectionFactory synchronized private Ice.ConnectionI findConnection(java.util.List endpoints, boolean tpc, Ice.BooleanHolder compress) { + if(_destroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); assert(!endpoints.isEmpty()); @@ -573,25 +562,29 @@ public final class OutgoingConnectionFactory } synchronized private void - addPendingEndpoints(java.util.List endpoints) + incPendingConnectCount() { + // + // Keep track of the number of pending connects. The outgoing connection factory + // waitUntilFinished() method waits for all the pending connects to terminate before + // to return. This ensures that the communicator client thread pool isn't destroyed + // too soon and will still be available to execute the ice_exception() callbacks for + // the asynchronous requests waiting on a connection to be established. + // + if(_destroyed) { throw new Ice.CommunicatorDestroyedException(); } - _pendingEndpoints.addAll(endpoints); + ++_pendingConnectCount; } synchronized private void - removePendingEndpoints(java.util.List endpoints) + decPendingConnectCount() { - java.util.Iterator p = endpoints.iterator(); - while(p.hasNext()) - { - _pendingEndpoints.remove(p.next()); - } - - if(_destroyed) + --_pendingConnectCount; + assert(_pendingConnectCount >= 0); + if(_destroyed && _pendingConnectCount == 0) { notifyAll(); } @@ -963,8 +956,7 @@ public final class OutgoingConnectionFactory public boolean threadPerConnection; } - private static class ConnectCallback implements Ice.ConnectionI.StartCallback, EndpointI_connectors, - ThreadPoolWorkItem + private static class ConnectCallback implements Ice.ConnectionI.StartCallback, EndpointI_connectors { ConnectCallback(OutgoingConnectionFactory f, java.util.List endpoints, boolean more, CreateConnectionCallback cb, Ice.EndpointSelectionType selType, boolean threadPerConnection) @@ -984,8 +976,6 @@ public final class OutgoingConnectionFactory public synchronized void connectionStartCompleted(Ice.ConnectionI connection) { - assert(_exception == null && connection == _connection); - boolean compress; DefaultsAndOverrides defaultsAndOverrides = _factory._instance.defaultsAndOverrides(); if(defaultsAndOverrides.overrideCompress) @@ -996,19 +986,34 @@ public final class OutgoingConnectionFactory { compress = _current.endpoint.compress(); } - + _factory.finishGetConnection(_connectors, this, connection); - _factory.removePendingEndpoints(_endpoints); _callback.setConnection(connection, compress); + _factory.decPendingConnectCount(); // Must be called last. } public synchronized void connectionStartFailed(Ice.ConnectionI connection, Ice.LocalException ex) { - assert(_exception == null && connection == _connection); + assert(_current != null); - _exception = ex; - handleException(); + _factory.handleException(ex, _current, connection, _hasMore || _iter.hasNext()); + if(ex instanceof Ice.CommunicatorDestroyedException) // No need to continue. + { + _factory.finishGetConnection(_connectors, this, null); + _callback.setException(ex); + _factory.decPendingConnectCount(); // Must be called last. + } + else if(_iter.hasNext()) // Try the next connector. + { + nextConnector(); + } + else + { + _factory.finishGetConnection(_connectors, this, null); + _callback.setException(ex); + _factory.decPendingConnectCount(); // Must be called last. + } } // @@ -1033,8 +1038,7 @@ public final class OutgoingConnectionFactory if(_endpointsIter.hasNext()) { - _currentEndpoint = (EndpointI)_endpointsIter.next(); - _currentEndpoint.connectors_async(this); + nextEndpoint(); } else { @@ -1055,8 +1059,7 @@ public final class OutgoingConnectionFactory _factory.handleException(ex, _hasMore || _endpointsIter.hasNext()); if(_endpointsIter.hasNext()) { - _currentEndpoint = (EndpointI)_endpointsIter.next(); - _currentEndpoint.connectors_async(this); + nextEndpoint(); } else if(!_connectors.isEmpty()) { @@ -1069,46 +1072,56 @@ public final class OutgoingConnectionFactory } else { - _exception = ex; - _factory._instance.clientThreadPool().execute(this); + _callback.setException(ex); + _factory.decPendingConnectCount(); // Must be called last. } } - // - // Methods from ThreadPoolWorkItem - // - public void - execute(ThreadPool threadPool) + void + getConnectors() { - threadPool.promoteFollower(); - assert(_exception != null); - _factory.removePendingEndpoints(_endpoints); - _callback.setException(_exception); + try + { + // + // Notify the factory that there's an async connect pending. This is necessary + // to prevent the outgoing connection factory to be destroyed before all the + // pending asynchronous connects are finished. + // + _factory.incPendingConnectCount(); + } + catch(Ice.LocalException ex) + { + _callback.setException(ex); + return; + } + + nextEndpoint(); } void - getConnection() + nextEndpoint() { - // - // First, get the connectors for all the endpoints. - // - if(_endpointsIter.hasNext()) + try { - try - { - _factory.addPendingEndpoints(_endpoints); - _currentEndpoint = (EndpointI)_endpointsIter.next(); - _currentEndpoint.connectors_async(this); - } - catch(Ice.LocalException ex) - { - _callback.setException(ex); - } - return; + assert(_endpointsIter.hasNext()); + _currentEndpoint = (EndpointI)_endpointsIter.next(); + _currentEndpoint.connectors_async(this); + } + catch(Ice.LocalException ex) + { + exception(ex); } + } + void + getConnection() + { try { + // + // If all the connectors have been created, we ask the factory to get a + // connection. + // Ice.BooleanHolder compress = new Ice.BooleanHolder(); Ice.ConnectionI connection = _factory.getConnection(_connectors, this, compress); if(connection == null) @@ -1122,54 +1135,30 @@ public final class OutgoingConnectionFactory return; } - _factory.removePendingEndpoints(_endpoints); _callback.setConnection(connection, compress.value); + _factory.decPendingConnectCount(); // Must be called last. } catch(Ice.LocalException ex) { - _exception = ex; - _factory._instance.clientThreadPool().execute(this); + _callback.setException(ex); + _factory.decPendingConnectCount(); // Must be called last. } } void nextConnector() { - _current = (ConnectorInfo)_iter.next(); + Ice.ConnectionI connection = null; try { - _exception = null; - _connection = _factory.createConnection(_current.connector.connect(0), _current); - _connection.start(this); + assert(_iter.hasNext()); + _current = (ConnectorInfo)_iter.next(); + connection = _factory.createConnection(_current.connector.connect(0), _current); + connection.start(this); } catch(Ice.LocalException ex) { - _exception = ex; - handleException(); - } - } - - private void - handleException() - { - assert(_current != null && _exception != null); - - _factory.handleException(_exception, _current, _connection, _hasMore || _iter.hasNext()); - if(_exception instanceof Ice.CommunicatorDestroyedException) // No need to continue. - { - _factory.finishGetConnection(_connectors, this, null); - _factory.removePendingEndpoints(_endpoints); - _callback.setException(_exception); - } - else if(_iter.hasNext()) // Try the next connector. - { - nextConnector(); - } - else - { - _factory.finishGetConnection(_connectors, this, null); - _factory.removePendingEndpoints(_endpoints); - _callback.setException(_exception); + connectionStartFailed(connection, ex); } } @@ -1184,16 +1173,13 @@ public final class OutgoingConnectionFactory private java.util.List _connectors = new java.util.ArrayList(); private java.util.Iterator _iter; private ConnectorInfo _current; - private Ice.LocalException _exception; - private Ice.ConnectionI _connection; } private final Instance _instance; private boolean _destroyed; private java.util.HashMap _connections = new java.util.HashMap(); - private java.util.HashMap _pending = new java.util.HashMap(); - private java.util.HashMap _connectionsByEndpoint = new java.util.HashMap(); - private java.util.LinkedList _pendingEndpoints = new java.util.LinkedList(); + private java.util.HashMap _pending = new java.util.HashMap(); + private int _pendingConnectCount = 0; } diff --git a/java/src/IceInternal/RequestHandler.java b/java/src/IceInternal/RequestHandler.java index 49df9e6bc8d..e3a9efd7307 100644 --- a/java/src/IceInternal/RequestHandler.java +++ b/java/src/IceInternal/RequestHandler.java @@ -18,7 +18,8 @@ public interface RequestHandler Ice.ConnectionI sendRequest(Outgoing out) throws LocalExceptionWrapper; - void sendAsyncRequest(OutgoingAsync out); + void sendAsyncRequest(OutgoingAsync out) + throws LocalExceptionWrapper; boolean flushBatchRequests(BatchOutgoing out); void flushAsyncBatchRequests(BatchOutgoingAsync out); diff --git a/java/src/IceInternal/TcpEndpointI.java b/java/src/IceInternal/TcpEndpointI.java index 90b3073b3ce..e4a62706b19 100644 --- a/java/src/IceInternal/TcpEndpointI.java +++ b/java/src/IceInternal/TcpEndpointI.java @@ -482,12 +482,6 @@ final class TcpEndpointI extends EndpointI return _host.compareTo(p._host); } - public boolean - requiresThreadPerConnection() - { - return false; - } - public java.util.List connectors(java.util.List addresses) { diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java index 533b91adfc7..bfa7c3ced27 100644 --- a/java/src/IceInternal/TcpTransceiver.java +++ b/java/src/IceInternal/TcpTransceiver.java @@ -179,7 +179,6 @@ final class TcpTransceiver implements Transceiver public boolean write(Buffer buf, int timeout) - throws LocalExceptionWrapper { while(writeBuffer(buf.b)) { diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 6306ca10475..2ba09235fec 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -145,7 +145,6 @@ public final class ThreadPool assert(!_destroyed); assert(_handlerMap.isEmpty()); - assert(_changes.isEmpty()); assert(_workItems.isEmpty()); _destroyed = true; setInterrupt(); @@ -197,7 +196,10 @@ public final class ThreadPool public synchronized void execute(ThreadPoolWorkItem workItem) { - assert(!_destroyed); + if(_destroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } _workItems.add(workItem); setInterrupt(); } @@ -529,9 +531,21 @@ public final class ThreadPool // 3. A work item has been scheduled. // - // Thread pool destroyed? - // - if(_destroyed) + if(!_workItems.isEmpty()) + { + // + // Work items must be executed first even if the thread pool is destroyed. + // + + // + // Remove the interrupt channel from the selected key set. + // + _keys.remove(_fdIntrReadKey); + clearInterrupt(); + assert(!_workItems.isEmpty()); + workItem = (ThreadPoolWorkItem)_workItems.removeFirst(); + } + else if(_destroyed) { if(TRACE_SHUTDOWN) { @@ -539,27 +553,22 @@ public final class ThreadPool } // - // Don't clear the interrupt fd if - // destroyed, so that the other threads - // exit as well. + // Don't clear the interrupt fd if destroyed, so that the other threads exit as well. // return true; } - - // - // Remove the interrupt channel from the - // selected key set. - // - _keys.remove(_fdIntrReadKey); - - clearInterrupt(); - - // - // An event handler must have been registered - // or unregistered. - // - if(!_changes.isEmpty()) + else { + // + // Remove the interrupt channel from the selected key set. + // + _keys.remove(_fdIntrReadKey); + clearInterrupt(); + + // + // An event handler must have been registered or unregistered. + // + assert(!_changes.isEmpty()); FdHandlerPair change = (FdHandlerPair)_changes.removeFirst(); if(change.handler != null) // Addition if handler is set. @@ -627,11 +636,6 @@ public final class ThreadPool // outside the thread synchronization. } } - else - { - assert(!_workItems.isEmpty()); - workItem = (ThreadPoolWorkItem)_workItems.removeFirst(); - } } else { diff --git a/java/src/IceInternal/Transceiver.java b/java/src/IceInternal/Transceiver.java index 7dce61206b8..fbc61da6ff7 100644 --- a/java/src/IceInternal/Transceiver.java +++ b/java/src/IceInternal/Transceiver.java @@ -40,13 +40,7 @@ public interface Transceiver // block until all the data is written or the specified timeout // expires. // - // NOTE: In Java, write() can raise LocalExceptionWrapper to indicate that - // retrying may not be safe, which is necessary to address an issue - // in the IceSSL implementation for JDK 1.4. We can remove this if - // we ever drop support for JDK 1.4 (also see Ice.ConnectionI). - // - boolean write(Buffer buf, int timeout) - throws LocalExceptionWrapper; + boolean write(Buffer buf, int timeout); // // Read data. diff --git a/java/src/IceInternal/UdpEndpointI.java b/java/src/IceInternal/UdpEndpointI.java index 1d635a7dff2..190fdf061c2 100644 --- a/java/src/IceInternal/UdpEndpointI.java +++ b/java/src/IceInternal/UdpEndpointI.java @@ -703,12 +703,6 @@ final class UdpEndpointI extends EndpointI return _host.compareTo(p._host); } - public boolean - requiresThreadPerConnection() - { - return false; - } - public java.util.List connectors(java.util.List addresses) { diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java index e04d4f0b84b..cebd0876374 100644 --- a/java/src/IceInternal/UdpTransceiver.java +++ b/java/src/IceInternal/UdpTransceiver.java @@ -94,7 +94,6 @@ final class UdpTransceiver implements Transceiver public boolean write(Buffer buf, int timeout) - throws LocalExceptionWrapper { assert(buf.b.position() == 0); final int packetSize = java.lang.Math.min(_maxPacketSize, _sndSize - _udpOverhead); diff --git a/java/src/IceSSL/EndpointI.java b/java/src/IceSSL/EndpointI.java index f2a66c95f13..84a94885599 100644 --- a/java/src/IceSSL/EndpointI.java +++ b/java/src/IceSSL/EndpointI.java @@ -483,12 +483,6 @@ final class EndpointI extends IceInternal.EndpointI return _host.compareTo(p._host); } - public boolean - requiresThreadPerConnection() - { - return false; - } - public java.util.List connectors(java.util.List addresses) { diff --git a/java/src/IceSSL/TransceiverI.java b/java/src/IceSSL/TransceiverI.java index defae854a84..97ce62c558c 100644 --- a/java/src/IceSSL/TransceiverI.java +++ b/java/src/IceSSL/TransceiverI.java @@ -226,7 +226,6 @@ final class TransceiverI implements IceInternal.Transceiver // public synchronized boolean write(IceInternal.Buffer buf, int timeout) - throws IceInternal.LocalExceptionWrapper { // // If the handshake isn't completed yet, we shouldn't be writing. |