summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/OutgoingConnectionFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/OutgoingConnectionFactory.java')
-rw-r--r--java/src/IceInternal/OutgoingConnectionFactory.java230
1 files changed, 108 insertions, 122 deletions
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java
index 601b522bf10..107ec4986a7 100644
--- a/java/src/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/IceInternal/OutgoingConnectionFactory.java
@@ -55,7 +55,7 @@ public final class OutgoingConnectionFactory
// anymore. Only then we can be sure the _connections
// contains all connections.
//
- while(!_destroyed || !_pending.isEmpty() || !_pendingEndpoints.isEmpty())
+ while(!_destroyed || !_pending.isEmpty() || _pendingConnectCount > 0)
{
try
{
@@ -100,6 +100,7 @@ public final class OutgoingConnectionFactory
// methods on member objects.
//
_connections = null;
+ _connectionsByEndpoint = null;
}
}
@@ -110,20 +111,6 @@ public final class OutgoingConnectionFactory
assert(endpts.length > 0);
//
- // TODO: Remove when we no longer support SSL for JDK 1.4. We can also remove
- // the threadPerConnection argument.
- //
- for(int i = 0; i < endpts.length; i++)
- {
- if(!tpc && endpts[i].requiresThreadPerConnection())
- {
- Ice.FeatureNotSupportedException ex = new Ice.FeatureNotSupportedException();
- ex.unsupportedFeature = "endpoint requires thread-per-connection:\n" + endpts[i].toString();
- throw ex;
- }
- }
-
- //
// Apply the overrides.
//
java.util.List endpoints = applyOverrides(endpts);
@@ -271,20 +258,6 @@ public final class OutgoingConnectionFactory
assert(endpts.length > 0);
//
- // TODO: Remove when we no longer support SSL for JDK 1.4. We can also remove
- // the threadPerConnection argument.
- //
- for(int i = 0; i < endpts.length; i++)
- {
- if(!tpc && endpts[i].requiresThreadPerConnection())
- {
- Ice.FeatureNotSupportedException ex = new Ice.FeatureNotSupportedException();
- ex.unsupportedFeature = "endpoint requires thread-per-connection:\n" + endpts[i].toString();
- throw ex;
- }
- }
-
- //
// Apply the overrides.
//
java.util.List endpoints = applyOverrides(endpts);
@@ -292,16 +265,24 @@ public final class OutgoingConnectionFactory
//
// Try to find a connection to one of the given endpoints.
//
- Ice.BooleanHolder compress = new Ice.BooleanHolder();
- Ice.ConnectionI connection = findConnection(endpoints, tpc, compress);
- if(connection != null)
+ try
+ {
+ Ice.BooleanHolder compress = new Ice.BooleanHolder();
+ Ice.ConnectionI connection = findConnection(endpoints, tpc, compress);
+ if(connection != null)
+ {
+ callback.setConnection(connection, compress.value);
+ return;
+ }
+ }
+ catch(Ice.LocalException ex)
{
- callback.setConnection(connection, compress.value);
+ callback.setException(ex);
return;
}
ConnectCallback cb = new ConnectCallback(this, endpoints, hasMore, callback, selType, tpc);
- cb.getConnection();
+ cb.getConnectors();
}
public synchronized void
@@ -456,6 +437,9 @@ public final class OutgoingConnectionFactory
{
IceUtil.Assert.FinalizerAssert(_destroyed);
IceUtil.Assert.FinalizerAssert(_connections == null);
+ IceUtil.Assert.FinalizerAssert(_connectionsByEndpoint == null);
+ IceUtil.Assert.FinalizerAssert(_pendingConnectCount == 0);
+ IceUtil.Assert.FinalizerAssert(_pending.isEmpty());
super.finalize();
}
@@ -486,6 +470,11 @@ public final class OutgoingConnectionFactory
synchronized private Ice.ConnectionI
findConnection(java.util.List endpoints, boolean tpc, Ice.BooleanHolder compress)
{
+ if(_destroyed)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
+
DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
assert(!endpoints.isEmpty());
@@ -573,25 +562,29 @@ public final class OutgoingConnectionFactory
}
synchronized private void
- addPendingEndpoints(java.util.List endpoints)
+ incPendingConnectCount()
{
+ //
+ // Keep track of the number of pending connects. The outgoing connection factory
+ // waitUntilFinished() method waits for all the pending connects to terminate before
+ // to return. This ensures that the communicator client thread pool isn't destroyed
+ // too soon and will still be available to execute the ice_exception() callbacks for
+ // the asynchronous requests waiting on a connection to be established.
+ //
+
if(_destroyed)
{
throw new Ice.CommunicatorDestroyedException();
}
- _pendingEndpoints.addAll(endpoints);
+ ++_pendingConnectCount;
}
synchronized private void
- removePendingEndpoints(java.util.List endpoints)
+ decPendingConnectCount()
{
- java.util.Iterator p = endpoints.iterator();
- while(p.hasNext())
- {
- _pendingEndpoints.remove(p.next());
- }
-
- if(_destroyed)
+ --_pendingConnectCount;
+ assert(_pendingConnectCount >= 0);
+ if(_destroyed && _pendingConnectCount == 0)
{
notifyAll();
}
@@ -963,8 +956,7 @@ public final class OutgoingConnectionFactory
public boolean threadPerConnection;
}
- private static class ConnectCallback implements Ice.ConnectionI.StartCallback, EndpointI_connectors,
- ThreadPoolWorkItem
+ private static class ConnectCallback implements Ice.ConnectionI.StartCallback, EndpointI_connectors
{
ConnectCallback(OutgoingConnectionFactory f, java.util.List endpoints, boolean more,
CreateConnectionCallback cb, Ice.EndpointSelectionType selType, boolean threadPerConnection)
@@ -984,8 +976,6 @@ public final class OutgoingConnectionFactory
public synchronized void
connectionStartCompleted(Ice.ConnectionI connection)
{
- assert(_exception == null && connection == _connection);
-
boolean compress;
DefaultsAndOverrides defaultsAndOverrides = _factory._instance.defaultsAndOverrides();
if(defaultsAndOverrides.overrideCompress)
@@ -996,19 +986,34 @@ public final class OutgoingConnectionFactory
{
compress = _current.endpoint.compress();
}
-
+
_factory.finishGetConnection(_connectors, this, connection);
- _factory.removePendingEndpoints(_endpoints);
_callback.setConnection(connection, compress);
+ _factory.decPendingConnectCount(); // Must be called last.
}
public synchronized void
connectionStartFailed(Ice.ConnectionI connection, Ice.LocalException ex)
{
- assert(_exception == null && connection == _connection);
+ assert(_current != null);
- _exception = ex;
- handleException();
+ _factory.handleException(ex, _current, connection, _hasMore || _iter.hasNext());
+ if(ex instanceof Ice.CommunicatorDestroyedException) // No need to continue.
+ {
+ _factory.finishGetConnection(_connectors, this, null);
+ _callback.setException(ex);
+ _factory.decPendingConnectCount(); // Must be called last.
+ }
+ else if(_iter.hasNext()) // Try the next connector.
+ {
+ nextConnector();
+ }
+ else
+ {
+ _factory.finishGetConnection(_connectors, this, null);
+ _callback.setException(ex);
+ _factory.decPendingConnectCount(); // Must be called last.
+ }
}
//
@@ -1033,8 +1038,7 @@ public final class OutgoingConnectionFactory
if(_endpointsIter.hasNext())
{
- _currentEndpoint = (EndpointI)_endpointsIter.next();
- _currentEndpoint.connectors_async(this);
+ nextEndpoint();
}
else
{
@@ -1055,8 +1059,7 @@ public final class OutgoingConnectionFactory
_factory.handleException(ex, _hasMore || _endpointsIter.hasNext());
if(_endpointsIter.hasNext())
{
- _currentEndpoint = (EndpointI)_endpointsIter.next();
- _currentEndpoint.connectors_async(this);
+ nextEndpoint();
}
else if(!_connectors.isEmpty())
{
@@ -1069,46 +1072,56 @@ public final class OutgoingConnectionFactory
}
else
{
- _exception = ex;
- _factory._instance.clientThreadPool().execute(this);
+ _callback.setException(ex);
+ _factory.decPendingConnectCount(); // Must be called last.
}
}
- //
- // Methods from ThreadPoolWorkItem
- //
- public void
- execute(ThreadPool threadPool)
+ void
+ getConnectors()
{
- threadPool.promoteFollower();
- assert(_exception != null);
- _factory.removePendingEndpoints(_endpoints);
- _callback.setException(_exception);
+ try
+ {
+ //
+ // Notify the factory that there's an async connect pending. This is necessary
+ // to prevent the outgoing connection factory to be destroyed before all the
+ // pending asynchronous connects are finished.
+ //
+ _factory.incPendingConnectCount();
+ }
+ catch(Ice.LocalException ex)
+ {
+ _callback.setException(ex);
+ return;
+ }
+
+ nextEndpoint();
}
void
- getConnection()
+ nextEndpoint()
{
- //
- // First, get the connectors for all the endpoints.
- //
- if(_endpointsIter.hasNext())
+ try
{
- try
- {
- _factory.addPendingEndpoints(_endpoints);
- _currentEndpoint = (EndpointI)_endpointsIter.next();
- _currentEndpoint.connectors_async(this);
- }
- catch(Ice.LocalException ex)
- {
- _callback.setException(ex);
- }
- return;
+ assert(_endpointsIter.hasNext());
+ _currentEndpoint = (EndpointI)_endpointsIter.next();
+ _currentEndpoint.connectors_async(this);
+ }
+ catch(Ice.LocalException ex)
+ {
+ exception(ex);
}
+ }
+ void
+ getConnection()
+ {
try
{
+ //
+ // If all the connectors have been created, we ask the factory to get a
+ // connection.
+ //
Ice.BooleanHolder compress = new Ice.BooleanHolder();
Ice.ConnectionI connection = _factory.getConnection(_connectors, this, compress);
if(connection == null)
@@ -1122,54 +1135,30 @@ public final class OutgoingConnectionFactory
return;
}
- _factory.removePendingEndpoints(_endpoints);
_callback.setConnection(connection, compress.value);
+ _factory.decPendingConnectCount(); // Must be called last.
}
catch(Ice.LocalException ex)
{
- _exception = ex;
- _factory._instance.clientThreadPool().execute(this);
+ _callback.setException(ex);
+ _factory.decPendingConnectCount(); // Must be called last.
}
}
void
nextConnector()
{
- _current = (ConnectorInfo)_iter.next();
+ Ice.ConnectionI connection = null;
try
{
- _exception = null;
- _connection = _factory.createConnection(_current.connector.connect(0), _current);
- _connection.start(this);
+ assert(_iter.hasNext());
+ _current = (ConnectorInfo)_iter.next();
+ connection = _factory.createConnection(_current.connector.connect(0), _current);
+ connection.start(this);
}
catch(Ice.LocalException ex)
{
- _exception = ex;
- handleException();
- }
- }
-
- private void
- handleException()
- {
- assert(_current != null && _exception != null);
-
- _factory.handleException(_exception, _current, _connection, _hasMore || _iter.hasNext());
- if(_exception instanceof Ice.CommunicatorDestroyedException) // No need to continue.
- {
- _factory.finishGetConnection(_connectors, this, null);
- _factory.removePendingEndpoints(_endpoints);
- _callback.setException(_exception);
- }
- else if(_iter.hasNext()) // Try the next connector.
- {
- nextConnector();
- }
- else
- {
- _factory.finishGetConnection(_connectors, this, null);
- _factory.removePendingEndpoints(_endpoints);
- _callback.setException(_exception);
+ connectionStartFailed(connection, ex);
}
}
@@ -1184,16 +1173,13 @@ public final class OutgoingConnectionFactory
private java.util.List _connectors = new java.util.ArrayList();
private java.util.Iterator _iter;
private ConnectorInfo _current;
- private Ice.LocalException _exception;
- private Ice.ConnectionI _connection;
}
private final Instance _instance;
private boolean _destroyed;
private java.util.HashMap _connections = new java.util.HashMap();
- private java.util.HashMap _pending = new java.util.HashMap();
-
private java.util.HashMap _connectionsByEndpoint = new java.util.HashMap();
- private java.util.LinkedList _pendingEndpoints = new java.util.LinkedList();
+ private java.util.HashMap _pending = new java.util.HashMap();
+ private int _pendingConnectCount = 0;
}