summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/OutgoingConnectionFactory.java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2009-03-24 11:45:18 +0100
committerBenoit Foucher <benoit@zeroc.com>2009-03-24 11:45:18 +0100
commit06a08ecf28e205277336a97a6173db7ccbed1adc (patch)
treea369a5044a63f8cdba9e7c0a461e24ae344486b4 /java/src/IceInternal/OutgoingConnectionFactory.java
parentMerge branch 'R3_3_branch' (diff)
parentBug 3924: slice2py missing from VC60 installer (diff)
downloadice-06a08ecf28e205277336a97a6173db7ccbed1adc.tar.bz2
ice-06a08ecf28e205277336a97a6173db7ccbed1adc.tar.xz
ice-06a08ecf28e205277336a97a6173db7ccbed1adc.zip
Merge commit 'origin/R3_3_branch'
Conflicts: CHANGES cpp/demo/Freeze/backup/.depend cpp/demo/Freeze/bench/.depend cpp/demo/Freeze/casino/.depend cpp/demo/Freeze/customEvictor/.depend cpp/demo/Freeze/library/.depend cpp/demo/Freeze/phonebook/.depend cpp/demo/Freeze/transform/.depend cpp/demo/Glacier2/callback/.depend cpp/demo/Glacier2/chat/.depend cpp/demo/Ice/async/.depend cpp/demo/Ice/bidir/.depend cpp/demo/Ice/callback/.depend cpp/demo/Ice/converter/.depend cpp/demo/Ice/hello/.depend cpp/demo/Ice/invoke/.depend cpp/demo/Ice/latency/.depend cpp/demo/Ice/minimal/.depend cpp/demo/Ice/multicast/.depend cpp/demo/Ice/nested/.depend cpp/demo/Ice/nrvo/.depend cpp/demo/Ice/session/.depend cpp/demo/Ice/throughput/.depend cpp/demo/Ice/value/.depend cpp/demo/IceBox/hello/.depend cpp/demo/IceGrid/allocate/.depend cpp/demo/IceGrid/icebox/.depend cpp/demo/IceGrid/replication/.depend cpp/demo/IceGrid/sessionActivation/.depend cpp/demo/IceGrid/simple/.depend cpp/demo/IceStorm/clock/.depend cpp/demo/IceStorm/counter/.depend cpp/demo/IceStorm/replicated/.depend cpp/demo/IceStorm/replicated2/.depend cpp/demo/book/freeze_filesystem/.depend cpp/demo/book/lifecycle/.depend cpp/demo/book/printer/.depend cpp/demo/book/simple_filesystem/.depend cpp/src/Freeze/.depend cpp/src/FreezeScript/.depend cpp/src/Ice/.depend cpp/src/Ice/UdpTransceiver.cpp cpp/src/Ice/UdpTransceiver.h cpp/src/IceBox/.depend cpp/src/IceGrid/.depend cpp/src/IceGridLib/.depend cpp/src/IcePatch2/.depend cpp/src/IceStorm/.depend cpp/src/slice2freeze/.depend cpp/test/Freeze/complex/.depend cpp/test/Freeze/dbmap/.depend cpp/test/Freeze/evictor/.depend cpp/test/Freeze/oldevictor/.depend cpp/test/FreezeScript/dbmap/.depend cpp/test/FreezeScript/evictor/.depend cpp/test/Glacier2/attack/.depend cpp/test/Glacier2/dynamicFiltering/.depend cpp/test/Glacier2/router/.depend cpp/test/Glacier2/sessionControl/.depend cpp/test/Glacier2/ssl/.depend cpp/test/Glacier2/staticFiltering/.depend cpp/test/Ice/adapterDeactivation/.depend cpp/test/Ice/background/.depend cpp/test/Ice/binding/.depend cpp/test/Ice/checksum/.depend cpp/test/Ice/checksum/server/.depend cpp/test/Ice/custom/.depend cpp/test/Ice/exceptions/.depend cpp/test/Ice/facets/.depend cpp/test/Ice/faultTolerance/.depend cpp/test/Ice/gc/.depend cpp/test/Ice/hold/.depend cpp/test/Ice/inheritance/.depend cpp/test/Ice/interceptor/.depend cpp/test/Ice/location/.depend cpp/test/Ice/objects/.depend cpp/test/Ice/operations/.depend cpp/test/Ice/proxy/.depend cpp/test/Ice/retry/.depend cpp/test/Ice/servantLocator/.depend cpp/test/Ice/slicing/exceptions/.depend cpp/test/Ice/slicing/objects/.depend cpp/test/Ice/stream/.depend cpp/test/Ice/stringConverter/.depend cpp/test/Ice/timeout/.depend cpp/test/Ice/udp/.depend cpp/test/IceBox/configuration/.depend cpp/test/IceGrid/activation/.depend cpp/test/IceGrid/allocation/.depend cpp/test/IceGrid/deployer/.depend cpp/test/IceGrid/distribution/.depend cpp/test/IceGrid/replicaGroup/.depend cpp/test/IceGrid/replication/.depend cpp/test/IceGrid/session/.depend cpp/test/IceGrid/simple/.depend cpp/test/IceGrid/update/.depend cpp/test/IceSSL/configuration/.depend cpp/test/IceStorm/federation/.depend cpp/test/IceStorm/federation2/.depend cpp/test/IceStorm/rep1/.depend cpp/test/IceStorm/repgrid/.depend cpp/test/IceStorm/repstress/.depend cpp/test/IceStorm/single/.depend cpp/test/IceStorm/stress/.depend cpp/test/Slice/keyword/.depend cs/src/Ice/Instance.cs cs/src/IceSSL/ConnectorI.cs java/demo/book/simple_filesystem/Filesystem/DirectoryI.java java/demo/book/simple_filesystem/Filesystem/FileI.java java/src/IceInternal/TcpConnector.java java/src/IceSSL/ConnectorI.java py/modules/IcePy/.depend rb/src/IceRuby/.depend
Diffstat (limited to 'java/src/IceInternal/OutgoingConnectionFactory.java')
-rw-r--r--java/src/IceInternal/OutgoingConnectionFactory.java364
1 files changed, 233 insertions, 131 deletions
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java
index 0a5bdec8c86..563368f9f22 100644
--- a/java/src/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/IceInternal/OutgoingConnectionFactory.java
@@ -188,9 +188,10 @@ public final class OutgoingConnectionFactory
//
DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
java.util.Iterator<ConnectorInfo> q = connectors.iterator();
+ ConnectorInfo ci = null;
while(q.hasNext())
{
- ConnectorInfo ci = q.next();
+ ci = q.next();
try
{
connection = createConnection(ci.connector.connect(), ci);
@@ -204,7 +205,7 @@ public final class OutgoingConnectionFactory
{
compress.value = ci.endpoint.compress();
}
-
+ connection.activate();
break;
}
catch(Ice.CommunicatorDestroyedException ex)
@@ -226,7 +227,14 @@ public final class OutgoingConnectionFactory
// Finish creating the connection (this removes the connectors from the _pending
// list and notifies any waiting threads).
//
- finishGetConnection(connectors, null, connection);
+ if(connection != null)
+ {
+ finishGetConnection(connectors, ci, connection, null);
+ }
+ else
+ {
+ finishGetConnection(connectors, exception, null);
+ }
if(connection == null)
{
@@ -324,16 +332,7 @@ public final class OutgoingConnectionFactory
Ice.ConnectionI connection = q.next();
if(connection.endpoint() == endpoint)
{
- try
- {
- connection.setAdapter(adapter);
- }
- catch(Ice.LocalException ex)
- {
- //
- // Ignore, the connection is being closed or closed.
- //
- }
+ connection.setAdapter(adapter);
}
}
}
@@ -359,16 +358,7 @@ public final class OutgoingConnectionFactory
Ice.ConnectionI connection = q.next();
if(connection.getAdapter() == adapter)
{
- try
- {
- connection.setAdapter(null);
- }
- catch(Ice.LocalException ex)
- {
- //
- // Ignore, the connection is being closed or closed.
- //
- }
+ connection.setAdapter(null);
}
}
}
@@ -507,6 +497,11 @@ public final class OutgoingConnectionFactory
while(p.hasNext())
{
ConnectorInfo ci = p.next();
+ if(_pending.containsKey(ci))
+ {
+ continue;
+ }
+
java.util.List<Ice.ConnectionI> connectionList = _connections.get(ci);
if(connectionList == null)
{
@@ -637,64 +632,23 @@ public final class OutgoingConnectionFactory
// finish if one of them is currently establishing a connection to one
// of our connectors.
//
- while(!_destroyed)
+ while(true)
{
+ if(_destroyed)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
+
//
// Search for a matching connection. If we find one, we're done.
//
Ice.ConnectionI connection = findConnection(connectors, compress);
if(connection != null)
{
- if(cb != null)
- {
- //
- // This might not be the first getConnection call for the callback. We need
- // to ensure that the callback isn't registered with any other pending
- // connectors since we just found a connection and therefore don't need to
- // wait anymore for other pending connectors.
- //
- java.util.Iterator<ConnectorInfo> p = connectors.iterator();
- while(p.hasNext())
- {
- java.util.Set<ConnectCallback> cbs = _pending.get(p.next());
- if(cbs != null)
- {
- cbs.remove(cb);
- }
- }
- }
return connection;
}
-
- //
- // Determine whether another thread is currently attempting to connect to one of our endpoints;
- // if so we wait until it's done.
- //
- java.util.Iterator<ConnectorInfo> p = connectors.iterator();
- boolean found = false;
- while(p.hasNext())
- {
- java.util.Set<ConnectCallback> cbs = _pending.get(p.next());
- if(cbs != null)
- {
- found = true;
- if(cb != null)
- {
- cbs.add(cb); // Add the callback to each pending connector.
- }
- }
- }
- if(!found)
- {
- //
- // If no thread is currently establishing a connection to one of our connectors,
- // we get out of this loop and start the connection establishment to one of the
- // given connectors.
- //
- break;
- }
- else
+ if(addToPending(cb, connectors))
{
//
// If a callback is not specified we wait until another thread notifies us about a
@@ -717,25 +671,14 @@ public final class OutgoingConnectionFactory
return null;
}
}
- }
-
- if(_destroyed)
- {
- throw new Ice.CommunicatorDestroyedException();
- }
-
- //
- // No connection to any of our endpoints exists yet; we add the given connectors to
- // the _pending set to indicate that we're attempting connection establishment to
- // these connectors. We might attempt to connect to the same connector multiple times.
- //
- java.util.Iterator<ConnectorInfo> p = connectors.iterator();
- while(p.hasNext())
- {
- ConnectorInfo obj = p.next();
- if(!_pending.containsKey(obj))
+ else
{
- _pending.put(obj, new java.util.HashSet<ConnectCallback>());
+ //
+ // If no thread is currently establishing a connection to one of our connectors,
+ // we get out of this loop and start the connection establishment to one of the
+ // given connectors.
+ //
+ break;
}
}
}
@@ -804,46 +747,180 @@ public final class OutgoingConnectionFactory
}
private void
- finishGetConnection(java.util.List<ConnectorInfo> connectors, ConnectCallback cb, Ice.ConnectionI connection)
+ finishGetConnection(java.util.List<ConnectorInfo> connectors,
+ ConnectorInfo ci,
+ Ice.ConnectionI connection,
+ ConnectCallback cb)
{
- java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>();
+ java.util.Set<ConnectCallback> connectionCallbacks = new java.util.HashSet<ConnectCallback>();
+ if(cb != null)
+ {
+ connectionCallbacks.add(cb);
+ }
+ java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>();
synchronized(this)
{
- //
- // We're done trying to connect to the given connectors so we remove the
- // connectors from the pending list and notify waiting threads. We also
- // notify the pending connect callbacks (outside the synchronization).
- //
-
java.util.Iterator<ConnectorInfo> p = connectors.iterator();
while(p.hasNext())
{
- java.util.Set<ConnectCallback> cbs = _pending.remove(p.next());
+ ConnectorInfo c = p.next();
+ java.util.Set<ConnectCallback> cbs = _pending.remove(c);
if(cbs != null)
{
- callbacks.addAll(cbs);
+ for(ConnectCallback cc : cbs)
+ {
+ if(cc.hasConnector(ci))
+ {
+ connectionCallbacks.add(cc);
+ }
+ else
+ {
+ callbacks.add(cc);
+ }
+ }
}
}
+
+ for(ConnectCallback cc : connectionCallbacks)
+ {
+ cc.removeFromPending();
+ callbacks.remove(cc);
+ }
+ for(ConnectCallback cc : callbacks)
+ {
+ cc.removeFromPending();
+ }
notifyAll();
+ }
+
+ boolean compress;
+ DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideCompress)
+ {
+ compress = defaultsAndOverrides.overrideCompressValue;
+ }
+ else
+ {
+ compress = ci.endpoint.compress();
+ }
- //
- // If the connect attempt succeeded and the communicator is not destroyed,
- // activate the connection!
- //
- if(connection != null && !_destroyed)
+ for(ConnectCallback cc : callbacks)
+ {
+ cc.getConnection();
+ }
+ for(ConnectCallback cc : connectionCallbacks)
+ {
+ cc.setConnection(connection, compress);
+ }
+ }
+
+ private void
+ finishGetConnection(java.util.List<ConnectorInfo> connectors, Ice.LocalException ex, ConnectCallback cb)
+ {
+ java.util.Set<ConnectCallback> failedCallbacks = new java.util.HashSet<ConnectCallback>();
+ if(cb != null)
+ {
+ failedCallbacks.add(cb);
+ }
+
+ java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>();
+ synchronized(this)
+ {
+ java.util.Iterator<ConnectorInfo> p = connectors.iterator();
+ while(p.hasNext())
{
- connection.activate();
+ ConnectorInfo c = p.next();
+ java.util.Set<ConnectCallback> cbs = _pending.remove(c);
+ if(cbs != null)
+ {
+ for(ConnectCallback cc : cbs)
+ {
+ if(cc.removeConnectors(connectors))
+ {
+ failedCallbacks.add(cc);
+ }
+ else
+ {
+ callbacks.add(cc);
+ }
+ }
+ }
+ }
+
+ for(ConnectCallback cc : callbacks)
+ {
+ assert(!failedCallbacks.contains(cc));
+ cc.removeFromPending();
}
+ notifyAll();
}
+ for(ConnectCallback cc : callbacks)
+ {
+ cc.getConnection();
+ }
+ for(ConnectCallback cc : failedCallbacks)
+ {
+ cc.setException(ex);
+ }
+ }
+
+ private boolean
+ addToPending(ConnectCallback cb, java.util.List<ConnectorInfo> connectors)
+ {
+ //
+ // Add the callback to each connector pending list.
+ //
+ java.util.Iterator<ConnectorInfo> p = connectors.iterator();
+ boolean found = false;
+ while(p.hasNext())
+ {
+ java.util.Set<ConnectCallback> cbs = _pending.get(p.next());
+ if(cbs != null)
+ {
+ found = true;
+ if(cb != null)
+ {
+ cbs.add(cb); // Add the callback to each pending connector.
+ }
+ }
+ }
+
+ if(found)
+ {
+ return true;
+ }
+
//
- // Notify any waiting callbacks.
+ // If there's no pending connection for the given connectors, we're
+ // responsible for its establishment. We add empty pending lists,
+ // other callbacks to the same connectors will be queued.
//
- java.util.Iterator<ConnectCallback> p = callbacks.iterator();
+ p = connectors.iterator();
+ while(p.hasNext())
+ {
+ ConnectorInfo obj = p.next();
+ if(!_pending.containsKey(obj))
+ {
+ _pending.put(obj, new java.util.HashSet<ConnectCallback>());
+ }
+ }
+
+ return false;
+ }
+
+ private void
+ removeFromPending(ConnectCallback cb, java.util.List<ConnectorInfo> connectors)
+ {
+ java.util.Iterator<ConnectorInfo> p = connectors.iterator();
while(p.hasNext())
{
- p.next().getConnection();
+ java.util.Set<ConnectCallback> cbs = _pending.get(p.next());
+ if(cbs != null)
+ {
+ cbs.remove(cb);
+ }
}
}
@@ -853,7 +930,7 @@ public final class OutgoingConnectionFactory
TraceLevels traceLevels = _instance.traceLevels();
if(traceLevels.retry >= 2)
{
- StringBuffer s = new StringBuffer();
+ StringBuilder s = new StringBuilder(128);
s.append("connection to endpoint failed");
if(ex instanceof Ice.CommunicatorDestroyedException)
{
@@ -913,7 +990,7 @@ public final class OutgoingConnectionFactory
TraceLevels traceLevels = _instance.traceLevels();
if(traceLevels.retry >= 2)
{
- StringBuffer s = new StringBuffer();
+ StringBuilder s = new StringBuilder(128);
s.append("couldn't resolve endpoint host");
if(ex instanceof Ice.CommunicatorDestroyedException)
{
@@ -972,27 +1049,15 @@ public final class OutgoingConnectionFactory
_selType = selType;
_endpointsIter = _endpoints.iterator();
}
-
+
//
// Methods from ConnectionI.StartCallback
//
public void
connectionStartCompleted(Ice.ConnectionI connection)
{
- boolean compress;
- DefaultsAndOverrides defaultsAndOverrides = _factory._instance.defaultsAndOverrides();
- if(defaultsAndOverrides.overrideCompress)
- {
- compress = defaultsAndOverrides.overrideCompressValue;
- }
- else
- {
- compress = _current.endpoint.compress();
- }
-
- _factory.finishGetConnection(_connectors, this, connection);
- _callback.setConnection(connection, compress);
- _factory.decPendingConnectCount(); // Must be called last.
+ connection.activate();
+ _factory.finishGetConnection(_connectors, _current, connection, this);
}
public void
@@ -1003,9 +1068,7 @@ public final class OutgoingConnectionFactory
_factory.handleException(ex, _current, connection, _hasMore || _iter.hasNext());
if(ex instanceof Ice.CommunicatorDestroyedException) // No need to continue.
{
- _factory.finishGetConnection(_connectors, this, null);
- _callback.setException(ex);
- _factory.decPendingConnectCount(); // Must be called last.
+ _factory.finishGetConnection(_connectors, ex, this);
}
else if(_iter.hasNext()) // Try the next connector.
{
@@ -1013,9 +1076,7 @@ public final class OutgoingConnectionFactory
}
else
{
- _factory.finishGetConnection(_connectors, this, null);
- _callback.setException(ex);
- _factory.decPendingConnectCount(); // Must be called last.
+ _factory.finishGetConnection(_connectors, ex, this);
}
}
@@ -1080,6 +1141,47 @@ public final class OutgoingConnectionFactory
}
}
+ public void
+ setConnection(Ice.ConnectionI connection, boolean compress)
+ {
+ //
+ // Callback from the factory: the connection to one of the callback
+ // connectors has been established.
+ //
+ _callback.setConnection(connection, compress);
+ _factory.decPendingConnectCount(); // Must be called last.
+ }
+
+ public void
+ setException(Ice.LocalException ex)
+ {
+ //
+ // Callback from the factory: connection establishment failed.
+ //
+ _callback.setException(ex);
+ _factory.decPendingConnectCount(); // Must be called last.
+ }
+
+ public boolean
+ hasConnector(ConnectorInfo ci)
+ {
+ return _connectors.contains(ci);
+ }
+
+ public boolean
+ removeConnectors(java.util.List<ConnectorInfo> connectors)
+ {
+ _connectors.removeAll(connectors);
+ _iter = _connectors.iterator();
+ return _connectors.isEmpty();
+ }
+
+ public void
+ removeFromPending()
+ {
+ _factory.removeFromPending(this, _connectors);
+ }
+
void
getConnectors()
{