diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-01-07 19:37:17 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-01-07 19:37:17 +0100 |
commit | b5042ce24aaa2dcff2092046b322ff61c3d9ef8c (patch) | |
tree | dcdca6930377ad9098eeb9996ce1f7663c79e5db /cpp/src/IceGrid/LocatorI.cpp | |
parent | Other fix for 3601 - plugins can be destroyed twice (diff) | |
download | ice-b5042ce24aaa2dcff2092046b322ff61c3d9ef8c.tar.bz2 ice-b5042ce24aaa2dcff2092046b322ff61c3d9ef8c.tar.xz ice-b5042ce24aaa2dcff2092046b322ff61c3d9ef8c.zip |
Squashed commit of the following:
commit 8019e6de4480f361a83d8944afec60793454c322
Author: Benoit Foucher <benoit@zeroc.com>
Date: Wed Jan 7 17:16:40 2009 +0100
Fixed bug 3516 - Fixed scaling issue when using round-robin replica groups
commit 6c36afb32dda8b37b7d5330ed51a439bc73b17db
Author: Benoit Foucher <benoit@zeroc.com>
Date: Wed Jan 7 17:16:36 2009 +0100
Fixed bug 3230 - IceGrid node leak
Diffstat (limited to 'cpp/src/IceGrid/LocatorI.cpp')
-rw-r--r-- | cpp/src/IceGrid/LocatorI.cpp | 694 |
1 files changed, 428 insertions, 266 deletions
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp index 4e354c5d9bf..d038ccae641 100644 --- a/cpp/src/IceGrid/LocatorI.cpp +++ b/cpp/src/IceGrid/LocatorI.cpp @@ -150,80 +150,144 @@ private: const Ice::ObjectPrx _obj; }; -} - -LocatorI::Request::Request(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, - const LocatorIPtr& locator, - const string& id, - bool replicaGroup, - bool roundRobin, - const LocatorAdapterInfoSeq& adapters, - int count) : - _amdCB(amdCB), - _locator(locator), - _id(id), - _replicaGroup(replicaGroup), - _roundRobin(roundRobin), - _adapters(adapters), - _traceLevels(locator->getTraceLevels()), - _count(count), - _lastAdapter(_adapters.begin()) +class AdapterRequest : public LocatorI::Request { - assert((_count == 0 && _adapters.empty()) || _count > 0); -} +public: -void -LocatorI::Request::execute() + AdapterRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, + const LocatorIPtr& locator, + const LocatorAdapterInfo& adapter) : + _amdCB(amdCB), + _locator(locator), + _adapter(adapter), + _traceLevels(locator->getTraceLevels()) + { + assert(_adapter.proxy); + } + + virtual void + execute() + { + _locator->getDirectProxy(_adapter, this); + } + + virtual void + activating(const string&) + { + // Nothing to do. + } + + virtual void + response(const std::string&, const Ice::ObjectPrx& proxy) + { + assert(proxy); + _amdCB->ice_response(proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"))); + } + + virtual void + exception(const std::string&, const Ice::Exception& ex) + { + if(_traceLevels->locator > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); + out << "couldn't resolve adapter`" << _adapter.id << "' endpoints:\n" << toString(ex); + } + _amdCB->ice_response(0); + } + +private: + + const Ice::AMD_Locator_findAdapterByIdPtr _amdCB; + const LocatorIPtr _locator; + const LocatorAdapterInfo _adapter; + const TraceLevelsPtr _traceLevels; +}; + +class ReplicaGroupRequest : public LocatorI::Request, public IceUtil::Mutex { - // - // If there's no adapters to request, we're done, send the - // response. - // - if(_adapters.empty()) +public: + + ReplicaGroupRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, + const LocatorIPtr& locator, + const string& id, + const LocatorAdapterInfoSeq& adapters, + int count, + Ice::ObjectPrx firstProxy) : + _amdCB(amdCB), + _locator(locator), + _id(id), + _adapters(adapters), + _traceLevels(locator->getTraceLevels()), + _count(count), + _lastAdapter(_adapters.begin()) { - sendResponse(); - return; + assert(_adapters.empty() || _count > 0); + + if(_adapters.empty()) + { + _count = 0; + } + + // + // If the first adapter proxy is provided, store it in _proxies. + // + if(firstProxy) + { + assert(!_adapters.empty()); + _proxies[_adapters[0].id] = firstProxy; + ++_lastAdapter; + } } - // - // Otherwise, request as many adapters as required. - // - LocatorAdapterInfoSeq adapters; + virtual void + execute() { - Lock sync(*this); - assert(_count > 0 && _lastAdapter != _adapters.end()); - for(unsigned int i = static_cast<unsigned int>(_proxies.size()); i < _count; ++i) + // + // Otherwise, request as many adapters as required. + // + LocatorAdapterInfoSeq adapters; { - if(_lastAdapter == _adapters.end()) + Lock sync(*this); + for(unsigned int i = static_cast<unsigned int>(_proxies.size()); i < _count; ++i) { - _count = i; - break; + if(_lastAdapter == _adapters.end()) + { + _count = i; + break; + } + assert(_lastAdapter->proxy); + adapters.push_back(*_lastAdapter); + ++_lastAdapter; + } + + // + // If there's no adapters to request, we're done, send the + // response. + // + if(_proxies.size() == _count) + { + sendResponse(); + return; } - assert(_lastAdapter->proxy); - adapters.push_back(*_lastAdapter); - ++_lastAdapter; } - } + - for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p) - { - if(_locator->getDirectProxy(*p, this)) + for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p) { - activating(); + if(_locator->getDirectProxy(*p, this)) + { + activating(p->id); + } } } -} -void -LocatorI::Request::activating() -{ - // - // An adapter is being activated. If this is a request for a replica group, don't - // wait for the activation to complete. Instead, we query the next adapter which - // might be already active. - // - if(_replicaGroup) + virtual void + activating(const string&) { + // + // An adapter is being activated. Don't wait for the activation to complete. Instead, + // we query the next adapter which might be already active. + // LocatorAdapterInfo adapter; do { @@ -237,150 +301,332 @@ LocatorI::Request::activating() } while(_locator->getDirectProxy(adapter, this)); } -} + + virtual void + exception(const string& id, const Ice::Exception& ex) + { + LocatorAdapterInfo adapter; + { + Lock sync(*this); + if(_proxies.size() == _count) // Nothing to do if we already sent the response. + { + return; + } + + if(!_exception.get()) + { + _exception.reset(ex.ice_clone()); + } + + if(_lastAdapter == _adapters.end()) + { + --_count; // Expect one less adapter proxy if there's no more adapters to query. + + // + // If we received all the required proxies, it's time to send the + // answer back to the client. + // + if(_count == _proxies.size()) + { + sendResponse(); + } + } + else + { + adapter = *_lastAdapter; + ++_lastAdapter; + } + } -void -LocatorI::Request::exception(const string& id, const Ice::Exception& ex) -{ - LocatorAdapterInfo adapter; + if(adapter.proxy) + { + if(_locator->getDirectProxy(adapter, this)) + { + activating(adapter.id); + } + } + } + + virtual void + response(const string& id, const Ice::ObjectPrx& proxy) { Lock sync(*this); + assert(proxy); if(_proxies.size() == _count) // Nothing to do if we already sent the response. { return; } - if(!_exception.get()) + + _proxies[id] = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")); + + // + // If we received all the required proxies, it's time to send the + // answer back to the client. + // + if(_proxies.size() == _count) { - _exception.reset(ex.ice_clone()); + sendResponse(); } - - if(_lastAdapter == _adapters.end()) + } + +private: + + void + sendResponse() + { + if(_proxies.size() == 1) + { + _amdCB->ice_response(_proxies.begin()->second); + } + else if(_proxies.empty()) { - --_count; // Expect one less adapter proxy if there's no more adapters to query. - // - // If we received all the required proxies, it's time to send the - // answer back to the client. + // If there's no proxies, it's either because we couldn't contact the adapters or + // because the replica group has no members. // - if(_count == _proxies.size()) + assert(_exception.get() || _adapters.empty()); + if(_traceLevels->locator > 0) { - sendResponse(); + Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); + out << "couldn't resolve replica group `" << _id << "' endpoints:\n"; + out << (_exception.get() ? toString(*_exception) : string("replica group is empty")); } + _amdCB->ice_response(0); } - else + else if(_proxies.size() > 1) { - adapter = *_lastAdapter; - ++_lastAdapter; + Ice::EndpointSeq endpoints; + endpoints.reserve(_proxies.size()); + for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + { + map<string, Ice::ObjectPrx>::const_iterator q = _proxies.find(p->id); + if(q != _proxies.end()) + { + Ice::EndpointSeq edpts = q->second->ice_getEndpoints(); + endpoints.insert(endpoints.end(), edpts.begin(), edpts.end()); + } + } + + Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default"); + _amdCB->ice_response(proxy->ice_endpoints(endpoints)); } } - if(adapter.proxy) + const Ice::AMD_Locator_findAdapterByIdPtr _amdCB; + const LocatorIPtr _locator; + const std::string _id; + LocatorAdapterInfoSeq _adapters; + const TraceLevelsPtr _traceLevels; + unsigned int _count; + LocatorAdapterInfoSeq::const_iterator _lastAdapter; + std::map<std::string, Ice::ObjectPrx> _proxies; + std::auto_ptr<Ice::Exception> _exception; +}; + +class RoundRobinRequest : public LocatorI::Request, public IceUtil::Mutex +{ +public: + + RoundRobinRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, + const LocatorIPtr& locator, + const string& id, + const LocatorAdapterInfoSeq& adapters, + int count) : + _amdCB(amdCB), + _locator(locator), + _id(id), + _adapters(adapters), + _traceLevels(locator->getTraceLevels()), + _count(count), + _waitForActivation(false) { + assert(_adapters.empty() || _count > 0); + } + + virtual void + execute() + { + if(_adapters.empty()) + { + if(_traceLevels->locator > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); + out << "couldn't resolve replica group `" << _id << "' endpoints:\nreplica group is empty"; + } + _amdCB->ice_response(0); + return; + } + + LocatorAdapterInfo adapter = _adapters[0]; + assert(adapter.proxy); if(_locator->getDirectProxy(adapter, this)) { - activating(); + activating(adapter.id); } } -} -void -LocatorI::Request::response(const string& id, const Ice::ObjectPrx& proxy) -{ - if(!proxy) + virtual void + activating(const string& id) { - exception(id, AdapterNotActiveException()); - return; + LocatorAdapterInfo adapter; + adapter.id = id; + do + { + Lock sync(*this); + if(_adapters.empty() || _waitForActivation) + { + return; + } + _activatingOrFailed.insert(adapter.id); + adapter = nextAdapter(); + } + while(adapter.proxy && _locator->getDirectProxy(adapter, this)); } - Lock sync(*this); - if(_proxies.size() == _count) // Nothing to do if we already sent the response. + virtual void + response(const std::string& id, const Ice::ObjectPrx& proxy) { - return; - } - - _proxies[id] = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")); + Lock sync(*this); + assert(proxy); + if(_adapters.empty() || id != _adapters[0].id) + { + return; + } - // - // If we received all the required proxies, it's time to send the - // answer back to the client. - // - if(_proxies.size() == _count) - { - sendResponse(); + if(_count > 1) + { + Ice::ObjectPrx p = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")); + LocatorI::RequestPtr request = new ReplicaGroupRequest(_amdCB, _locator, _id, _adapters, _count, p); + request->execute(); + } + else + { + _amdCB->ice_response(proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"))); + } + _adapters.clear(); } -} -void -LocatorI::Request::sendResponse() -{ - int roundRobinCount = 0; - if(_proxies.size() == 1) + virtual void + exception(const std::string& id, const Ice::Exception& ex) { - if(_roundRobin) + LocatorAdapterInfo adapter; { - for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + Lock sync(*this); + _failed.insert(id); + _activatingOrFailed.insert(id); + + if(!_exception.get()) { - if(_proxies.find(p->id) != _proxies.end()) - { - break; - } - // - // We count the number of object adapters which are inactive until we find - // one active. This count will be used to update the round robin counter. - // - ++roundRobinCount; + _exception.reset(ex.ice_clone()); } + + if(_adapters.empty() || id != _adapters[0].id) + { + return; + } + + adapter = nextAdapter(); } - _amdCB->ice_response(_proxies.begin()->second); - } - else if(_proxies.empty()) - { - // - // If there's no proxies, it's either because we couldn't - // contact the adapters or because the replica group has - // no members. - // - assert(_exception.get() || (_replicaGroup && _adapters.empty())); - if(_traceLevels->locator > 0) + if(adapter.proxy && _locator->getDirectProxy(adapter, this)) { - Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); - out << "couldn't resolve " << (_replicaGroup ? "replica group `" : "adapter `") << _id << "' endpoints:\n"; - out << (_exception.get() ? toString(*_exception) : string("replica group is empty")); + activating(adapter.id); } - _amdCB->ice_response(0); } - else if(_proxies.size() > 1) + +private: + + LocatorAdapterInfo + nextAdapter() { - Ice::EndpointSeq endpoints; - endpoints.reserve(_proxies.size()); - for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + bool replicaGroup; + bool roundRobin; + + _adapters.clear(); + + try { - map<string, Ice::ObjectPrx>::const_iterator q = _proxies.find(p->id); - if(q != _proxies.end()) + if(!_waitForActivation) { - Ice::EndpointSeq edpts = q->second->ice_getEndpoints(); - endpoints.insert(endpoints.end(), edpts.begin(), edpts.end()); + _locator->getAdapterInfo(_id, _adapters, _count, replicaGroup, roundRobin, _activatingOrFailed); } - else if(_roundRobin && endpoints.empty()) + + if(_waitForActivation || (_adapters.empty() && _activatingOrFailed.size() > _failed.size())) { // - // We count the number of object adapters which are inactive until we find - // one active. This count will be used to update the round robin counter. + // If there are no more adapters to try and some servers were being activated, we + // try again but this time we wait for the server activation. // - ++roundRobinCount; + _locator->getAdapterInfo(_id, _adapters, _count, replicaGroup, roundRobin, _failed); + _waitForActivation = true; } - } - Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default"); - _amdCB->ice_response(proxy->ice_endpoints(endpoints)); + if(!roundRobin) + { + LocatorI::RequestPtr request; + if(replicaGroup) + { + request = new ReplicaGroupRequest(_amdCB, _locator, _id, _adapters, _count, 0); + } + else + { + assert(!_adapters.empty()); + request = new AdapterRequest(_amdCB, _locator, _adapters[0]); + } + request->execute(); + _adapters.clear(); + return LocatorAdapterInfo(); + } + else if(!_adapters.empty()) + { + return _adapters[0]; + } + else + { + assert(_adapters.empty()); + if(_traceLevels->locator > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); + out << "couldn't resolve replica group `" << _id << "' endpoints:\n"; + out << (_exception.get() ? toString(*_exception) : string("replica group is empty")); + } + _amdCB->ice_response(0); + return LocatorAdapterInfo(); + } + } + catch(const AdapterNotExistException&) + { + assert(_adapters.empty()); + _amdCB->ice_exception(Ice::AdapterNotFoundException()); + return LocatorAdapterInfo(); + } + catch(const Ice::Exception& ex) + { + assert(_adapters.empty()); + if(_traceLevels->locator > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); + out << "couldn't resolve replica group `" << _id << "' endpoints:\n" << toString(ex); + } + _amdCB->ice_response(0); + return LocatorAdapterInfo(); + } } - if(_roundRobin) - { - _locator->removePendingRoundRobinRequest(_id, roundRobinCount); - } -} + const Ice::AMD_Locator_findAdapterByIdPtr _amdCB; + const LocatorIPtr _locator; + const std::string _id; + LocatorAdapterInfoSeq _adapters; + const TraceLevelsPtr _traceLevels; + int _count; + bool _waitForActivation; + set<string> _failed; + set<string> _activatingOrFailed; + std::auto_ptr<Ice::Exception> _exception; +}; + +}; + LocatorI::LocatorI(const Ice::CommunicatorPtr& communicator, const DatabasePtr& database, @@ -444,22 +690,6 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, const Ice::Current&) const { LocatorIPtr self = const_cast<LocatorI*>(this); - bool pending = false; - if(self->addPendingRoundRobinRequest(id, cb, true, pending)) // Add only if there's already round robin requests - // pending. - { - // - // Another request is currently resolving the adapter endpoints. We'll - // answer this request once it's done. - // - return; - } - - // - // If no other request is resolving the adapter endpoints, resolve - // the endpoints now. - // - bool replicaGroup = false; try { @@ -472,36 +702,24 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, LocatorAdapterInfoSeq adapters; bool roundRobin; _database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup, roundRobin); - - // - // Round robin replica group requests are serialized. This is - // required to make sure the round robin counter is accurate - // even if some adapters are unreachable (bug 2576). For - // adapters, and replica groups, there's no need to serialize - // the requests. - // + RequestPtr request; if(roundRobin) { - if(self->addPendingRoundRobinRequest(id, cb, false, pending)) - { - return; - } + request = new RoundRobinRequest(cb, self, id, adapters, count); } - else if(pending) + else if(replicaGroup) { - self->removePendingRoundRobinRequest(id, 0); + request = new ReplicaGroupRequest(cb, self, id, adapters, count, 0); + } + else + { + assert(adapters.size() == 1); + request = new AdapterRequest(cb, self, adapters[0]); } - - RequestPtr request = new Request(cb, self, id, replicaGroup, roundRobin, adapters, count); request->execute(); } catch(const AdapterNotExistException&) { - if(pending) - { - self->removePendingRoundRobinRequest(id, 0); - } - try { cb->ice_response(_database->getAdapterDirectProxy(id)); @@ -514,11 +732,6 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, } catch(const Ice::Exception& ex) { - if(pending) - { - self->removePendingRoundRobinRequest(id, 0); - } - const TraceLevelsPtr traceLevels = _database->getTraceLevels(); if(traceLevels->locator > 0) { @@ -568,78 +781,6 @@ LocatorI::getTraceLevels() const } bool -LocatorI::addPendingRoundRobinRequest(const string& adapterId, - const Ice::AMD_Locator_findAdapterByIdPtr& cb, - bool addIfExists, - bool& pending) -{ - Lock sync(*this); - pending = false; - map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId); - if(p == _resolves.end()) - { - if(addIfExists) - { - return false; - } - p = _resolves.insert(make_pair(adapterId, deque<Ice::AMD_Locator_findAdapterByIdPtr>())).first; - } - else if(p->second.front().get() == cb.get()) - { - pending = true; - return false; - } - - p->second.push_back(cb); - return p->second.size() > 1; -} - -void -LocatorI::removePendingRoundRobinRequest(const string& adapterId, int roundRobinCount) -{ - Ice::AMD_Locator_findAdapterByIdPtr cb; - { - Lock sync(*this); - - // - // Bump the round robin counter. We bump the round robin counter by - // the number of inactive adapters. This ensures that if the first - // adapters are inactive, if the first adapter to be inactive is the - // Nth adapter, the next adapter to be returned will be the Nth + 1. - // - if(roundRobinCount > 0) - { - try - { - _database->getAdapter(adapterId)->increaseRoundRobinCount(roundRobinCount); - } - catch(const Ice::Exception&) - { - // Ignore. - } - } - - map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId); - assert(p != _resolves.end()); - - p->second.pop_front(); - if(p->second.empty()) - { - _resolves.erase(p); - } - else - { - cb = p->second.front(); - } - } - - if(cb) - { - findAdapterById_async(cb, adapterId); - } -} - -bool LocatorI::getDirectProxy(const LocatorAdapterInfo& adapter, const RequestPtr& request) { { @@ -673,9 +814,19 @@ LocatorI::getDirectProxyResponse(const LocatorAdapterInfo& adapter, const Ice::O _activating.erase(adapter.id); } - for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + if(proxy) { - (*q)->response(adapter.id, proxy); + for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + { + (*q)->response(adapter.id, proxy); + } + } + else + { + for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + { + (*q)->exception(adapter.id, AdapterNotActiveException()); + } } } @@ -717,7 +868,7 @@ LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice:: { for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) { - (*q)->activating(); + (*q)->activating(adapter.id); } AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, adapter); @@ -732,3 +883,14 @@ LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice:: } } } + +void +LocatorI::getAdapterInfo(const string& id, + LocatorAdapterInfoSeq& adapters, + int& count, + bool& replicaGroup, + bool& roundRobin, + const set<string>& excludes) +{ + _database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup, roundRobin, excludes); +} |