diff options
Diffstat (limited to 'java/src/IceInternal/OutgoingConnectionFactory.java')
-rw-r--r-- | java/src/IceInternal/OutgoingConnectionFactory.java | 338 |
1 files changed, 229 insertions, 109 deletions
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 51881390f20..563368f9f22 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -188,9 +188,10 @@ public final class OutgoingConnectionFactory // DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); java.util.Iterator<ConnectorInfo> q = connectors.iterator(); + ConnectorInfo ci = null; while(q.hasNext()) { - ConnectorInfo ci = q.next(); + ci = q.next(); try { connection = createConnection(ci.connector.connect(), ci); @@ -204,7 +205,7 @@ public final class OutgoingConnectionFactory { compress.value = ci.endpoint.compress(); } - + connection.activate(); break; } catch(Ice.CommunicatorDestroyedException ex) @@ -226,7 +227,14 @@ public final class OutgoingConnectionFactory // Finish creating the connection (this removes the connectors from the _pending // list and notifies any waiting threads). // - finishGetConnection(connectors, null, connection); + if(connection != null) + { + finishGetConnection(connectors, ci, connection, null); + } + else + { + finishGetConnection(connectors, exception, null); + } if(connection == null) { @@ -489,6 +497,11 @@ public final class OutgoingConnectionFactory while(p.hasNext()) { ConnectorInfo ci = p.next(); + if(_pending.containsKey(ci)) + { + continue; + } + java.util.List<Ice.ConnectionI> connectionList = _connections.get(ci); if(connectionList == null) { @@ -619,64 +632,23 @@ public final class OutgoingConnectionFactory // finish if one of them is currently establishing a connection to one // of our connectors. // - while(!_destroyed) + while(true) { + if(_destroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + // // Search for a matching connection. If we find one, we're done. // Ice.ConnectionI connection = findConnection(connectors, compress); if(connection != null) { - if(cb != null) - { - // - // This might not be the first getConnection call for the callback. We need - // to ensure that the callback isn't registered with any other pending - // connectors since we just found a connection and therefore don't need to - // wait anymore for other pending connectors. - // - java.util.Iterator<ConnectorInfo> p = connectors.iterator(); - while(p.hasNext()) - { - java.util.Set<ConnectCallback> cbs = _pending.get(p.next()); - if(cbs != null) - { - cbs.remove(cb); - } - } - } return connection; } - - // - // Determine whether another thread is currently attempting to connect to one of our endpoints; - // if so we wait until it's done. - // - java.util.Iterator<ConnectorInfo> p = connectors.iterator(); - boolean found = false; - while(p.hasNext()) - { - java.util.Set<ConnectCallback> cbs = _pending.get(p.next()); - if(cbs != null) - { - found = true; - if(cb != null) - { - cbs.add(cb); // Add the callback to each pending connector. - } - } - } - if(!found) - { - // - // If no thread is currently establishing a connection to one of our connectors, - // we get out of this loop and start the connection establishment to one of the - // given connectors. - // - break; - } - else + if(addToPending(cb, connectors)) { // // If a callback is not specified we wait until another thread notifies us about a @@ -699,25 +671,14 @@ public final class OutgoingConnectionFactory return null; } } - } - - if(_destroyed) - { - throw new Ice.CommunicatorDestroyedException(); - } - - // - // No connection to any of our endpoints exists yet; we add the given connectors to - // the _pending set to indicate that we're attempting connection establishment to - // these connectors. We might attempt to connect to the same connector multiple times. - // - java.util.Iterator<ConnectorInfo> p = connectors.iterator(); - while(p.hasNext()) - { - ConnectorInfo obj = p.next(); - if(!_pending.containsKey(obj)) + else { - _pending.put(obj, new java.util.HashSet<ConnectCallback>()); + // + // If no thread is currently establishing a connection to one of our connectors, + // we get out of this loop and start the connection establishment to one of the + // given connectors. + // + break; } } } @@ -786,46 +747,180 @@ public final class OutgoingConnectionFactory } private void - finishGetConnection(java.util.List<ConnectorInfo> connectors, ConnectCallback cb, Ice.ConnectionI connection) + finishGetConnection(java.util.List<ConnectorInfo> connectors, + ConnectorInfo ci, + Ice.ConnectionI connection, + ConnectCallback cb) { - java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>(); + java.util.Set<ConnectCallback> connectionCallbacks = new java.util.HashSet<ConnectCallback>(); + if(cb != null) + { + connectionCallbacks.add(cb); + } + java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>(); synchronized(this) { - // - // We're done trying to connect to the given connectors so we remove the - // connectors from the pending list and notify waiting threads. We also - // notify the pending connect callbacks (outside the synchronization). - // - java.util.Iterator<ConnectorInfo> p = connectors.iterator(); while(p.hasNext()) { - java.util.Set<ConnectCallback> cbs = _pending.remove(p.next()); + ConnectorInfo c = p.next(); + java.util.Set<ConnectCallback> cbs = _pending.remove(c); if(cbs != null) { - callbacks.addAll(cbs); + for(ConnectCallback cc : cbs) + { + if(cc.hasConnector(ci)) + { + connectionCallbacks.add(cc); + } + else + { + callbacks.add(cc); + } + } } } + + for(ConnectCallback cc : connectionCallbacks) + { + cc.removeFromPending(); + callbacks.remove(cc); + } + for(ConnectCallback cc : callbacks) + { + cc.removeFromPending(); + } notifyAll(); + } + + boolean compress; + DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); + if(defaultsAndOverrides.overrideCompress) + { + compress = defaultsAndOverrides.overrideCompressValue; + } + else + { + compress = ci.endpoint.compress(); + } - // - // If the connect attempt succeeded and the communicator is not destroyed, - // activate the connection! - // - if(connection != null && !_destroyed) + for(ConnectCallback cc : callbacks) + { + cc.getConnection(); + } + for(ConnectCallback cc : connectionCallbacks) + { + cc.setConnection(connection, compress); + } + } + + private void + finishGetConnection(java.util.List<ConnectorInfo> connectors, Ice.LocalException ex, ConnectCallback cb) + { + java.util.Set<ConnectCallback> failedCallbacks = new java.util.HashSet<ConnectCallback>(); + if(cb != null) + { + failedCallbacks.add(cb); + } + + java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>(); + synchronized(this) + { + java.util.Iterator<ConnectorInfo> p = connectors.iterator(); + while(p.hasNext()) { - connection.activate(); + ConnectorInfo c = p.next(); + java.util.Set<ConnectCallback> cbs = _pending.remove(c); + if(cbs != null) + { + for(ConnectCallback cc : cbs) + { + if(cc.removeConnectors(connectors)) + { + failedCallbacks.add(cc); + } + else + { + callbacks.add(cc); + } + } + } + } + + for(ConnectCallback cc : callbacks) + { + assert(!failedCallbacks.contains(cc)); + cc.removeFromPending(); } + notifyAll(); } + for(ConnectCallback cc : callbacks) + { + cc.getConnection(); + } + for(ConnectCallback cc : failedCallbacks) + { + cc.setException(ex); + } + } + + private boolean + addToPending(ConnectCallback cb, java.util.List<ConnectorInfo> connectors) + { // - // Notify any waiting callbacks. + // Add the callback to each connector pending list. // - java.util.Iterator<ConnectCallback> p = callbacks.iterator(); + java.util.Iterator<ConnectorInfo> p = connectors.iterator(); + boolean found = false; + while(p.hasNext()) + { + java.util.Set<ConnectCallback> cbs = _pending.get(p.next()); + if(cbs != null) + { + found = true; + if(cb != null) + { + cbs.add(cb); // Add the callback to each pending connector. + } + } + } + + if(found) + { + return true; + } + + // + // If there's no pending connection for the given connectors, we're + // responsible for its establishment. We add empty pending lists, + // other callbacks to the same connectors will be queued. + // + p = connectors.iterator(); + while(p.hasNext()) + { + ConnectorInfo obj = p.next(); + if(!_pending.containsKey(obj)) + { + _pending.put(obj, new java.util.HashSet<ConnectCallback>()); + } + } + + return false; + } + + private void + removeFromPending(ConnectCallback cb, java.util.List<ConnectorInfo> connectors) + { + java.util.Iterator<ConnectorInfo> p = connectors.iterator(); while(p.hasNext()) { - p.next().getConnection(); + java.util.Set<ConnectCallback> cbs = _pending.get(p.next()); + if(cbs != null) + { + cbs.remove(cb); + } } } @@ -954,27 +1049,15 @@ public final class OutgoingConnectionFactory _selType = selType; _endpointsIter = _endpoints.iterator(); } - + // // Methods from ConnectionI.StartCallback // public void connectionStartCompleted(Ice.ConnectionI connection) { - boolean compress; - DefaultsAndOverrides defaultsAndOverrides = _factory._instance.defaultsAndOverrides(); - if(defaultsAndOverrides.overrideCompress) - { - compress = defaultsAndOverrides.overrideCompressValue; - } - else - { - compress = _current.endpoint.compress(); - } - - _factory.finishGetConnection(_connectors, this, connection); - _callback.setConnection(connection, compress); - _factory.decPendingConnectCount(); // Must be called last. + connection.activate(); + _factory.finishGetConnection(_connectors, _current, connection, this); } public void @@ -985,9 +1068,7 @@ public final class OutgoingConnectionFactory _factory.handleException(ex, _current, connection, _hasMore || _iter.hasNext()); if(ex instanceof Ice.CommunicatorDestroyedException) // No need to continue. { - _factory.finishGetConnection(_connectors, this, null); - _callback.setException(ex); - _factory.decPendingConnectCount(); // Must be called last. + _factory.finishGetConnection(_connectors, ex, this); } else if(_iter.hasNext()) // Try the next connector. { @@ -995,9 +1076,7 @@ public final class OutgoingConnectionFactory } else { - _factory.finishGetConnection(_connectors, this, null); - _callback.setException(ex); - _factory.decPendingConnectCount(); // Must be called last. + _factory.finishGetConnection(_connectors, ex, this); } } @@ -1062,6 +1141,47 @@ public final class OutgoingConnectionFactory } } + public void + setConnection(Ice.ConnectionI connection, boolean compress) + { + // + // Callback from the factory: the connection to one of the callback + // connectors has been established. + // + _callback.setConnection(connection, compress); + _factory.decPendingConnectCount(); // Must be called last. + } + + public void + setException(Ice.LocalException ex) + { + // + // Callback from the factory: connection establishment failed. + // + _callback.setException(ex); + _factory.decPendingConnectCount(); // Must be called last. + } + + public boolean + hasConnector(ConnectorInfo ci) + { + return _connectors.contains(ci); + } + + public boolean + removeConnectors(java.util.List<ConnectorInfo> connectors) + { + _connectors.removeAll(connectors); + _iter = _connectors.iterator(); + return _connectors.isEmpty(); + } + + public void + removeFromPending() + { + _factory.removeFromPending(this, _connectors); + } + void getConnectors() { |