summaryrefslogtreecommitdiff
path: root/cs/src/Ice/ConnectionFactory.cs
diff options
context:
space:
mode:
Diffstat (limited to 'cs/src/Ice/ConnectionFactory.cs')
-rw-r--r--cs/src/Ice/ConnectionFactory.cs325
1 files changed, 218 insertions, 107 deletions
diff --git a/cs/src/Ice/ConnectionFactory.cs b/cs/src/Ice/ConnectionFactory.cs
index b671ede01d9..564d718f971 100644
--- a/cs/src/Ice/ConnectionFactory.cs
+++ b/cs/src/Ice/ConnectionFactory.cs
@@ -183,9 +183,10 @@ namespace IceInternal
// Try to establish the connection to the connectors.
//
DefaultsAndOverrides defaultsAndOverrides = instance_.defaultsAndOverrides();
+ ConnectorInfo ci = null;
for(int i = 0; i < connectors.Count; ++i)
{
- ConnectorInfo ci = connectors[i];
+ ci = connectors[i];
try
{
connection = createConnection(ci.connector.connect(), ci);
@@ -199,7 +200,7 @@ namespace IceInternal
{
compress = ci.endpoint.compress();
}
-
+ connection.activate();
break;
}
catch(Ice.CommunicatorDestroyedException ex)
@@ -221,7 +222,14 @@ namespace IceInternal
// Finish creating the connection (this removes the connectors from the _pending
// list and notifies any waiting threads).
//
- finishGetConnection(connectors, null, connection);
+ if(connection != null)
+ {
+ finishGetConnection(connectors, ci, connection, null);
+ }
+ else
+ {
+ finishGetConnection(connectors, exception, null);
+ }
if(connection == null)
{
@@ -454,6 +462,11 @@ namespace IceInternal
DefaultsAndOverrides defaultsAndOverrides = instance_.defaultsAndOverrides();
foreach(ConnectorInfo ci in connectors)
{
+ if(_pending.ContainsKey(ci))
+ {
+ continue;
+ }
+
LinkedList connectionList = null;
if(!_connections.TryGetValue(ci, out connectionList))
{
@@ -588,62 +601,23 @@ namespace IceInternal
// finish if one of them is currently establishing a connection to one
// of our connectors.
//
- while(!_destroyed)
+ while(true)
{
+ if(_destroyed)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
+
//
// Search for a matching connection. If we find one, we're done.
//
Ice.ConnectionI connection = findConnection(connectors, out compress);
if(connection != null)
{
- if(cb != null)
- {
- //
- // This might not be the first getConnection call for the callback. We need
- // to ensure that the callback isn't registered with any other pending
- // connectors since we just found a connection and therefore don't need to
- // wait anymore for other pending connectors.
- //
- foreach(ConnectorInfo ci in connectors)
- {
- Set cbs = null;
- if(_pending.TryGetValue(ci, out cbs))
- {
- cbs.Remove(cb);
- }
- }
- }
return connection;
}
- //
- // Determine whether another thread is currently attempting to connect to one of our endpoints;
- // if so we wait until it's done.
- //
- bool found = false;
- foreach(ConnectorInfo ci in connectors)
- {
- Set cbs = null;
- if(_pending.TryGetValue(ci, out cbs))
- {
- found = true;
- if(cb != null)
- {
- cbs.Add(cb); // Add the callback to each pending connector.
- }
- }
- }
-
- if(!found)
- {
- //
- // If no thread is currently establishing a connection to one of our connectors,
- // we get out of this loop and start the connection establishment to one of the
- // given connectors.
- //
- break;
- }
- else
+ if(addToPending(cb, connectors))
{
//
// If a callback is not specified we wait until another thread notifies us about a
@@ -660,23 +634,14 @@ namespace IceInternal
return null;
}
}
- }
-
- if(_destroyed)
- {
- throw new Ice.CommunicatorDestroyedException();
- }
-
- //
- // No connection to any of our endpoints exists yet; we add the given connectors to
- // the _pending set to indicate that we're attempting connection establishment to
- // these connectors. We might attempt to connect to the same connector multiple times.
- //
- foreach(ConnectorInfo ci in connectors)
- {
- if(!_pending.ContainsKey(ci))
+ else
{
- _pending.Add(ci, new Set());
+ //
+ // If no thread is currently establishing a connection to one of our connectors,
+ // we get out of this loop and start the connection establishment to one of the
+ // given connectors.
+ //
+ break;
}
}
}
@@ -724,6 +689,7 @@ namespace IceInternal
_connections.Add(ci, connectionList);
}
connectionList.Add(connection);
+ connectionList = null;
if(!_connectionsByEndpoint.TryGetValue(ci.endpoint, out connectionList))
{
connectionList = new LinkedList();
@@ -747,49 +713,120 @@ namespace IceInternal
}
}
- private void finishGetConnection(List<ConnectorInfo> connectors, ConnectCallback cb, Ice.ConnectionI connection)
+ private void finishGetConnection(List<ConnectorInfo> connectors,
+ ConnectorInfo ci,
+ Ice.ConnectionI connection,
+ ConnectCallback cb)
{
- Set callbacks = new Set();
+ Set connectionCallbacks = new Set();
+ if(cb != null)
+ {
+ connectionCallbacks.Add(cb);
+ }
+ Set callbacks = new Set();
lock(this)
{
- //
- // We're done trying to connect to the given connectors so we remove the
- // connectors from the pending list and notify waiting threads. We also
- // notify the pending connect callbacks (outside the synchronization).
- //
-
- foreach(ConnectorInfo ci in connectors)
+ foreach(ConnectorInfo c in connectors)
{
Set s = null;
- if(_pending.TryGetValue(ci, out s))
+ if(_pending.TryGetValue(c, out s))
{
- foreach(ConnectCallback c in s)
+ foreach(ConnectCallback cc in s)
{
- callbacks.Add(c);
+ if(cc.hasConnector(ci))
+ {
+ connectionCallbacks.Add(cc);
+ }
+ else
+ {
+ callbacks.Add(cc);
+ }
}
- _pending.Remove(ci);
+ _pending.Remove(c);
}
}
+
+ foreach(ConnectCallback cc in connectionCallbacks)
+ {
+ cc.removeFromPending();
+ callbacks.Remove(cc);
+ }
+ foreach(ConnectCallback cc in callbacks)
+ {
+ cc.removeFromPending();
+ }
Monitor.PulseAll(this);
+ }
- //
- // If the connect attempt succeeded and the communicator is not destroyed,
- // activate the connection!
- //
- if(connection != null && !_destroyed)
+ bool compress;
+ DefaultsAndOverrides defaultsAndOverrides = instance_.defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideCompress)
+ {
+ compress = defaultsAndOverrides.overrideCompressValue;
+ }
+ else
+ {
+ compress = ci.endpoint.compress();
+ }
+
+ foreach(ConnectCallback cc in callbacks)
+ {
+ cc.getConnection();
+ }
+ foreach(ConnectCallback cc in connectionCallbacks)
+ {
+ cc.setConnection(connection, compress);
+ }
+ }
+
+ private void finishGetConnection(List<ConnectorInfo> connectors, Ice.LocalException ex, ConnectCallback cb)
+ {
+ Set failedCallbacks = new Set();
+ if(cb != null)
+ {
+ failedCallbacks.Add(cb);
+ }
+
+ Set callbacks = new Set();
+ lock(this)
+ {
+ foreach(ConnectorInfo c in connectors)
{
- connection.activate();
+ Set s = null;
+ if(_pending.TryGetValue(c, out s))
+ {
+ foreach(ConnectCallback cc in s)
+ {
+ if(cc.removeConnectors(connectors))
+ {
+ failedCallbacks.Add(cc);
+ }
+ else
+ {
+ callbacks.Add(cc);
+ }
+ }
+ _pending.Remove(c);
+ }
}
+
+ foreach(ConnectCallback cc in callbacks)
+ {
+ Debug.Assert(!failedCallbacks.Contains(cc));
+ cc.removeFromPending();
+ }
+ Monitor.PulseAll(this);
}
- //
- // Notify any waiting callbacks.
- //
foreach(ConnectCallback cc in callbacks)
{
cc.getConnection();
}
+ foreach(ConnectCallback cc in failedCallbacks)
+ {
+ cc.setException(ex);
+ }
}
private void handleException(Ice.LocalException ex, ConnectorInfo ci, Ice.ConnectionI connection,
@@ -854,6 +891,59 @@ namespace IceInternal
}
}
+ private bool
+ addToPending(ConnectCallback cb, List<ConnectorInfo> connectors)
+ {
+ //
+ // Add the callback to each connector pending list.
+ //
+ bool found = false;
+ foreach(ConnectorInfo ci in connectors)
+ {
+ Set cbs = null;
+ if(_pending.TryGetValue(ci, out cbs))
+ {
+ found = true;
+ if(cb != null)
+ {
+ cbs.Add(cb); // Add the callback to each pending connector.
+ }
+ }
+ }
+
+ if(found)
+ {
+ return true;
+ }
+
+ //
+ // If there's no pending connection for the given connectors, we're
+ // responsible for its establishment. We add empty pending lists,
+ // other callbacks to the same connectors will be queued.
+ //
+ foreach(ConnectorInfo ci in connectors)
+ {
+ if(!_pending.ContainsKey(ci))
+ {
+ _pending.Add(ci, new Set());
+ }
+ }
+ return false;
+ }
+
+ private void
+ removeFromPending(ConnectCallback cb, List<ConnectorInfo> connectors)
+ {
+ foreach(ConnectorInfo ci in connectors)
+ {
+ Set cbs = null;
+ if(_pending.TryGetValue(ci, out cbs))
+ {
+ cbs.Remove(cb);
+ }
+ }
+ }
+
internal void handleException(Ice.LocalException ex, bool hasMore)
{
TraceLevels traceLevels = instance_.traceLevels();
@@ -922,20 +1012,8 @@ namespace IceInternal
//
public void connectionStartCompleted(Ice.ConnectionI connection)
{
- bool compress;
- DefaultsAndOverrides defaultsAndOverrides = _factory.instance_.defaultsAndOverrides();
- if(defaultsAndOverrides.overrideCompress)
- {
- compress = defaultsAndOverrides.overrideCompressValue;
- }
- else
- {
- compress = _current.endpoint.compress();
- }
-
- _factory.finishGetConnection(_connectors, this, connection);
- _callback.setConnection(connection, compress);
- _factory.decPendingConnectCount(); // Must be called last.
+ connection.activate();
+ _factory.finishGetConnection(_connectors, _current, connection, this);
}
public void connectionStartFailed(Ice.ConnectionI connection, Ice.LocalException ex)
@@ -943,9 +1021,7 @@ namespace IceInternal
_factory.handleException(ex, _current, connection, _hasMore || _iter < _connectors.Count);
if(ex is Ice.CommunicatorDestroyedException) // No need to continue.
{
- _factory.finishGetConnection(_connectors, this, null);
- _callback.setException(ex);
- _factory.decPendingConnectCount(); // Must be called last.
+ _factory.finishGetConnection(_connectors, ex, this);
}
else if(_iter < _connectors.Count) // Try the next connector.
{
@@ -953,9 +1029,7 @@ namespace IceInternal
}
else
{
- _factory.finishGetConnection(_connectors, this, null);
- _callback.setException(ex);
- _factory.decPendingConnectCount(); // Must be called last.
+ _factory.finishGetConnection(_connectors, ex, this);
}
}
@@ -1027,6 +1101,43 @@ namespace IceInternal
}
}
+ public void setConnection(Ice.ConnectionI connection, bool compress)
+ {
+ //
+ // Callback from the factory: the connection to one of the callback
+ // connectors has been established.
+ //
+ _callback.setConnection(connection, compress);
+ _factory.decPendingConnectCount(); // Must be called last.
+ }
+
+ public void setException(Ice.LocalException ex)
+ {
+ //
+ // Callback from the factory: connection establishment failed.
+ //
+ _callback.setException(ex);
+ _factory.decPendingConnectCount(); // Must be called last.
+ }
+
+ public bool hasConnector(ConnectorInfo ci)
+ {
+ return _connectors.Contains(ci);
+ }
+ public bool removeConnectors(List<ConnectorInfo> connectors)
+ {
+ foreach(ConnectorInfo ci in connectors)
+ {
+ while(_connectors.Remove(ci)); // Remove all of them.
+ }
+ return _connectors.Count == 0;
+ }
+
+ public void removeFromPending()
+ {
+ _factory.removeFromPending(this, _connectors);
+ }
+
public void getConnectors()
{
try