diff options
Diffstat (limited to 'java/src/IceInternal/OutgoingConnectionFactory.java')
-rw-r--r-- | java/src/IceInternal/OutgoingConnectionFactory.java | 295 |
1 files changed, 72 insertions, 223 deletions
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 63ebc0bd919..4a893444be8 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -81,227 +81,94 @@ public final class OutgoingConnectionFactory } } + // Called from Instance.destroy(). public void waitUntilFinished() + throws InterruptedException { - java.util.Map<Connector, java.util.List<Ice.ConnectionI> > connections = null; - - synchronized(this) + try { - // - // First we wait until the factory is destroyed. We also - // wait until there are no pending connections - // anymore. Only then we can be sure the _connections - // contains all connections. - // - while(!_destroyed || !_pending.isEmpty() || _pendingConnectCount > 0) + java.util.Map<Connector, java.util.List<Ice.ConnectionI> > connections = null; + synchronized(this) { - try + // + // First we wait until the factory is destroyed. We also + // wait until there are no pending connections + // anymore. Only then we can be sure the _connections + // contains all connections. + // + while(!_destroyed || !_pending.isEmpty() || _pendingConnectCount > 0) { wait(); } - catch(InterruptedException ex) - { - } - } - - // - // We want to wait until all connections are finished outside the - // thread synchronization. - // - connections = new java.util.HashMap<Connector, java.util.List<Ice.ConnectionI> >(_connections); - } - // - // Now we wait until the destruction of each connection is finished. - // - for(java.util.List<Ice.ConnectionI> connectionList : connections.values()) - { - for(Ice.ConnectionI connection : connectionList) - { - connection.waitUntilFinished(); - } - } - - synchronized(this) - { - // Ensure all the connections are finished and reapable at this point. - java.util.List<Ice.ConnectionI> cons = _monitor.swapReapedConnections(); - if(cons != null) - { - int size = 0; - for(java.util.List<Ice.ConnectionI> connectionList : _connections.values()) - { - size += connectionList.size(); - } - assert(cons.size() == size); - _connections.clear(); - _connectionsByEndpoint.clear(); - } - else - { - assert(_connections.isEmpty()); - assert(_connectionsByEndpoint.isEmpty()); + // + // We want to wait until all connections are finished outside the + // thread synchronization. + // + connections = new java.util.HashMap<Connector, java.util.List<Ice.ConnectionI> >(_connections); } - _monitor.destroy(); - } - } - - public Ice.ConnectionI - create(EndpointI[] endpts, boolean hasMore, Ice.EndpointSelectionType selType, Ice.BooleanHolder compress) - { - assert(endpts.length > 0); - - // - // Apply the overrides. - // - java.util.List<EndpointI> endpoints = applyOverrides(endpts); - - // - // Try to find a connection to one of the given endpoints. - // - Ice.ConnectionI connection = findConnectionByEndpoint(endpoints, compress); - if(connection != null) - { - return connection; - } - - Ice.LocalException exception = null; - - // - // If we didn't find a connection with the endpoints, we create the connectors - // for the endpoints. - // - java.util.List<ConnectorInfo> connectors = new java.util.ArrayList<ConnectorInfo>(); - java.util.Iterator<EndpointI> p = endpoints.iterator(); - while(p.hasNext()) - { - EndpointI endpoint = p.next(); // - // Create connectors for the endpoint. + // Now we wait until the destruction of each connection is finished. // - try + for(java.util.List<Ice.ConnectionI> connectionList : connections.values()) { - java.util.List<Connector> cons = endpoint.connectors(selType); - assert(cons.size() > 0); - for(Connector c : cons) - { - connectors.add(new ConnectorInfo(c, endpoint)); - } - } - catch(Ice.LocalException ex) - { - exception = ex; - handleException(exception, hasMore || p.hasNext()); - } - } - - if(connectors.isEmpty()) - { - assert(exception != null); - throw exception; - } - - // - // Try to get a connection to one of the connectors. A null result indicates that no - // connection was found and that we should try to establish the connection (and that - // the connectors were added to _pending to prevent other threads from establishing - // the connection). - // - connection = getConnection(connectors, null, compress); - if(connection != null) - { - return connection; - } - - // - // Try to establish the connection to the connectors. - // - DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); - Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); - java.util.Iterator<ConnectorInfo> q = connectors.iterator(); - ConnectorInfo ci = null; - while(q.hasNext()) - { - ci = q.next(); - - Ice.Instrumentation.Observer observer = null; - if(obsv != null) - { - observer = obsv.getConnectionEstablishmentObserver(ci.endpoint, ci.connector.toString()); - if(observer != null) + for(Ice.ConnectionI connection : connectionList) { - observer.attach(); + try + { + connection.waitUntilFinished(); + } + catch(InterruptedException e) + { + // + // Force close all of the connections. + // + for(java.util.List<Ice.ConnectionI> l : connections.values()) + { + for(Ice.ConnectionI c : l) + { + c.close(true); + } + } + throw e; + } } } - try + synchronized(this) { - connection = createConnection(ci.connector.connect(), ci); - connection.start(null); - - if(observer != null) + // Ensure all the connections are finished and reapable at this point. + java.util.List<Ice.ConnectionI> cons = _monitor.swapReapedConnections(); + if(cons != null) { - observer.detach(); - } - - if(defaultsAndOverrides.overrideCompress) - { - compress.value = defaultsAndOverrides.overrideCompressValue; + int size = 0; + for(java.util.List<Ice.ConnectionI> connectionList : _connections.values()) + { + size += connectionList.size(); + } + assert(cons.size() == size); + _connections.clear(); + _connectionsByEndpoint.clear(); } else { - compress.value = ci.endpoint.compress(); + assert(_connections.isEmpty()); + assert(_connectionsByEndpoint.isEmpty()); } - connection.activate(); - break; - } - catch(Ice.CommunicatorDestroyedException ex) - { - if(observer != null) - { - observer.failed(ex.ice_name()); - observer.detach(); - } - exception = ex; - handleConnectionException(exception, hasMore || p.hasNext()); - connection = null; - break; // No need to continue - } - catch(Ice.LocalException ex) - { - if(observer != null) - { - observer.failed(ex.ice_name()); - observer.detach(); - } - exception = ex; - handleConnectionException(exception, hasMore || p.hasNext()); - connection = null; + _monitor.destroy(); } } - - // - // Finish creating the connection (this removes the connectors from the _pending - // list and notifies any waiting threads). - // - if(connection != null) + catch(InterruptedException ex) { - finishGetConnection(connectors, ci, connection, null); - } - else - { - finishGetConnection(connectors, exception, null); - } - - if(connection == null) - { - assert(exception != null); - throw exception; + // Here wait() or waitUntilFinished() were interrupted. Clear the connections + // and such and continue along. + _connections.clear(); + _connectionsByEndpoint.clear(); + _monitor.destroy(); + throw ex; } - - return connection; } public void @@ -611,6 +478,7 @@ public final class OutgoingConnectionFactory private Ice.ConnectionI getConnection(java.util.List<ConnectorInfo> connectors, ConnectCallback cb, Ice.BooleanHolder compress) { + assert(cb != null); synchronized(this) { if(_destroyed) @@ -655,26 +523,7 @@ public final class OutgoingConnectionFactory if(addToPending(cb, connectors)) { - // - // If a callback is not specified we wait until another thread notifies us about a - // change to the pending list. Otherwise, if a callback is provided we're done: - // when the pending list changes the callback will be notified and will try to - // get the connection again. - // - if(cb == null) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - else - { - return null; - } + return null; } else { @@ -1104,7 +953,7 @@ public final class OutgoingConnectionFactory } } - public void + void setConnection(Ice.ConnectionI connection, boolean compress) { // @@ -1115,7 +964,7 @@ public final class OutgoingConnectionFactory _factory.decPendingConnectCount(); // Must be called last. } - public void + void setException(Ice.LocalException ex) { // @@ -1125,13 +974,13 @@ public final class OutgoingConnectionFactory _factory.decPendingConnectCount(); // Must be called last. } - public boolean + boolean hasConnector(ConnectorInfo ci) { return _connectors.contains(ci); } - public boolean + boolean removeConnectors(java.util.List<ConnectorInfo> connectors) { _connectors.removeAll(connectors); @@ -1139,13 +988,13 @@ public final class OutgoingConnectionFactory return _connectors.isEmpty(); } - public void + void removeFromPending() { _factory.removeFromPending(this, _connectors); } - void + private void getConnectors() { try @@ -1166,7 +1015,7 @@ public final class OutgoingConnectionFactory nextEndpoint(); } - void + private void nextEndpoint() { try @@ -1181,7 +1030,7 @@ public final class OutgoingConnectionFactory } } - void + private void getConnection() { try @@ -1196,7 +1045,7 @@ public final class OutgoingConnectionFactory { // // A null return value from getConnection indicates that the connection - // is being established and that everthing has been done to ensure that + // is being established and that everything has been done to ensure that // the callback will be notified when the connection establishment is // done. // @@ -1213,7 +1062,7 @@ public final class OutgoingConnectionFactory } } - void + private void nextConnector() { Ice.ConnectionI connection = null; |