diff options
Diffstat (limited to 'java/src/IceInternal/OutgoingConnectionFactory.java')
-rw-r--r-- | java/src/IceInternal/OutgoingConnectionFactory.java | 230 |
1 files changed, 108 insertions, 122 deletions
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 601b522bf10..107ec4986a7 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -55,7 +55,7 @@ public final class OutgoingConnectionFactory // anymore. Only then we can be sure the _connections // contains all connections. // - while(!_destroyed || !_pending.isEmpty() || !_pendingEndpoints.isEmpty()) + while(!_destroyed || !_pending.isEmpty() || _pendingConnectCount > 0) { try { @@ -100,6 +100,7 @@ public final class OutgoingConnectionFactory // methods on member objects. // _connections = null; + _connectionsByEndpoint = null; } } @@ -110,20 +111,6 @@ public final class OutgoingConnectionFactory assert(endpts.length > 0); // - // TODO: Remove when we no longer support SSL for JDK 1.4. We can also remove - // the threadPerConnection argument. - // - for(int i = 0; i < endpts.length; i++) - { - if(!tpc && endpts[i].requiresThreadPerConnection()) - { - Ice.FeatureNotSupportedException ex = new Ice.FeatureNotSupportedException(); - ex.unsupportedFeature = "endpoint requires thread-per-connection:\n" + endpts[i].toString(); - throw ex; - } - } - - // // Apply the overrides. // java.util.List endpoints = applyOverrides(endpts); @@ -271,20 +258,6 @@ public final class OutgoingConnectionFactory assert(endpts.length > 0); // - // TODO: Remove when we no longer support SSL for JDK 1.4. We can also remove - // the threadPerConnection argument. - // - for(int i = 0; i < endpts.length; i++) - { - if(!tpc && endpts[i].requiresThreadPerConnection()) - { - Ice.FeatureNotSupportedException ex = new Ice.FeatureNotSupportedException(); - ex.unsupportedFeature = "endpoint requires thread-per-connection:\n" + endpts[i].toString(); - throw ex; - } - } - - // // Apply the overrides. // java.util.List endpoints = applyOverrides(endpts); @@ -292,16 +265,24 @@ public final class OutgoingConnectionFactory // // Try to find a connection to one of the given endpoints. // - Ice.BooleanHolder compress = new Ice.BooleanHolder(); - Ice.ConnectionI connection = findConnection(endpoints, tpc, compress); - if(connection != null) + try + { + Ice.BooleanHolder compress = new Ice.BooleanHolder(); + Ice.ConnectionI connection = findConnection(endpoints, tpc, compress); + if(connection != null) + { + callback.setConnection(connection, compress.value); + return; + } + } + catch(Ice.LocalException ex) { - callback.setConnection(connection, compress.value); + callback.setException(ex); return; } ConnectCallback cb = new ConnectCallback(this, endpoints, hasMore, callback, selType, tpc); - cb.getConnection(); + cb.getConnectors(); } public synchronized void @@ -456,6 +437,9 @@ public final class OutgoingConnectionFactory { IceUtil.Assert.FinalizerAssert(_destroyed); IceUtil.Assert.FinalizerAssert(_connections == null); + IceUtil.Assert.FinalizerAssert(_connectionsByEndpoint == null); + IceUtil.Assert.FinalizerAssert(_pendingConnectCount == 0); + IceUtil.Assert.FinalizerAssert(_pending.isEmpty()); super.finalize(); } @@ -486,6 +470,11 @@ public final class OutgoingConnectionFactory synchronized private Ice.ConnectionI findConnection(java.util.List endpoints, boolean tpc, Ice.BooleanHolder compress) { + if(_destroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); assert(!endpoints.isEmpty()); @@ -573,25 +562,29 @@ public final class OutgoingConnectionFactory } synchronized private void - addPendingEndpoints(java.util.List endpoints) + incPendingConnectCount() { + // + // Keep track of the number of pending connects. The outgoing connection factory + // waitUntilFinished() method waits for all the pending connects to terminate before + // to return. This ensures that the communicator client thread pool isn't destroyed + // too soon and will still be available to execute the ice_exception() callbacks for + // the asynchronous requests waiting on a connection to be established. + // + if(_destroyed) { throw new Ice.CommunicatorDestroyedException(); } - _pendingEndpoints.addAll(endpoints); + ++_pendingConnectCount; } synchronized private void - removePendingEndpoints(java.util.List endpoints) + decPendingConnectCount() { - java.util.Iterator p = endpoints.iterator(); - while(p.hasNext()) - { - _pendingEndpoints.remove(p.next()); - } - - if(_destroyed) + --_pendingConnectCount; + assert(_pendingConnectCount >= 0); + if(_destroyed && _pendingConnectCount == 0) { notifyAll(); } @@ -963,8 +956,7 @@ public final class OutgoingConnectionFactory public boolean threadPerConnection; } - private static class ConnectCallback implements Ice.ConnectionI.StartCallback, EndpointI_connectors, - ThreadPoolWorkItem + private static class ConnectCallback implements Ice.ConnectionI.StartCallback, EndpointI_connectors { ConnectCallback(OutgoingConnectionFactory f, java.util.List endpoints, boolean more, CreateConnectionCallback cb, Ice.EndpointSelectionType selType, boolean threadPerConnection) @@ -984,8 +976,6 @@ public final class OutgoingConnectionFactory public synchronized void connectionStartCompleted(Ice.ConnectionI connection) { - assert(_exception == null && connection == _connection); - boolean compress; DefaultsAndOverrides defaultsAndOverrides = _factory._instance.defaultsAndOverrides(); if(defaultsAndOverrides.overrideCompress) @@ -996,19 +986,34 @@ public final class OutgoingConnectionFactory { compress = _current.endpoint.compress(); } - + _factory.finishGetConnection(_connectors, this, connection); - _factory.removePendingEndpoints(_endpoints); _callback.setConnection(connection, compress); + _factory.decPendingConnectCount(); // Must be called last. } public synchronized void connectionStartFailed(Ice.ConnectionI connection, Ice.LocalException ex) { - assert(_exception == null && connection == _connection); + assert(_current != null); - _exception = ex; - handleException(); + _factory.handleException(ex, _current, connection, _hasMore || _iter.hasNext()); + if(ex instanceof Ice.CommunicatorDestroyedException) // No need to continue. + { + _factory.finishGetConnection(_connectors, this, null); + _callback.setException(ex); + _factory.decPendingConnectCount(); // Must be called last. + } + else if(_iter.hasNext()) // Try the next connector. + { + nextConnector(); + } + else + { + _factory.finishGetConnection(_connectors, this, null); + _callback.setException(ex); + _factory.decPendingConnectCount(); // Must be called last. + } } // @@ -1033,8 +1038,7 @@ public final class OutgoingConnectionFactory if(_endpointsIter.hasNext()) { - _currentEndpoint = (EndpointI)_endpointsIter.next(); - _currentEndpoint.connectors_async(this); + nextEndpoint(); } else { @@ -1055,8 +1059,7 @@ public final class OutgoingConnectionFactory _factory.handleException(ex, _hasMore || _endpointsIter.hasNext()); if(_endpointsIter.hasNext()) { - _currentEndpoint = (EndpointI)_endpointsIter.next(); - _currentEndpoint.connectors_async(this); + nextEndpoint(); } else if(!_connectors.isEmpty()) { @@ -1069,46 +1072,56 @@ public final class OutgoingConnectionFactory } else { - _exception = ex; - _factory._instance.clientThreadPool().execute(this); + _callback.setException(ex); + _factory.decPendingConnectCount(); // Must be called last. } } - // - // Methods from ThreadPoolWorkItem - // - public void - execute(ThreadPool threadPool) + void + getConnectors() { - threadPool.promoteFollower(); - assert(_exception != null); - _factory.removePendingEndpoints(_endpoints); - _callback.setException(_exception); + try + { + // + // Notify the factory that there's an async connect pending. This is necessary + // to prevent the outgoing connection factory to be destroyed before all the + // pending asynchronous connects are finished. + // + _factory.incPendingConnectCount(); + } + catch(Ice.LocalException ex) + { + _callback.setException(ex); + return; + } + + nextEndpoint(); } void - getConnection() + nextEndpoint() { - // - // First, get the connectors for all the endpoints. - // - if(_endpointsIter.hasNext()) + try { - try - { - _factory.addPendingEndpoints(_endpoints); - _currentEndpoint = (EndpointI)_endpointsIter.next(); - _currentEndpoint.connectors_async(this); - } - catch(Ice.LocalException ex) - { - _callback.setException(ex); - } - return; + assert(_endpointsIter.hasNext()); + _currentEndpoint = (EndpointI)_endpointsIter.next(); + _currentEndpoint.connectors_async(this); + } + catch(Ice.LocalException ex) + { + exception(ex); } + } + void + getConnection() + { try { + // + // If all the connectors have been created, we ask the factory to get a + // connection. + // Ice.BooleanHolder compress = new Ice.BooleanHolder(); Ice.ConnectionI connection = _factory.getConnection(_connectors, this, compress); if(connection == null) @@ -1122,54 +1135,30 @@ public final class OutgoingConnectionFactory return; } - _factory.removePendingEndpoints(_endpoints); _callback.setConnection(connection, compress.value); + _factory.decPendingConnectCount(); // Must be called last. } catch(Ice.LocalException ex) { - _exception = ex; - _factory._instance.clientThreadPool().execute(this); + _callback.setException(ex); + _factory.decPendingConnectCount(); // Must be called last. } } void nextConnector() { - _current = (ConnectorInfo)_iter.next(); + Ice.ConnectionI connection = null; try { - _exception = null; - _connection = _factory.createConnection(_current.connector.connect(0), _current); - _connection.start(this); + assert(_iter.hasNext()); + _current = (ConnectorInfo)_iter.next(); + connection = _factory.createConnection(_current.connector.connect(0), _current); + connection.start(this); } catch(Ice.LocalException ex) { - _exception = ex; - handleException(); - } - } - - private void - handleException() - { - assert(_current != null && _exception != null); - - _factory.handleException(_exception, _current, _connection, _hasMore || _iter.hasNext()); - if(_exception instanceof Ice.CommunicatorDestroyedException) // No need to continue. - { - _factory.finishGetConnection(_connectors, this, null); - _factory.removePendingEndpoints(_endpoints); - _callback.setException(_exception); - } - else if(_iter.hasNext()) // Try the next connector. - { - nextConnector(); - } - else - { - _factory.finishGetConnection(_connectors, this, null); - _factory.removePendingEndpoints(_endpoints); - _callback.setException(_exception); + connectionStartFailed(connection, ex); } } @@ -1184,16 +1173,13 @@ public final class OutgoingConnectionFactory private java.util.List _connectors = new java.util.ArrayList(); private java.util.Iterator _iter; private ConnectorInfo _current; - private Ice.LocalException _exception; - private Ice.ConnectionI _connection; } private final Instance _instance; private boolean _destroyed; private java.util.HashMap _connections = new java.util.HashMap(); - private java.util.HashMap _pending = new java.util.HashMap(); - private java.util.HashMap _connectionsByEndpoint = new java.util.HashMap(); - private java.util.LinkedList _pendingEndpoints = new java.util.LinkedList(); + private java.util.HashMap _pending = new java.util.HashMap(); + private int _pendingConnectCount = 0; } |