summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp356
-rw-r--r--cpp/src/Ice/ConnectionFactory.h18
-rw-r--r--cpp/src/Ice/Reference.cpp94
-rw-r--r--cpp/test/Ice/background/AllTests.cpp46
-rw-r--r--cpp/test/Ice/binding/AllTests.cpp105
-rwxr-xr-xcpp/test/Ice/gc/run.py5
-rw-r--r--cs/src/Ice/ConnectionFactory.cs325
-rw-r--r--cs/test/Ice/binding/AllTests.cs114
-rw-r--r--java/src/IceInternal/OutgoingConnectionFactory.java338
-rw-r--r--java/test/Ice/background/Connector.java2
-rw-r--r--java/test/Ice/binding/AllTests.java118
11 files changed, 1141 insertions, 380 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index a275f61e292..107887e6501 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -55,6 +55,12 @@ IceInternal::OutgoingConnectionFactory::ConnectorInfo::operator<(const Connector
return connector < other.connector;
}
+bool
+IceInternal::OutgoingConnectionFactory::ConnectorInfo::operator==(const ConnectorInfo& other) const
+{
+ return connector == other.connector;
+}
+
void
IceInternal::OutgoingConnectionFactory::destroy()
{
@@ -193,7 +199,8 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
// Try to establish the connection to the connectors.
//
DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
- for(vector<ConnectorInfo>::const_iterator q = connectors.begin(); q != connectors.end(); ++q)
+ vector<ConnectorInfo>::const_iterator q;
+ for(q = connectors.begin(); q != connectors.end(); ++q)
{
try
{
@@ -209,6 +216,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
compress = q->endpoint->compress();
}
+ connection->activate();
break;
}
catch(const Ice::CommunicatorDestroyedException& ex)
@@ -230,7 +238,14 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
// Finish creating the connection (this removes the connectors from the _pending
// list and notifies any waiting threads).
//
- finishGetConnection(connectors, 0, connection);
+ if(connection)
+ {
+ finishGetConnection(connectors, *q, connection, 0);
+ }
+ else
+ {
+ finishGetConnection(connectors, *exception.get(), 0);
+ }
if(!connection)
{
@@ -425,7 +440,7 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr
for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
{
pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
- multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connectionsByEndpoint.equal_range(*p);
+ multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connectionsByEndpoint.equal_range(*p);
for(multimap<EndpointIPtr, ConnectionIPtr>::iterator q = pr.first; q != pr.second; ++q)
{
@@ -454,8 +469,13 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<ConnectorInf
DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
{
+ if(_pending.find(*p) != _pending.end())
+ {
+ continue;
+ }
+
pair<multimap<ConnectorInfo, ConnectionIPtr>::iterator,
- multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(*p);
+ multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(*p);
if(pr.first == pr.second)
{
@@ -520,7 +540,8 @@ IceInternal::OutgoingConnectionFactory::decPendingConnectCount()
ConnectionIPtr
IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo>& connectors,
- const ConnectCallbackPtr& cb, bool& compress)
+ const ConnectCallbackPtr& cb,
+ bool& compress)
{
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -563,62 +584,27 @@ IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo
// finish if one of them is currently establishing a connection to one
// of our connectors.
//
- while(!_destroyed)
+ while(true)
{
+ if(_destroyed)
+ {
+ throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__);
+ }
+
//
// Search for a matching connection. If we find one, we're done.
//
Ice::ConnectionIPtr connection = findConnection(connectors, compress);
if(connection)
{
- if(cb)
- {
- //
- // This might not be the first getConnection call for the callback. We need
- // to ensure that the callback isn't registered with any other pending
- // connectors since we just found a connection and therefore don't need to
- // wait anymore for other pending connectors.
- //
- for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
- {
- map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p);
- if(q != _pending.end())
- {
- q->second.erase(cb);
- }
- }
- }
return connection;
}
//
- // Determine whether another thread is currently attempting to connect to one of our endpoints;
- // if so we wait until it's done.
+ // Determine whether another thread/request is currently attempting to connect to
+ // one of our endpoints; if so we wait until it's done.
//
- bool found = false;
- for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
- {
- map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p);
- if(q != _pending.end())
- {
- found = true;
- if(cb)
- {
- q->second.insert(cb); // Add the callback to each pending connector.
- }
- }
- }
-
- if(!found)
- {
- //
- // If no thread is currently establishing a connection to one of our connectors,
- // we get out of this loop and start the connection establishment to one of the
- // given connectors.
- //
- break;
- }
- else
+ if(addToPending(cb, connectors))
{
//
// If a callback is not specified we wait until another thread notifies us about a
@@ -635,23 +621,14 @@ IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo
return 0;
}
}
- }
-
- if(_destroyed)
- {
- throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__);
- }
-
- //
- // No connection to any of our endpoints exists yet; we add the given connectors to
- // the _pending set to indicate that we're attempting connection establishment to
- // these connectors. We might attempt to connect to the same connector multiple times.
- //
- for(vector<ConnectorInfo>::const_iterator r = connectors.begin(); r != connectors.end(); ++r)
- {
- if(_pending.find(*r) == _pending.end())
+ else
{
- _pending.insert(pair<ConnectorInfo, set<ConnectCallbackPtr> >(*r, set<ConnectCallbackPtr>()));
+ //
+ // If no thread is currently establishing a connection to one of our connectors,
+ // we get out of this loop and start the connection establishment to one of the
+ // given connectors.
+ //
+ break;
}
}
}
@@ -709,47 +686,179 @@ IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& t
void
IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<ConnectorInfo>& connectors,
- const ConnectCallbackPtr& cb,
- const ConnectionIPtr& connection)
+ const ConnectorInfo& ci,
+ const ConnectionIPtr& connection,
+ const ConnectCallbackPtr& cb)
{
+ set<ConnectCallbackPtr> connectionCallbacks;
+ if(cb)
+ {
+ connectionCallbacks.insert(cb);
+ }
+
set<ConnectCallbackPtr> callbacks;
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
+ {
+ map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p);
+ if(q != _pending.end())
+ {
+ for(set<ConnectCallbackPtr>::const_iterator r = q->second.begin(); r != q->second.end(); ++r)
+ {
+ if((*r)->hasConnector(ci))
+ {
+ connectionCallbacks.insert(*r);
+ }
+ else
+ {
+ callbacks.insert(*r);
+ }
+ }
+ _pending.erase(q);
+ }
+ }
+
+ set<ConnectCallbackPtr>::iterator r;
+ for(r = connectionCallbacks.begin(); r != connectionCallbacks.end(); ++r)
+ {
+ (*r)->removeFromPending();
+ callbacks.erase(*r);
+ }
+ for(r = callbacks.begin(); r != callbacks.end(); ++r)
+ {
+ (*r)->removeFromPending();
+ }
+ notifyAll();
+ }
+ bool compress;
+ DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
+ if(defaultsAndOverrides->overrideCompress)
+ {
+ compress = defaultsAndOverrides->overrideCompressValue;
+ }
+ else
+ {
+ compress = ci.endpoint->compress();
+ }
+
+ set<ConnectCallbackPtr>::const_iterator p;
+ for(p = callbacks.begin(); p != callbacks.end(); ++p)
+ {
+ (*p)->getConnection();
+ }
+ for(p = connectionCallbacks.begin(); p != connectionCallbacks.end(); ++p)
+ {
+ (*p)->setConnection(connection, compress);
+ }
+}
+
+void
+IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<ConnectorInfo>& connectors,
+ const Ice::LocalException& ex,
+ const ConnectCallbackPtr& cb)
+{
+ set<ConnectCallbackPtr> failedCallbacks;
+ if(cb)
+ {
+ failedCallbacks.insert(cb);
+ }
+
+ set<ConnectCallbackPtr> callbacks;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // We're done trying to connect to the given connectors so we remove the
- // connectors from the pending list and notify waiting threads. We also
- // notify the pending connect callbacks (outside the synchronization).
- //
-
for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
{
map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p);
if(q != _pending.end())
{
- callbacks.insert(q->second.begin(), q->second.end());
+ for(set<ConnectCallbackPtr>::const_iterator r = q->second.begin(); r != q->second.end(); ++r)
+ {
+ if((*r)->removeConnectors(connectors))
+ {
+ failedCallbacks.insert(*r);
+ }
+ else
+ {
+ callbacks.insert(*r);
+ }
+ }
_pending.erase(q);
}
}
+
+ for(set<ConnectCallbackPtr>::iterator r = callbacks.begin(); r != callbacks.end(); ++r)
+ {
+ assert(failedCallbacks.find(*r) == failedCallbacks.end());
+ (*r)->removeFromPending();
+ }
notifyAll();
+ }
+
+ set<ConnectCallbackPtr>::const_iterator p;
+ for(p = callbacks.begin(); p != callbacks.end(); ++p)
+ {
+ (*p)->getConnection();
+ }
+ for(p = failedCallbacks.begin(); p != failedCallbacks.end(); ++p)
+ {
+ (*p)->setException(ex);
+ }
+}
- //
- // If the connect attempt succeeded and the communicator is not destroyed,
- // activate the connection!
- //
- if(connection && !_destroyed)
+bool
+IceInternal::OutgoingConnectionFactory::addToPending(const ConnectCallbackPtr& cb,
+ const vector<ConnectorInfo>& connectors)
+{
+ //
+ // Add the callback to each connector pending list.
+ //
+ bool found = false;
+ for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
+ {
+ map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p);
+ if(q != _pending.end())
{
- connection->activate();
+ found = true;
+ if(cb)
+ {
+ q->second.insert(cb);
+ }
}
}
+ if(found)
+ {
+ return true;
+ }
+
//
- // Notify any waiting callbacks.
+ // If there's no pending connection for the given connectors, we're
+ // responsible for its establishment. We add empty pending lists,
+ // other callbacks to the same connectors will be queued.
//
- for(set<ConnectCallbackPtr>::const_iterator p = callbacks.begin(); p != callbacks.end(); ++p)
+ for(vector<ConnectorInfo>::const_iterator r = connectors.begin(); r != connectors.end(); ++r)
{
- (*p)->getConnection();
+ if(_pending.find(*r) == _pending.end())
+ {
+ _pending.insert(pair<ConnectorInfo, set<ConnectCallbackPtr> >(*r, set<ConnectCallbackPtr>()));
+ }
+ }
+ return false;
+}
+
+void
+IceInternal::OutgoingConnectionFactory::removeFromPending(const ConnectCallbackPtr& cb,
+ const vector<ConnectorInfo>& connectors)
+{
+ for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
+ {
+ map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p);
+ if(q != _pending.end())
+ {
+ q->second.erase(cb);
+ }
}
}
@@ -790,7 +899,7 @@ IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
pair<multimap<ConnectorInfo, ConnectionIPtr>::iterator,
- multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(ci);
+ multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(ci);
for(multimap<ConnectorInfo, ConnectionIPtr>::iterator p = pr.first; p != pr.second; ++p)
{
@@ -802,7 +911,7 @@ IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex
}
pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
- multimap<EndpointIPtr, ConnectionIPtr>::iterator> qr = _connectionsByEndpoint.equal_range(ci.endpoint);
+ multimap<EndpointIPtr, ConnectionIPtr>::iterator> qr = _connectionsByEndpoint.equal_range(ci.endpoint);
for(multimap<EndpointIPtr, ConnectionIPtr>::iterator q = qr.first; q != qr.second; ++q)
{
@@ -864,20 +973,8 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::ConnectCallback(const O
void
IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartCompleted(const ConnectionIPtr& connection)
{
- bool compress;
- DefaultsAndOverridesPtr defaultsAndOverrides = _factory->_instance->defaultsAndOverrides();
- if(defaultsAndOverrides->overrideCompress)
- {
- compress = defaultsAndOverrides->overrideCompressValue;
- }
- else
- {
- compress = _iter->endpoint->compress();
- }
-
- _factory->finishGetConnection(_connectors, this, connection);
- _callback->setConnection(connection, compress);
- _factory->decPendingConnectCount(); // Must be called last.
+ connection->activate();
+ _factory->finishGetConnection(_connectors, *_iter, connection, this);
}
void
@@ -889,9 +986,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(c
_factory->handleException(ex, *_iter, connection, _hasMore || _iter != _connectors.end() - 1);
if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) // No need to continue.
{
- _factory->finishGetConnection(_connectors, this, 0);
- _callback->setException(ex);
- _factory->decPendingConnectCount(); // Must be called last.
+ _factory->finishGetConnection(_connectors, ex, this);
}
else if(++_iter != _connectors.end()) // Try the next connector.
{
@@ -899,9 +994,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(c
}
else
{
- _factory->finishGetConnection(_connectors, this, 0);
- _callback->setException(ex);
- _factory->decPendingConnectCount(); // Must be called last.
+ _factory->finishGetConnection(_connectors, ex, this);
}
}
@@ -1016,7 +1109,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection()
// A null return value from getConnection indicates that the connection
// is being established and that everthing has been done to ensure that
// the callback will be notified when the connection establishment is
- // done.
+ // done or that the callback already obtain the connection.
//
return;
}
@@ -1050,6 +1143,55 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector()
}
}
+void
+IceInternal::OutgoingConnectionFactory::ConnectCallback::setConnection(const Ice::ConnectionIPtr& connection,
+ bool compress)
+{
+ //
+ // Callback from the factory: the connection to one of the callback
+ // connectors has been established.
+ //
+ _callback->setConnection(connection, compress);
+ _factory->decPendingConnectCount(); // Must be called last.
+}
+
+void
+IceInternal::OutgoingConnectionFactory::ConnectCallback::setException(const Ice::LocalException& ex)
+{
+ //
+ // Callback from the factory: connection establishment failed.
+ //
+ _callback->setException(ex);
+ _factory->decPendingConnectCount(); // Must be called last.
+}
+
+bool
+IceInternal::OutgoingConnectionFactory::ConnectCallback::hasConnector(const ConnectorInfo& ci)
+{
+ return find(_connectors.begin(), _connectors.end(), ci) != _connectors.end();
+}
+
+bool
+IceInternal::OutgoingConnectionFactory::ConnectCallback::removeConnectors(const vector<ConnectorInfo>& connectors)
+{
+ //
+ // Callback from the factory: connecting to the given connectors
+ // failed, we remove the connectors and return true if there's
+ // no more connectors left to try.
+ //
+ for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
+ {
+ _connectors.erase(remove(_connectors.begin(), _connectors.end(), *p), _connectors.end());
+ }
+ return _connectors.empty();
+}
+
+void
+IceInternal::OutgoingConnectionFactory::ConnectCallback::removeFromPending()
+{
+ _factory->removeFromPending(this, _connectors);
+}
+
bool
IceInternal::OutgoingConnectionFactory::ConnectCallback::operator<(const ConnectCallback& rhs) const
{
diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h
index 7bd97ab70e1..89ef6bc514b 100644
--- a/cpp/src/Ice/ConnectionFactory.h
+++ b/cpp/src/Ice/ConnectionFactory.h
@@ -75,6 +75,7 @@ private:
}
bool operator<(const ConnectorInfo& other) const;
+ bool operator==(const ConnectorInfo& other) const;
ConnectorPtr connector;
EndpointIPtr endpoint;
@@ -99,8 +100,15 @@ private:
void getConnection();
void nextConnector();
+ void setConnection(const Ice::ConnectionIPtr&, bool);
+ void setException(const Ice::LocalException&);
+
+ bool hasConnector(const ConnectorInfo&);
+ bool removeConnectors(const std::vector<ConnectorInfo>&);
+ void removeFromPending();
+
bool operator<(const ConnectCallback&) const;
-
+
private:
const OutgoingConnectionFactoryPtr _factory;
@@ -120,7 +128,13 @@ private:
void incPendingConnectCount();
void decPendingConnectCount();
Ice::ConnectionIPtr getConnection(const std::vector<ConnectorInfo>&, const ConnectCallbackPtr&, bool&);
- void finishGetConnection(const std::vector<ConnectorInfo>&, const ConnectCallbackPtr&, const Ice::ConnectionIPtr&);
+ void finishGetConnection(const std::vector<ConnectorInfo>&, const ConnectorInfo&, const Ice::ConnectionIPtr&,
+ const ConnectCallbackPtr&);
+ void finishGetConnection(const std::vector<ConnectorInfo>&, const Ice::LocalException&, const ConnectCallbackPtr&);
+
+ bool addToPending(const ConnectCallbackPtr&, const std::vector<ConnectorInfo>&);
+ void removeFromPending(const ConnectCallbackPtr&, const std::vector<ConnectorInfo>&);
+
Ice::ConnectionIPtr findConnection(const std::vector<ConnectorInfo>&, bool&);
Ice::ConnectionIPtr createConnection(const TransceiverPtr&, const ConnectorInfo&);
diff --git a/cpp/src/Ice/Reference.cpp b/cpp/src/Ice/Reference.cpp
index d3cc01479d6..29c177567c0 100644
--- a/cpp/src/Ice/Reference.cpp
+++ b/cpp/src/Ice/Reference.cpp
@@ -1678,29 +1678,29 @@ IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& all
virtual void
setConnection(const Ice::ConnectionIPtr& connection, bool compress)
+ {
+ //
+ // If we have a router, set the object adapter for this router
+ // (if any) to the new connection, so that callbacks from the
+ // router can be received over this new connection.
+ //
+ if(_routerInfo && _routerInfo->getAdapter())
{
- //
- // If we have a router, set the object adapter for this router
- // (if any) to the new connection, so that callbacks from the
- // router can be received over this new connection.
- //
- if(_routerInfo && _routerInfo->getAdapter())
- {
- connection->setAdapter(_routerInfo->getAdapter());
- }
- _callback->setConnection(connection, compress);
+ connection->setAdapter(_routerInfo->getAdapter());
}
+ _callback->setConnection(connection, compress);
+ }
virtual void
setException(const Ice::LocalException& ex)
- {
- _callback->setException(ex);
- }
-
+ {
+ _callback->setException(ex);
+ }
+
CB1(const RouterInfoPtr& routerInfo, const GetConnectionCallbackPtr& callback) :
- _routerInfo(routerInfo), _callback(callback)
- {
- }
+ _routerInfo(routerInfo), _callback(callback)
+ {
+ }
private:
@@ -1723,49 +1723,49 @@ IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& all
virtual void
setConnection(const Ice::ConnectionIPtr& connection, bool compress)
+ {
+ //
+ // If we have a router, set the object adapter for this router
+ // (if any) to the new connection, so that callbacks from the
+ // router can be received over this new connection.
+ //
+ if(_reference->getRouterInfo() && _reference->getRouterInfo()->getAdapter())
{
- //
- // If we have a router, set the object adapter for this router
- // (if any) to the new connection, so that callbacks from the
- // router can be received over this new connection.
- //
- if(_reference->getRouterInfo() && _reference->getRouterInfo()->getAdapter())
- {
- connection->setAdapter(_reference->getRouterInfo()->getAdapter());
- }
- _callback->setConnection(connection, compress);
+ connection->setAdapter(_reference->getRouterInfo()->getAdapter());
}
+ _callback->setConnection(connection, compress);
+ }
virtual void
setException(const Ice::LocalException& ex)
+ {
+ if(!_exception.get())
{
- if(!_exception.get())
- {
- _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
- }
-
- if(++_i == _endpoints.size())
- {
- _callback->setException(*_exception.get());
- return;
- }
-
- const bool more = _i != _endpoints.size() - 1;
- vector<EndpointIPtr> endpoint;
- endpoint.push_back(_endpoints[_i]);
-
- OutgoingConnectionFactoryPtr factory = _reference->getInstance()->outgoingConnectionFactory();
- factory->create(endpoint, more, _reference->getEndpointSelection(), this);
+ _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
}
-
+
+ if(++_i == _endpoints.size())
+ {
+ _callback->setException(*_exception.get());
+ return;
+ }
+
+ const bool more = _i != _endpoints.size() - 1;
+ vector<EndpointIPtr> endpoint;
+ endpoint.push_back(_endpoints[_i]);
+
+ OutgoingConnectionFactoryPtr factory = _reference->getInstance()->outgoingConnectionFactory();
+ factory->create(endpoint, more, _reference->getEndpointSelection(), this);
+ }
+
CB2(const RoutableReferencePtr& reference, const vector<EndpointIPtr>& endpoints,
const GetConnectionCallbackPtr& callback) :
_reference(reference),
_endpoints(endpoints),
_callback(callback),
_i(0)
- {
- }
+ {
+ }
private:
diff --git a/cpp/test/Ice/background/AllTests.cpp b/cpp/test/Ice/background/AllTests.cpp
index 5da5b39ee7b..1aa19dbefa8 100644
--- a/cpp/test/Ice/background/AllTests.cpp
+++ b/cpp/test/Ice/background/AllTests.cpp
@@ -701,6 +701,11 @@ validationTests(const ConfigurationPtr& configuration,
{
configuration->readException(0);
}
+ catch(const Ice::LocalException& ex)
+ {
+ cerr << ex << endl;
+ test(false);
+ }
OpExAMICallbackPtr cbEx = new OpExAMICallback();
@@ -743,6 +748,11 @@ validationTests(const ConfigurationPtr& configuration,
configuration->readException(0);
configuration->readReady(true);
}
+ catch(const Ice::LocalException& ex)
+ {
+ cerr << ex << endl;
+ test(false);
+ }
configuration->readReady(false);
configuration->readException(new Ice::SocketException(__FILE__, __LINE__));
@@ -781,6 +791,11 @@ validationTests(const ConfigurationPtr& configuration,
{
ctl->writeException(false);
}
+ catch(const Ice::LocalException& ex)
+ {
+ cerr << ex << endl;
+ test(false);
+ }
try
{
@@ -809,7 +824,11 @@ validationTests(const ConfigurationPtr& configuration,
ctl->writeException(false);
ctl->writeReady(true);
}
-
+ catch(const Ice::LocalException& ex)
+ {
+ cerr << ex << endl;
+ test(false);
+ }
Ice::ByteSeq seq;
seq.resize(512 * 1024);
@@ -833,7 +852,15 @@ validationTests(const ConfigurationPtr& configuration,
backgroundBatchOneway->op();
backgroundBatchOneway->op();
ctl->resumeAdapter();
- backgroundBatchOneway->ice_flushBatchRequests();
+ try
+ {
+ backgroundBatchOneway->ice_flushBatchRequests();
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ cerr << ex << endl;
+ test(false);
+ }
//
// Send bigger requests to test with auto-flushing.
@@ -853,7 +880,15 @@ validationTests(const ConfigurationPtr& configuration,
backgroundBatchOneway->opWithPayload(seq);
backgroundBatchOneway->opWithPayload(seq);
ctl->resumeAdapter();
- backgroundBatchOneway->ice_flushBatchRequests();
+ try
+ {
+ backgroundBatchOneway->ice_flushBatchRequests();
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ cerr << ex << endl;
+ test(false);
+ }
//
// Then try the same thing with async flush.
@@ -874,7 +909,8 @@ validationTests(const ConfigurationPtr& configuration,
backgroundBatchOneway->op();
backgroundBatchOneway->op();
ctl->resumeAdapter();
- backgroundBatchOneway->ice_flushBatchRequests_async(new FlushBatchRequestsCallback());
+ FlushBatchRequestsCallbackPtr fcb = new FlushBatchRequestsCallback();
+ backgroundBatchOneway->ice_flushBatchRequests_async(fcb);
backgroundBatchOneway->ice_getConnection()->close(false);
backgroundBatchOneway->ice_getConnection()->close(false);
@@ -892,7 +928,7 @@ validationTests(const ConfigurationPtr& configuration,
backgroundBatchOneway->opWithPayload(seq);
backgroundBatchOneway->opWithPayload(seq);
ctl->resumeAdapter();
- FlushBatchRequestsCallbackPtr fcb = new FlushBatchRequestsCallback();
+ fcb = new FlushBatchRequestsCallback();
backgroundBatchOneway->ice_flushBatchRequests_async(fcb);
//
// We can't close the connection before ensuring all the batches have been sent since
diff --git a/cpp/test/Ice/binding/AllTests.cpp b/cpp/test/Ice/binding/AllTests.cpp
index eec35751912..1f9b133d7c9 100644
--- a/cpp/test/Ice/binding/AllTests.cpp
+++ b/cpp/test/Ice/binding/AllTests.cpp
@@ -62,6 +62,21 @@ private:
};
typedef IceUtil::Handle<GetAdapterNameCB> GetAdapterNameCBPtr;
+class NoOpGetAdapterNameCB : public AMI_TestIntf_getAdapterName
+{
+public:
+
+ virtual void
+ ice_response(const string&)
+ {
+ }
+
+ virtual void
+ ice_exception(const Ice::Exception&)
+ {
+ }
+};
+
string
getAdapterNameWithAMI(const TestIntfPrx& test)
{
@@ -220,6 +235,95 @@ allTests(const Ice::CommunicatorPtr& communicator)
}
cout << "ok" << endl;
+ cout << "testing binding with multiple random endpoints... " << flush;
+ {
+ vector<RemoteObjectAdapterPrx> adapters;
+ adapters.push_back(com->createObjectAdapter("AdapterRandom11", "default"));
+ adapters.push_back(com->createObjectAdapter("AdapterRandom12", "default"));
+ adapters.push_back(com->createObjectAdapter("AdapterRandom13", "default"));
+ adapters.push_back(com->createObjectAdapter("AdapterRandom14", "default"));
+ adapters.push_back(com->createObjectAdapter("AdapterRandom15", "default"));
+
+#ifdef _WIN32
+ int count = 60;
+#else
+ int count = 20;
+#endif
+ int adapterCount = adapters.size();
+ while(--count > 0)
+ {
+#ifdef _WIN32
+ if(count == 10)
+ {
+ com->deactivateObjectAdapter(adapters[4]);
+ --adapterCount;
+ }
+ vector<TestIntfPrx> proxies;
+ proxies.resize(10);
+#else
+ if(count < 60 && count % 10 == 0)
+ {
+ com->deactivateObjectAdapter(adapters[count / 10 - 1]);
+ --adapterCount;
+ }
+ vector<TestIntfPrx> proxies;
+ proxies.resize(40);
+#endif
+ unsigned int i;
+ for(i = 0; i < proxies.size(); ++i)
+ {
+ vector<RemoteObjectAdapterPrx> adpts;
+ adpts.resize(IceUtilInternal::random(static_cast<int>(adapters.size())));
+ if(adpts.empty())
+ {
+ adpts.resize(1);
+ }
+ for(vector<RemoteObjectAdapterPrx>::iterator p = adpts.begin(); p != adpts.end(); ++p)
+ {
+ *p = adapters[IceUtilInternal::random(static_cast<int>(adapters.size()))];
+ }
+ proxies[i] = createTestIntfPrx(adpts);
+ }
+
+ for(i = 0; i < proxies.size(); i++)
+ {
+ proxies[i]->getAdapterName_async(new NoOpGetAdapterNameCB());
+ }
+ for(i = 0; i < proxies.size(); i++)
+ {
+ try
+ {
+ proxies[i]->ice_ping();
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ }
+ set<Ice::ConnectionPtr> connections;
+ for(i = 0; i < proxies.size(); i++)
+ {
+ if(proxies[i]->ice_getCachedConnection())
+ {
+ connections.insert(proxies[i]->ice_getCachedConnection());
+ }
+ }
+ test(static_cast<int>(connections.size()) <= adapterCount);
+
+ for(vector<RemoteObjectAdapterPrx>::const_iterator q = adapters.begin(); q != adapters.end(); ++q)
+ {
+ try
+ {
+ (*q)->getTestIntf()->ice_getConnection()->close(false);
+ }
+ catch(const Ice::LocalException&)
+ {
+ // Expected if adapter is down.
+ }
+ }
+ }
+ }
+ cout << "ok" << endl;
+
cout << "testing binding with multiple endpoints and AMI... " << flush;
{
vector<RemoteObjectAdapterPrx> adapters;
@@ -468,7 +572,6 @@ allTests(const Ice::CommunicatorPtr& communicator)
com->deactivateObjectAdapter(adapters[2]);
-
test(test->getAdapterName() == "Adapter52");
deactivate(com, adapters);
diff --git a/cpp/test/Ice/gc/run.py b/cpp/test/Ice/gc/run.py
index 03a703b953c..ced3120bc64 100755
--- a/cpp/test/Ice/gc/run.py
+++ b/cpp/test/Ice/gc/run.py
@@ -25,5 +25,8 @@ client = os.path.join(os.getcwd(), "client")
seedfile = os.path.join(os.getcwd(), "seed")
TestUtil.simpleTest(client, seedfile)
-TestUtil.startClient(client, seedfile)
+
+clientProc = TestUtil.startClient(client, seedfile)
+clientProc.waitTestSuccess()
+
os.remove(seedfile)
diff --git a/cs/src/Ice/ConnectionFactory.cs b/cs/src/Ice/ConnectionFactory.cs
index b671ede01d9..564d718f971 100644
--- a/cs/src/Ice/ConnectionFactory.cs
+++ b/cs/src/Ice/ConnectionFactory.cs
@@ -183,9 +183,10 @@ namespace IceInternal
// Try to establish the connection to the connectors.
//
DefaultsAndOverrides defaultsAndOverrides = instance_.defaultsAndOverrides();
+ ConnectorInfo ci = null;
for(int i = 0; i < connectors.Count; ++i)
{
- ConnectorInfo ci = connectors[i];
+ ci = connectors[i];
try
{
connection = createConnection(ci.connector.connect(), ci);
@@ -199,7 +200,7 @@ namespace IceInternal
{
compress = ci.endpoint.compress();
}
-
+ connection.activate();
break;
}
catch(Ice.CommunicatorDestroyedException ex)
@@ -221,7 +222,14 @@ namespace IceInternal
// Finish creating the connection (this removes the connectors from the _pending
// list and notifies any waiting threads).
//
- finishGetConnection(connectors, null, connection);
+ if(connection != null)
+ {
+ finishGetConnection(connectors, ci, connection, null);
+ }
+ else
+ {
+ finishGetConnection(connectors, exception, null);
+ }
if(connection == null)
{
@@ -454,6 +462,11 @@ namespace IceInternal
DefaultsAndOverrides defaultsAndOverrides = instance_.defaultsAndOverrides();
foreach(ConnectorInfo ci in connectors)
{
+ if(_pending.ContainsKey(ci))
+ {
+ continue;
+ }
+
LinkedList connectionList = null;
if(!_connections.TryGetValue(ci, out connectionList))
{
@@ -588,62 +601,23 @@ namespace IceInternal
// finish if one of them is currently establishing a connection to one
// of our connectors.
//
- while(!_destroyed)
+ while(true)
{
+ if(_destroyed)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
+
//
// Search for a matching connection. If we find one, we're done.
//
Ice.ConnectionI connection = findConnection(connectors, out compress);
if(connection != null)
{
- if(cb != null)
- {
- //
- // This might not be the first getConnection call for the callback. We need
- // to ensure that the callback isn't registered with any other pending
- // connectors since we just found a connection and therefore don't need to
- // wait anymore for other pending connectors.
- //
- foreach(ConnectorInfo ci in connectors)
- {
- Set cbs = null;
- if(_pending.TryGetValue(ci, out cbs))
- {
- cbs.Remove(cb);
- }
- }
- }
return connection;
}
- //
- // Determine whether another thread is currently attempting to connect to one of our endpoints;
- // if so we wait until it's done.
- //
- bool found = false;
- foreach(ConnectorInfo ci in connectors)
- {
- Set cbs = null;
- if(_pending.TryGetValue(ci, out cbs))
- {
- found = true;
- if(cb != null)
- {
- cbs.Add(cb); // Add the callback to each pending connector.
- }
- }
- }
-
- if(!found)
- {
- //
- // If no thread is currently establishing a connection to one of our connectors,
- // we get out of this loop and start the connection establishment to one of the
- // given connectors.
- //
- break;
- }
- else
+ if(addToPending(cb, connectors))
{
//
// If a callback is not specified we wait until another thread notifies us about a
@@ -660,23 +634,14 @@ namespace IceInternal
return null;
}
}
- }
-
- if(_destroyed)
- {
- throw new Ice.CommunicatorDestroyedException();
- }
-
- //
- // No connection to any of our endpoints exists yet; we add the given connectors to
- // the _pending set to indicate that we're attempting connection establishment to
- // these connectors. We might attempt to connect to the same connector multiple times.
- //
- foreach(ConnectorInfo ci in connectors)
- {
- if(!_pending.ContainsKey(ci))
+ else
{
- _pending.Add(ci, new Set());
+ //
+ // If no thread is currently establishing a connection to one of our connectors,
+ // we get out of this loop and start the connection establishment to one of the
+ // given connectors.
+ //
+ break;
}
}
}
@@ -724,6 +689,7 @@ namespace IceInternal
_connections.Add(ci, connectionList);
}
connectionList.Add(connection);
+ connectionList = null;
if(!_connectionsByEndpoint.TryGetValue(ci.endpoint, out connectionList))
{
connectionList = new LinkedList();
@@ -747,49 +713,120 @@ namespace IceInternal
}
}
- private void finishGetConnection(List<ConnectorInfo> connectors, ConnectCallback cb, Ice.ConnectionI connection)
+ private void finishGetConnection(List<ConnectorInfo> connectors,
+ ConnectorInfo ci,
+ Ice.ConnectionI connection,
+ ConnectCallback cb)
{
- Set callbacks = new Set();
+ Set connectionCallbacks = new Set();
+ if(cb != null)
+ {
+ connectionCallbacks.Add(cb);
+ }
+ Set callbacks = new Set();
lock(this)
{
- //
- // We're done trying to connect to the given connectors so we remove the
- // connectors from the pending list and notify waiting threads. We also
- // notify the pending connect callbacks (outside the synchronization).
- //
-
- foreach(ConnectorInfo ci in connectors)
+ foreach(ConnectorInfo c in connectors)
{
Set s = null;
- if(_pending.TryGetValue(ci, out s))
+ if(_pending.TryGetValue(c, out s))
{
- foreach(ConnectCallback c in s)
+ foreach(ConnectCallback cc in s)
{
- callbacks.Add(c);
+ if(cc.hasConnector(ci))
+ {
+ connectionCallbacks.Add(cc);
+ }
+ else
+ {
+ callbacks.Add(cc);
+ }
}
- _pending.Remove(ci);
+ _pending.Remove(c);
}
}
+
+ foreach(ConnectCallback cc in connectionCallbacks)
+ {
+ cc.removeFromPending();
+ callbacks.Remove(cc);
+ }
+ foreach(ConnectCallback cc in callbacks)
+ {
+ cc.removeFromPending();
+ }
Monitor.PulseAll(this);
+ }
- //
- // If the connect attempt succeeded and the communicator is not destroyed,
- // activate the connection!
- //
- if(connection != null && !_destroyed)
+ bool compress;
+ DefaultsAndOverrides defaultsAndOverrides = instance_.defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideCompress)
+ {
+ compress = defaultsAndOverrides.overrideCompressValue;
+ }
+ else
+ {
+ compress = ci.endpoint.compress();
+ }
+
+ foreach(ConnectCallback cc in callbacks)
+ {
+ cc.getConnection();
+ }
+ foreach(ConnectCallback cc in connectionCallbacks)
+ {
+ cc.setConnection(connection, compress);
+ }
+ }
+
+ private void finishGetConnection(List<ConnectorInfo> connectors, Ice.LocalException ex, ConnectCallback cb)
+ {
+ Set failedCallbacks = new Set();
+ if(cb != null)
+ {
+ failedCallbacks.Add(cb);
+ }
+
+ Set callbacks = new Set();
+ lock(this)
+ {
+ foreach(ConnectorInfo c in connectors)
{
- connection.activate();
+ Set s = null;
+ if(_pending.TryGetValue(c, out s))
+ {
+ foreach(ConnectCallback cc in s)
+ {
+ if(cc.removeConnectors(connectors))
+ {
+ failedCallbacks.Add(cc);
+ }
+ else
+ {
+ callbacks.Add(cc);
+ }
+ }
+ _pending.Remove(c);
+ }
}
+
+ foreach(ConnectCallback cc in callbacks)
+ {
+ Debug.Assert(!failedCallbacks.Contains(cc));
+ cc.removeFromPending();
+ }
+ Monitor.PulseAll(this);
}
- //
- // Notify any waiting callbacks.
- //
foreach(ConnectCallback cc in callbacks)
{
cc.getConnection();
}
+ foreach(ConnectCallback cc in failedCallbacks)
+ {
+ cc.setException(ex);
+ }
}
private void handleException(Ice.LocalException ex, ConnectorInfo ci, Ice.ConnectionI connection,
@@ -854,6 +891,59 @@ namespace IceInternal
}
}
+ private bool
+ addToPending(ConnectCallback cb, List<ConnectorInfo> connectors)
+ {
+ //
+ // Add the callback to each connector pending list.
+ //
+ bool found = false;
+ foreach(ConnectorInfo ci in connectors)
+ {
+ Set cbs = null;
+ if(_pending.TryGetValue(ci, out cbs))
+ {
+ found = true;
+ if(cb != null)
+ {
+ cbs.Add(cb); // Add the callback to each pending connector.
+ }
+ }
+ }
+
+ if(found)
+ {
+ return true;
+ }
+
+ //
+ // If there's no pending connection for the given connectors, we're
+ // responsible for its establishment. We add empty pending lists,
+ // other callbacks to the same connectors will be queued.
+ //
+ foreach(ConnectorInfo ci in connectors)
+ {
+ if(!_pending.ContainsKey(ci))
+ {
+ _pending.Add(ci, new Set());
+ }
+ }
+ return false;
+ }
+
+ private void
+ removeFromPending(ConnectCallback cb, List<ConnectorInfo> connectors)
+ {
+ foreach(ConnectorInfo ci in connectors)
+ {
+ Set cbs = null;
+ if(_pending.TryGetValue(ci, out cbs))
+ {
+ cbs.Remove(cb);
+ }
+ }
+ }
+
internal void handleException(Ice.LocalException ex, bool hasMore)
{
TraceLevels traceLevels = instance_.traceLevels();
@@ -922,20 +1012,8 @@ namespace IceInternal
//
public void connectionStartCompleted(Ice.ConnectionI connection)
{
- bool compress;
- DefaultsAndOverrides defaultsAndOverrides = _factory.instance_.defaultsAndOverrides();
- if(defaultsAndOverrides.overrideCompress)
- {
- compress = defaultsAndOverrides.overrideCompressValue;
- }
- else
- {
- compress = _current.endpoint.compress();
- }
-
- _factory.finishGetConnection(_connectors, this, connection);
- _callback.setConnection(connection, compress);
- _factory.decPendingConnectCount(); // Must be called last.
+ connection.activate();
+ _factory.finishGetConnection(_connectors, _current, connection, this);
}
public void connectionStartFailed(Ice.ConnectionI connection, Ice.LocalException ex)
@@ -943,9 +1021,7 @@ namespace IceInternal
_factory.handleException(ex, _current, connection, _hasMore || _iter < _connectors.Count);
if(ex is Ice.CommunicatorDestroyedException) // No need to continue.
{
- _factory.finishGetConnection(_connectors, this, null);
- _callback.setException(ex);
- _factory.decPendingConnectCount(); // Must be called last.
+ _factory.finishGetConnection(_connectors, ex, this);
}
else if(_iter < _connectors.Count) // Try the next connector.
{
@@ -953,9 +1029,7 @@ namespace IceInternal
}
else
{
- _factory.finishGetConnection(_connectors, this, null);
- _callback.setException(ex);
- _factory.decPendingConnectCount(); // Must be called last.
+ _factory.finishGetConnection(_connectors, ex, this);
}
}
@@ -1027,6 +1101,43 @@ namespace IceInternal
}
}
+ public void setConnection(Ice.ConnectionI connection, bool compress)
+ {
+ //
+ // Callback from the factory: the connection to one of the callback
+ // connectors has been established.
+ //
+ _callback.setConnection(connection, compress);
+ _factory.decPendingConnectCount(); // Must be called last.
+ }
+
+ public void setException(Ice.LocalException ex)
+ {
+ //
+ // Callback from the factory: connection establishment failed.
+ //
+ _callback.setException(ex);
+ _factory.decPendingConnectCount(); // Must be called last.
+ }
+
+ public bool hasConnector(ConnectorInfo ci)
+ {
+ return _connectors.Contains(ci);
+ }
+ public bool removeConnectors(List<ConnectorInfo> connectors)
+ {
+ foreach(ConnectorInfo ci in connectors)
+ {
+ while(_connectors.Remove(ci)); // Remove all of them.
+ }
+ return _connectors.Count == 0;
+ }
+
+ public void removeFromPending()
+ {
+ _factory.removeFromPending(this, _connectors);
+ }
+
public void getConnectors()
{
try
diff --git a/cs/test/Ice/binding/AllTests.cs b/cs/test/Ice/binding/AllTests.cs
index b6fdbe989dc..f38e3ccbd5b 100644
--- a/cs/test/Ice/binding/AllTests.cs
+++ b/cs/test/Ice/binding/AllTests.cs
@@ -54,6 +54,17 @@ public class AllTests
private string _name = null;
};
+ private class NoOpGetAdapterNameCB : AMI_TestIntf_getAdapterName
+ {
+ public override void ice_response(string name)
+ {
+ }
+
+ public override void ice_exception(Ice.Exception ex)
+ {
+ }
+ };
+
private static string getAdapterNameWithAMI(TestIntfPrx test)
{
GetAdapterNameCB cb = new GetAdapterNameCB();
@@ -112,6 +123,8 @@ public class AllTests
string @ref = "communicator:default -p 12010 -t 10000";
RemoteCommunicatorPrx com = RemoteCommunicatorPrxHelper.uncheckedCast(communicator.stringToProxy(@ref));
+ System.Random rand = new System.Random(unchecked((int)System.DateTime.Now.Ticks));
+
Console.Out.Write("testing binding with single endpoint... ");
Console.Out.Flush();
{
@@ -233,6 +246,107 @@ public class AllTests
}
Console.Out.WriteLine("ok");
+ Console.Out.Write("testing binding with multiple random endpoints... ");
+ Console.Out.Flush();
+ {
+ RemoteObjectAdapterPrx[] adapters = new RemoteObjectAdapterPrx[5];
+ adapters[0] = com.createObjectAdapter("AdapterRandom11", "default");
+ adapters[1] = com.createObjectAdapter("AdapterRandom12", "default");
+ adapters[2] = com.createObjectAdapter("AdapterRandom13", "default");
+ adapters[3] = com.createObjectAdapter("AdapterRandom14", "default");
+ adapters[4] = com.createObjectAdapter("AdapterRandom15", "default");
+
+ int count;
+ if(IceInternal.AssemblyUtil.platform_ == IceInternal.AssemblyUtil.Platform.Windows)
+ {
+ count = 20;
+ }
+ else
+ {
+ count = 60;
+ }
+
+ int adapterCount = adapters.Length;
+ while(--count > 0)
+ {
+ TestIntfPrx[] proxies;
+ if(IceInternal.AssemblyUtil.platform_ == IceInternal.AssemblyUtil.Platform.Windows)
+ {
+ if(count == 10)
+ {
+ com.deactivateObjectAdapter(adapters[4]);
+ --adapterCount;
+ }
+ proxies = new TestIntfPrx[10];
+ }
+ else
+ {
+ if(count < 60 && count % 10 == 0)
+ {
+ com.deactivateObjectAdapter(adapters[count / 10 - 1]);
+ --adapterCount;
+ }
+ proxies = new TestIntfPrx[40];
+ }
+
+ int i;
+ for(i = 0; i < proxies.Length; ++i)
+ {
+ RemoteObjectAdapterPrx[] adpts = new RemoteObjectAdapterPrx[rand.Next(adapters.Length)];
+ if(adpts.Length == 0)
+ {
+ adpts = new RemoteObjectAdapterPrx[1];
+ }
+ for(int j = 0; j < adpts.Length; ++j)
+ {
+ adpts[j] = adapters[rand.Next(adapters.Length)];
+ }
+ proxies[i] = createTestIntfPrx(new ArrayList(adpts));
+ }
+
+ for(i = 0; i < proxies.Length; i++)
+ {
+ proxies[i].getAdapterName_async(new NoOpGetAdapterNameCB());
+ }
+ for(i = 0; i < proxies.Length; i++)
+ {
+ try
+ {
+ proxies[i].ice_ping();
+ }
+ catch(Ice.LocalException)
+ {
+ }
+ }
+
+ ArrayList connections = new ArrayList();
+ for(i = 0; i < proxies.Length; i++)
+ {
+ if(proxies[i].ice_getCachedConnection() != null)
+ {
+ if(!connections.Contains(proxies[i].ice_getCachedConnection()))
+ {
+ connections.Add(proxies[i].ice_getCachedConnection());
+ }
+ }
+ }
+ test(connections.Count <= adapterCount);
+
+ foreach(RemoteObjectAdapterPrx a in adapters)
+ {
+ try
+ {
+ a.getTestIntf().ice_getConnection().close(false);
+ }
+ catch(Ice.LocalException)
+ {
+ // Expected if adapter is down.
+ }
+ }
+ }
+ }
+ Console.Out.WriteLine("ok");
+
Console.Out.Write("testing binding with multiple endpoints and AMI... ");
Console.Out.Flush();
{
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java
index 51881390f20..563368f9f22 100644
--- a/java/src/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/IceInternal/OutgoingConnectionFactory.java
@@ -188,9 +188,10 @@ public final class OutgoingConnectionFactory
//
DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
java.util.Iterator<ConnectorInfo> q = connectors.iterator();
+ ConnectorInfo ci = null;
while(q.hasNext())
{
- ConnectorInfo ci = q.next();
+ ci = q.next();
try
{
connection = createConnection(ci.connector.connect(), ci);
@@ -204,7 +205,7 @@ public final class OutgoingConnectionFactory
{
compress.value = ci.endpoint.compress();
}
-
+ connection.activate();
break;
}
catch(Ice.CommunicatorDestroyedException ex)
@@ -226,7 +227,14 @@ public final class OutgoingConnectionFactory
// Finish creating the connection (this removes the connectors from the _pending
// list and notifies any waiting threads).
//
- finishGetConnection(connectors, null, connection);
+ if(connection != null)
+ {
+ finishGetConnection(connectors, ci, connection, null);
+ }
+ else
+ {
+ finishGetConnection(connectors, exception, null);
+ }
if(connection == null)
{
@@ -489,6 +497,11 @@ public final class OutgoingConnectionFactory
while(p.hasNext())
{
ConnectorInfo ci = p.next();
+ if(_pending.containsKey(ci))
+ {
+ continue;
+ }
+
java.util.List<Ice.ConnectionI> connectionList = _connections.get(ci);
if(connectionList == null)
{
@@ -619,64 +632,23 @@ public final class OutgoingConnectionFactory
// finish if one of them is currently establishing a connection to one
// of our connectors.
//
- while(!_destroyed)
+ while(true)
{
+ if(_destroyed)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
+
//
// Search for a matching connection. If we find one, we're done.
//
Ice.ConnectionI connection = findConnection(connectors, compress);
if(connection != null)
{
- if(cb != null)
- {
- //
- // This might not be the first getConnection call for the callback. We need
- // to ensure that the callback isn't registered with any other pending
- // connectors since we just found a connection and therefore don't need to
- // wait anymore for other pending connectors.
- //
- java.util.Iterator<ConnectorInfo> p = connectors.iterator();
- while(p.hasNext())
- {
- java.util.Set<ConnectCallback> cbs = _pending.get(p.next());
- if(cbs != null)
- {
- cbs.remove(cb);
- }
- }
- }
return connection;
}
-
- //
- // Determine whether another thread is currently attempting to connect to one of our endpoints;
- // if so we wait until it's done.
- //
- java.util.Iterator<ConnectorInfo> p = connectors.iterator();
- boolean found = false;
- while(p.hasNext())
- {
- java.util.Set<ConnectCallback> cbs = _pending.get(p.next());
- if(cbs != null)
- {
- found = true;
- if(cb != null)
- {
- cbs.add(cb); // Add the callback to each pending connector.
- }
- }
- }
- if(!found)
- {
- //
- // If no thread is currently establishing a connection to one of our connectors,
- // we get out of this loop and start the connection establishment to one of the
- // given connectors.
- //
- break;
- }
- else
+ if(addToPending(cb, connectors))
{
//
// If a callback is not specified we wait until another thread notifies us about a
@@ -699,25 +671,14 @@ public final class OutgoingConnectionFactory
return null;
}
}
- }
-
- if(_destroyed)
- {
- throw new Ice.CommunicatorDestroyedException();
- }
-
- //
- // No connection to any of our endpoints exists yet; we add the given connectors to
- // the _pending set to indicate that we're attempting connection establishment to
- // these connectors. We might attempt to connect to the same connector multiple times.
- //
- java.util.Iterator<ConnectorInfo> p = connectors.iterator();
- while(p.hasNext())
- {
- ConnectorInfo obj = p.next();
- if(!_pending.containsKey(obj))
+ else
{
- _pending.put(obj, new java.util.HashSet<ConnectCallback>());
+ //
+ // If no thread is currently establishing a connection to one of our connectors,
+ // we get out of this loop and start the connection establishment to one of the
+ // given connectors.
+ //
+ break;
}
}
}
@@ -786,46 +747,180 @@ public final class OutgoingConnectionFactory
}
private void
- finishGetConnection(java.util.List<ConnectorInfo> connectors, ConnectCallback cb, Ice.ConnectionI connection)
+ finishGetConnection(java.util.List<ConnectorInfo> connectors,
+ ConnectorInfo ci,
+ Ice.ConnectionI connection,
+ ConnectCallback cb)
{
- java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>();
+ java.util.Set<ConnectCallback> connectionCallbacks = new java.util.HashSet<ConnectCallback>();
+ if(cb != null)
+ {
+ connectionCallbacks.add(cb);
+ }
+ java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>();
synchronized(this)
{
- //
- // We're done trying to connect to the given connectors so we remove the
- // connectors from the pending list and notify waiting threads. We also
- // notify the pending connect callbacks (outside the synchronization).
- //
-
java.util.Iterator<ConnectorInfo> p = connectors.iterator();
while(p.hasNext())
{
- java.util.Set<ConnectCallback> cbs = _pending.remove(p.next());
+ ConnectorInfo c = p.next();
+ java.util.Set<ConnectCallback> cbs = _pending.remove(c);
if(cbs != null)
{
- callbacks.addAll(cbs);
+ for(ConnectCallback cc : cbs)
+ {
+ if(cc.hasConnector(ci))
+ {
+ connectionCallbacks.add(cc);
+ }
+ else
+ {
+ callbacks.add(cc);
+ }
+ }
}
}
+
+ for(ConnectCallback cc : connectionCallbacks)
+ {
+ cc.removeFromPending();
+ callbacks.remove(cc);
+ }
+ for(ConnectCallback cc : callbacks)
+ {
+ cc.removeFromPending();
+ }
notifyAll();
+ }
+
+ boolean compress;
+ DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideCompress)
+ {
+ compress = defaultsAndOverrides.overrideCompressValue;
+ }
+ else
+ {
+ compress = ci.endpoint.compress();
+ }
- //
- // If the connect attempt succeeded and the communicator is not destroyed,
- // activate the connection!
- //
- if(connection != null && !_destroyed)
+ for(ConnectCallback cc : callbacks)
+ {
+ cc.getConnection();
+ }
+ for(ConnectCallback cc : connectionCallbacks)
+ {
+ cc.setConnection(connection, compress);
+ }
+ }
+
+ private void
+ finishGetConnection(java.util.List<ConnectorInfo> connectors, Ice.LocalException ex, ConnectCallback cb)
+ {
+ java.util.Set<ConnectCallback> failedCallbacks = new java.util.HashSet<ConnectCallback>();
+ if(cb != null)
+ {
+ failedCallbacks.add(cb);
+ }
+
+ java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>();
+ synchronized(this)
+ {
+ java.util.Iterator<ConnectorInfo> p = connectors.iterator();
+ while(p.hasNext())
{
- connection.activate();
+ ConnectorInfo c = p.next();
+ java.util.Set<ConnectCallback> cbs = _pending.remove(c);
+ if(cbs != null)
+ {
+ for(ConnectCallback cc : cbs)
+ {
+ if(cc.removeConnectors(connectors))
+ {
+ failedCallbacks.add(cc);
+ }
+ else
+ {
+ callbacks.add(cc);
+ }
+ }
+ }
+ }
+
+ for(ConnectCallback cc : callbacks)
+ {
+ assert(!failedCallbacks.contains(cc));
+ cc.removeFromPending();
}
+ notifyAll();
}
+ for(ConnectCallback cc : callbacks)
+ {
+ cc.getConnection();
+ }
+ for(ConnectCallback cc : failedCallbacks)
+ {
+ cc.setException(ex);
+ }
+ }
+
+ private boolean
+ addToPending(ConnectCallback cb, java.util.List<ConnectorInfo> connectors)
+ {
//
- // Notify any waiting callbacks.
+ // Add the callback to each connector pending list.
//
- java.util.Iterator<ConnectCallback> p = callbacks.iterator();
+ java.util.Iterator<ConnectorInfo> p = connectors.iterator();
+ boolean found = false;
+ while(p.hasNext())
+ {
+ java.util.Set<ConnectCallback> cbs = _pending.get(p.next());
+ if(cbs != null)
+ {
+ found = true;
+ if(cb != null)
+ {
+ cbs.add(cb); // Add the callback to each pending connector.
+ }
+ }
+ }
+
+ if(found)
+ {
+ return true;
+ }
+
+ //
+ // If there's no pending connection for the given connectors, we're
+ // responsible for its establishment. We add empty pending lists,
+ // other callbacks to the same connectors will be queued.
+ //
+ p = connectors.iterator();
+ while(p.hasNext())
+ {
+ ConnectorInfo obj = p.next();
+ if(!_pending.containsKey(obj))
+ {
+ _pending.put(obj, new java.util.HashSet<ConnectCallback>());
+ }
+ }
+
+ return false;
+ }
+
+ private void
+ removeFromPending(ConnectCallback cb, java.util.List<ConnectorInfo> connectors)
+ {
+ java.util.Iterator<ConnectorInfo> p = connectors.iterator();
while(p.hasNext())
{
- p.next().getConnection();
+ java.util.Set<ConnectCallback> cbs = _pending.get(p.next());
+ if(cbs != null)
+ {
+ cbs.remove(cb);
+ }
}
}
@@ -954,27 +1049,15 @@ public final class OutgoingConnectionFactory
_selType = selType;
_endpointsIter = _endpoints.iterator();
}
-
+
//
// Methods from ConnectionI.StartCallback
//
public void
connectionStartCompleted(Ice.ConnectionI connection)
{
- boolean compress;
- DefaultsAndOverrides defaultsAndOverrides = _factory._instance.defaultsAndOverrides();
- if(defaultsAndOverrides.overrideCompress)
- {
- compress = defaultsAndOverrides.overrideCompressValue;
- }
- else
- {
- compress = _current.endpoint.compress();
- }
-
- _factory.finishGetConnection(_connectors, this, connection);
- _callback.setConnection(connection, compress);
- _factory.decPendingConnectCount(); // Must be called last.
+ connection.activate();
+ _factory.finishGetConnection(_connectors, _current, connection, this);
}
public void
@@ -985,9 +1068,7 @@ public final class OutgoingConnectionFactory
_factory.handleException(ex, _current, connection, _hasMore || _iter.hasNext());
if(ex instanceof Ice.CommunicatorDestroyedException) // No need to continue.
{
- _factory.finishGetConnection(_connectors, this, null);
- _callback.setException(ex);
- _factory.decPendingConnectCount(); // Must be called last.
+ _factory.finishGetConnection(_connectors, ex, this);
}
else if(_iter.hasNext()) // Try the next connector.
{
@@ -995,9 +1076,7 @@ public final class OutgoingConnectionFactory
}
else
{
- _factory.finishGetConnection(_connectors, this, null);
- _callback.setException(ex);
- _factory.decPendingConnectCount(); // Must be called last.
+ _factory.finishGetConnection(_connectors, ex, this);
}
}
@@ -1062,6 +1141,47 @@ public final class OutgoingConnectionFactory
}
}
+ public void
+ setConnection(Ice.ConnectionI connection, boolean compress)
+ {
+ //
+ // Callback from the factory: the connection to one of the callback
+ // connectors has been established.
+ //
+ _callback.setConnection(connection, compress);
+ _factory.decPendingConnectCount(); // Must be called last.
+ }
+
+ public void
+ setException(Ice.LocalException ex)
+ {
+ //
+ // Callback from the factory: connection establishment failed.
+ //
+ _callback.setException(ex);
+ _factory.decPendingConnectCount(); // Must be called last.
+ }
+
+ public boolean
+ hasConnector(ConnectorInfo ci)
+ {
+ return _connectors.contains(ci);
+ }
+
+ public boolean
+ removeConnectors(java.util.List<ConnectorInfo> connectors)
+ {
+ _connectors.removeAll(connectors);
+ _iter = _connectors.iterator();
+ return _connectors.isEmpty();
+ }
+
+ public void
+ removeFromPending()
+ {
+ _factory.removeFromPending(this, _connectors);
+ }
+
void
getConnectors()
{
diff --git a/java/test/Ice/background/Connector.java b/java/test/Ice/background/Connector.java
index 4a386fb3e0c..b737a1400ef 100644
--- a/java/test/Ice/background/Connector.java
+++ b/java/test/Ice/background/Connector.java
@@ -59,7 +59,7 @@ final class Connector implements IceInternal.Connector
if(this == p)
{
- return false;
+ return true;
}
return _connector.equals(p._connector);
diff --git a/java/test/Ice/binding/AllTests.java b/java/test/Ice/binding/AllTests.java
index 242c25093f4..c6e4e31b4f9 100644
--- a/java/test/Ice/binding/AllTests.java
+++ b/java/test/Ice/binding/AllTests.java
@@ -20,6 +20,24 @@ public class AllTests
}
}
+ static class NoOpGetAdapterNameCB extends AMI_TestIntf_getAdapterName
+ {
+ public
+ void ice_response(String name)
+ {
+ }
+
+ public void
+ ice_exception(Ice.LocalException ex)
+ {
+ }
+
+ public void
+ ice_exception(Ice.UserException ex)
+ {
+ }
+ };
+
static class GetAdapterNameCB extends AMI_TestIntf_getAdapterName
{
synchronized public void
@@ -225,6 +243,106 @@ public class AllTests
}
System.out.println("ok");
+ System.out.print("testing binding with multiple random endpoints... ");
+ System.out.flush();
+ {
+ java.util.Random rand = new java.util.Random();
+
+ RemoteObjectAdapterPrx[] adapters = new RemoteObjectAdapterPrx[5];
+ adapters[0] = com.createObjectAdapter("AdapterRandom11", "default");
+ adapters[1] = com.createObjectAdapter("AdapterRandom12", "default");
+ adapters[2] = com.createObjectAdapter("AdapterRandom13", "default");
+ adapters[3] = com.createObjectAdapter("AdapterRandom14", "default");
+ adapters[4] = com.createObjectAdapter("AdapterRandom15", "default");
+
+ int count;
+ if(System.getProperty("os.name").startsWith("Windows"))
+ {
+ count = 20;
+ }
+ else
+ {
+ count = 60;
+ }
+
+ int adapterCount = adapters.length;
+ while(--count > 0)
+ {
+ TestIntfPrx[] proxies;
+ if(System.getProperty("os.name").startsWith("Windows"))
+ {
+ if(count == 10)
+ {
+ com.deactivateObjectAdapter(adapters[4]);
+ --adapterCount;
+ }
+ proxies = new TestIntfPrx[10];
+ }
+ else
+ {
+ if(count < 60 && count % 10 == 0)
+ {
+ com.deactivateObjectAdapter(adapters[count / 10 - 1]);
+ --adapterCount;
+ }
+ proxies = new TestIntfPrx[40];
+ }
+
+ int i;
+ for(i = 0; i < proxies.length; ++i)
+ {
+ RemoteObjectAdapterPrx[] adpts = new RemoteObjectAdapterPrx[rand.nextInt(adapters.length)];
+ if(adpts.length == 0)
+ {
+ adpts = new RemoteObjectAdapterPrx[1];
+ }
+ for(int j = 0; j < adpts.length; ++j)
+ {
+ adpts[j] = adapters[rand.nextInt(adapters.length)];
+ }
+ proxies[i] = createTestIntfPrx(java.util.Arrays.asList((adpts)));
+ }
+
+ for(i = 0; i < proxies.length; i++)
+ {
+ proxies[i].getAdapterName_async(new NoOpGetAdapterNameCB());
+ }
+ for(i = 0; i < proxies.length; i++)
+ {
+ try
+ {
+ proxies[i].ice_ping();
+ }
+ catch(Ice.LocalException ex)
+ {
+ }
+ }
+
+ java.util.Set<Ice.Connection> connections = new java.util.HashSet<Ice.Connection>();
+ for(i = 0; i < proxies.length; i++)
+ {
+ if(proxies[i].ice_getCachedConnection() != null)
+ {
+ connections.add(proxies[i].ice_getCachedConnection());
+ }
+ }
+ test(connections.size() <= adapterCount);
+
+ for(RemoteObjectAdapterPrx a : adapters)
+ {
+ try
+ {
+ a.getTestIntf().ice_getConnection().close(false);
+ }
+ catch(Ice.LocalException ex)
+ {
+ // Expected if adapter is down.
+ }
+ }
+ }
+ }
+ System.out.println("ok");
+
System.out.print("testing binding with multiple endpoints and AMI... ");
System.out.flush();
{