diff options
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 356 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.h | 18 | ||||
-rw-r--r-- | cpp/src/Ice/Reference.cpp | 94 | ||||
-rw-r--r-- | cpp/test/Ice/background/AllTests.cpp | 46 | ||||
-rw-r--r-- | cpp/test/Ice/binding/AllTests.cpp | 105 | ||||
-rwxr-xr-x | cpp/test/Ice/gc/run.py | 5 | ||||
-rw-r--r-- | cs/src/Ice/ConnectionFactory.cs | 325 | ||||
-rw-r--r-- | cs/test/Ice/binding/AllTests.cs | 114 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingConnectionFactory.java | 338 | ||||
-rw-r--r-- | java/test/Ice/background/Connector.java | 2 | ||||
-rw-r--r-- | java/test/Ice/binding/AllTests.java | 118 |
11 files changed, 1141 insertions, 380 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index a275f61e292..107887e6501 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -55,6 +55,12 @@ IceInternal::OutgoingConnectionFactory::ConnectorInfo::operator<(const Connector return connector < other.connector; } +bool +IceInternal::OutgoingConnectionFactory::ConnectorInfo::operator==(const ConnectorInfo& other) const +{ + return connector == other.connector; +} + void IceInternal::OutgoingConnectionFactory::destroy() { @@ -193,7 +199,8 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt // Try to establish the connection to the connectors. // DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); - for(vector<ConnectorInfo>::const_iterator q = connectors.begin(); q != connectors.end(); ++q) + vector<ConnectorInfo>::const_iterator q; + for(q = connectors.begin(); q != connectors.end(); ++q) { try { @@ -209,6 +216,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt compress = q->endpoint->compress(); } + connection->activate(); break; } catch(const Ice::CommunicatorDestroyedException& ex) @@ -230,7 +238,14 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt // Finish creating the connection (this removes the connectors from the _pending // list and notifies any waiting threads). // - finishGetConnection(connectors, 0, connection); + if(connection) + { + finishGetConnection(connectors, *q, connection, 0); + } + else + { + finishGetConnection(connectors, *exception.get(), 0); + } if(!connection) { @@ -425,7 +440,7 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) { pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, - multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connectionsByEndpoint.equal_range(*p); + multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connectionsByEndpoint.equal_range(*p); for(multimap<EndpointIPtr, ConnectionIPtr>::iterator q = pr.first; q != pr.second; ++q) { @@ -454,8 +469,13 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<ConnectorInf DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) { + if(_pending.find(*p) != _pending.end()) + { + continue; + } + pair<multimap<ConnectorInfo, ConnectionIPtr>::iterator, - multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(*p); + multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(*p); if(pr.first == pr.second) { @@ -520,7 +540,8 @@ IceInternal::OutgoingConnectionFactory::decPendingConnectCount() ConnectionIPtr IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo>& connectors, - const ConnectCallbackPtr& cb, bool& compress) + const ConnectCallbackPtr& cb, + bool& compress) { { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -563,62 +584,27 @@ IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo // finish if one of them is currently establishing a connection to one // of our connectors. // - while(!_destroyed) + while(true) { + if(_destroyed) + { + throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); + } + // // Search for a matching connection. If we find one, we're done. // Ice::ConnectionIPtr connection = findConnection(connectors, compress); if(connection) { - if(cb) - { - // - // This might not be the first getConnection call for the callback. We need - // to ensure that the callback isn't registered with any other pending - // connectors since we just found a connection and therefore don't need to - // wait anymore for other pending connectors. - // - for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) - { - map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); - if(q != _pending.end()) - { - q->second.erase(cb); - } - } - } return connection; } // - // Determine whether another thread is currently attempting to connect to one of our endpoints; - // if so we wait until it's done. + // Determine whether another thread/request is currently attempting to connect to + // one of our endpoints; if so we wait until it's done. // - bool found = false; - for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) - { - map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); - if(q != _pending.end()) - { - found = true; - if(cb) - { - q->second.insert(cb); // Add the callback to each pending connector. - } - } - } - - if(!found) - { - // - // If no thread is currently establishing a connection to one of our connectors, - // we get out of this loop and start the connection establishment to one of the - // given connectors. - // - break; - } - else + if(addToPending(cb, connectors)) { // // If a callback is not specified we wait until another thread notifies us about a @@ -635,23 +621,14 @@ IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo return 0; } } - } - - if(_destroyed) - { - throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); - } - - // - // No connection to any of our endpoints exists yet; we add the given connectors to - // the _pending set to indicate that we're attempting connection establishment to - // these connectors. We might attempt to connect to the same connector multiple times. - // - for(vector<ConnectorInfo>::const_iterator r = connectors.begin(); r != connectors.end(); ++r) - { - if(_pending.find(*r) == _pending.end()) + else { - _pending.insert(pair<ConnectorInfo, set<ConnectCallbackPtr> >(*r, set<ConnectCallbackPtr>())); + // + // If no thread is currently establishing a connection to one of our connectors, + // we get out of this loop and start the connection establishment to one of the + // given connectors. + // + break; } } } @@ -709,47 +686,179 @@ IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& t void IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<ConnectorInfo>& connectors, - const ConnectCallbackPtr& cb, - const ConnectionIPtr& connection) + const ConnectorInfo& ci, + const ConnectionIPtr& connection, + const ConnectCallbackPtr& cb) { + set<ConnectCallbackPtr> connectionCallbacks; + if(cb) + { + connectionCallbacks.insert(cb); + } + set<ConnectCallbackPtr> callbacks; + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); + if(q != _pending.end()) + { + for(set<ConnectCallbackPtr>::const_iterator r = q->second.begin(); r != q->second.end(); ++r) + { + if((*r)->hasConnector(ci)) + { + connectionCallbacks.insert(*r); + } + else + { + callbacks.insert(*r); + } + } + _pending.erase(q); + } + } + + set<ConnectCallbackPtr>::iterator r; + for(r = connectionCallbacks.begin(); r != connectionCallbacks.end(); ++r) + { + (*r)->removeFromPending(); + callbacks.erase(*r); + } + for(r = callbacks.begin(); r != callbacks.end(); ++r) + { + (*r)->removeFromPending(); + } + notifyAll(); + } + bool compress; + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + if(defaultsAndOverrides->overrideCompress) + { + compress = defaultsAndOverrides->overrideCompressValue; + } + else + { + compress = ci.endpoint->compress(); + } + + set<ConnectCallbackPtr>::const_iterator p; + for(p = callbacks.begin(); p != callbacks.end(); ++p) + { + (*p)->getConnection(); + } + for(p = connectionCallbacks.begin(); p != connectionCallbacks.end(); ++p) + { + (*p)->setConnection(connection, compress); + } +} + +void +IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<ConnectorInfo>& connectors, + const Ice::LocalException& ex, + const ConnectCallbackPtr& cb) +{ + set<ConnectCallbackPtr> failedCallbacks; + if(cb) + { + failedCallbacks.insert(cb); + } + + set<ConnectCallbackPtr> callbacks; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // We're done trying to connect to the given connectors so we remove the - // connectors from the pending list and notify waiting threads. We also - // notify the pending connect callbacks (outside the synchronization). - // - for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) { map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); if(q != _pending.end()) { - callbacks.insert(q->second.begin(), q->second.end()); + for(set<ConnectCallbackPtr>::const_iterator r = q->second.begin(); r != q->second.end(); ++r) + { + if((*r)->removeConnectors(connectors)) + { + failedCallbacks.insert(*r); + } + else + { + callbacks.insert(*r); + } + } _pending.erase(q); } } + + for(set<ConnectCallbackPtr>::iterator r = callbacks.begin(); r != callbacks.end(); ++r) + { + assert(failedCallbacks.find(*r) == failedCallbacks.end()); + (*r)->removeFromPending(); + } notifyAll(); + } + + set<ConnectCallbackPtr>::const_iterator p; + for(p = callbacks.begin(); p != callbacks.end(); ++p) + { + (*p)->getConnection(); + } + for(p = failedCallbacks.begin(); p != failedCallbacks.end(); ++p) + { + (*p)->setException(ex); + } +} - // - // If the connect attempt succeeded and the communicator is not destroyed, - // activate the connection! - // - if(connection && !_destroyed) +bool +IceInternal::OutgoingConnectionFactory::addToPending(const ConnectCallbackPtr& cb, + const vector<ConnectorInfo>& connectors) +{ + // + // Add the callback to each connector pending list. + // + bool found = false; + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); + if(q != _pending.end()) { - connection->activate(); + found = true; + if(cb) + { + q->second.insert(cb); + } } } + if(found) + { + return true; + } + // - // Notify any waiting callbacks. + // If there's no pending connection for the given connectors, we're + // responsible for its establishment. We add empty pending lists, + // other callbacks to the same connectors will be queued. // - for(set<ConnectCallbackPtr>::const_iterator p = callbacks.begin(); p != callbacks.end(); ++p) + for(vector<ConnectorInfo>::const_iterator r = connectors.begin(); r != connectors.end(); ++r) { - (*p)->getConnection(); + if(_pending.find(*r) == _pending.end()) + { + _pending.insert(pair<ConnectorInfo, set<ConnectCallbackPtr> >(*r, set<ConnectCallbackPtr>())); + } + } + return false; +} + +void +IceInternal::OutgoingConnectionFactory::removeFromPending(const ConnectCallbackPtr& cb, + const vector<ConnectorInfo>& connectors) +{ + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); + if(q != _pending.end()) + { + q->second.erase(cb); + } } } @@ -790,7 +899,7 @@ IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); pair<multimap<ConnectorInfo, ConnectionIPtr>::iterator, - multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(ci); + multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(ci); for(multimap<ConnectorInfo, ConnectionIPtr>::iterator p = pr.first; p != pr.second; ++p) { @@ -802,7 +911,7 @@ IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex } pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, - multimap<EndpointIPtr, ConnectionIPtr>::iterator> qr = _connectionsByEndpoint.equal_range(ci.endpoint); + multimap<EndpointIPtr, ConnectionIPtr>::iterator> qr = _connectionsByEndpoint.equal_range(ci.endpoint); for(multimap<EndpointIPtr, ConnectionIPtr>::iterator q = qr.first; q != qr.second; ++q) { @@ -864,20 +973,8 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::ConnectCallback(const O void IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartCompleted(const ConnectionIPtr& connection) { - bool compress; - DefaultsAndOverridesPtr defaultsAndOverrides = _factory->_instance->defaultsAndOverrides(); - if(defaultsAndOverrides->overrideCompress) - { - compress = defaultsAndOverrides->overrideCompressValue; - } - else - { - compress = _iter->endpoint->compress(); - } - - _factory->finishGetConnection(_connectors, this, connection); - _callback->setConnection(connection, compress); - _factory->decPendingConnectCount(); // Must be called last. + connection->activate(); + _factory->finishGetConnection(_connectors, *_iter, connection, this); } void @@ -889,9 +986,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(c _factory->handleException(ex, *_iter, connection, _hasMore || _iter != _connectors.end() - 1); if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) // No need to continue. { - _factory->finishGetConnection(_connectors, this, 0); - _callback->setException(ex); - _factory->decPendingConnectCount(); // Must be called last. + _factory->finishGetConnection(_connectors, ex, this); } else if(++_iter != _connectors.end()) // Try the next connector. { @@ -899,9 +994,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(c } else { - _factory->finishGetConnection(_connectors, this, 0); - _callback->setException(ex); - _factory->decPendingConnectCount(); // Must be called last. + _factory->finishGetConnection(_connectors, ex, this); } } @@ -1016,7 +1109,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection() // A null return value from getConnection indicates that the connection // is being established and that everthing has been done to ensure that // the callback will be notified when the connection establishment is - // done. + // done or that the callback already obtain the connection. // return; } @@ -1050,6 +1143,55 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector() } } +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::setConnection(const Ice::ConnectionIPtr& connection, + bool compress) +{ + // + // Callback from the factory: the connection to one of the callback + // connectors has been established. + // + _callback->setConnection(connection, compress); + _factory->decPendingConnectCount(); // Must be called last. +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::setException(const Ice::LocalException& ex) +{ + // + // Callback from the factory: connection establishment failed. + // + _callback->setException(ex); + _factory->decPendingConnectCount(); // Must be called last. +} + +bool +IceInternal::OutgoingConnectionFactory::ConnectCallback::hasConnector(const ConnectorInfo& ci) +{ + return find(_connectors.begin(), _connectors.end(), ci) != _connectors.end(); +} + +bool +IceInternal::OutgoingConnectionFactory::ConnectCallback::removeConnectors(const vector<ConnectorInfo>& connectors) +{ + // + // Callback from the factory: connecting to the given connectors + // failed, we remove the connectors and return true if there's + // no more connectors left to try. + // + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + _connectors.erase(remove(_connectors.begin(), _connectors.end(), *p), _connectors.end()); + } + return _connectors.empty(); +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::removeFromPending() +{ + _factory->removeFromPending(this, _connectors); +} + bool IceInternal::OutgoingConnectionFactory::ConnectCallback::operator<(const ConnectCallback& rhs) const { diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h index 7bd97ab70e1..89ef6bc514b 100644 --- a/cpp/src/Ice/ConnectionFactory.h +++ b/cpp/src/Ice/ConnectionFactory.h @@ -75,6 +75,7 @@ private: } bool operator<(const ConnectorInfo& other) const; + bool operator==(const ConnectorInfo& other) const; ConnectorPtr connector; EndpointIPtr endpoint; @@ -99,8 +100,15 @@ private: void getConnection(); void nextConnector(); + void setConnection(const Ice::ConnectionIPtr&, bool); + void setException(const Ice::LocalException&); + + bool hasConnector(const ConnectorInfo&); + bool removeConnectors(const std::vector<ConnectorInfo>&); + void removeFromPending(); + bool operator<(const ConnectCallback&) const; - + private: const OutgoingConnectionFactoryPtr _factory; @@ -120,7 +128,13 @@ private: void incPendingConnectCount(); void decPendingConnectCount(); Ice::ConnectionIPtr getConnection(const std::vector<ConnectorInfo>&, const ConnectCallbackPtr&, bool&); - void finishGetConnection(const std::vector<ConnectorInfo>&, const ConnectCallbackPtr&, const Ice::ConnectionIPtr&); + void finishGetConnection(const std::vector<ConnectorInfo>&, const ConnectorInfo&, const Ice::ConnectionIPtr&, + const ConnectCallbackPtr&); + void finishGetConnection(const std::vector<ConnectorInfo>&, const Ice::LocalException&, const ConnectCallbackPtr&); + + bool addToPending(const ConnectCallbackPtr&, const std::vector<ConnectorInfo>&); + void removeFromPending(const ConnectCallbackPtr&, const std::vector<ConnectorInfo>&); + Ice::ConnectionIPtr findConnection(const std::vector<ConnectorInfo>&, bool&); Ice::ConnectionIPtr createConnection(const TransceiverPtr&, const ConnectorInfo&); diff --git a/cpp/src/Ice/Reference.cpp b/cpp/src/Ice/Reference.cpp index d3cc01479d6..29c177567c0 100644 --- a/cpp/src/Ice/Reference.cpp +++ b/cpp/src/Ice/Reference.cpp @@ -1678,29 +1678,29 @@ IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& all virtual void setConnection(const Ice::ConnectionIPtr& connection, bool compress) + { + // + // If we have a router, set the object adapter for this router + // (if any) to the new connection, so that callbacks from the + // router can be received over this new connection. + // + if(_routerInfo && _routerInfo->getAdapter()) { - // - // If we have a router, set the object adapter for this router - // (if any) to the new connection, so that callbacks from the - // router can be received over this new connection. - // - if(_routerInfo && _routerInfo->getAdapter()) - { - connection->setAdapter(_routerInfo->getAdapter()); - } - _callback->setConnection(connection, compress); + connection->setAdapter(_routerInfo->getAdapter()); } + _callback->setConnection(connection, compress); + } virtual void setException(const Ice::LocalException& ex) - { - _callback->setException(ex); - } - + { + _callback->setException(ex); + } + CB1(const RouterInfoPtr& routerInfo, const GetConnectionCallbackPtr& callback) : - _routerInfo(routerInfo), _callback(callback) - { - } + _routerInfo(routerInfo), _callback(callback) + { + } private: @@ -1723,49 +1723,49 @@ IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& all virtual void setConnection(const Ice::ConnectionIPtr& connection, bool compress) + { + // + // If we have a router, set the object adapter for this router + // (if any) to the new connection, so that callbacks from the + // router can be received over this new connection. + // + if(_reference->getRouterInfo() && _reference->getRouterInfo()->getAdapter()) { - // - // If we have a router, set the object adapter for this router - // (if any) to the new connection, so that callbacks from the - // router can be received over this new connection. - // - if(_reference->getRouterInfo() && _reference->getRouterInfo()->getAdapter()) - { - connection->setAdapter(_reference->getRouterInfo()->getAdapter()); - } - _callback->setConnection(connection, compress); + connection->setAdapter(_reference->getRouterInfo()->getAdapter()); } + _callback->setConnection(connection, compress); + } virtual void setException(const Ice::LocalException& ex) + { + if(!_exception.get()) { - if(!_exception.get()) - { - _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); - } - - if(++_i == _endpoints.size()) - { - _callback->setException(*_exception.get()); - return; - } - - const bool more = _i != _endpoints.size() - 1; - vector<EndpointIPtr> endpoint; - endpoint.push_back(_endpoints[_i]); - - OutgoingConnectionFactoryPtr factory = _reference->getInstance()->outgoingConnectionFactory(); - factory->create(endpoint, more, _reference->getEndpointSelection(), this); + _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); } - + + if(++_i == _endpoints.size()) + { + _callback->setException(*_exception.get()); + return; + } + + const bool more = _i != _endpoints.size() - 1; + vector<EndpointIPtr> endpoint; + endpoint.push_back(_endpoints[_i]); + + OutgoingConnectionFactoryPtr factory = _reference->getInstance()->outgoingConnectionFactory(); + factory->create(endpoint, more, _reference->getEndpointSelection(), this); + } + CB2(const RoutableReferencePtr& reference, const vector<EndpointIPtr>& endpoints, const GetConnectionCallbackPtr& callback) : _reference(reference), _endpoints(endpoints), _callback(callback), _i(0) - { - } + { + } private: diff --git a/cpp/test/Ice/background/AllTests.cpp b/cpp/test/Ice/background/AllTests.cpp index 5da5b39ee7b..1aa19dbefa8 100644 --- a/cpp/test/Ice/background/AllTests.cpp +++ b/cpp/test/Ice/background/AllTests.cpp @@ -701,6 +701,11 @@ validationTests(const ConfigurationPtr& configuration, { configuration->readException(0); } + catch(const Ice::LocalException& ex) + { + cerr << ex << endl; + test(false); + } OpExAMICallbackPtr cbEx = new OpExAMICallback(); @@ -743,6 +748,11 @@ validationTests(const ConfigurationPtr& configuration, configuration->readException(0); configuration->readReady(true); } + catch(const Ice::LocalException& ex) + { + cerr << ex << endl; + test(false); + } configuration->readReady(false); configuration->readException(new Ice::SocketException(__FILE__, __LINE__)); @@ -781,6 +791,11 @@ validationTests(const ConfigurationPtr& configuration, { ctl->writeException(false); } + catch(const Ice::LocalException& ex) + { + cerr << ex << endl; + test(false); + } try { @@ -809,7 +824,11 @@ validationTests(const ConfigurationPtr& configuration, ctl->writeException(false); ctl->writeReady(true); } - + catch(const Ice::LocalException& ex) + { + cerr << ex << endl; + test(false); + } Ice::ByteSeq seq; seq.resize(512 * 1024); @@ -833,7 +852,15 @@ validationTests(const ConfigurationPtr& configuration, backgroundBatchOneway->op(); backgroundBatchOneway->op(); ctl->resumeAdapter(); - backgroundBatchOneway->ice_flushBatchRequests(); + try + { + backgroundBatchOneway->ice_flushBatchRequests(); + } + catch(const Ice::LocalException& ex) + { + cerr << ex << endl; + test(false); + } // // Send bigger requests to test with auto-flushing. @@ -853,7 +880,15 @@ validationTests(const ConfigurationPtr& configuration, backgroundBatchOneway->opWithPayload(seq); backgroundBatchOneway->opWithPayload(seq); ctl->resumeAdapter(); - backgroundBatchOneway->ice_flushBatchRequests(); + try + { + backgroundBatchOneway->ice_flushBatchRequests(); + } + catch(const Ice::LocalException& ex) + { + cerr << ex << endl; + test(false); + } // // Then try the same thing with async flush. @@ -874,7 +909,8 @@ validationTests(const ConfigurationPtr& configuration, backgroundBatchOneway->op(); backgroundBatchOneway->op(); ctl->resumeAdapter(); - backgroundBatchOneway->ice_flushBatchRequests_async(new FlushBatchRequestsCallback()); + FlushBatchRequestsCallbackPtr fcb = new FlushBatchRequestsCallback(); + backgroundBatchOneway->ice_flushBatchRequests_async(fcb); backgroundBatchOneway->ice_getConnection()->close(false); backgroundBatchOneway->ice_getConnection()->close(false); @@ -892,7 +928,7 @@ validationTests(const ConfigurationPtr& configuration, backgroundBatchOneway->opWithPayload(seq); backgroundBatchOneway->opWithPayload(seq); ctl->resumeAdapter(); - FlushBatchRequestsCallbackPtr fcb = new FlushBatchRequestsCallback(); + fcb = new FlushBatchRequestsCallback(); backgroundBatchOneway->ice_flushBatchRequests_async(fcb); // // We can't close the connection before ensuring all the batches have been sent since diff --git a/cpp/test/Ice/binding/AllTests.cpp b/cpp/test/Ice/binding/AllTests.cpp index eec35751912..1f9b133d7c9 100644 --- a/cpp/test/Ice/binding/AllTests.cpp +++ b/cpp/test/Ice/binding/AllTests.cpp @@ -62,6 +62,21 @@ private: }; typedef IceUtil::Handle<GetAdapterNameCB> GetAdapterNameCBPtr; +class NoOpGetAdapterNameCB : public AMI_TestIntf_getAdapterName +{ +public: + + virtual void + ice_response(const string&) + { + } + + virtual void + ice_exception(const Ice::Exception&) + { + } +}; + string getAdapterNameWithAMI(const TestIntfPrx& test) { @@ -220,6 +235,95 @@ allTests(const Ice::CommunicatorPtr& communicator) } cout << "ok" << endl; + cout << "testing binding with multiple random endpoints... " << flush; + { + vector<RemoteObjectAdapterPrx> adapters; + adapters.push_back(com->createObjectAdapter("AdapterRandom11", "default")); + adapters.push_back(com->createObjectAdapter("AdapterRandom12", "default")); + adapters.push_back(com->createObjectAdapter("AdapterRandom13", "default")); + adapters.push_back(com->createObjectAdapter("AdapterRandom14", "default")); + adapters.push_back(com->createObjectAdapter("AdapterRandom15", "default")); + +#ifdef _WIN32 + int count = 60; +#else + int count = 20; +#endif + int adapterCount = adapters.size(); + while(--count > 0) + { +#ifdef _WIN32 + if(count == 10) + { + com->deactivateObjectAdapter(adapters[4]); + --adapterCount; + } + vector<TestIntfPrx> proxies; + proxies.resize(10); +#else + if(count < 60 && count % 10 == 0) + { + com->deactivateObjectAdapter(adapters[count / 10 - 1]); + --adapterCount; + } + vector<TestIntfPrx> proxies; + proxies.resize(40); +#endif + unsigned int i; + for(i = 0; i < proxies.size(); ++i) + { + vector<RemoteObjectAdapterPrx> adpts; + adpts.resize(IceUtilInternal::random(static_cast<int>(adapters.size()))); + if(adpts.empty()) + { + adpts.resize(1); + } + for(vector<RemoteObjectAdapterPrx>::iterator p = adpts.begin(); p != adpts.end(); ++p) + { + *p = adapters[IceUtilInternal::random(static_cast<int>(adapters.size()))]; + } + proxies[i] = createTestIntfPrx(adpts); + } + + for(i = 0; i < proxies.size(); i++) + { + proxies[i]->getAdapterName_async(new NoOpGetAdapterNameCB()); + } + for(i = 0; i < proxies.size(); i++) + { + try + { + proxies[i]->ice_ping(); + } + catch(const Ice::LocalException&) + { + } + } + set<Ice::ConnectionPtr> connections; + for(i = 0; i < proxies.size(); i++) + { + if(proxies[i]->ice_getCachedConnection()) + { + connections.insert(proxies[i]->ice_getCachedConnection()); + } + } + test(static_cast<int>(connections.size()) <= adapterCount); + + for(vector<RemoteObjectAdapterPrx>::const_iterator q = adapters.begin(); q != adapters.end(); ++q) + { + try + { + (*q)->getTestIntf()->ice_getConnection()->close(false); + } + catch(const Ice::LocalException&) + { + // Expected if adapter is down. + } + } + } + } + cout << "ok" << endl; + cout << "testing binding with multiple endpoints and AMI... " << flush; { vector<RemoteObjectAdapterPrx> adapters; @@ -468,7 +572,6 @@ allTests(const Ice::CommunicatorPtr& communicator) com->deactivateObjectAdapter(adapters[2]); - test(test->getAdapterName() == "Adapter52"); deactivate(com, adapters); diff --git a/cpp/test/Ice/gc/run.py b/cpp/test/Ice/gc/run.py index 03a703b953c..ced3120bc64 100755 --- a/cpp/test/Ice/gc/run.py +++ b/cpp/test/Ice/gc/run.py @@ -25,5 +25,8 @@ client = os.path.join(os.getcwd(), "client") seedfile = os.path.join(os.getcwd(), "seed") TestUtil.simpleTest(client, seedfile) -TestUtil.startClient(client, seedfile) + +clientProc = TestUtil.startClient(client, seedfile) +clientProc.waitTestSuccess() + os.remove(seedfile) diff --git a/cs/src/Ice/ConnectionFactory.cs b/cs/src/Ice/ConnectionFactory.cs index b671ede01d9..564d718f971 100644 --- a/cs/src/Ice/ConnectionFactory.cs +++ b/cs/src/Ice/ConnectionFactory.cs @@ -183,9 +183,10 @@ namespace IceInternal // Try to establish the connection to the connectors. // DefaultsAndOverrides defaultsAndOverrides = instance_.defaultsAndOverrides(); + ConnectorInfo ci = null; for(int i = 0; i < connectors.Count; ++i) { - ConnectorInfo ci = connectors[i]; + ci = connectors[i]; try { connection = createConnection(ci.connector.connect(), ci); @@ -199,7 +200,7 @@ namespace IceInternal { compress = ci.endpoint.compress(); } - + connection.activate(); break; } catch(Ice.CommunicatorDestroyedException ex) @@ -221,7 +222,14 @@ namespace IceInternal // Finish creating the connection (this removes the connectors from the _pending // list and notifies any waiting threads). // - finishGetConnection(connectors, null, connection); + if(connection != null) + { + finishGetConnection(connectors, ci, connection, null); + } + else + { + finishGetConnection(connectors, exception, null); + } if(connection == null) { @@ -454,6 +462,11 @@ namespace IceInternal DefaultsAndOverrides defaultsAndOverrides = instance_.defaultsAndOverrides(); foreach(ConnectorInfo ci in connectors) { + if(_pending.ContainsKey(ci)) + { + continue; + } + LinkedList connectionList = null; if(!_connections.TryGetValue(ci, out connectionList)) { @@ -588,62 +601,23 @@ namespace IceInternal // finish if one of them is currently establishing a connection to one // of our connectors. // - while(!_destroyed) + while(true) { + if(_destroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + // // Search for a matching connection. If we find one, we're done. // Ice.ConnectionI connection = findConnection(connectors, out compress); if(connection != null) { - if(cb != null) - { - // - // This might not be the first getConnection call for the callback. We need - // to ensure that the callback isn't registered with any other pending - // connectors since we just found a connection and therefore don't need to - // wait anymore for other pending connectors. - // - foreach(ConnectorInfo ci in connectors) - { - Set cbs = null; - if(_pending.TryGetValue(ci, out cbs)) - { - cbs.Remove(cb); - } - } - } return connection; } - // - // Determine whether another thread is currently attempting to connect to one of our endpoints; - // if so we wait until it's done. - // - bool found = false; - foreach(ConnectorInfo ci in connectors) - { - Set cbs = null; - if(_pending.TryGetValue(ci, out cbs)) - { - found = true; - if(cb != null) - { - cbs.Add(cb); // Add the callback to each pending connector. - } - } - } - - if(!found) - { - // - // If no thread is currently establishing a connection to one of our connectors, - // we get out of this loop and start the connection establishment to one of the - // given connectors. - // - break; - } - else + if(addToPending(cb, connectors)) { // // If a callback is not specified we wait until another thread notifies us about a @@ -660,23 +634,14 @@ namespace IceInternal return null; } } - } - - if(_destroyed) - { - throw new Ice.CommunicatorDestroyedException(); - } - - // - // No connection to any of our endpoints exists yet; we add the given connectors to - // the _pending set to indicate that we're attempting connection establishment to - // these connectors. We might attempt to connect to the same connector multiple times. - // - foreach(ConnectorInfo ci in connectors) - { - if(!_pending.ContainsKey(ci)) + else { - _pending.Add(ci, new Set()); + // + // If no thread is currently establishing a connection to one of our connectors, + // we get out of this loop and start the connection establishment to one of the + // given connectors. + // + break; } } } @@ -724,6 +689,7 @@ namespace IceInternal _connections.Add(ci, connectionList); } connectionList.Add(connection); + connectionList = null; if(!_connectionsByEndpoint.TryGetValue(ci.endpoint, out connectionList)) { connectionList = new LinkedList(); @@ -747,49 +713,120 @@ namespace IceInternal } } - private void finishGetConnection(List<ConnectorInfo> connectors, ConnectCallback cb, Ice.ConnectionI connection) + private void finishGetConnection(List<ConnectorInfo> connectors, + ConnectorInfo ci, + Ice.ConnectionI connection, + ConnectCallback cb) { - Set callbacks = new Set(); + Set connectionCallbacks = new Set(); + if(cb != null) + { + connectionCallbacks.Add(cb); + } + Set callbacks = new Set(); lock(this) { - // - // We're done trying to connect to the given connectors so we remove the - // connectors from the pending list and notify waiting threads. We also - // notify the pending connect callbacks (outside the synchronization). - // - - foreach(ConnectorInfo ci in connectors) + foreach(ConnectorInfo c in connectors) { Set s = null; - if(_pending.TryGetValue(ci, out s)) + if(_pending.TryGetValue(c, out s)) { - foreach(ConnectCallback c in s) + foreach(ConnectCallback cc in s) { - callbacks.Add(c); + if(cc.hasConnector(ci)) + { + connectionCallbacks.Add(cc); + } + else + { + callbacks.Add(cc); + } } - _pending.Remove(ci); + _pending.Remove(c); } } + + foreach(ConnectCallback cc in connectionCallbacks) + { + cc.removeFromPending(); + callbacks.Remove(cc); + } + foreach(ConnectCallback cc in callbacks) + { + cc.removeFromPending(); + } Monitor.PulseAll(this); + } - // - // If the connect attempt succeeded and the communicator is not destroyed, - // activate the connection! - // - if(connection != null && !_destroyed) + bool compress; + DefaultsAndOverrides defaultsAndOverrides = instance_.defaultsAndOverrides(); + if(defaultsAndOverrides.overrideCompress) + { + compress = defaultsAndOverrides.overrideCompressValue; + } + else + { + compress = ci.endpoint.compress(); + } + + foreach(ConnectCallback cc in callbacks) + { + cc.getConnection(); + } + foreach(ConnectCallback cc in connectionCallbacks) + { + cc.setConnection(connection, compress); + } + } + + private void finishGetConnection(List<ConnectorInfo> connectors, Ice.LocalException ex, ConnectCallback cb) + { + Set failedCallbacks = new Set(); + if(cb != null) + { + failedCallbacks.Add(cb); + } + + Set callbacks = new Set(); + lock(this) + { + foreach(ConnectorInfo c in connectors) { - connection.activate(); + Set s = null; + if(_pending.TryGetValue(c, out s)) + { + foreach(ConnectCallback cc in s) + { + if(cc.removeConnectors(connectors)) + { + failedCallbacks.Add(cc); + } + else + { + callbacks.Add(cc); + } + } + _pending.Remove(c); + } } + + foreach(ConnectCallback cc in callbacks) + { + Debug.Assert(!failedCallbacks.Contains(cc)); + cc.removeFromPending(); + } + Monitor.PulseAll(this); } - // - // Notify any waiting callbacks. - // foreach(ConnectCallback cc in callbacks) { cc.getConnection(); } + foreach(ConnectCallback cc in failedCallbacks) + { + cc.setException(ex); + } } private void handleException(Ice.LocalException ex, ConnectorInfo ci, Ice.ConnectionI connection, @@ -854,6 +891,59 @@ namespace IceInternal } } + private bool + addToPending(ConnectCallback cb, List<ConnectorInfo> connectors) + { + // + // Add the callback to each connector pending list. + // + bool found = false; + foreach(ConnectorInfo ci in connectors) + { + Set cbs = null; + if(_pending.TryGetValue(ci, out cbs)) + { + found = true; + if(cb != null) + { + cbs.Add(cb); // Add the callback to each pending connector. + } + } + } + + if(found) + { + return true; + } + + // + // If there's no pending connection for the given connectors, we're + // responsible for its establishment. We add empty pending lists, + // other callbacks to the same connectors will be queued. + // + foreach(ConnectorInfo ci in connectors) + { + if(!_pending.ContainsKey(ci)) + { + _pending.Add(ci, new Set()); + } + } + return false; + } + + private void + removeFromPending(ConnectCallback cb, List<ConnectorInfo> connectors) + { + foreach(ConnectorInfo ci in connectors) + { + Set cbs = null; + if(_pending.TryGetValue(ci, out cbs)) + { + cbs.Remove(cb); + } + } + } + internal void handleException(Ice.LocalException ex, bool hasMore) { TraceLevels traceLevels = instance_.traceLevels(); @@ -922,20 +1012,8 @@ namespace IceInternal // public void connectionStartCompleted(Ice.ConnectionI connection) { - bool compress; - DefaultsAndOverrides defaultsAndOverrides = _factory.instance_.defaultsAndOverrides(); - if(defaultsAndOverrides.overrideCompress) - { - compress = defaultsAndOverrides.overrideCompressValue; - } - else - { - compress = _current.endpoint.compress(); - } - - _factory.finishGetConnection(_connectors, this, connection); - _callback.setConnection(connection, compress); - _factory.decPendingConnectCount(); // Must be called last. + connection.activate(); + _factory.finishGetConnection(_connectors, _current, connection, this); } public void connectionStartFailed(Ice.ConnectionI connection, Ice.LocalException ex) @@ -943,9 +1021,7 @@ namespace IceInternal _factory.handleException(ex, _current, connection, _hasMore || _iter < _connectors.Count); if(ex is Ice.CommunicatorDestroyedException) // No need to continue. { - _factory.finishGetConnection(_connectors, this, null); - _callback.setException(ex); - _factory.decPendingConnectCount(); // Must be called last. + _factory.finishGetConnection(_connectors, ex, this); } else if(_iter < _connectors.Count) // Try the next connector. { @@ -953,9 +1029,7 @@ namespace IceInternal } else { - _factory.finishGetConnection(_connectors, this, null); - _callback.setException(ex); - _factory.decPendingConnectCount(); // Must be called last. + _factory.finishGetConnection(_connectors, ex, this); } } @@ -1027,6 +1101,43 @@ namespace IceInternal } } + public void setConnection(Ice.ConnectionI connection, bool compress) + { + // + // Callback from the factory: the connection to one of the callback + // connectors has been established. + // + _callback.setConnection(connection, compress); + _factory.decPendingConnectCount(); // Must be called last. + } + + public void setException(Ice.LocalException ex) + { + // + // Callback from the factory: connection establishment failed. + // + _callback.setException(ex); + _factory.decPendingConnectCount(); // Must be called last. + } + + public bool hasConnector(ConnectorInfo ci) + { + return _connectors.Contains(ci); + } + public bool removeConnectors(List<ConnectorInfo> connectors) + { + foreach(ConnectorInfo ci in connectors) + { + while(_connectors.Remove(ci)); // Remove all of them. + } + return _connectors.Count == 0; + } + + public void removeFromPending() + { + _factory.removeFromPending(this, _connectors); + } + public void getConnectors() { try diff --git a/cs/test/Ice/binding/AllTests.cs b/cs/test/Ice/binding/AllTests.cs index b6fdbe989dc..f38e3ccbd5b 100644 --- a/cs/test/Ice/binding/AllTests.cs +++ b/cs/test/Ice/binding/AllTests.cs @@ -54,6 +54,17 @@ public class AllTests private string _name = null; }; + private class NoOpGetAdapterNameCB : AMI_TestIntf_getAdapterName + { + public override void ice_response(string name) + { + } + + public override void ice_exception(Ice.Exception ex) + { + } + }; + private static string getAdapterNameWithAMI(TestIntfPrx test) { GetAdapterNameCB cb = new GetAdapterNameCB(); @@ -112,6 +123,8 @@ public class AllTests string @ref = "communicator:default -p 12010 -t 10000"; RemoteCommunicatorPrx com = RemoteCommunicatorPrxHelper.uncheckedCast(communicator.stringToProxy(@ref)); + System.Random rand = new System.Random(unchecked((int)System.DateTime.Now.Ticks)); + Console.Out.Write("testing binding with single endpoint... "); Console.Out.Flush(); { @@ -233,6 +246,107 @@ public class AllTests } Console.Out.WriteLine("ok"); + Console.Out.Write("testing binding with multiple random endpoints... "); + Console.Out.Flush(); + { + RemoteObjectAdapterPrx[] adapters = new RemoteObjectAdapterPrx[5]; + adapters[0] = com.createObjectAdapter("AdapterRandom11", "default"); + adapters[1] = com.createObjectAdapter("AdapterRandom12", "default"); + adapters[2] = com.createObjectAdapter("AdapterRandom13", "default"); + adapters[3] = com.createObjectAdapter("AdapterRandom14", "default"); + adapters[4] = com.createObjectAdapter("AdapterRandom15", "default"); + + int count; + if(IceInternal.AssemblyUtil.platform_ == IceInternal.AssemblyUtil.Platform.Windows) + { + count = 20; + } + else + { + count = 60; + } + + int adapterCount = adapters.Length; + while(--count > 0) + { + TestIntfPrx[] proxies; + if(IceInternal.AssemblyUtil.platform_ == IceInternal.AssemblyUtil.Platform.Windows) + { + if(count == 10) + { + com.deactivateObjectAdapter(adapters[4]); + --adapterCount; + } + proxies = new TestIntfPrx[10]; + } + else + { + if(count < 60 && count % 10 == 0) + { + com.deactivateObjectAdapter(adapters[count / 10 - 1]); + --adapterCount; + } + proxies = new TestIntfPrx[40]; + } + + int i; + for(i = 0; i < proxies.Length; ++i) + { + RemoteObjectAdapterPrx[] adpts = new RemoteObjectAdapterPrx[rand.Next(adapters.Length)]; + if(adpts.Length == 0) + { + adpts = new RemoteObjectAdapterPrx[1]; + } + for(int j = 0; j < adpts.Length; ++j) + { + adpts[j] = adapters[rand.Next(adapters.Length)]; + } + proxies[i] = createTestIntfPrx(new ArrayList(adpts)); + } + + for(i = 0; i < proxies.Length; i++) + { + proxies[i].getAdapterName_async(new NoOpGetAdapterNameCB()); + } + for(i = 0; i < proxies.Length; i++) + { + try + { + proxies[i].ice_ping(); + } + catch(Ice.LocalException) + { + } + } + + ArrayList connections = new ArrayList(); + for(i = 0; i < proxies.Length; i++) + { + if(proxies[i].ice_getCachedConnection() != null) + { + if(!connections.Contains(proxies[i].ice_getCachedConnection())) + { + connections.Add(proxies[i].ice_getCachedConnection()); + } + } + } + test(connections.Count <= adapterCount); + + foreach(RemoteObjectAdapterPrx a in adapters) + { + try + { + a.getTestIntf().ice_getConnection().close(false); + } + catch(Ice.LocalException) + { + // Expected if adapter is down. + } + } + } + } + Console.Out.WriteLine("ok"); + Console.Out.Write("testing binding with multiple endpoints and AMI... "); Console.Out.Flush(); { diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 51881390f20..563368f9f22 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -188,9 +188,10 @@ public final class OutgoingConnectionFactory // DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); java.util.Iterator<ConnectorInfo> q = connectors.iterator(); + ConnectorInfo ci = null; while(q.hasNext()) { - ConnectorInfo ci = q.next(); + ci = q.next(); try { connection = createConnection(ci.connector.connect(), ci); @@ -204,7 +205,7 @@ public final class OutgoingConnectionFactory { compress.value = ci.endpoint.compress(); } - + connection.activate(); break; } catch(Ice.CommunicatorDestroyedException ex) @@ -226,7 +227,14 @@ public final class OutgoingConnectionFactory // Finish creating the connection (this removes the connectors from the _pending // list and notifies any waiting threads). // - finishGetConnection(connectors, null, connection); + if(connection != null) + { + finishGetConnection(connectors, ci, connection, null); + } + else + { + finishGetConnection(connectors, exception, null); + } if(connection == null) { @@ -489,6 +497,11 @@ public final class OutgoingConnectionFactory while(p.hasNext()) { ConnectorInfo ci = p.next(); + if(_pending.containsKey(ci)) + { + continue; + } + java.util.List<Ice.ConnectionI> connectionList = _connections.get(ci); if(connectionList == null) { @@ -619,64 +632,23 @@ public final class OutgoingConnectionFactory // finish if one of them is currently establishing a connection to one // of our connectors. // - while(!_destroyed) + while(true) { + if(_destroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + // // Search for a matching connection. If we find one, we're done. // Ice.ConnectionI connection = findConnection(connectors, compress); if(connection != null) { - if(cb != null) - { - // - // This might not be the first getConnection call for the callback. We need - // to ensure that the callback isn't registered with any other pending - // connectors since we just found a connection and therefore don't need to - // wait anymore for other pending connectors. - // - java.util.Iterator<ConnectorInfo> p = connectors.iterator(); - while(p.hasNext()) - { - java.util.Set<ConnectCallback> cbs = _pending.get(p.next()); - if(cbs != null) - { - cbs.remove(cb); - } - } - } return connection; } - - // - // Determine whether another thread is currently attempting to connect to one of our endpoints; - // if so we wait until it's done. - // - java.util.Iterator<ConnectorInfo> p = connectors.iterator(); - boolean found = false; - while(p.hasNext()) - { - java.util.Set<ConnectCallback> cbs = _pending.get(p.next()); - if(cbs != null) - { - found = true; - if(cb != null) - { - cbs.add(cb); // Add the callback to each pending connector. - } - } - } - if(!found) - { - // - // If no thread is currently establishing a connection to one of our connectors, - // we get out of this loop and start the connection establishment to one of the - // given connectors. - // - break; - } - else + if(addToPending(cb, connectors)) { // // If a callback is not specified we wait until another thread notifies us about a @@ -699,25 +671,14 @@ public final class OutgoingConnectionFactory return null; } } - } - - if(_destroyed) - { - throw new Ice.CommunicatorDestroyedException(); - } - - // - // No connection to any of our endpoints exists yet; we add the given connectors to - // the _pending set to indicate that we're attempting connection establishment to - // these connectors. We might attempt to connect to the same connector multiple times. - // - java.util.Iterator<ConnectorInfo> p = connectors.iterator(); - while(p.hasNext()) - { - ConnectorInfo obj = p.next(); - if(!_pending.containsKey(obj)) + else { - _pending.put(obj, new java.util.HashSet<ConnectCallback>()); + // + // If no thread is currently establishing a connection to one of our connectors, + // we get out of this loop and start the connection establishment to one of the + // given connectors. + // + break; } } } @@ -786,46 +747,180 @@ public final class OutgoingConnectionFactory } private void - finishGetConnection(java.util.List<ConnectorInfo> connectors, ConnectCallback cb, Ice.ConnectionI connection) + finishGetConnection(java.util.List<ConnectorInfo> connectors, + ConnectorInfo ci, + Ice.ConnectionI connection, + ConnectCallback cb) { - java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>(); + java.util.Set<ConnectCallback> connectionCallbacks = new java.util.HashSet<ConnectCallback>(); + if(cb != null) + { + connectionCallbacks.add(cb); + } + java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>(); synchronized(this) { - // - // We're done trying to connect to the given connectors so we remove the - // connectors from the pending list and notify waiting threads. We also - // notify the pending connect callbacks (outside the synchronization). - // - java.util.Iterator<ConnectorInfo> p = connectors.iterator(); while(p.hasNext()) { - java.util.Set<ConnectCallback> cbs = _pending.remove(p.next()); + ConnectorInfo c = p.next(); + java.util.Set<ConnectCallback> cbs = _pending.remove(c); if(cbs != null) { - callbacks.addAll(cbs); + for(ConnectCallback cc : cbs) + { + if(cc.hasConnector(ci)) + { + connectionCallbacks.add(cc); + } + else + { + callbacks.add(cc); + } + } } } + + for(ConnectCallback cc : connectionCallbacks) + { + cc.removeFromPending(); + callbacks.remove(cc); + } + for(ConnectCallback cc : callbacks) + { + cc.removeFromPending(); + } notifyAll(); + } + + boolean compress; + DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); + if(defaultsAndOverrides.overrideCompress) + { + compress = defaultsAndOverrides.overrideCompressValue; + } + else + { + compress = ci.endpoint.compress(); + } - // - // If the connect attempt succeeded and the communicator is not destroyed, - // activate the connection! - // - if(connection != null && !_destroyed) + for(ConnectCallback cc : callbacks) + { + cc.getConnection(); + } + for(ConnectCallback cc : connectionCallbacks) + { + cc.setConnection(connection, compress); + } + } + + private void + finishGetConnection(java.util.List<ConnectorInfo> connectors, Ice.LocalException ex, ConnectCallback cb) + { + java.util.Set<ConnectCallback> failedCallbacks = new java.util.HashSet<ConnectCallback>(); + if(cb != null) + { + failedCallbacks.add(cb); + } + + java.util.Set<ConnectCallback> callbacks = new java.util.HashSet<ConnectCallback>(); + synchronized(this) + { + java.util.Iterator<ConnectorInfo> p = connectors.iterator(); + while(p.hasNext()) { - connection.activate(); + ConnectorInfo c = p.next(); + java.util.Set<ConnectCallback> cbs = _pending.remove(c); + if(cbs != null) + { + for(ConnectCallback cc : cbs) + { + if(cc.removeConnectors(connectors)) + { + failedCallbacks.add(cc); + } + else + { + callbacks.add(cc); + } + } + } + } + + for(ConnectCallback cc : callbacks) + { + assert(!failedCallbacks.contains(cc)); + cc.removeFromPending(); } + notifyAll(); } + for(ConnectCallback cc : callbacks) + { + cc.getConnection(); + } + for(ConnectCallback cc : failedCallbacks) + { + cc.setException(ex); + } + } + + private boolean + addToPending(ConnectCallback cb, java.util.List<ConnectorInfo> connectors) + { // - // Notify any waiting callbacks. + // Add the callback to each connector pending list. // - java.util.Iterator<ConnectCallback> p = callbacks.iterator(); + java.util.Iterator<ConnectorInfo> p = connectors.iterator(); + boolean found = false; + while(p.hasNext()) + { + java.util.Set<ConnectCallback> cbs = _pending.get(p.next()); + if(cbs != null) + { + found = true; + if(cb != null) + { + cbs.add(cb); // Add the callback to each pending connector. + } + } + } + + if(found) + { + return true; + } + + // + // If there's no pending connection for the given connectors, we're + // responsible for its establishment. We add empty pending lists, + // other callbacks to the same connectors will be queued. + // + p = connectors.iterator(); + while(p.hasNext()) + { + ConnectorInfo obj = p.next(); + if(!_pending.containsKey(obj)) + { + _pending.put(obj, new java.util.HashSet<ConnectCallback>()); + } + } + + return false; + } + + private void + removeFromPending(ConnectCallback cb, java.util.List<ConnectorInfo> connectors) + { + java.util.Iterator<ConnectorInfo> p = connectors.iterator(); while(p.hasNext()) { - p.next().getConnection(); + java.util.Set<ConnectCallback> cbs = _pending.get(p.next()); + if(cbs != null) + { + cbs.remove(cb); + } } } @@ -954,27 +1049,15 @@ public final class OutgoingConnectionFactory _selType = selType; _endpointsIter = _endpoints.iterator(); } - + // // Methods from ConnectionI.StartCallback // public void connectionStartCompleted(Ice.ConnectionI connection) { - boolean compress; - DefaultsAndOverrides defaultsAndOverrides = _factory._instance.defaultsAndOverrides(); - if(defaultsAndOverrides.overrideCompress) - { - compress = defaultsAndOverrides.overrideCompressValue; - } - else - { - compress = _current.endpoint.compress(); - } - - _factory.finishGetConnection(_connectors, this, connection); - _callback.setConnection(connection, compress); - _factory.decPendingConnectCount(); // Must be called last. + connection.activate(); + _factory.finishGetConnection(_connectors, _current, connection, this); } public void @@ -985,9 +1068,7 @@ public final class OutgoingConnectionFactory _factory.handleException(ex, _current, connection, _hasMore || _iter.hasNext()); if(ex instanceof Ice.CommunicatorDestroyedException) // No need to continue. { - _factory.finishGetConnection(_connectors, this, null); - _callback.setException(ex); - _factory.decPendingConnectCount(); // Must be called last. + _factory.finishGetConnection(_connectors, ex, this); } else if(_iter.hasNext()) // Try the next connector. { @@ -995,9 +1076,7 @@ public final class OutgoingConnectionFactory } else { - _factory.finishGetConnection(_connectors, this, null); - _callback.setException(ex); - _factory.decPendingConnectCount(); // Must be called last. + _factory.finishGetConnection(_connectors, ex, this); } } @@ -1062,6 +1141,47 @@ public final class OutgoingConnectionFactory } } + public void + setConnection(Ice.ConnectionI connection, boolean compress) + { + // + // Callback from the factory: the connection to one of the callback + // connectors has been established. + // + _callback.setConnection(connection, compress); + _factory.decPendingConnectCount(); // Must be called last. + } + + public void + setException(Ice.LocalException ex) + { + // + // Callback from the factory: connection establishment failed. + // + _callback.setException(ex); + _factory.decPendingConnectCount(); // Must be called last. + } + + public boolean + hasConnector(ConnectorInfo ci) + { + return _connectors.contains(ci); + } + + public boolean + removeConnectors(java.util.List<ConnectorInfo> connectors) + { + _connectors.removeAll(connectors); + _iter = _connectors.iterator(); + return _connectors.isEmpty(); + } + + public void + removeFromPending() + { + _factory.removeFromPending(this, _connectors); + } + void getConnectors() { diff --git a/java/test/Ice/background/Connector.java b/java/test/Ice/background/Connector.java index 4a386fb3e0c..b737a1400ef 100644 --- a/java/test/Ice/background/Connector.java +++ b/java/test/Ice/background/Connector.java @@ -59,7 +59,7 @@ final class Connector implements IceInternal.Connector if(this == p) { - return false; + return true; } return _connector.equals(p._connector); diff --git a/java/test/Ice/binding/AllTests.java b/java/test/Ice/binding/AllTests.java index 242c25093f4..c6e4e31b4f9 100644 --- a/java/test/Ice/binding/AllTests.java +++ b/java/test/Ice/binding/AllTests.java @@ -20,6 +20,24 @@ public class AllTests } } + static class NoOpGetAdapterNameCB extends AMI_TestIntf_getAdapterName + { + public + void ice_response(String name) + { + } + + public void + ice_exception(Ice.LocalException ex) + { + } + + public void + ice_exception(Ice.UserException ex) + { + } + }; + static class GetAdapterNameCB extends AMI_TestIntf_getAdapterName { synchronized public void @@ -225,6 +243,106 @@ public class AllTests } System.out.println("ok"); + System.out.print("testing binding with multiple random endpoints... "); + System.out.flush(); + { + java.util.Random rand = new java.util.Random(); + + RemoteObjectAdapterPrx[] adapters = new RemoteObjectAdapterPrx[5]; + adapters[0] = com.createObjectAdapter("AdapterRandom11", "default"); + adapters[1] = com.createObjectAdapter("AdapterRandom12", "default"); + adapters[2] = com.createObjectAdapter("AdapterRandom13", "default"); + adapters[3] = com.createObjectAdapter("AdapterRandom14", "default"); + adapters[4] = com.createObjectAdapter("AdapterRandom15", "default"); + + int count; + if(System.getProperty("os.name").startsWith("Windows")) + { + count = 20; + } + else + { + count = 60; + } + + int adapterCount = adapters.length; + while(--count > 0) + { + TestIntfPrx[] proxies; + if(System.getProperty("os.name").startsWith("Windows")) + { + if(count == 10) + { + com.deactivateObjectAdapter(adapters[4]); + --adapterCount; + } + proxies = new TestIntfPrx[10]; + } + else + { + if(count < 60 && count % 10 == 0) + { + com.deactivateObjectAdapter(adapters[count / 10 - 1]); + --adapterCount; + } + proxies = new TestIntfPrx[40]; + } + + int i; + for(i = 0; i < proxies.length; ++i) + { + RemoteObjectAdapterPrx[] adpts = new RemoteObjectAdapterPrx[rand.nextInt(adapters.length)]; + if(adpts.length == 0) + { + adpts = new RemoteObjectAdapterPrx[1]; + } + for(int j = 0; j < adpts.length; ++j) + { + adpts[j] = adapters[rand.nextInt(adapters.length)]; + } + proxies[i] = createTestIntfPrx(java.util.Arrays.asList((adpts))); + } + + for(i = 0; i < proxies.length; i++) + { + proxies[i].getAdapterName_async(new NoOpGetAdapterNameCB()); + } + for(i = 0; i < proxies.length; i++) + { + try + { + proxies[i].ice_ping(); + } + catch(Ice.LocalException ex) + { + } + } + + java.util.Set<Ice.Connection> connections = new java.util.HashSet<Ice.Connection>(); + for(i = 0; i < proxies.length; i++) + { + if(proxies[i].ice_getCachedConnection() != null) + { + connections.add(proxies[i].ice_getCachedConnection()); + } + } + test(connections.size() <= adapterCount); + + for(RemoteObjectAdapterPrx a : adapters) + { + try + { + a.getTestIntf().ice_getConnection().close(false); + } + catch(Ice.LocalException ex) + { + // Expected if adapter is down. + } + } + } + } + System.out.println("ok"); + System.out.print("testing binding with multiple endpoints and AMI... "); System.out.flush(); { |