diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-01-07 19:37:17 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-01-07 19:37:17 +0100 |
commit | b5042ce24aaa2dcff2092046b322ff61c3d9ef8c (patch) | |
tree | dcdca6930377ad9098eeb9996ce1f7663c79e5db /cpp | |
parent | Other fix for 3601 - plugins can be destroyed twice (diff) | |
download | ice-b5042ce24aaa2dcff2092046b322ff61c3d9ef8c.tar.bz2 ice-b5042ce24aaa2dcff2092046b322ff61c3d9ef8c.tar.xz ice-b5042ce24aaa2dcff2092046b322ff61c3d9ef8c.zip |
Squashed commit of the following:
commit 8019e6de4480f361a83d8944afec60793454c322
Author: Benoit Foucher <benoit@zeroc.com>
Date: Wed Jan 7 17:16:40 2009 +0100
Fixed bug 3516 - Fixed scaling issue when using round-robin replica groups
commit 6c36afb32dda8b37b7d5330ed51a439bc73b17db
Author: Benoit Foucher <benoit@zeroc.com>
Date: Wed Jan 7 17:16:36 2009 +0100
Fixed bug 3230 - IceGrid node leak
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/IceGrid/AdapterCache.cpp | 47 | ||||
-rw-r--r-- | cpp/src/IceGrid/AdapterCache.h | 13 | ||||
-rw-r--r-- | cpp/src/IceGrid/IceGridNode.cpp | 12 | ||||
-rw-r--r-- | cpp/src/IceGrid/LocatorI.cpp | 694 | ||||
-rw-r--r-- | cpp/src/IceGrid/LocatorI.h | 43 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 10 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerI.cpp | 8 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerI.h | 1 | ||||
-rw-r--r-- | cpp/test/IceGrid/replicaGroup/AllTests.cpp | 104 |
10 files changed, 590 insertions, 344 deletions
diff --git a/cpp/src/IceGrid/AdapterCache.cpp b/cpp/src/IceGrid/AdapterCache.cpp index f6970f4e09e..af24d9da061 100644 --- a/cpp/src/IceGrid/AdapterCache.cpp +++ b/cpp/src/IceGrid/AdapterCache.cpp @@ -231,7 +231,7 @@ ServerAdapterEntry::ServerAdapterEntry(AdapterCache& cache, void ServerAdapterEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& nReplicas, bool& replicaGroup, - bool& roundRobin) + bool& roundRobin, const set<string>&) { nReplicas = 1; replicaGroup = false; @@ -242,11 +242,6 @@ ServerAdapterEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& adapters.push_back(info); } -void -ServerAdapterEntry::increaseRoundRobinCount(int roundRobinCount) -{ -} - float ServerAdapterEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const { @@ -383,7 +378,7 @@ ReplicaGroupEntry::update(const LoadBalancingPolicyPtr& policy) void ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& nReplicas, bool& replicaGroup, - bool& roundRobin) + bool& roundRobin, const set<string>& excludes) { vector<ServerAdapterEntryPtr> replicas; bool adaptive = false; @@ -453,32 +448,28 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n // for(vector<ServerAdapterEntryPtr>::const_iterator p = replicas.begin(); p != replicas.end(); ++p) { - try - { - int dummy; - bool dummy2; - bool dummy3; - (*p)->getLocatorAdapterInfo(adapters, dummy, dummy2, dummy3); - } - catch(const AdapterNotExistException&) - { - } - catch(const NodeUnreachableException&) - { - } - catch(const DeploymentException&) + if(!roundRobin || excludes.find((*p)->getId()) == excludes.end()) { + try + { + int dummy; + bool dummy2; + bool dummy3; + (*p)->getLocatorAdapterInfo(adapters, dummy, dummy2, dummy3, set<string>()); + } + catch(const AdapterNotExistException&) + { + } + catch(const NodeUnreachableException&) + { + } + catch(const DeploymentException&) + { + } } } } -void -ReplicaGroupEntry::increaseRoundRobinCount(int count) -{ - Lock sync(*this); - _lastReplica = (_lastReplica + count) % static_cast<int>(_replicas.size()); -} - float ReplicaGroupEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const { diff --git a/cpp/src/IceGrid/AdapterCache.h b/cpp/src/IceGrid/AdapterCache.h index 3a862d12d97..8ee42bc1531 100644 --- a/cpp/src/IceGrid/AdapterCache.h +++ b/cpp/src/IceGrid/AdapterCache.h @@ -43,8 +43,11 @@ public: AdapterEntry(AdapterCache&, const std::string&, const std::string&); - virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&) = 0; - virtual void increaseRoundRobinCount(int) = 0; + virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&, const std::set<std::string>&) = 0; + void getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& count, bool& replicaGroup, bool& roundRobin) + { + getLocatorAdapterInfo(adapters, count, replicaGroup, roundRobin, std::set<std::string>()); + } virtual float getLeastLoadedNodeLoad(LoadSample) const = 0; virtual AdapterInfoSeq getAdapterInfo() const = 0; @@ -68,8 +71,7 @@ public: ServerAdapterEntry(AdapterCache&, const std::string&, const std::string&, const std::string&, int, const ServerEntryPtr&); - virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&); - virtual void increaseRoundRobinCount(int); + virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&, const std::set<std::string>&); virtual float getLeastLoadedNodeLoad(LoadSample) const; virtual AdapterInfoSeq getAdapterInfo() const; virtual const std::string& getReplicaGroupId() const { return _replicaGroupId; } @@ -91,8 +93,7 @@ public: ReplicaGroupEntry(AdapterCache&, const std::string&, const std::string&, const LoadBalancingPolicyPtr&); - virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&); - virtual void increaseRoundRobinCount(int); + virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&, const std::set<std::string>&); virtual float getLeastLoadedNodeLoad(LoadSample) const; virtual AdapterInfoSeq getAdapterInfo() const; diff --git a/cpp/src/IceGrid/IceGridNode.cpp b/cpp/src/IceGrid/IceGridNode.cpp index 1b55a7e2066..4dc9a62e978 100644 --- a/cpp/src/IceGrid/IceGridNode.cpp +++ b/cpp/src/IceGrid/IceGridNode.cpp @@ -718,12 +718,6 @@ NodeService::stop() _node->getPlatformInfo().stop(); // - // Break cylic reference counts. - // - _node->destroy(); - _node = 0; - - // // We can now safely shutdown the communicator. // try @@ -739,6 +733,12 @@ NodeService::stop() } // + // Break cylic reference counts. + // + _node->shutdown(); + _node = 0; + + // // And shutdown the collocated registry. // if(_registry) diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp index 4e354c5d9bf..d038ccae641 100644 --- a/cpp/src/IceGrid/LocatorI.cpp +++ b/cpp/src/IceGrid/LocatorI.cpp @@ -150,80 +150,144 @@ private: const Ice::ObjectPrx _obj; }; -} - -LocatorI::Request::Request(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, - const LocatorIPtr& locator, - const string& id, - bool replicaGroup, - bool roundRobin, - const LocatorAdapterInfoSeq& adapters, - int count) : - _amdCB(amdCB), - _locator(locator), - _id(id), - _replicaGroup(replicaGroup), - _roundRobin(roundRobin), - _adapters(adapters), - _traceLevels(locator->getTraceLevels()), - _count(count), - _lastAdapter(_adapters.begin()) +class AdapterRequest : public LocatorI::Request { - assert((_count == 0 && _adapters.empty()) || _count > 0); -} +public: -void -LocatorI::Request::execute() + AdapterRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, + const LocatorIPtr& locator, + const LocatorAdapterInfo& adapter) : + _amdCB(amdCB), + _locator(locator), + _adapter(adapter), + _traceLevels(locator->getTraceLevels()) + { + assert(_adapter.proxy); + } + + virtual void + execute() + { + _locator->getDirectProxy(_adapter, this); + } + + virtual void + activating(const string&) + { + // Nothing to do. + } + + virtual void + response(const std::string&, const Ice::ObjectPrx& proxy) + { + assert(proxy); + _amdCB->ice_response(proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"))); + } + + virtual void + exception(const std::string&, const Ice::Exception& ex) + { + if(_traceLevels->locator > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); + out << "couldn't resolve adapter`" << _adapter.id << "' endpoints:\n" << toString(ex); + } + _amdCB->ice_response(0); + } + +private: + + const Ice::AMD_Locator_findAdapterByIdPtr _amdCB; + const LocatorIPtr _locator; + const LocatorAdapterInfo _adapter; + const TraceLevelsPtr _traceLevels; +}; + +class ReplicaGroupRequest : public LocatorI::Request, public IceUtil::Mutex { - // - // If there's no adapters to request, we're done, send the - // response. - // - if(_adapters.empty()) +public: + + ReplicaGroupRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, + const LocatorIPtr& locator, + const string& id, + const LocatorAdapterInfoSeq& adapters, + int count, + Ice::ObjectPrx firstProxy) : + _amdCB(amdCB), + _locator(locator), + _id(id), + _adapters(adapters), + _traceLevels(locator->getTraceLevels()), + _count(count), + _lastAdapter(_adapters.begin()) { - sendResponse(); - return; + assert(_adapters.empty() || _count > 0); + + if(_adapters.empty()) + { + _count = 0; + } + + // + // If the first adapter proxy is provided, store it in _proxies. + // + if(firstProxy) + { + assert(!_adapters.empty()); + _proxies[_adapters[0].id] = firstProxy; + ++_lastAdapter; + } } - // - // Otherwise, request as many adapters as required. - // - LocatorAdapterInfoSeq adapters; + virtual void + execute() { - Lock sync(*this); - assert(_count > 0 && _lastAdapter != _adapters.end()); - for(unsigned int i = static_cast<unsigned int>(_proxies.size()); i < _count; ++i) + // + // Otherwise, request as many adapters as required. + // + LocatorAdapterInfoSeq adapters; { - if(_lastAdapter == _adapters.end()) + Lock sync(*this); + for(unsigned int i = static_cast<unsigned int>(_proxies.size()); i < _count; ++i) { - _count = i; - break; + if(_lastAdapter == _adapters.end()) + { + _count = i; + break; + } + assert(_lastAdapter->proxy); + adapters.push_back(*_lastAdapter); + ++_lastAdapter; + } + + // + // If there's no adapters to request, we're done, send the + // response. + // + if(_proxies.size() == _count) + { + sendResponse(); + return; } - assert(_lastAdapter->proxy); - adapters.push_back(*_lastAdapter); - ++_lastAdapter; } - } + - for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p) - { - if(_locator->getDirectProxy(*p, this)) + for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p) { - activating(); + if(_locator->getDirectProxy(*p, this)) + { + activating(p->id); + } } } -} -void -LocatorI::Request::activating() -{ - // - // An adapter is being activated. If this is a request for a replica group, don't - // wait for the activation to complete. Instead, we query the next adapter which - // might be already active. - // - if(_replicaGroup) + virtual void + activating(const string&) { + // + // An adapter is being activated. Don't wait for the activation to complete. Instead, + // we query the next adapter which might be already active. + // LocatorAdapterInfo adapter; do { @@ -237,150 +301,332 @@ LocatorI::Request::activating() } while(_locator->getDirectProxy(adapter, this)); } -} + + virtual void + exception(const string& id, const Ice::Exception& ex) + { + LocatorAdapterInfo adapter; + { + Lock sync(*this); + if(_proxies.size() == _count) // Nothing to do if we already sent the response. + { + return; + } + + if(!_exception.get()) + { + _exception.reset(ex.ice_clone()); + } + + if(_lastAdapter == _adapters.end()) + { + --_count; // Expect one less adapter proxy if there's no more adapters to query. + + // + // If we received all the required proxies, it's time to send the + // answer back to the client. + // + if(_count == _proxies.size()) + { + sendResponse(); + } + } + else + { + adapter = *_lastAdapter; + ++_lastAdapter; + } + } -void -LocatorI::Request::exception(const string& id, const Ice::Exception& ex) -{ - LocatorAdapterInfo adapter; + if(adapter.proxy) + { + if(_locator->getDirectProxy(adapter, this)) + { + activating(adapter.id); + } + } + } + + virtual void + response(const string& id, const Ice::ObjectPrx& proxy) { Lock sync(*this); + assert(proxy); if(_proxies.size() == _count) // Nothing to do if we already sent the response. { return; } - if(!_exception.get()) + + _proxies[id] = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")); + + // + // If we received all the required proxies, it's time to send the + // answer back to the client. + // + if(_proxies.size() == _count) { - _exception.reset(ex.ice_clone()); + sendResponse(); } - - if(_lastAdapter == _adapters.end()) + } + +private: + + void + sendResponse() + { + if(_proxies.size() == 1) + { + _amdCB->ice_response(_proxies.begin()->second); + } + else if(_proxies.empty()) { - --_count; // Expect one less adapter proxy if there's no more adapters to query. - // - // If we received all the required proxies, it's time to send the - // answer back to the client. + // If there's no proxies, it's either because we couldn't contact the adapters or + // because the replica group has no members. // - if(_count == _proxies.size()) + assert(_exception.get() || _adapters.empty()); + if(_traceLevels->locator > 0) { - sendResponse(); + Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); + out << "couldn't resolve replica group `" << _id << "' endpoints:\n"; + out << (_exception.get() ? toString(*_exception) : string("replica group is empty")); } + _amdCB->ice_response(0); } - else + else if(_proxies.size() > 1) { - adapter = *_lastAdapter; - ++_lastAdapter; + Ice::EndpointSeq endpoints; + endpoints.reserve(_proxies.size()); + for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + { + map<string, Ice::ObjectPrx>::const_iterator q = _proxies.find(p->id); + if(q != _proxies.end()) + { + Ice::EndpointSeq edpts = q->second->ice_getEndpoints(); + endpoints.insert(endpoints.end(), edpts.begin(), edpts.end()); + } + } + + Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default"); + _amdCB->ice_response(proxy->ice_endpoints(endpoints)); } } - if(adapter.proxy) + const Ice::AMD_Locator_findAdapterByIdPtr _amdCB; + const LocatorIPtr _locator; + const std::string _id; + LocatorAdapterInfoSeq _adapters; + const TraceLevelsPtr _traceLevels; + unsigned int _count; + LocatorAdapterInfoSeq::const_iterator _lastAdapter; + std::map<std::string, Ice::ObjectPrx> _proxies; + std::auto_ptr<Ice::Exception> _exception; +}; + +class RoundRobinRequest : public LocatorI::Request, public IceUtil::Mutex +{ +public: + + RoundRobinRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, + const LocatorIPtr& locator, + const string& id, + const LocatorAdapterInfoSeq& adapters, + int count) : + _amdCB(amdCB), + _locator(locator), + _id(id), + _adapters(adapters), + _traceLevels(locator->getTraceLevels()), + _count(count), + _waitForActivation(false) { + assert(_adapters.empty() || _count > 0); + } + + virtual void + execute() + { + if(_adapters.empty()) + { + if(_traceLevels->locator > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); + out << "couldn't resolve replica group `" << _id << "' endpoints:\nreplica group is empty"; + } + _amdCB->ice_response(0); + return; + } + + LocatorAdapterInfo adapter = _adapters[0]; + assert(adapter.proxy); if(_locator->getDirectProxy(adapter, this)) { - activating(); + activating(adapter.id); } } -} -void -LocatorI::Request::response(const string& id, const Ice::ObjectPrx& proxy) -{ - if(!proxy) + virtual void + activating(const string& id) { - exception(id, AdapterNotActiveException()); - return; + LocatorAdapterInfo adapter; + adapter.id = id; + do + { + Lock sync(*this); + if(_adapters.empty() || _waitForActivation) + { + return; + } + _activatingOrFailed.insert(adapter.id); + adapter = nextAdapter(); + } + while(adapter.proxy && _locator->getDirectProxy(adapter, this)); } - Lock sync(*this); - if(_proxies.size() == _count) // Nothing to do if we already sent the response. + virtual void + response(const std::string& id, const Ice::ObjectPrx& proxy) { - return; - } - - _proxies[id] = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")); + Lock sync(*this); + assert(proxy); + if(_adapters.empty() || id != _adapters[0].id) + { + return; + } - // - // If we received all the required proxies, it's time to send the - // answer back to the client. - // - if(_proxies.size() == _count) - { - sendResponse(); + if(_count > 1) + { + Ice::ObjectPrx p = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")); + LocatorI::RequestPtr request = new ReplicaGroupRequest(_amdCB, _locator, _id, _adapters, _count, p); + request->execute(); + } + else + { + _amdCB->ice_response(proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"))); + } + _adapters.clear(); } -} -void -LocatorI::Request::sendResponse() -{ - int roundRobinCount = 0; - if(_proxies.size() == 1) + virtual void + exception(const std::string& id, const Ice::Exception& ex) { - if(_roundRobin) + LocatorAdapterInfo adapter; { - for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + Lock sync(*this); + _failed.insert(id); + _activatingOrFailed.insert(id); + + if(!_exception.get()) { - if(_proxies.find(p->id) != _proxies.end()) - { - break; - } - // - // We count the number of object adapters which are inactive until we find - // one active. This count will be used to update the round robin counter. - // - ++roundRobinCount; + _exception.reset(ex.ice_clone()); } + + if(_adapters.empty() || id != _adapters[0].id) + { + return; + } + + adapter = nextAdapter(); } - _amdCB->ice_response(_proxies.begin()->second); - } - else if(_proxies.empty()) - { - // - // If there's no proxies, it's either because we couldn't - // contact the adapters or because the replica group has - // no members. - // - assert(_exception.get() || (_replicaGroup && _adapters.empty())); - if(_traceLevels->locator > 0) + if(adapter.proxy && _locator->getDirectProxy(adapter, this)) { - Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); - out << "couldn't resolve " << (_replicaGroup ? "replica group `" : "adapter `") << _id << "' endpoints:\n"; - out << (_exception.get() ? toString(*_exception) : string("replica group is empty")); + activating(adapter.id); } - _amdCB->ice_response(0); } - else if(_proxies.size() > 1) + +private: + + LocatorAdapterInfo + nextAdapter() { - Ice::EndpointSeq endpoints; - endpoints.reserve(_proxies.size()); - for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + bool replicaGroup; + bool roundRobin; + + _adapters.clear(); + + try { - map<string, Ice::ObjectPrx>::const_iterator q = _proxies.find(p->id); - if(q != _proxies.end()) + if(!_waitForActivation) { - Ice::EndpointSeq edpts = q->second->ice_getEndpoints(); - endpoints.insert(endpoints.end(), edpts.begin(), edpts.end()); + _locator->getAdapterInfo(_id, _adapters, _count, replicaGroup, roundRobin, _activatingOrFailed); } - else if(_roundRobin && endpoints.empty()) + + if(_waitForActivation || (_adapters.empty() && _activatingOrFailed.size() > _failed.size())) { // - // We count the number of object adapters which are inactive until we find - // one active. This count will be used to update the round robin counter. + // If there are no more adapters to try and some servers were being activated, we + // try again but this time we wait for the server activation. // - ++roundRobinCount; + _locator->getAdapterInfo(_id, _adapters, _count, replicaGroup, roundRobin, _failed); + _waitForActivation = true; } - } - Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default"); - _amdCB->ice_response(proxy->ice_endpoints(endpoints)); + if(!roundRobin) + { + LocatorI::RequestPtr request; + if(replicaGroup) + { + request = new ReplicaGroupRequest(_amdCB, _locator, _id, _adapters, _count, 0); + } + else + { + assert(!_adapters.empty()); + request = new AdapterRequest(_amdCB, _locator, _adapters[0]); + } + request->execute(); + _adapters.clear(); + return LocatorAdapterInfo(); + } + else if(!_adapters.empty()) + { + return _adapters[0]; + } + else + { + assert(_adapters.empty()); + if(_traceLevels->locator > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); + out << "couldn't resolve replica group `" << _id << "' endpoints:\n"; + out << (_exception.get() ? toString(*_exception) : string("replica group is empty")); + } + _amdCB->ice_response(0); + return LocatorAdapterInfo(); + } + } + catch(const AdapterNotExistException&) + { + assert(_adapters.empty()); + _amdCB->ice_exception(Ice::AdapterNotFoundException()); + return LocatorAdapterInfo(); + } + catch(const Ice::Exception& ex) + { + assert(_adapters.empty()); + if(_traceLevels->locator > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); + out << "couldn't resolve replica group `" << _id << "' endpoints:\n" << toString(ex); + } + _amdCB->ice_response(0); + return LocatorAdapterInfo(); + } } - if(_roundRobin) - { - _locator->removePendingRoundRobinRequest(_id, roundRobinCount); - } -} + const Ice::AMD_Locator_findAdapterByIdPtr _amdCB; + const LocatorIPtr _locator; + const std::string _id; + LocatorAdapterInfoSeq _adapters; + const TraceLevelsPtr _traceLevels; + int _count; + bool _waitForActivation; + set<string> _failed; + set<string> _activatingOrFailed; + std::auto_ptr<Ice::Exception> _exception; +}; + +}; + LocatorI::LocatorI(const Ice::CommunicatorPtr& communicator, const DatabasePtr& database, @@ -444,22 +690,6 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, const Ice::Current&) const { LocatorIPtr self = const_cast<LocatorI*>(this); - bool pending = false; - if(self->addPendingRoundRobinRequest(id, cb, true, pending)) // Add only if there's already round robin requests - // pending. - { - // - // Another request is currently resolving the adapter endpoints. We'll - // answer this request once it's done. - // - return; - } - - // - // If no other request is resolving the adapter endpoints, resolve - // the endpoints now. - // - bool replicaGroup = false; try { @@ -472,36 +702,24 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, LocatorAdapterInfoSeq adapters; bool roundRobin; _database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup, roundRobin); - - // - // Round robin replica group requests are serialized. This is - // required to make sure the round robin counter is accurate - // even if some adapters are unreachable (bug 2576). For - // adapters, and replica groups, there's no need to serialize - // the requests. - // + RequestPtr request; if(roundRobin) { - if(self->addPendingRoundRobinRequest(id, cb, false, pending)) - { - return; - } + request = new RoundRobinRequest(cb, self, id, adapters, count); } - else if(pending) + else if(replicaGroup) { - self->removePendingRoundRobinRequest(id, 0); + request = new ReplicaGroupRequest(cb, self, id, adapters, count, 0); + } + else + { + assert(adapters.size() == 1); + request = new AdapterRequest(cb, self, adapters[0]); } - - RequestPtr request = new Request(cb, self, id, replicaGroup, roundRobin, adapters, count); request->execute(); } catch(const AdapterNotExistException&) { - if(pending) - { - self->removePendingRoundRobinRequest(id, 0); - } - try { cb->ice_response(_database->getAdapterDirectProxy(id)); @@ -514,11 +732,6 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, } catch(const Ice::Exception& ex) { - if(pending) - { - self->removePendingRoundRobinRequest(id, 0); - } - const TraceLevelsPtr traceLevels = _database->getTraceLevels(); if(traceLevels->locator > 0) { @@ -568,78 +781,6 @@ LocatorI::getTraceLevels() const } bool -LocatorI::addPendingRoundRobinRequest(const string& adapterId, - const Ice::AMD_Locator_findAdapterByIdPtr& cb, - bool addIfExists, - bool& pending) -{ - Lock sync(*this); - pending = false; - map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId); - if(p == _resolves.end()) - { - if(addIfExists) - { - return false; - } - p = _resolves.insert(make_pair(adapterId, deque<Ice::AMD_Locator_findAdapterByIdPtr>())).first; - } - else if(p->second.front().get() == cb.get()) - { - pending = true; - return false; - } - - p->second.push_back(cb); - return p->second.size() > 1; -} - -void -LocatorI::removePendingRoundRobinRequest(const string& adapterId, int roundRobinCount) -{ - Ice::AMD_Locator_findAdapterByIdPtr cb; - { - Lock sync(*this); - - // - // Bump the round robin counter. We bump the round robin counter by - // the number of inactive adapters. This ensures that if the first - // adapters are inactive, if the first adapter to be inactive is the - // Nth adapter, the next adapter to be returned will be the Nth + 1. - // - if(roundRobinCount > 0) - { - try - { - _database->getAdapter(adapterId)->increaseRoundRobinCount(roundRobinCount); - } - catch(const Ice::Exception&) - { - // Ignore. - } - } - - map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId); - assert(p != _resolves.end()); - - p->second.pop_front(); - if(p->second.empty()) - { - _resolves.erase(p); - } - else - { - cb = p->second.front(); - } - } - - if(cb) - { - findAdapterById_async(cb, adapterId); - } -} - -bool LocatorI::getDirectProxy(const LocatorAdapterInfo& adapter, const RequestPtr& request) { { @@ -673,9 +814,19 @@ LocatorI::getDirectProxyResponse(const LocatorAdapterInfo& adapter, const Ice::O _activating.erase(adapter.id); } - for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + if(proxy) { - (*q)->response(adapter.id, proxy); + for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + { + (*q)->response(adapter.id, proxy); + } + } + else + { + for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + { + (*q)->exception(adapter.id, AdapterNotActiveException()); + } } } @@ -717,7 +868,7 @@ LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice:: { for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) { - (*q)->activating(); + (*q)->activating(adapter.id); } AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, adapter); @@ -732,3 +883,14 @@ LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice:: } } } + +void +LocatorI::getAdapterInfo(const string& id, + LocatorAdapterInfoSeq& adapters, + int& count, + bool& replicaGroup, + bool& roundRobin, + const set<string>& excludes) +{ + _database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup, roundRobin, excludes); +} diff --git a/cpp/src/IceGrid/LocatorI.h b/cpp/src/IceGrid/LocatorI.h index 51befd24e85..124b62b7dc1 100644 --- a/cpp/src/IceGrid/LocatorI.h +++ b/cpp/src/IceGrid/LocatorI.h @@ -34,40 +34,14 @@ class LocatorI : public Locator, public IceUtil::Mutex { public: - class Request : public IceUtil::Mutex, public IceUtil::Shared + class Request : public IceUtil::Shared { public: - Request(const Ice::AMD_Locator_findAdapterByIdPtr&, const LocatorIPtr&, const std::string&, bool, bool, - const LocatorAdapterInfoSeq&, int); - - void execute(); - void response(const std::string&, const Ice::ObjectPrx&); - void activating(); - void exception(const std::string&, const Ice::Exception&); - - virtual bool - operator<(const Request& r) const - { - return this < &r; - } - - private: - - void requestAdapter(const LocatorAdapterInfo&); - void sendResponse(); - - const Ice::AMD_Locator_findAdapterByIdPtr _amdCB; - const LocatorIPtr _locator; - const std::string _id; - const bool _replicaGroup; - const bool _roundRobin; - LocatorAdapterInfoSeq _adapters; - const TraceLevelsPtr _traceLevels; - unsigned int _count; - LocatorAdapterInfoSeq::const_iterator _lastAdapter; - std::map<std::string, Ice::ObjectPrx> _proxies; - std::auto_ptr<Ice::Exception> _exception; + virtual void execute() = 0; + virtual void activating(const std::string&) = 0; + virtual void response(const std::string&, const Ice::ObjectPrx&) = 0; + virtual void exception(const std::string&, const Ice::Exception&) = 0; }; typedef IceUtil::Handle<Request> RequestPtr; @@ -87,13 +61,12 @@ public: const Ice::CommunicatorPtr& getCommunicator() const; const TraceLevelsPtr& getTraceLevels() const; - bool addPendingRoundRobinRequest(const std::string&, const Ice::AMD_Locator_findAdapterByIdPtr&, bool, bool&); - void removePendingRoundRobinRequest(const std::string&, int); - bool getDirectProxy(const LocatorAdapterInfo&, const RequestPtr&); void getDirectProxyResponse(const LocatorAdapterInfo&, const Ice::ObjectPrx&); void getDirectProxyException(const LocatorAdapterInfo&, const Ice::Exception&); + void getAdapterInfo(const std::string&, LocatorAdapterInfoSeq&, int&, bool&, bool&, const std::set<std::string>&); + protected: const Ice::CommunicatorPtr _communicator; @@ -106,8 +79,6 @@ protected: typedef std::map<std::string, PendingRequests> PendingRequestsMap; PendingRequestsMap _pendingRequests; std::set<std::string> _activating; - - std::map<std::string, std::deque<Ice::AMD_Locator_findAdapterByIdPtr> > _resolves; }; } diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index 6381b39760e..77ed21db79e 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -831,9 +831,17 @@ NodeI::read(const string& filename, Ice::Long pos, int size, Ice::Long& newPos, } void -NodeI::destroy() +NodeI::shutdown() { IceUtil::Mutex::Lock sync(_serversLock); + for(map<string, set<ServerIPtr> >::const_iterator p = _serversByApplication.begin(); + p != _serversByApplication.end(); ++p) + { + for(set<ServerIPtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q) + { + (*q)->shutdown(); + } + } _serversByApplication.clear(); } diff --git a/cpp/src/IceGrid/NodeI.h b/cpp/src/IceGrid/NodeI.h index 087d661a49d..46c6470bb83 100644 --- a/cpp/src/IceGrid/NodeI.h +++ b/cpp/src/IceGrid/NodeI.h @@ -92,7 +92,7 @@ public: virtual Ice::Long getOffsetFromEnd(const std::string&, int, const Ice::Current&) const; virtual bool read(const std::string&, Ice::Long, int, Ice::Long&, Ice::StringSeq&, const Ice::Current&) const; - void destroy(); + void shutdown(); IceUtil::TimerPtr getTimer() const; Ice::CommunicatorPtr getCommunicator() const; diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp index 79b5cd80b67..b01219c4e46 100644 --- a/cpp/src/IceGrid/ServerI.cpp +++ b/cpp/src/IceGrid/ServerI.cpp @@ -1694,6 +1694,14 @@ ServerI::terminated(const string& msg, int status) } void +ServerI::shutdown() +{ + Lock sync(*this); + assert(_state == ServerI::Inactive); + _timerTask = 0; +} + +void ServerI::update() { ServerCommandPtr command; diff --git a/cpp/src/IceGrid/ServerI.h b/cpp/src/IceGrid/ServerI.h index 375a5c1fc80..9d04b017b7f 100644 --- a/cpp/src/IceGrid/ServerI.h +++ b/cpp/src/IceGrid/ServerI.h @@ -109,6 +109,7 @@ public: void update(); void destroy(); void terminated(const std::string&, int); + void shutdown(); // // A proxy to the Process facet of the real Admin object; called by the AdminFacade servant implementation diff --git a/cpp/test/IceGrid/replicaGroup/AllTests.cpp b/cpp/test/IceGrid/replicaGroup/AllTests.cpp index 3cb2e93e487..2484c62dc1f 100644 --- a/cpp/test/IceGrid/replicaGroup/AllTests.cpp +++ b/cpp/test/IceGrid/replicaGroup/AllTests.cpp @@ -230,17 +230,73 @@ allTests(const Ice::CommunicatorPtr& comm) params["id"] = "Server3"; instantiateServer(admin, "Server", "localnode", params); TestIntfPrx obj = TestIntfPrx::uncheckedCast(comm->stringToProxy("RoundRobin")); + obj = TestIntfPrx::uncheckedCast(obj->ice_locatorCacheTimeout(0)); + obj = TestIntfPrx::uncheckedCast(obj->ice_connectionCached(false)); try { test(obj->getReplicaIdAndShutdown() == "Server1.ReplicatedAdapter"); test(obj->getReplicaIdAndShutdown() == "Server2.ReplicatedAdapter"); test(obj->getReplicaIdAndShutdown() == "Server3.ReplicatedAdapter"); + + admin->enableServer("Server1", false); + admin->enableServer("Server2", false); + admin->enableServer("Server3", false); + + try + { + obj->getReplicaId(); + test(false); + } + catch(const Ice::NoEndpointException&) + { + } + + admin->enableServer("Server1", true); + admin->enableServer("Server2", true); + admin->enableServer("Server3", true); + + set<string> adapterIds; + string previousId; + while(adapterIds.size() != 3) + { + string id = obj->getReplicaId(); + adapterIds.insert(id); + + if(adapterIds.size() == 1) + { + previousId = id; + } + else + { + test(previousId != id); + previousId = id; + } + } + + int i; + for(i = 0; i < 3; i++) + { + if(obj->getReplicaId() == "Server3.ReplicatedAdapter") + { + break; + } + } + test(i != 3); + + test(obj->getReplicaId() == "Server1.ReplicatedAdapter"); + test(obj->getReplicaId() == "Server2.ReplicatedAdapter"); + test(obj->getReplicaId() == "Server3.ReplicatedAdapter"); + + test(obj->getReplicaIdAndShutdown() == "Server1.ReplicatedAdapter"); + test(obj->getReplicaIdAndShutdown() == "Server2.ReplicatedAdapter"); + test(obj->getReplicaIdAndShutdown() == "Server3.ReplicatedAdapter"); } catch(const Ice::LocalException& ex) { cerr << ex << endl; test(false); } + removeServer(admin, "Server1"); removeServer(admin, "Server2"); removeServer(admin, "Server3"); @@ -344,6 +400,42 @@ allTests(const Ice::CommunicatorPtr& comm) test(false); } } + + admin->stopServer("Server1"); + admin->stopServer("Server2"); + admin->stopServer("Server3"); + + admin->enableServer("Server1", false); + admin->enableServer("Server2", false); + admin->enableServer("Server3", false); + + try + { + obj->getReplicaId(); + test(false); + } + catch(const Ice::NoEndpointException&) + { + } + + admin->enableServer("Server1", true); + admin->enableServer("Server2", true); + admin->enableServer("Server3", true); + + replicaIds = serverReplicaIds; + while(!replicaIds.empty()) + { + try + { + replicaIds.erase(obj->getReplicaIdAndShutdown()); + } + catch(const Ice::LocalException& ex) + { + cerr << ex << endl; + test(false); + } + } + removeServer(admin, "Server1"); removeServer(admin, "Server2"); removeServer(admin, "Server3"); @@ -489,6 +581,18 @@ allTests(const Ice::CommunicatorPtr& comm) test(expected.find(replicaId) != expected.end()); replicaIds.erase(replicaId); } + + admin->stopServer("Server1"); + admin->stopServer("Server2"); + admin->stopServer("Server3"); + + obj->ice_locatorCacheTimeout(0)->ice_ping(); + int nRetry = 500; + while(replicaIds.size() != 2 && --nRetry > 0) + { + replicaIds.insert(obj->getReplicaId()); + } + test(replicaIds.size() == 2); } catch(const Ice::LocalException& ex) { |