diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-03-24 11:45:18 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-03-24 11:45:18 +0100 |
commit | 06a08ecf28e205277336a97a6173db7ccbed1adc (patch) | |
tree | a369a5044a63f8cdba9e7c0a461e24ae344486b4 /java/src/IceInternal/OutgoingConnectionFactory.java | |
parent | Merge branch 'R3_3_branch' (diff) | |
parent | Bug 3924: slice2py missing from VC60 installer (diff) | |
download | ice-06a08ecf28e205277336a97a6173db7ccbed1adc.tar.bz2 ice-06a08ecf28e205277336a97a6173db7ccbed1adc.tar.xz ice-06a08ecf28e205277336a97a6173db7ccbed1adc.zip |
Merge commit 'origin/R3_3_branch'
Conflicts:
CHANGES
cpp/demo/Freeze/backup/.depend
cpp/demo/Freeze/bench/.depend
cpp/demo/Freeze/casino/.depend
cpp/demo/Freeze/customEvictor/.depend
cpp/demo/Freeze/library/.depend
cpp/demo/Freeze/phonebook/.depend
cpp/demo/Freeze/transform/.depend
cpp/demo/Glacier2/callback/.depend
cpp/demo/Glacier2/chat/.depend
cpp/demo/Ice/async/.depend
cpp/demo/Ice/bidir/.depend
cpp/demo/Ice/callback/.depend
cpp/demo/Ice/converter/.depend
cpp/demo/Ice/hello/.depend
cpp/demo/Ice/invoke/.depend
cpp/demo/Ice/latency/.depend
cpp/demo/Ice/minimal/.depend
cpp/demo/Ice/multicast/.depend
cpp/demo/Ice/nested/.depend
cpp/demo/Ice/nrvo/.depend
cpp/demo/Ice/session/.depend
cpp/demo/Ice/throughput/.depend
cpp/demo/Ice/value/.depend
cpp/demo/IceBox/hello/.depend
cpp/demo/IceGrid/allocate/.depend
cpp/demo/IceGrid/icebox/.depend
cpp/demo/IceGrid/replication/.depend
cpp/demo/IceGrid/sessionActivation/.depend
cpp/demo/IceGrid/simple/.depend
cpp/demo/IceStorm/clock/.depend
cpp/demo/IceStorm/counter/.depend
cpp/demo/IceStorm/replicated/.depend
cpp/demo/IceStorm/replicated2/.depend
cpp/demo/book/freeze_filesystem/.depend
cpp/demo/book/lifecycle/.depend
cpp/demo/book/printer/.depend
cpp/demo/book/simple_filesystem/.depend
cpp/src/Freeze/.depend
cpp/src/FreezeScript/.depend
cpp/src/Ice/.depend
cpp/src/Ice/UdpTransceiver.cpp
cpp/src/Ice/UdpTransceiver.h
cpp/src/IceBox/.depend
cpp/src/IceGrid/.depend
cpp/src/IceGridLib/.depend
cpp/src/IcePatch2/.depend
cpp/src/IceStorm/.depend
cpp/src/slice2freeze/.depend
cpp/test/Freeze/complex/.depend
cpp/test/Freeze/dbmap/.depend
cpp/test/Freeze/evictor/.depend
cpp/test/Freeze/oldevictor/.depend
cpp/test/FreezeScript/dbmap/.depend
cpp/test/FreezeScript/evictor/.depend
cpp/test/Glacier2/attack/.depend
cpp/test/Glacier2/dynamicFiltering/.depend
cpp/test/Glacier2/router/.depend
cpp/test/Glacier2/sessionControl/.depend
cpp/test/Glacier2/ssl/.depend
cpp/test/Glacier2/staticFiltering/.depend
cpp/test/Ice/adapterDeactivation/.depend
cpp/test/Ice/background/.depend
cpp/test/Ice/binding/.depend
cpp/test/Ice/checksum/.depend
cpp/test/Ice/checksum/server/.depend
cpp/test/Ice/custom/.depend
cpp/test/Ice/exceptions/.depend
cpp/test/Ice/facets/.depend
cpp/test/Ice/faultTolerance/.depend
cpp/test/Ice/gc/.depend
cpp/test/Ice/hold/.depend
cpp/test/Ice/inheritance/.depend
cpp/test/Ice/interceptor/.depend
cpp/test/Ice/location/.depend
cpp/test/Ice/objects/.depend
cpp/test/Ice/operations/.depend
cpp/test/Ice/proxy/.depend
cpp/test/Ice/retry/.depend
cpp/test/Ice/servantLocator/.depend
cpp/test/Ice/slicing/exceptions/.depend
cpp/test/Ice/slicing/objects/.depend
cpp/test/Ice/stream/.depend
cpp/test/Ice/stringConverter/.depend
cpp/test/Ice/timeout/.depend
cpp/test/Ice/udp/.depend
cpp/test/IceBox/configuration/.depend
cpp/test/IceGrid/activation/.depend
cpp/test/IceGrid/allocation/.depend
cpp/test/IceGrid/deployer/.depend
cpp/test/IceGrid/distribution/.depend
cpp/test/IceGrid/replicaGroup/.depend
cpp/test/IceGrid/replication/.depend
cpp/test/IceGrid/session/.depend
cpp/test/IceGrid/simple/.depend
cpp/test/IceGrid/update/.depend
cpp/test/IceSSL/configuration/.depend
cpp/test/IceStorm/federation/.depend
cpp/test/IceStorm/federation2/.depend
cpp/test/IceStorm/rep1/.depend
cpp/test/IceStorm/repgrid/.depend
cpp/test/IceStorm/repstress/.depend
cpp/test/IceStorm/single/.depend
cpp/test/IceStorm/stress/.depend
cpp/test/Slice/keyword/.depend
cs/src/Ice/Instance.cs
cs/src/IceSSL/ConnectorI.cs
java/demo/book/simple_filesystem/Filesystem/DirectoryI.java
java/demo/book/simple_filesystem/Filesystem/FileI.java
java/src/IceInternal/TcpConnector.java
java/src/IceSSL/ConnectorI.java
py/modules/IcePy/.depend
rb/src/IceRuby/.depend
Diffstat (limited to 'java/src/IceInternal/OutgoingConnectionFactory.java')
-rw-r--r-- | java/src/IceInternal/OutgoingConnectionFactory.java | 364 |
1 files changed, 233 insertions, 131 deletions
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 0a5bdec8c86..563368f9f22 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -188,9 +188,10 @@ public final class OutgoingConnectionFactory // DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); java.util.Iterator<ConnectorInfo> q = connectors.iterator(); + ConnectorInfo ci = null; while(q.hasNext()) { - ConnectorInfo ci = q.next(); + ci = q.next(); try { connection = createConnection(ci.connector.connect(), ci); @@ -204,7 +205,7 @@ public final class OutgoingConnectionFactory { compress.value = ci.endpoint.compress(); } - + connection.activate(); break; } catch(Ice.CommunicatorDestroyedException ex) @@ -226,7 +227,14 @@ public final class OutgoingConnectionFactory // Finish creating the connection (this removes the connectors from the _pending // list and notifies any waiting threads). // - finishGetConnection(connectors, null, connection); + if(connection != null) + { + finishGetConnection(connectors, ci, connection, null); + } + else + { + finishGetConnection(connectors, exception, null); + } if(connection == null) { @@ -324,16 +332,7 @@ public final class OutgoingConnectionFactory Ice.ConnectionI connection = q.next(); if(connection.endpoint() == endpoint) { - try - { - connection.setAdapter(adapter); - } - catch(Ice.LocalException ex) - { - // - // Ignore, the connection is being closed or closed. - // - } + connection.setAdapter(adapter); } } } @@ -359,16 +358,7 @@ public final class OutgoingConnectionFactory Ice.ConnectionI connection = q.next(); if(connection.getAdapter() == adapter) { - try - { - connection.setAdapter(null); - } - catch(Ice.LocalException ex) - { - // - // Ignore, the connection is being closed or closed. - // - } + connection.setAdapter(null); } } } @@ -507,6 +497,11 @@ public final class OutgoingConnectionFactory while(p.hasNext()) { ConnectorInfo ci = p.next(); + if(_pending.containsKey(ci)) + { + continue; + } + java.util.List<Ice.ConnectionI> connectionList = _connections.get(ci); if(connectionList == null) { @@ -637,64 +632,23 @@ public final class OutgoingConnectionFactory // finish if one of them is currently establishing a connection to one // of our connectors. // - while(!_destroyed) + while(true) { + if(_destroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + // // Search for a matching connection. If we find one, we're done. // Ice.ConnectionI connection = findConnection(connectors, compress); if(connection != null) { - if(cb != null) - { - // - // This might not be the first getConnection call for the callback. We need - // to ensure that the callback isn't registered with any other pending - // connectors since we just found a connection and therefore don't need to - // wait anymore for other pending connectors. - // - java.util.Iterator<ConnectorInfo> p = connectors.iterator(); - while(p.hasNext()) - { - java.util.Set<ConnectCallback> cbs = _pending.get(p.next()); - if(cbs != null) - { - cbs.remove(cb); - } - } - } return connection; } - - // - // Determine whether another thread is currently attempting to connect to one of our endpoints; - // if so we wait until it's done. - // - java.util.Iterator<ConnectorInfo> p = connectors.iterator(); - boolean found = false; - while(p.hasNext()) - { - java.util.Set<ConnectCallback> cbs = _pending.get(p.next()); - if(cbs != null) - { - found = true; - if(cb != null) - { - cbs.add(cb); // Add the callback to each pending connector. - } - } - } - if(!found) - { - // - // If no thread is currently establishing a connection to one of our connectors, - // we get out of this loop and start the connection establishment to one of the - // given connectors. - // - break; - } - else + if(addToPending(cb, connectors)) { // // If a callback is not specified we wait until another thread notifies us about a @@ -717,25 +671,14 @@ public final class OutgoingConnectionFactory return null; } } - } - - if(_destroyed) - { - throw new Ice.CommunicatorDestroyedException(); - } - - // - // No connection to any of our endpoints exists yet; we add the given connectors to - // the _pending set to indicate that we're attempting connection establishment to - // these connectors. We might attempt to connect to the same connector multiple times. - // - java.util.Iterator<ConnectorInfo> p = connectors.iterator(); - while(p.hasNext()) - { - ConnectorInfo obj = p.next(); - if(!_pending.containsKey(obj)) + else { - _pending.put(obj, new java.util.HashSet<ConnectCallback>()); + // + // If no thread is currently establishing a connection to one of our connectors, + // we get out of this loop and start the connection establishment to one of the + // given connectors. + // + break; } } } @@ -804,46 +747,180 @@ public final class OutgoingConnectionFactory } private void - finishGetConnection(java.util.List<ConnectorInfo> connectors, ConnectCallback cb, Ice.ConnectionI connection) + finishGetConnection(java.util.List<ConnectorInfo> connectors, + ConnectorInfo ci, + Ice.ConnectionI connection, + ConnectCallback cb) { - java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>(); + java.util.Set<ConnectCallback> connectionCallbacks = new java.util.HashSet<ConnectCallback>(); + if(cb != null) + { + connectionCallbacks.add(cb); + } + java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>(); synchronized(this) { - // - // We're done trying to connect to the given connectors so we remove the - // connectors from the pending list and notify waiting threads. We also - // notify the pending connect callbacks (outside the synchronization). - // - java.util.Iterator<ConnectorInfo> p = connectors.iterator(); while(p.hasNext()) { - java.util.Set<ConnectCallback> cbs = _pending.remove(p.next()); + ConnectorInfo c = p.next(); + java.util.Set<ConnectCallback> cbs = _pending.remove(c); if(cbs != null) { - callbacks.addAll(cbs); + for(ConnectCallback cc : cbs) + { + if(cc.hasConnector(ci)) + { + connectionCallbacks.add(cc); + } + else + { + callbacks.add(cc); + } + } } } + + for(ConnectCallback cc : connectionCallbacks) + { + cc.removeFromPending(); + callbacks.remove(cc); + } + for(ConnectCallback cc : callbacks) + { + cc.removeFromPending(); + } notifyAll(); + } + + boolean compress; + DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); + if(defaultsAndOverrides.overrideCompress) + { + compress = defaultsAndOverrides.overrideCompressValue; + } + else + { + compress = ci.endpoint.compress(); + } - // - // If the connect attempt succeeded and the communicator is not destroyed, - // activate the connection! - // - if(connection != null && !_destroyed) + for(ConnectCallback cc : callbacks) + { + cc.getConnection(); + } + for(ConnectCallback cc : connectionCallbacks) + { + cc.setConnection(connection, compress); + } + } + + private void + finishGetConnection(java.util.List<ConnectorInfo> connectors, Ice.LocalException ex, ConnectCallback cb) + { + java.util.Set<ConnectCallback> failedCallbacks = new java.util.HashSet<ConnectCallback>(); + if(cb != null) + { + failedCallbacks.add(cb); + } + + java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>(); + synchronized(this) + { + java.util.Iterator<ConnectorInfo> p = connectors.iterator(); + while(p.hasNext()) { - connection.activate(); + ConnectorInfo c = p.next(); + java.util.Set<ConnectCallback> cbs = _pending.remove(c); + if(cbs != null) + { + for(ConnectCallback cc : cbs) + { + if(cc.removeConnectors(connectors)) + { + failedCallbacks.add(cc); + } + else + { + callbacks.add(cc); + } + } + } + } + + for(ConnectCallback cc : callbacks) + { + assert(!failedCallbacks.contains(cc)); + cc.removeFromPending(); } + notifyAll(); } + for(ConnectCallback cc : callbacks) + { + cc.getConnection(); + } + for(ConnectCallback cc : failedCallbacks) + { + cc.setException(ex); + } + } + + private boolean + addToPending(ConnectCallback cb, java.util.List<ConnectorInfo> connectors) + { + // + // Add the callback to each connector pending list. + // + java.util.Iterator<ConnectorInfo> p = connectors.iterator(); + boolean found = false; + while(p.hasNext()) + { + java.util.Set<ConnectCallback> cbs = _pending.get(p.next()); + if(cbs != null) + { + found = true; + if(cb != null) + { + cbs.add(cb); // Add the callback to each pending connector. + } + } + } + + if(found) + { + return true; + } + // - // Notify any waiting callbacks. + // If there's no pending connection for the given connectors, we're + // responsible for its establishment. We add empty pending lists, + // other callbacks to the same connectors will be queued. // - java.util.Iterator<ConnectCallback> p = callbacks.iterator(); + p = connectors.iterator(); + while(p.hasNext()) + { + ConnectorInfo obj = p.next(); + if(!_pending.containsKey(obj)) + { + _pending.put(obj, new java.util.HashSet<ConnectCallback>()); + } + } + + return false; + } + + private void + removeFromPending(ConnectCallback cb, java.util.List<ConnectorInfo> connectors) + { + java.util.Iterator<ConnectorInfo> p = connectors.iterator(); while(p.hasNext()) { - p.next().getConnection(); + java.util.Set<ConnectCallback> cbs = _pending.get(p.next()); + if(cbs != null) + { + cbs.remove(cb); + } } } @@ -853,7 +930,7 @@ public final class OutgoingConnectionFactory TraceLevels traceLevels = _instance.traceLevels(); if(traceLevels.retry >= 2) { - StringBuffer s = new StringBuffer(); + StringBuilder s = new StringBuilder(128); s.append("connection to endpoint failed"); if(ex instanceof Ice.CommunicatorDestroyedException) { @@ -913,7 +990,7 @@ public final class OutgoingConnectionFactory TraceLevels traceLevels = _instance.traceLevels(); if(traceLevels.retry >= 2) { - StringBuffer s = new StringBuffer(); + StringBuilder s = new StringBuilder(128); s.append("couldn't resolve endpoint host"); if(ex instanceof Ice.CommunicatorDestroyedException) { @@ -972,27 +1049,15 @@ public final class OutgoingConnectionFactory _selType = selType; _endpointsIter = _endpoints.iterator(); } - + // // Methods from ConnectionI.StartCallback // public void connectionStartCompleted(Ice.ConnectionI connection) { - boolean compress; - DefaultsAndOverrides defaultsAndOverrides = _factory._instance.defaultsAndOverrides(); - if(defaultsAndOverrides.overrideCompress) - { - compress = defaultsAndOverrides.overrideCompressValue; - } - else - { - compress = _current.endpoint.compress(); - } - - _factory.finishGetConnection(_connectors, this, connection); - _callback.setConnection(connection, compress); - _factory.decPendingConnectCount(); // Must be called last. + connection.activate(); + _factory.finishGetConnection(_connectors, _current, connection, this); } public void @@ -1003,9 +1068,7 @@ public final class OutgoingConnectionFactory _factory.handleException(ex, _current, connection, _hasMore || _iter.hasNext()); if(ex instanceof Ice.CommunicatorDestroyedException) // No need to continue. { - _factory.finishGetConnection(_connectors, this, null); - _callback.setException(ex); - _factory.decPendingConnectCount(); // Must be called last. + _factory.finishGetConnection(_connectors, ex, this); } else if(_iter.hasNext()) // Try the next connector. { @@ -1013,9 +1076,7 @@ public final class OutgoingConnectionFactory } else { - _factory.finishGetConnection(_connectors, this, null); - _callback.setException(ex); - _factory.decPendingConnectCount(); // Must be called last. + _factory.finishGetConnection(_connectors, ex, this); } } @@ -1080,6 +1141,47 @@ public final class OutgoingConnectionFactory } } + public void + setConnection(Ice.ConnectionI connection, boolean compress) + { + // + // Callback from the factory: the connection to one of the callback + // connectors has been established. + // + _callback.setConnection(connection, compress); + _factory.decPendingConnectCount(); // Must be called last. + } + + public void + setException(Ice.LocalException ex) + { + // + // Callback from the factory: connection establishment failed. + // + _callback.setException(ex); + _factory.decPendingConnectCount(); // Must be called last. + } + + public boolean + hasConnector(ConnectorInfo ci) + { + return _connectors.contains(ci); + } + + public boolean + removeConnectors(java.util.List<ConnectorInfo> connectors) + { + _connectors.removeAll(connectors); + _iter = _connectors.iterator(); + return _connectors.isEmpty(); + } + + public void + removeFromPending() + { + _factory.removeFromPending(this, _connectors); + } + void getConnectors() { |