diff options
Diffstat (limited to 'cs/src/Ice/ConnectionFactory.cs')
-rw-r--r-- | cs/src/Ice/ConnectionFactory.cs | 325 |
1 files changed, 218 insertions, 107 deletions
diff --git a/cs/src/Ice/ConnectionFactory.cs b/cs/src/Ice/ConnectionFactory.cs index b671ede01d9..564d718f971 100644 --- a/cs/src/Ice/ConnectionFactory.cs +++ b/cs/src/Ice/ConnectionFactory.cs @@ -183,9 +183,10 @@ namespace IceInternal // Try to establish the connection to the connectors. // DefaultsAndOverrides defaultsAndOverrides = instance_.defaultsAndOverrides(); + ConnectorInfo ci = null; for(int i = 0; i < connectors.Count; ++i) { - ConnectorInfo ci = connectors[i]; + ci = connectors[i]; try { connection = createConnection(ci.connector.connect(), ci); @@ -199,7 +200,7 @@ namespace IceInternal { compress = ci.endpoint.compress(); } - + connection.activate(); break; } catch(Ice.CommunicatorDestroyedException ex) @@ -221,7 +222,14 @@ namespace IceInternal // Finish creating the connection (this removes the connectors from the _pending // list and notifies any waiting threads). // - finishGetConnection(connectors, null, connection); + if(connection != null) + { + finishGetConnection(connectors, ci, connection, null); + } + else + { + finishGetConnection(connectors, exception, null); + } if(connection == null) { @@ -454,6 +462,11 @@ namespace IceInternal DefaultsAndOverrides defaultsAndOverrides = instance_.defaultsAndOverrides(); foreach(ConnectorInfo ci in connectors) { + if(_pending.ContainsKey(ci)) + { + continue; + } + LinkedList connectionList = null; if(!_connections.TryGetValue(ci, out connectionList)) { @@ -588,62 +601,23 @@ namespace IceInternal // finish if one of them is currently establishing a connection to one // of our connectors. // - while(!_destroyed) + while(true) { + if(_destroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + // // Search for a matching connection. If we find one, we're done. // Ice.ConnectionI connection = findConnection(connectors, out compress); if(connection != null) { - if(cb != null) - { - // - // This might not be the first getConnection call for the callback. We need - // to ensure that the callback isn't registered with any other pending - // connectors since we just found a connection and therefore don't need to - // wait anymore for other pending connectors. - // - foreach(ConnectorInfo ci in connectors) - { - Set cbs = null; - if(_pending.TryGetValue(ci, out cbs)) - { - cbs.Remove(cb); - } - } - } return connection; } - // - // Determine whether another thread is currently attempting to connect to one of our endpoints; - // if so we wait until it's done. - // - bool found = false; - foreach(ConnectorInfo ci in connectors) - { - Set cbs = null; - if(_pending.TryGetValue(ci, out cbs)) - { - found = true; - if(cb != null) - { - cbs.Add(cb); // Add the callback to each pending connector. - } - } - } - - if(!found) - { - // - // If no thread is currently establishing a connection to one of our connectors, - // we get out of this loop and start the connection establishment to one of the - // given connectors. - // - break; - } - else + if(addToPending(cb, connectors)) { // // If a callback is not specified we wait until another thread notifies us about a @@ -660,23 +634,14 @@ namespace IceInternal return null; } } - } - - if(_destroyed) - { - throw new Ice.CommunicatorDestroyedException(); - } - - // - // No connection to any of our endpoints exists yet; we add the given connectors to - // the _pending set to indicate that we're attempting connection establishment to - // these connectors. We might attempt to connect to the same connector multiple times. - // - foreach(ConnectorInfo ci in connectors) - { - if(!_pending.ContainsKey(ci)) + else { - _pending.Add(ci, new Set()); + // + // If no thread is currently establishing a connection to one of our connectors, + // we get out of this loop and start the connection establishment to one of the + // given connectors. + // + break; } } } @@ -724,6 +689,7 @@ namespace IceInternal _connections.Add(ci, connectionList); } connectionList.Add(connection); + connectionList = null; if(!_connectionsByEndpoint.TryGetValue(ci.endpoint, out connectionList)) { connectionList = new LinkedList(); @@ -747,49 +713,120 @@ namespace IceInternal } } - private void finishGetConnection(List<ConnectorInfo> connectors, ConnectCallback cb, Ice.ConnectionI connection) + private void finishGetConnection(List<ConnectorInfo> connectors, + ConnectorInfo ci, + Ice.ConnectionI connection, + ConnectCallback cb) { - Set callbacks = new Set(); + Set connectionCallbacks = new Set(); + if(cb != null) + { + connectionCallbacks.Add(cb); + } + Set callbacks = new Set(); lock(this) { - // - // We're done trying to connect to the given connectors so we remove the - // connectors from the pending list and notify waiting threads. We also - // notify the pending connect callbacks (outside the synchronization). - // - - foreach(ConnectorInfo ci in connectors) + foreach(ConnectorInfo c in connectors) { Set s = null; - if(_pending.TryGetValue(ci, out s)) + if(_pending.TryGetValue(c, out s)) { - foreach(ConnectCallback c in s) + foreach(ConnectCallback cc in s) { - callbacks.Add(c); + if(cc.hasConnector(ci)) + { + connectionCallbacks.Add(cc); + } + else + { + callbacks.Add(cc); + } } - _pending.Remove(ci); + _pending.Remove(c); } } + + foreach(ConnectCallback cc in connectionCallbacks) + { + cc.removeFromPending(); + callbacks.Remove(cc); + } + foreach(ConnectCallback cc in callbacks) + { + cc.removeFromPending(); + } Monitor.PulseAll(this); + } - // - // If the connect attempt succeeded and the communicator is not destroyed, - // activate the connection! - // - if(connection != null && !_destroyed) + bool compress; + DefaultsAndOverrides defaultsAndOverrides = instance_.defaultsAndOverrides(); + if(defaultsAndOverrides.overrideCompress) + { + compress = defaultsAndOverrides.overrideCompressValue; + } + else + { + compress = ci.endpoint.compress(); + } + + foreach(ConnectCallback cc in callbacks) + { + cc.getConnection(); + } + foreach(ConnectCallback cc in connectionCallbacks) + { + cc.setConnection(connection, compress); + } + } + + private void finishGetConnection(List<ConnectorInfo> connectors, Ice.LocalException ex, ConnectCallback cb) + { + Set failedCallbacks = new Set(); + if(cb != null) + { + failedCallbacks.Add(cb); + } + + Set callbacks = new Set(); + lock(this) + { + foreach(ConnectorInfo c in connectors) { - connection.activate(); + Set s = null; + if(_pending.TryGetValue(c, out s)) + { + foreach(ConnectCallback cc in s) + { + if(cc.removeConnectors(connectors)) + { + failedCallbacks.Add(cc); + } + else + { + callbacks.Add(cc); + } + } + _pending.Remove(c); + } } + + foreach(ConnectCallback cc in callbacks) + { + Debug.Assert(!failedCallbacks.Contains(cc)); + cc.removeFromPending(); + } + Monitor.PulseAll(this); } - // - // Notify any waiting callbacks. - // foreach(ConnectCallback cc in callbacks) { cc.getConnection(); } + foreach(ConnectCallback cc in failedCallbacks) + { + cc.setException(ex); + } } private void handleException(Ice.LocalException ex, ConnectorInfo ci, Ice.ConnectionI connection, @@ -854,6 +891,59 @@ namespace IceInternal } } + private bool + addToPending(ConnectCallback cb, List<ConnectorInfo> connectors) + { + // + // Add the callback to each connector pending list. + // + bool found = false; + foreach(ConnectorInfo ci in connectors) + { + Set cbs = null; + if(_pending.TryGetValue(ci, out cbs)) + { + found = true; + if(cb != null) + { + cbs.Add(cb); // Add the callback to each pending connector. + } + } + } + + if(found) + { + return true; + } + + // + // If there's no pending connection for the given connectors, we're + // responsible for its establishment. We add empty pending lists, + // other callbacks to the same connectors will be queued. + // + foreach(ConnectorInfo ci in connectors) + { + if(!_pending.ContainsKey(ci)) + { + _pending.Add(ci, new Set()); + } + } + return false; + } + + private void + removeFromPending(ConnectCallback cb, List<ConnectorInfo> connectors) + { + foreach(ConnectorInfo ci in connectors) + { + Set cbs = null; + if(_pending.TryGetValue(ci, out cbs)) + { + cbs.Remove(cb); + } + } + } + internal void handleException(Ice.LocalException ex, bool hasMore) { TraceLevels traceLevels = instance_.traceLevels(); @@ -922,20 +1012,8 @@ namespace IceInternal // public void connectionStartCompleted(Ice.ConnectionI connection) { - bool compress; - DefaultsAndOverrides defaultsAndOverrides = _factory.instance_.defaultsAndOverrides(); - if(defaultsAndOverrides.overrideCompress) - { - compress = defaultsAndOverrides.overrideCompressValue; - } - else - { - compress = _current.endpoint.compress(); - } - - _factory.finishGetConnection(_connectors, this, connection); - _callback.setConnection(connection, compress); - _factory.decPendingConnectCount(); // Must be called last. + connection.activate(); + _factory.finishGetConnection(_connectors, _current, connection, this); } public void connectionStartFailed(Ice.ConnectionI connection, Ice.LocalException ex) @@ -943,9 +1021,7 @@ namespace IceInternal _factory.handleException(ex, _current, connection, _hasMore || _iter < _connectors.Count); if(ex is Ice.CommunicatorDestroyedException) // No need to continue. { - _factory.finishGetConnection(_connectors, this, null); - _callback.setException(ex); - _factory.decPendingConnectCount(); // Must be called last. + _factory.finishGetConnection(_connectors, ex, this); } else if(_iter < _connectors.Count) // Try the next connector. { @@ -953,9 +1029,7 @@ namespace IceInternal } else { - _factory.finishGetConnection(_connectors, this, null); - _callback.setException(ex); - _factory.decPendingConnectCount(); // Must be called last. + _factory.finishGetConnection(_connectors, ex, this); } } @@ -1027,6 +1101,43 @@ namespace IceInternal } } + public void setConnection(Ice.ConnectionI connection, bool compress) + { + // + // Callback from the factory: the connection to one of the callback + // connectors has been established. + // + _callback.setConnection(connection, compress); + _factory.decPendingConnectCount(); // Must be called last. + } + + public void setException(Ice.LocalException ex) + { + // + // Callback from the factory: connection establishment failed. + // + _callback.setException(ex); + _factory.decPendingConnectCount(); // Must be called last. + } + + public bool hasConnector(ConnectorInfo ci) + { + return _connectors.Contains(ci); + } + public bool removeConnectors(List<ConnectorInfo> connectors) + { + foreach(ConnectorInfo ci in connectors) + { + while(_connectors.Remove(ci)); // Remove all of them. + } + return _connectors.Count == 0; + } + + public void removeFromPending() + { + _factory.removeFromPending(this, _connectors); + } + public void getConnectors() { try |