diff options
Diffstat (limited to 'cpp/src/IceGrid/LocatorI.cpp')
-rw-r--r-- | cpp/src/IceGrid/LocatorI.cpp | 135 |
1 files changed, 128 insertions, 7 deletions
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp index b7361dce4a1..60a5a1b6688 100644 --- a/cpp/src/IceGrid/LocatorI.cpp +++ b/cpp/src/IceGrid/LocatorI.cpp @@ -172,15 +172,16 @@ LocatorI::Request::Request(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, const LocatorIPtr& locator, const string& id, bool replicaGroup, + bool roundRobin, const LocatorAdapterInfoSeq& adapters, - int count, - const TraceLevelsPtr& traceLevels) : + int count) : _amdCB(amdCB), _locator(locator), _id(id), _replicaGroup(replicaGroup), + _roundRobin(roundRobin), _adapters(adapters), - _traceLevels(traceLevels), + _traceLevels(locator->getTraceLevels()), _count(count), _lastAdapter(_adapters.begin()) { @@ -207,7 +208,7 @@ LocatorI::Request::execute() { Lock sync(*this); assert(_count > 0 && _lastAdapter != _adapters.end()); - for(unsigned int i = 0; i < _count; ++i) + for(unsigned int i = _proxies.size(); i < _count; ++i) { if(_lastAdapter == _adapters.end()) { @@ -334,8 +335,24 @@ LocatorI::Request::response(const string& id, const Ice::ObjectPrx& proxy) void LocatorI::Request::sendResponse() { + int roundRobinCount = 0; if(_proxies.size() == 1) { + if(_roundRobin) + { + for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + { + if(_proxies.find(p->id) != _proxies.end()) + { + break; + } + // + // We count the number of object adapters which are inactive until we find + // one active. This count will be used to update the round robin counter. + // + ++roundRobinCount; + } + } _amdCB->ice_response(_proxies.begin()->second); } else if(_proxies.empty()) @@ -367,6 +384,14 @@ LocatorI::Request::sendResponse() Ice::EndpointSeq edpts = q->second->ice_getEndpoints(); endpoints.insert(endpoints.end(), edpts.begin(), edpts.end()); } + else if(_roundRobin && endpoints.empty()) + { + // + // We count the number of object adapters which are inactive until we find + // one active. This count will be used to update the round robin counter. + // + ++roundRobinCount; + } } for(set<string>::const_iterator q = _activating.begin(); q != _activating.end(); ++q) @@ -377,6 +402,11 @@ LocatorI::Request::sendResponse() Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default"); _amdCB->ice_response(proxy->ice_endpoints(endpoints)); } + + if(_roundRobin) + { + _locator->removePendingResolve(_id, roundRobinCount); + } } LocatorI::LocatorI(const Ice::CommunicatorPtr& communicator, @@ -440,6 +470,21 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, const string& id, const Ice::Current&) const { + LocatorIPtr self = const_cast<LocatorI*>(this); + if(self->addPendingResolve(id, cb)) + { + // + // Another request is currently resolving the adapter endpoints. We'll + // answer this request once it's done. + // + return; + } + + // + // If no other request is resolving the adapter endpoints, resolve + // the endpoints now. + // + bool replicaGroup = false; try { @@ -450,10 +495,22 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, // int count; LocatorAdapterInfoSeq adapters; - _database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup); + bool roundRobin; + _database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup, roundRobin); + + // + // Round robin replica group requests are serialized. This is + // required to make sure the round robin counter is accurate + // even if some adapters are unreachable (bug 2576). For + // adapters, and replica groups, there's no need to serialize + // the requests. + // + if(!roundRobin) + { + self->removePendingResolve(id, 0); + } - LocatorIPtr self = const_cast<LocatorI*>(this); - RequestPtr request = new Request(cb, self, id, replicaGroup, adapters, count, _database->getTraceLevels()); + RequestPtr request = new Request(cb, self, id, replicaGroup, roundRobin, adapters, count); request->execute(); } catch(const AdapterNotExistException&) @@ -466,6 +523,7 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, { cb->ice_exception(Ice::AdapterNotFoundException()); } + self->removePendingResolve(id, 0); return; } catch(const Ice::Exception& ex) @@ -484,6 +542,7 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, } } cb->ice_response(0); + self->removePendingResolve(id, 0); return; } } @@ -512,6 +571,12 @@ LocatorI::getCommunicator() const return _communicator; } +const TraceLevelsPtr& +LocatorI::getTraceLevels() const +{ + return _database->getTraceLevels(); +} + void LocatorI::activate(const LocatorAdapterInfo& adapter, const RequestPtr& request) { @@ -583,3 +648,59 @@ LocatorI::activateException(const string& id, const Ice::Exception& ex) (*q)->exception(id, ex); } } + +bool +LocatorI::addPendingResolve(const string& adapterId, const Ice::AMD_Locator_findAdapterByIdPtr& cb) +{ + Lock sync(*this); + map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId); + if(p == _resolves.end()) + { + p = _resolves.insert(make_pair(adapterId, deque<Ice::AMD_Locator_findAdapterByIdPtr>())).first; + } + else if(p->second.front().get() == cb.get()) + { + return false; + } + + p->second.push_back(cb); + return p->second.size() > 1; +} + +void +LocatorI::removePendingResolve(const string& adapterId, int roundRobinCount) +{ + Ice::AMD_Locator_findAdapterByIdPtr cb; + { + Lock sync(*this); + + // + // Bump the round robin counter. We bump the round robin counter by + // the number of inactive adapters. This ensures that if the first + // adapters are inactive, if the first adapter to be inactive is the + // Nth adapter, the next adapter to be returned will be the Nth + 1. + // + if(roundRobinCount > 0) + { + _database->getAdapter(adapterId)->increaseRoundRobinCount(roundRobinCount); + } + + map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId); + assert(p != _resolves.end()); + + p->second.pop_front(); + if(p->second.empty()) + { + _resolves.erase(p); + } + else + { + cb = p->second.front(); + } + } + + if(cb) + { + findAdapterById_async(cb, adapterId); + } +} |