summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/OutgoingConnectionFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/OutgoingConnectionFactory.java')
-rw-r--r--java/src/IceInternal/OutgoingConnectionFactory.java338
1 files changed, 229 insertions, 109 deletions
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java
index 51881390f20..563368f9f22 100644
--- a/java/src/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/IceInternal/OutgoingConnectionFactory.java
@@ -188,9 +188,10 @@ public final class OutgoingConnectionFactory
//
DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
java.util.Iterator<ConnectorInfo> q = connectors.iterator();
+ ConnectorInfo ci = null;
while(q.hasNext())
{
- ConnectorInfo ci = q.next();
+ ci = q.next();
try
{
connection = createConnection(ci.connector.connect(), ci);
@@ -204,7 +205,7 @@ public final class OutgoingConnectionFactory
{
compress.value = ci.endpoint.compress();
}
-
+ connection.activate();
break;
}
catch(Ice.CommunicatorDestroyedException ex)
@@ -226,7 +227,14 @@ public final class OutgoingConnectionFactory
// Finish creating the connection (this removes the connectors from the _pending
// list and notifies any waiting threads).
//
- finishGetConnection(connectors, null, connection);
+ if(connection != null)
+ {
+ finishGetConnection(connectors, ci, connection, null);
+ }
+ else
+ {
+ finishGetConnection(connectors, exception, null);
+ }
if(connection == null)
{
@@ -489,6 +497,11 @@ public final class OutgoingConnectionFactory
while(p.hasNext())
{
ConnectorInfo ci = p.next();
+ if(_pending.containsKey(ci))
+ {
+ continue;
+ }
+
java.util.List<Ice.ConnectionI> connectionList = _connections.get(ci);
if(connectionList == null)
{
@@ -619,64 +632,23 @@ public final class OutgoingConnectionFactory
// finish if one of them is currently establishing a connection to one
// of our connectors.
//
- while(!_destroyed)
+ while(true)
{
+ if(_destroyed)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
+
//
// Search for a matching connection. If we find one, we're done.
//
Ice.ConnectionI connection = findConnection(connectors, compress);
if(connection != null)
{
- if(cb != null)
- {
- //
- // This might not be the first getConnection call for the callback. We need
- // to ensure that the callback isn't registered with any other pending
- // connectors since we just found a connection and therefore don't need to
- // wait anymore for other pending connectors.
- //
- java.util.Iterator<ConnectorInfo> p = connectors.iterator();
- while(p.hasNext())
- {
- java.util.Set<ConnectCallback> cbs = _pending.get(p.next());
- if(cbs != null)
- {
- cbs.remove(cb);
- }
- }
- }
return connection;
}
-
- //
- // Determine whether another thread is currently attempting to connect to one of our endpoints;
- // if so we wait until it's done.
- //
- java.util.Iterator<ConnectorInfo> p = connectors.iterator();
- boolean found = false;
- while(p.hasNext())
- {
- java.util.Set<ConnectCallback> cbs = _pending.get(p.next());
- if(cbs != null)
- {
- found = true;
- if(cb != null)
- {
- cbs.add(cb); // Add the callback to each pending connector.
- }
- }
- }
- if(!found)
- {
- //
- // If no thread is currently establishing a connection to one of our connectors,
- // we get out of this loop and start the connection establishment to one of the
- // given connectors.
- //
- break;
- }
- else
+ if(addToPending(cb, connectors))
{
//
// If a callback is not specified we wait until another thread notifies us about a
@@ -699,25 +671,14 @@ public final class OutgoingConnectionFactory
return null;
}
}
- }
-
- if(_destroyed)
- {
- throw new Ice.CommunicatorDestroyedException();
- }
-
- //
- // No connection to any of our endpoints exists yet; we add the given connectors to
- // the _pending set to indicate that we're attempting connection establishment to
- // these connectors. We might attempt to connect to the same connector multiple times.
- //
- java.util.Iterator<ConnectorInfo> p = connectors.iterator();
- while(p.hasNext())
- {
- ConnectorInfo obj = p.next();
- if(!_pending.containsKey(obj))
+ else
{
- _pending.put(obj, new java.util.HashSet<ConnectCallback>());
+ //
+ // If no thread is currently establishing a connection to one of our connectors,
+ // we get out of this loop and start the connection establishment to one of the
+ // given connectors.
+ //
+ break;
}
}
}
@@ -786,46 +747,180 @@ public final class OutgoingConnectionFactory
}
private void
- finishGetConnection(java.util.List<ConnectorInfo> connectors, ConnectCallback cb, Ice.ConnectionI connection)
+ finishGetConnection(java.util.List<ConnectorInfo> connectors,
+ ConnectorInfo ci,
+ Ice.ConnectionI connection,
+ ConnectCallback cb)
{
- java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>();
+ java.util.Set<ConnectCallback> connectionCallbacks = new java.util.HashSet<ConnectCallback>();
+ if(cb != null)
+ {
+ connectionCallbacks.add(cb);
+ }
+ java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>();
synchronized(this)
{
- //
- // We're done trying to connect to the given connectors so we remove the
- // connectors from the pending list and notify waiting threads. We also
- // notify the pending connect callbacks (outside the synchronization).
- //
-
java.util.Iterator<ConnectorInfo> p = connectors.iterator();
while(p.hasNext())
{
- java.util.Set<ConnectCallback> cbs = _pending.remove(p.next());
+ ConnectorInfo c = p.next();
+ java.util.Set<ConnectCallback> cbs = _pending.remove(c);
if(cbs != null)
{
- callbacks.addAll(cbs);
+ for(ConnectCallback cc : cbs)
+ {
+ if(cc.hasConnector(ci))
+ {
+ connectionCallbacks.add(cc);
+ }
+ else
+ {
+ callbacks.add(cc);
+ }
+ }
}
}
+
+ for(ConnectCallback cc : connectionCallbacks)
+ {
+ cc.removeFromPending();
+ callbacks.remove(cc);
+ }
+ for(ConnectCallback cc : callbacks)
+ {
+ cc.removeFromPending();
+ }
notifyAll();
+ }
+
+ boolean compress;
+ DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideCompress)
+ {
+ compress = defaultsAndOverrides.overrideCompressValue;
+ }
+ else
+ {
+ compress = ci.endpoint.compress();
+ }
- //
- // If the connect attempt succeeded and the communicator is not destroyed,
- // activate the connection!
- //
- if(connection != null && !_destroyed)
+ for(ConnectCallback cc : callbacks)
+ {
+ cc.getConnection();
+ }
+ for(ConnectCallback cc : connectionCallbacks)
+ {
+ cc.setConnection(connection, compress);
+ }
+ }
+
+ private void
+ finishGetConnection(java.util.List<ConnectorInfo> connectors, Ice.LocalException ex, ConnectCallback cb)
+ {
+ java.util.Set<ConnectCallback> failedCallbacks = new java.util.HashSet<ConnectCallback>();
+ if(cb != null)
+ {
+ failedCallbacks.add(cb);
+ }
+
+ java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>();
+ synchronized(this)
+ {
+ java.util.Iterator<ConnectorInfo> p = connectors.iterator();
+ while(p.hasNext())
{
- connection.activate();
+ ConnectorInfo c = p.next();
+ java.util.Set<ConnectCallback> cbs = _pending.remove(c);
+ if(cbs != null)
+ {
+ for(ConnectCallback cc : cbs)
+ {
+ if(cc.removeConnectors(connectors))
+ {
+ failedCallbacks.add(cc);
+ }
+ else
+ {
+ callbacks.add(cc);
+ }
+ }
+ }
+ }
+
+ for(ConnectCallback cc : callbacks)
+ {
+ assert(!failedCallbacks.contains(cc));
+ cc.removeFromPending();
}
+ notifyAll();
}
+ for(ConnectCallback cc : callbacks)
+ {
+ cc.getConnection();
+ }
+ for(ConnectCallback cc : failedCallbacks)
+ {
+ cc.setException(ex);
+ }
+ }
+
+ private boolean
+ addToPending(ConnectCallback cb, java.util.List<ConnectorInfo> connectors)
+ {
//
- // Notify any waiting callbacks.
+ // Add the callback to each connector pending list.
//
- java.util.Iterator<ConnectCallback> p = callbacks.iterator();
+ java.util.Iterator<ConnectorInfo> p = connectors.iterator();
+ boolean found = false;
+ while(p.hasNext())
+ {
+ java.util.Set<ConnectCallback> cbs = _pending.get(p.next());
+ if(cbs != null)
+ {
+ found = true;
+ if(cb != null)
+ {
+ cbs.add(cb); // Add the callback to each pending connector.
+ }
+ }
+ }
+
+ if(found)
+ {
+ return true;
+ }
+
+ //
+ // If there's no pending connection for the given connectors, we're
+ // responsible for its establishment. We add empty pending lists,
+ // other callbacks to the same connectors will be queued.
+ //
+ p = connectors.iterator();
+ while(p.hasNext())
+ {
+ ConnectorInfo obj = p.next();
+ if(!_pending.containsKey(obj))
+ {
+ _pending.put(obj, new java.util.HashSet<ConnectCallback>());
+ }
+ }
+
+ return false;
+ }
+
+ private void
+ removeFromPending(ConnectCallback cb, java.util.List<ConnectorInfo> connectors)
+ {
+ java.util.Iterator<ConnectorInfo> p = connectors.iterator();
while(p.hasNext())
{
- p.next().getConnection();
+ java.util.Set<ConnectCallback> cbs = _pending.get(p.next());
+ if(cbs != null)
+ {
+ cbs.remove(cb);
+ }
}
}
@@ -954,27 +1049,15 @@ public final class OutgoingConnectionFactory
_selType = selType;
_endpointsIter = _endpoints.iterator();
}
-
+
//
// Methods from ConnectionI.StartCallback
//
public void
connectionStartCompleted(Ice.ConnectionI connection)
{
- boolean compress;
- DefaultsAndOverrides defaultsAndOverrides = _factory._instance.defaultsAndOverrides();
- if(defaultsAndOverrides.overrideCompress)
- {
- compress = defaultsAndOverrides.overrideCompressValue;
- }
- else
- {
- compress = _current.endpoint.compress();
- }
-
- _factory.finishGetConnection(_connectors, this, connection);
- _callback.setConnection(connection, compress);
- _factory.decPendingConnectCount(); // Must be called last.
+ connection.activate();
+ _factory.finishGetConnection(_connectors, _current, connection, this);
}
public void
@@ -985,9 +1068,7 @@ public final class OutgoingConnectionFactory
_factory.handleException(ex, _current, connection, _hasMore || _iter.hasNext());
if(ex instanceof Ice.CommunicatorDestroyedException) // No need to continue.
{
- _factory.finishGetConnection(_connectors, this, null);
- _callback.setException(ex);
- _factory.decPendingConnectCount(); // Must be called last.
+ _factory.finishGetConnection(_connectors, ex, this);
}
else if(_iter.hasNext()) // Try the next connector.
{
@@ -995,9 +1076,7 @@ public final class OutgoingConnectionFactory
}
else
{
- _factory.finishGetConnection(_connectors, this, null);
- _callback.setException(ex);
- _factory.decPendingConnectCount(); // Must be called last.
+ _factory.finishGetConnection(_connectors, ex, this);
}
}
@@ -1062,6 +1141,47 @@ public final class OutgoingConnectionFactory
}
}
+ public void
+ setConnection(Ice.ConnectionI connection, boolean compress)
+ {
+ //
+ // Callback from the factory: the connection to one of the callback
+ // connectors has been established.
+ //
+ _callback.setConnection(connection, compress);
+ _factory.decPendingConnectCount(); // Must be called last.
+ }
+
+ public void
+ setException(Ice.LocalException ex)
+ {
+ //
+ // Callback from the factory: connection establishment failed.
+ //
+ _callback.setException(ex);
+ _factory.decPendingConnectCount(); // Must be called last.
+ }
+
+ public boolean
+ hasConnector(ConnectorInfo ci)
+ {
+ return _connectors.contains(ci);
+ }
+
+ public boolean
+ removeConnectors(java.util.List<ConnectorInfo> connectors)
+ {
+ _connectors.removeAll(connectors);
+ _iter = _connectors.iterator();
+ return _connectors.isEmpty();
+ }
+
+ public void
+ removeFromPending()
+ {
+ _factory.removeFromPending(this, _connectors);
+ }
+
void
getConnectors()
{