diff options
author | Benoit Foucher <benoit@zeroc.com> | 2008-01-11 14:14:33 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2008-01-11 14:14:33 +0100 |
commit | 81255b93d09d5963eeb40dba272838650c939994 (patch) | |
tree | b977ac8538f1933f5ad7d7ff2a7229951108adfc /cpp/src | |
parent | Allow third party software locations to be set from environment (diff) | |
download | ice-81255b93d09d5963eeb40dba272838650c939994.tar.bz2 ice-81255b93d09d5963eeb40dba272838650c939994.tar.xz ice-81255b93d09d5963eeb40dba272838650c939994.zip |
Fixed bug 2576
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceGrid/AdapterCache.cpp | 24 | ||||
-rw-r--r-- | cpp/src/IceGrid/AdapterCache.h | 9 | ||||
-rw-r--r-- | cpp/src/IceGrid/LocatorI.cpp | 135 | ||||
-rw-r--r-- | cpp/src/IceGrid/LocatorI.h | 13 |
4 files changed, 165 insertions, 16 deletions
diff --git a/cpp/src/IceGrid/AdapterCache.cpp b/cpp/src/IceGrid/AdapterCache.cpp index e3bf7562803..446696fdcdf 100644 --- a/cpp/src/IceGrid/AdapterCache.cpp +++ b/cpp/src/IceGrid/AdapterCache.cpp @@ -230,16 +230,23 @@ ServerAdapterEntry::ServerAdapterEntry(AdapterCache& cache, } void -ServerAdapterEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& nReplicas, bool& replicaGroup) +ServerAdapterEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& nReplicas, bool& replicaGroup, + bool& roundRobin) { nReplicas = 1; replicaGroup = false; + roundRobin = false; LocatorAdapterInfo info; info.id = _id; info.proxy = _server->getAdapter(info.activationTimeout, info.deactivationTimeout, _id, true); adapters.push_back(info); } +void +ServerAdapterEntry::increaseRoundRobinCount(int roundRobinCount) +{ +} + float ServerAdapterEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const { @@ -375,7 +382,8 @@ ReplicaGroupEntry::update(const LoadBalancingPolicyPtr& policy) } void -ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& nReplicas, bool& replicaGroup) +ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& nReplicas, bool& replicaGroup, + bool& roundRobin) { vector<ServerAdapterEntryPtr> replicas; bool adaptive = false; @@ -383,6 +391,7 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n { Lock sync(*this); replicaGroup = true; + roundRobin = false; nReplicas = _loadBalancingNReplicas > 0 ? _loadBalancingNReplicas : static_cast<int>(_replicas.size()); if(_replicas.empty()) @@ -398,6 +407,7 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n replicas.push_back(_replicas[(_lastReplica + i) % _replicas.size()]); } _lastReplica = (_lastReplica + 1) % static_cast<int>(_replicas.size()); + roundRobin = true; } else if(AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) { @@ -447,7 +457,8 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n { int dummy; bool dummy2; - (*p)->getLocatorAdapterInfo(adapters, dummy, dummy2); + bool dummy3; + (*p)->getLocatorAdapterInfo(adapters, dummy, dummy2, dummy3); } catch(const AdapterNotExistException&) { @@ -461,6 +472,13 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n } } +void +ReplicaGroupEntry::increaseRoundRobinCount(int count) +{ + Lock sync(*this); + _lastReplica = (_lastReplica + count) % static_cast<int>(_replicas.size()); +} + float ReplicaGroupEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const { diff --git a/cpp/src/IceGrid/AdapterCache.h b/cpp/src/IceGrid/AdapterCache.h index cddd18b7f72..ac5440ca3ce 100644 --- a/cpp/src/IceGrid/AdapterCache.h +++ b/cpp/src/IceGrid/AdapterCache.h @@ -43,7 +43,8 @@ public: AdapterEntry(AdapterCache&, const std::string&, const std::string&); - virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&) = 0; + virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&) = 0; + virtual void increaseRoundRobinCount(int) = 0; virtual float getLeastLoadedNodeLoad(LoadSample) const = 0; virtual AdapterInfoSeq getAdapterInfo() const = 0; @@ -67,7 +68,8 @@ public: ServerAdapterEntry(AdapterCache&, const std::string&, const std::string&, const std::string&, int, const ServerEntryPtr&); - virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&); + virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&); + virtual void increaseRoundRobinCount(int); virtual float getLeastLoadedNodeLoad(LoadSample) const; virtual AdapterInfoSeq getAdapterInfo() const; virtual const std::string& getReplicaGroupId() const { return _replicaGroupId; } @@ -89,7 +91,8 @@ public: ReplicaGroupEntry(AdapterCache&, const std::string&, const std::string&, const LoadBalancingPolicyPtr&); - virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&); + virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&); + virtual void increaseRoundRobinCount(int); virtual float getLeastLoadedNodeLoad(LoadSample) const; virtual AdapterInfoSeq getAdapterInfo() const; diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp index b7361dce4a1..60a5a1b6688 100644 --- a/cpp/src/IceGrid/LocatorI.cpp +++ b/cpp/src/IceGrid/LocatorI.cpp @@ -172,15 +172,16 @@ LocatorI::Request::Request(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, const LocatorIPtr& locator, const string& id, bool replicaGroup, + bool roundRobin, const LocatorAdapterInfoSeq& adapters, - int count, - const TraceLevelsPtr& traceLevels) : + int count) : _amdCB(amdCB), _locator(locator), _id(id), _replicaGroup(replicaGroup), + _roundRobin(roundRobin), _adapters(adapters), - _traceLevels(traceLevels), + _traceLevels(locator->getTraceLevels()), _count(count), _lastAdapter(_adapters.begin()) { @@ -207,7 +208,7 @@ LocatorI::Request::execute() { Lock sync(*this); assert(_count > 0 && _lastAdapter != _adapters.end()); - for(unsigned int i = 0; i < _count; ++i) + for(unsigned int i = _proxies.size(); i < _count; ++i) { if(_lastAdapter == _adapters.end()) { @@ -334,8 +335,24 @@ LocatorI::Request::response(const string& id, const Ice::ObjectPrx& proxy) void LocatorI::Request::sendResponse() { + int roundRobinCount = 0; if(_proxies.size() == 1) { + if(_roundRobin) + { + for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + { + if(_proxies.find(p->id) != _proxies.end()) + { + break; + } + // + // We count the number of object adapters which are inactive until we find + // one active. This count will be used to update the round robin counter. + // + ++roundRobinCount; + } + } _amdCB->ice_response(_proxies.begin()->second); } else if(_proxies.empty()) @@ -367,6 +384,14 @@ LocatorI::Request::sendResponse() Ice::EndpointSeq edpts = q->second->ice_getEndpoints(); endpoints.insert(endpoints.end(), edpts.begin(), edpts.end()); } + else if(_roundRobin && endpoints.empty()) + { + // + // We count the number of object adapters which are inactive until we find + // one active. This count will be used to update the round robin counter. + // + ++roundRobinCount; + } } for(set<string>::const_iterator q = _activating.begin(); q != _activating.end(); ++q) @@ -377,6 +402,11 @@ LocatorI::Request::sendResponse() Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default"); _amdCB->ice_response(proxy->ice_endpoints(endpoints)); } + + if(_roundRobin) + { + _locator->removePendingResolve(_id, roundRobinCount); + } } LocatorI::LocatorI(const Ice::CommunicatorPtr& communicator, @@ -440,6 +470,21 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, const string& id, const Ice::Current&) const { + LocatorIPtr self = const_cast<LocatorI*>(this); + if(self->addPendingResolve(id, cb)) + { + // + // Another request is currently resolving the adapter endpoints. We'll + // answer this request once it's done. + // + return; + } + + // + // If no other request is resolving the adapter endpoints, resolve + // the endpoints now. + // + bool replicaGroup = false; try { @@ -450,10 +495,22 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, // int count; LocatorAdapterInfoSeq adapters; - _database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup); + bool roundRobin; + _database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup, roundRobin); + + // + // Round robin replica group requests are serialized. This is + // required to make sure the round robin counter is accurate + // even if some adapters are unreachable (bug 2576). For + // adapters, and replica groups, there's no need to serialize + // the requests. + // + if(!roundRobin) + { + self->removePendingResolve(id, 0); + } - LocatorIPtr self = const_cast<LocatorI*>(this); - RequestPtr request = new Request(cb, self, id, replicaGroup, adapters, count, _database->getTraceLevels()); + RequestPtr request = new Request(cb, self, id, replicaGroup, roundRobin, adapters, count); request->execute(); } catch(const AdapterNotExistException&) @@ -466,6 +523,7 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, { cb->ice_exception(Ice::AdapterNotFoundException()); } + self->removePendingResolve(id, 0); return; } catch(const Ice::Exception& ex) @@ -484,6 +542,7 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, } } cb->ice_response(0); + self->removePendingResolve(id, 0); return; } } @@ -512,6 +571,12 @@ LocatorI::getCommunicator() const return _communicator; } +const TraceLevelsPtr& +LocatorI::getTraceLevels() const +{ + return _database->getTraceLevels(); +} + void LocatorI::activate(const LocatorAdapterInfo& adapter, const RequestPtr& request) { @@ -583,3 +648,59 @@ LocatorI::activateException(const string& id, const Ice::Exception& ex) (*q)->exception(id, ex); } } + +bool +LocatorI::addPendingResolve(const string& adapterId, const Ice::AMD_Locator_findAdapterByIdPtr& cb) +{ + Lock sync(*this); + map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId); + if(p == _resolves.end()) + { + p = _resolves.insert(make_pair(adapterId, deque<Ice::AMD_Locator_findAdapterByIdPtr>())).first; + } + else if(p->second.front().get() == cb.get()) + { + return false; + } + + p->second.push_back(cb); + return p->second.size() > 1; +} + +void +LocatorI::removePendingResolve(const string& adapterId, int roundRobinCount) +{ + Ice::AMD_Locator_findAdapterByIdPtr cb; + { + Lock sync(*this); + + // + // Bump the round robin counter. We bump the round robin counter by + // the number of inactive adapters. This ensures that if the first + // adapters are inactive, if the first adapter to be inactive is the + // Nth adapter, the next adapter to be returned will be the Nth + 1. + // + if(roundRobinCount > 0) + { + _database->getAdapter(adapterId)->increaseRoundRobinCount(roundRobinCount); + } + + map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId); + assert(p != _resolves.end()); + + p->second.pop_front(); + if(p->second.empty()) + { + _resolves.erase(p); + } + else + { + cb = p->second.front(); + } + } + + if(cb) + { + findAdapterById_async(cb, adapterId); + } +} diff --git a/cpp/src/IceGrid/LocatorI.h b/cpp/src/IceGrid/LocatorI.h index f0f97239c83..778f2443f55 100644 --- a/cpp/src/IceGrid/LocatorI.h +++ b/cpp/src/IceGrid/LocatorI.h @@ -38,8 +38,8 @@ public: { public: - Request(const Ice::AMD_Locator_findAdapterByIdPtr&, const LocatorIPtr&, const std::string&, bool, - const LocatorAdapterInfoSeq&, int, const TraceLevelsPtr&); + Request(const Ice::AMD_Locator_findAdapterByIdPtr&, const LocatorIPtr&, const std::string&, bool, bool, + const LocatorAdapterInfoSeq&, int); void execute(); void response(const std::string&, const Ice::ObjectPrx&); @@ -61,6 +61,7 @@ public: const LocatorIPtr _locator; const std::string _id; const bool _replicaGroup; + const bool _roundRobin; LocatorAdapterInfoSeq _adapters; const TraceLevelsPtr _traceLevels; unsigned int _count; @@ -78,13 +79,14 @@ public: const Ice::Current&) const; virtual void findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr&, const ::std::string&, - const Ice::Current&) const; + const Ice::Current& = Ice::Current()) const; virtual Ice::LocatorRegistryPrx getRegistry(const Ice::Current&) const; virtual RegistryPrx getLocalRegistry(const Ice::Current&) const; virtual QueryPrx getLocalQuery(const Ice::Current&) const; const Ice::CommunicatorPtr& getCommunicator() const; + const TraceLevelsPtr& getTraceLevels() const; void activate(const LocatorAdapterInfo&, const RequestPtr&); void cancelActivate(const std::string&, const RequestPtr&); @@ -92,6 +94,9 @@ public: void activateFinished(const std::string&, const Ice::ObjectPrx&); void activateException(const std::string&, const Ice::Exception&); + bool addPendingResolve(const std::string&, const Ice::AMD_Locator_findAdapterByIdPtr&); + void removePendingResolve(const std::string&, int); + protected: const Ice::CommunicatorPtr _communicator; @@ -104,6 +109,8 @@ protected: typedef std::map<std::string, PendingRequests> PendingRequestsMap; PendingRequestsMap _pendingRequests; + + std::map<std::string, std::deque<Ice::AMD_Locator_findAdapterByIdPtr> > _resolves; }; } |