diff options
Diffstat (limited to 'java')
-rw-r--r-- | java/src/IceInternal/OutgoingConnectionFactory.java | 338 | ||||
-rw-r--r-- | java/test/Ice/background/Connector.java | 2 | ||||
-rw-r--r-- | java/test/Ice/binding/AllTests.java | 118 |
3 files changed, 348 insertions, 110 deletions
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 51881390f20..563368f9f22 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -188,9 +188,10 @@ public final class OutgoingConnectionFactory // DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); java.util.Iterator<ConnectorInfo> q = connectors.iterator(); + ConnectorInfo ci = null; while(q.hasNext()) { - ConnectorInfo ci = q.next(); + ci = q.next(); try { connection = createConnection(ci.connector.connect(), ci); @@ -204,7 +205,7 @@ public final class OutgoingConnectionFactory { compress.value = ci.endpoint.compress(); } - + connection.activate(); break; } catch(Ice.CommunicatorDestroyedException ex) @@ -226,7 +227,14 @@ public final class OutgoingConnectionFactory // Finish creating the connection (this removes the connectors from the _pending // list and notifies any waiting threads). // - finishGetConnection(connectors, null, connection); + if(connection != null) + { + finishGetConnection(connectors, ci, connection, null); + } + else + { + finishGetConnection(connectors, exception, null); + } if(connection == null) { @@ -489,6 +497,11 @@ public final class OutgoingConnectionFactory while(p.hasNext()) { ConnectorInfo ci = p.next(); + if(_pending.containsKey(ci)) + { + continue; + } + java.util.List<Ice.ConnectionI> connectionList = _connections.get(ci); if(connectionList == null) { @@ -619,64 +632,23 @@ public final class OutgoingConnectionFactory // finish if one of them is currently establishing a connection to one // of our connectors. // - while(!_destroyed) + while(true) { + if(_destroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + // // Search for a matching connection. If we find one, we're done. // Ice.ConnectionI connection = findConnection(connectors, compress); if(connection != null) { - if(cb != null) - { - // - // This might not be the first getConnection call for the callback. We need - // to ensure that the callback isn't registered with any other pending - // connectors since we just found a connection and therefore don't need to - // wait anymore for other pending connectors. - // - java.util.Iterator<ConnectorInfo> p = connectors.iterator(); - while(p.hasNext()) - { - java.util.Set<ConnectCallback> cbs = _pending.get(p.next()); - if(cbs != null) - { - cbs.remove(cb); - } - } - } return connection; } - - // - // Determine whether another thread is currently attempting to connect to one of our endpoints; - // if so we wait until it's done. - // - java.util.Iterator<ConnectorInfo> p = connectors.iterator(); - boolean found = false; - while(p.hasNext()) - { - java.util.Set<ConnectCallback> cbs = _pending.get(p.next()); - if(cbs != null) - { - found = true; - if(cb != null) - { - cbs.add(cb); // Add the callback to each pending connector. - } - } - } - if(!found) - { - // - // If no thread is currently establishing a connection to one of our connectors, - // we get out of this loop and start the connection establishment to one of the - // given connectors. - // - break; - } - else + if(addToPending(cb, connectors)) { // // If a callback is not specified we wait until another thread notifies us about a @@ -699,25 +671,14 @@ public final class OutgoingConnectionFactory return null; } } - } - - if(_destroyed) - { - throw new Ice.CommunicatorDestroyedException(); - } - - // - // No connection to any of our endpoints exists yet; we add the given connectors to - // the _pending set to indicate that we're attempting connection establishment to - // these connectors. We might attempt to connect to the same connector multiple times. - // - java.util.Iterator<ConnectorInfo> p = connectors.iterator(); - while(p.hasNext()) - { - ConnectorInfo obj = p.next(); - if(!_pending.containsKey(obj)) + else { - _pending.put(obj, new java.util.HashSet<ConnectCallback>()); + // + // If no thread is currently establishing a connection to one of our connectors, + // we get out of this loop and start the connection establishment to one of the + // given connectors. + // + break; } } } @@ -786,46 +747,180 @@ public final class OutgoingConnectionFactory } private void - finishGetConnection(java.util.List<ConnectorInfo> connectors, ConnectCallback cb, Ice.ConnectionI connection) + finishGetConnection(java.util.List<ConnectorInfo> connectors, + ConnectorInfo ci, + Ice.ConnectionI connection, + ConnectCallback cb) { - java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>(); + java.util.Set<ConnectCallback> connectionCallbacks = new java.util.HashSet<ConnectCallback>(); + if(cb != null) + { + connectionCallbacks.add(cb); + } + java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>(); synchronized(this) { - // - // We're done trying to connect to the given connectors so we remove the - // connectors from the pending list and notify waiting threads. We also - // notify the pending connect callbacks (outside the synchronization). - // - java.util.Iterator<ConnectorInfo> p = connectors.iterator(); while(p.hasNext()) { - java.util.Set<ConnectCallback> cbs = _pending.remove(p.next()); + ConnectorInfo c = p.next(); + java.util.Set<ConnectCallback> cbs = _pending.remove(c); if(cbs != null) { - callbacks.addAll(cbs); + for(ConnectCallback cc : cbs) + { + if(cc.hasConnector(ci)) + { + connectionCallbacks.add(cc); + } + else + { + callbacks.add(cc); + } + } } } + + for(ConnectCallback cc : connectionCallbacks) + { + cc.removeFromPending(); + callbacks.remove(cc); + } + for(ConnectCallback cc : callbacks) + { + cc.removeFromPending(); + } notifyAll(); + } + + boolean compress; + DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); + if(defaultsAndOverrides.overrideCompress) + { + compress = defaultsAndOverrides.overrideCompressValue; + } + else + { + compress = ci.endpoint.compress(); + } - // - // If the connect attempt succeeded and the communicator is not destroyed, - // activate the connection! - // - if(connection != null && !_destroyed) + for(ConnectCallback cc : callbacks) + { + cc.getConnection(); + } + for(ConnectCallback cc : connectionCallbacks) + { + cc.setConnection(connection, compress); + } + } + + private void + finishGetConnection(java.util.List<ConnectorInfo> connectors, Ice.LocalException ex, ConnectCallback cb) + { + java.util.Set<ConnectCallback> failedCallbacks = new java.util.HashSet<ConnectCallback>(); + if(cb != null) + { + failedCallbacks.add(cb); + } + + java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>(); + synchronized(this) + { + java.util.Iterator<ConnectorInfo> p = connectors.iterator(); + while(p.hasNext()) { - connection.activate(); + ConnectorInfo c = p.next(); + java.util.Set<ConnectCallback> cbs = _pending.remove(c); + if(cbs != null) + { + for(ConnectCallback cc : cbs) + { + if(cc.removeConnectors(connectors)) + { + failedCallbacks.add(cc); + } + else + { + callbacks.add(cc); + } + } + } + } + + for(ConnectCallback cc : callbacks) + { + assert(!failedCallbacks.contains(cc)); + cc.removeFromPending(); } + notifyAll(); } + for(ConnectCallback cc : callbacks) + { + cc.getConnection(); + } + for(ConnectCallback cc : failedCallbacks) + { + cc.setException(ex); + } + } + + private boolean + addToPending(ConnectCallback cb, java.util.List<ConnectorInfo> connectors) + { // - // Notify any waiting callbacks. + // Add the callback to each connector pending list. // - java.util.Iterator<ConnectCallback> p = callbacks.iterator(); + java.util.Iterator<ConnectorInfo> p = connectors.iterator(); + boolean found = false; + while(p.hasNext()) + { + java.util.Set<ConnectCallback> cbs = _pending.get(p.next()); + if(cbs != null) + { + found = true; + if(cb != null) + { + cbs.add(cb); // Add the callback to each pending connector. + } + } + } + + if(found) + { + return true; + } + + // + // If there's no pending connection for the given connectors, we're + // responsible for its establishment. We add empty pending lists, + // other callbacks to the same connectors will be queued. + // + p = connectors.iterator(); + while(p.hasNext()) + { + ConnectorInfo obj = p.next(); + if(!_pending.containsKey(obj)) + { + _pending.put(obj, new java.util.HashSet<ConnectCallback>()); + } + } + + return false; + } + + private void + removeFromPending(ConnectCallback cb, java.util.List<ConnectorInfo> connectors) + { + java.util.Iterator<ConnectorInfo> p = connectors.iterator(); while(p.hasNext()) { - p.next().getConnection(); + java.util.Set<ConnectCallback> cbs = _pending.get(p.next()); + if(cbs != null) + { + cbs.remove(cb); + } } } @@ -954,27 +1049,15 @@ public final class OutgoingConnectionFactory _selType = selType; _endpointsIter = _endpoints.iterator(); } - + // // Methods from ConnectionI.StartCallback // public void connectionStartCompleted(Ice.ConnectionI connection) { - boolean compress; - DefaultsAndOverrides defaultsAndOverrides = _factory._instance.defaultsAndOverrides(); - if(defaultsAndOverrides.overrideCompress) - { - compress = defaultsAndOverrides.overrideCompressValue; - } - else - { - compress = _current.endpoint.compress(); - } - - _factory.finishGetConnection(_connectors, this, connection); - _callback.setConnection(connection, compress); - _factory.decPendingConnectCount(); // Must be called last. + connection.activate(); + _factory.finishGetConnection(_connectors, _current, connection, this); } public void @@ -985,9 +1068,7 @@ public final class OutgoingConnectionFactory _factory.handleException(ex, _current, connection, _hasMore || _iter.hasNext()); if(ex instanceof Ice.CommunicatorDestroyedException) // No need to continue. { - _factory.finishGetConnection(_connectors, this, null); - _callback.setException(ex); - _factory.decPendingConnectCount(); // Must be called last. + _factory.finishGetConnection(_connectors, ex, this); } else if(_iter.hasNext()) // Try the next connector. { @@ -995,9 +1076,7 @@ public final class OutgoingConnectionFactory } else { - _factory.finishGetConnection(_connectors, this, null); - _callback.setException(ex); - _factory.decPendingConnectCount(); // Must be called last. + _factory.finishGetConnection(_connectors, ex, this); } } @@ -1062,6 +1141,47 @@ public final class OutgoingConnectionFactory } } + public void + setConnection(Ice.ConnectionI connection, boolean compress) + { + // + // Callback from the factory: the connection to one of the callback + // connectors has been established. + // + _callback.setConnection(connection, compress); + _factory.decPendingConnectCount(); // Must be called last. + } + + public void + setException(Ice.LocalException ex) + { + // + // Callback from the factory: connection establishment failed. + // + _callback.setException(ex); + _factory.decPendingConnectCount(); // Must be called last. + } + + public boolean + hasConnector(ConnectorInfo ci) + { + return _connectors.contains(ci); + } + + public boolean + removeConnectors(java.util.List<ConnectorInfo> connectors) + { + _connectors.removeAll(connectors); + _iter = _connectors.iterator(); + return _connectors.isEmpty(); + } + + public void + removeFromPending() + { + _factory.removeFromPending(this, _connectors); + } + void getConnectors() { diff --git a/java/test/Ice/background/Connector.java b/java/test/Ice/background/Connector.java index 4a386fb3e0c..b737a1400ef 100644 --- a/java/test/Ice/background/Connector.java +++ b/java/test/Ice/background/Connector.java @@ -59,7 +59,7 @@ final class Connector implements IceInternal.Connector if(this == p) { - return false; + return true; } return _connector.equals(p._connector); diff --git a/java/test/Ice/binding/AllTests.java b/java/test/Ice/binding/AllTests.java index 242c25093f4..c6e4e31b4f9 100644 --- a/java/test/Ice/binding/AllTests.java +++ b/java/test/Ice/binding/AllTests.java @@ -20,6 +20,24 @@ public class AllTests } } + static class NoOpGetAdapterNameCB extends AMI_TestIntf_getAdapterName + { + public + void ice_response(String name) + { + } + + public void + ice_exception(Ice.LocalException ex) + { + } + + public void + ice_exception(Ice.UserException ex) + { + } + }; + static class GetAdapterNameCB extends AMI_TestIntf_getAdapterName { synchronized public void @@ -225,6 +243,106 @@ public class AllTests } System.out.println("ok"); + System.out.print("testing binding with multiple random endpoints... "); + System.out.flush(); + { + java.util.Random rand = new java.util.Random(); + + RemoteObjectAdapterPrx[] adapters = new RemoteObjectAdapterPrx[5]; + adapters[0] = com.createObjectAdapter("AdapterRandom11", "default"); + adapters[1] = com.createObjectAdapter("AdapterRandom12", "default"); + adapters[2] = com.createObjectAdapter("AdapterRandom13", "default"); + adapters[3] = com.createObjectAdapter("AdapterRandom14", "default"); + adapters[4] = com.createObjectAdapter("AdapterRandom15", "default"); + + int count; + if(System.getProperty("os.name").startsWith("Windows")) + { + count = 20; + } + else + { + count = 60; + } + + int adapterCount = adapters.length; + while(--count > 0) + { + TestIntfPrx[] proxies; + if(System.getProperty("os.name").startsWith("Windows")) + { + if(count == 10) + { + com.deactivateObjectAdapter(adapters[4]); + --adapterCount; + } + proxies = new TestIntfPrx[10]; + } + else + { + if(count < 60 && count % 10 == 0) + { + com.deactivateObjectAdapter(adapters[count / 10 - 1]); + --adapterCount; + } + proxies = new TestIntfPrx[40]; + } + + int i; + for(i = 0; i < proxies.length; ++i) + { + RemoteObjectAdapterPrx[] adpts = new RemoteObjectAdapterPrx[rand.nextInt(adapters.length)]; + if(adpts.length == 0) + { + adpts = new RemoteObjectAdapterPrx[1]; + } + for(int j = 0; j < adpts.length; ++j) + { + adpts[j] = adapters[rand.nextInt(adapters.length)]; + } + proxies[i] = createTestIntfPrx(java.util.Arrays.asList((adpts))); + } + + for(i = 0; i < proxies.length; i++) + { + proxies[i].getAdapterName_async(new NoOpGetAdapterNameCB()); + } + for(i = 0; i < proxies.length; i++) + { + try + { + proxies[i].ice_ping(); + } + catch(Ice.LocalException ex) + { + } + } + + java.util.Set<Ice.Connection> connections = new java.util.HashSet<Ice.Connection>(); + for(i = 0; i < proxies.length; i++) + { + if(proxies[i].ice_getCachedConnection() != null) + { + connections.add(proxies[i].ice_getCachedConnection()); + } + } + test(connections.size() <= adapterCount); + + for(RemoteObjectAdapterPrx a : adapters) + { + try + { + a.getTestIntf().ice_getConnection().close(false); + } + catch(Ice.LocalException ex) + { + // Expected if adapter is down. + } + } + } + } + System.out.println("ok"); + System.out.print("testing binding with multiple endpoints and AMI... "); System.out.flush(); { |