diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-11-03 09:27:47 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-11-03 09:27:47 +0000 |
commit | e68a0e117ee67b3b0e3f64e14da73802e7877814 (patch) | |
tree | 62fc9806929cc447a694a1a5d094271766b998e4 /cpp/src/IceGrid/AdapterCache.cpp | |
parent | Added some IceVb stuff (diff) | |
download | ice-e68a0e117ee67b3b0e3f64e14da73802e7877814.tar.bz2 ice-e68a0e117ee67b3b0e3f64e14da73802e7877814.tar.xz ice-e68a0e117ee67b3b0e3f64e14da73802e7877814.zip |
Added ordered load balancing
Diffstat (limited to 'cpp/src/IceGrid/AdapterCache.cpp')
-rw-r--r-- | cpp/src/IceGrid/AdapterCache.cpp | 176 |
1 files changed, 101 insertions, 75 deletions
diff --git a/cpp/src/IceGrid/AdapterCache.cpp b/cpp/src/IceGrid/AdapterCache.cpp index 84776e93237..087049e83bc 100644 --- a/cpp/src/IceGrid/AdapterCache.cpp +++ b/cpp/src/IceGrid/AdapterCache.cpp @@ -26,25 +26,33 @@ namespace IceGrid struct ReplicaLoadComp : binary_function<ServerAdapterEntryPtr&, ServerAdapterEntryPtr&, bool> { - typedef ReplicaGroupEntry::ReplicaSeq::value_type Replica; - typedef pair<float, Replica> ReplicaLoad; - - bool operator()(const ReplicaLoad& lhs, const ReplicaLoad& rhs) + bool operator()(const pair<float, ServerAdapterEntryPtr>& lhs, const pair<float, ServerAdapterEntryPtr>& rhs) { return lhs.first < rhs.first; } }; -struct ToReplicaLoad : public unary_function<const ReplicaLoadComp::Replica&, ReplicaLoadComp::ReplicaLoad> +struct ReplicaPriorityComp : binary_function<ServerAdapterEntryPtr&, ServerAdapterEntryPtr&, bool> +{ + bool operator()(const ServerAdapterEntryPtr& lhs, const ServerAdapterEntryPtr& rhs) + { + return lhs->getPriority() < rhs->getPriority(); + } +}; + +struct TransformToReplicaLoad : + public unary_function<const ServerAdapterEntryPtr&, pair<float, ServerAdapterEntryPtr> > { - ToReplicaLoad(LoadSample loadSample) : _loadSample(loadSample) { } +public: - ReplicaLoadComp::ReplicaLoad - operator()(const ReplicaLoadComp::Replica& value) + TransformToReplicaLoad(LoadSample loadSample) : _loadSample(loadSample) { } + + pair<float, ServerAdapterEntryPtr> + operator()(const ServerAdapterEntryPtr& value) { try { - return make_pair(value.second->getLeastLoadedNodeLoad(_loadSample), value); + return make_pair(value->getLeastLoadedNodeLoad(_loadSample), value); } catch(const Ice::Exception&) { @@ -55,10 +63,10 @@ struct ToReplicaLoad : public unary_function<const ReplicaLoadComp::Replica&, Re LoadSample _loadSample; }; -struct ToReplica : public unary_function<const ReplicaLoadComp::ReplicaLoad&, ReplicaLoadComp::Replica> +struct TransformToReplica : public unary_function<const pair<string, ServerAdapterEntryPtr>&, ServerAdapterEntryPtr> { - ReplicaLoadComp::Replica - operator()(const ReplicaLoadComp::ReplicaLoad& value) + ServerAdapterEntryPtr + operator()(const pair<float, ServerAdapterEntryPtr>& value) { return value.second; } @@ -67,30 +75,35 @@ struct ToReplica : public unary_function<const ReplicaLoadComp::ReplicaLoad&, Re } ServerAdapterEntryPtr -AdapterCache::addServerAdapter(const string& id, const string& rgId, const ServerEntryPtr& server) +AdapterCache::addServerAdapter(const AdapterDescriptor& desc, const ServerEntryPtr& server) { Lock sync(*this); - assert(!getImpl(id)); - ServerAdapterEntryPtr entry = new ServerAdapterEntry(*this, id, rgId, server); - addImpl(id, entry); + assert(!getImpl(desc.id)); + + istringstream is(desc.priority); + int priority = 0; + is >> priority; + + ServerAdapterEntryPtr entry = new ServerAdapterEntry(*this, desc.id, desc.replicaGroupId, priority, server); + addImpl(desc.id, entry); - if(!rgId.empty()) + if(!desc.replicaGroupId.empty()) { - ReplicaGroupEntryPtr repEntry = ReplicaGroupEntryPtr::dynamicCast(getImpl(rgId)); + ReplicaGroupEntryPtr repEntry = ReplicaGroupEntryPtr::dynamicCast(getImpl(desc.replicaGroupId)); assert(repEntry); - repEntry->addReplica(id, entry); + repEntry->addReplica(desc.id, entry); } return entry; } ReplicaGroupEntryPtr -AdapterCache::addReplicaGroup(const string& id, const string& app, const LoadBalancingPolicyPtr& loadBalancing) +AdapterCache::addReplicaGroup(const ReplicaGroupDescriptor& desc, const string& app) { Lock sync(*this); - assert(!getImpl(id)); - ReplicaGroupEntryPtr entry = new ReplicaGroupEntry(*this, id, app, loadBalancing); - addImpl(id, entry); + assert(!getImpl(desc.id)); + ReplicaGroupEntryPtr entry = new ReplicaGroupEntry(*this, desc.id, app, desc.loadBalancing); + addImpl(desc.id, entry); return entry; } @@ -189,12 +202,20 @@ AdapterEntry::canRemove() return true; } +string +AdapterEntry::getId() const +{ + return _id; +} + ServerAdapterEntry::ServerAdapterEntry(AdapterCache& cache, const string& id, const string& replicaGroupId, + int priority, const ServerEntryPtr& server) : AdapterEntry(cache, id), _replicaGroupId(replicaGroupId), + _priority(priority), _server(server) { } @@ -263,6 +284,12 @@ ServerAdapterEntry::getProxy(const string& replicaGroupId, bool upToDate) const } } +int +ServerAdapterEntry::getPriority() const +{ + return _priority; +} + ServerEntryPtr ServerAdapterEntry::getServer() const { @@ -286,16 +313,16 @@ void ReplicaGroupEntry::addReplica(const string& replicaId, const ServerAdapterEntryPtr& adapter) { Lock sync(*this); - _replicas.push_back(make_pair(replicaId, adapter)); + _replicas.push_back(adapter); } void ReplicaGroupEntry::removeReplica(const string& replicaId) { Lock sync(*this); - for(ReplicaGroupEntry::ReplicaSeq::iterator p = _replicas.begin(); p != _replicas.end(); ++p) + for(vector<ServerAdapterEntryPtr>::iterator p = _replicas.begin(); p != _replicas.end(); ++p) { - if(replicaId == p->first) + if(replicaId == (*p)->getId()) { _replicas.erase(p); // Make sure _lastReplica is still within the bounds. @@ -309,36 +336,32 @@ void ReplicaGroupEntry::update(const LoadBalancingPolicyPtr& policy) { Lock sync(*this); + assert(policy); + _loadBalancing = policy; - if(!_loadBalancing) - { - _loadBalancingNReplicas = 0; - } - else + + istringstream is(_loadBalancing->nReplicas); + int nReplicas = 0; + is >> nReplicas; + _loadBalancingNReplicas = nReplicas < 0 ? 1 : nReplicas; + AdaptiveLoadBalancingPolicyPtr alb = AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing); + if(alb) { - istringstream is(_loadBalancing->nReplicas); - int nReplicas = 0; - is >> nReplicas; - _loadBalancingNReplicas = nReplicas < 1 ? 1 : nReplicas; - AdaptiveLoadBalancingPolicyPtr alb = AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing); - if(alb) + if(alb->loadSample == "1") { - if(alb->loadSample == "1") - { - _loadSample = LoadSample1; - } - else if(alb->loadSample == "5") - { - _loadSample = LoadSample5; - } - else if(alb->loadSample == "15") - { - _loadSample = LoadSample15; - } - else - { - _loadSample = LoadSample1; - } + _loadSample = LoadSample1; + } + else if(alb->loadSample == "5") + { + _loadSample = LoadSample5; + } + else if(alb->loadSample == "15") + { + _loadSample = LoadSample15; + } + else + { + _loadSample = LoadSample1; } } } @@ -346,7 +369,7 @@ ReplicaGroupEntry::update(const LoadBalancingPolicyPtr& policy) vector<pair<string, AdapterPrx> > ReplicaGroupEntry::getProxies(int& nReplicas, bool& replicaGroup) { - ReplicaSeq replicas; + vector<ServerAdapterEntryPtr> replicas; bool adaptive = false; LoadSample loadSample = LoadSample1; { @@ -360,10 +383,6 @@ ReplicaGroupEntry::getProxies(int& nReplicas, bool& replicaGroup) nReplicas = _loadBalancingNReplicas > 0 ? _loadBalancingNReplicas : static_cast<int>(_replicas.size()); replicas.reserve(_replicas.size()); - if(!_loadBalancing) - { - replicas = _replicas; - } if(RoundRobinLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) { for(unsigned int i = 0; i < _replicas.size(); ++i) @@ -377,10 +396,15 @@ ReplicaGroupEntry::getProxies(int& nReplicas, bool& replicaGroup) replicas = _replicas; RandomNumberGenerator rng; random_shuffle(replicas.begin(), replicas.end(), rng); - adaptive = true; loadSample = _loadSample; + adaptive = true; } - else// if(RandomLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) + else if(OrderedLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) + { + replicas = _replicas; + sort(replicas.begin(), replicas.end(), ReplicaPriorityComp()); + } + else if(RandomLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) { replicas = _replicas; RandomNumberGenerator rng; @@ -392,14 +416,16 @@ ReplicaGroupEntry::getProxies(int& nReplicas, bool& replicaGroup) { // // This must be done outside the synchronization block since - // the sort() will call and lock each server entry. + // the trasnform() might call and lock each server adapter + // entry. We also can't sort directly as the load of each + // server adapter is not stable so we first take a snapshot of + // each adapter and sort the snapshot. // - - vector<ReplicaLoadComp::ReplicaLoad> rl; - transform(replicas.begin(), replicas.end(), back_inserter(rl), ToReplicaLoad(loadSample)); + vector<pair<float, ServerAdapterEntryPtr> > rl; + transform(replicas.begin(), replicas.end(), back_inserter(rl), TransformToReplicaLoad(loadSample)); sort(rl.begin(), rl.end(), ReplicaLoadComp()); replicas.clear(); - transform(rl.begin(), rl.end(), back_inserter(replicas), ToReplica()); + transform(rl.begin(), rl.end(), back_inserter(replicas), TransformToReplica()); } // @@ -408,7 +434,7 @@ ReplicaGroupEntry::getProxies(int& nReplicas, bool& replicaGroup) // reachable. // vector<pair<string, AdapterPrx> > adapters; - for(ReplicaSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) + for(vector<ServerAdapterEntryPtr>::const_iterator p = replicas.begin(); p != replicas.end(); ++p) { try { @@ -417,8 +443,8 @@ ReplicaGroupEntry::getProxies(int& nReplicas, bool& replicaGroup) // compiler bug with xlC on AIX which causes a segfault if // getProxy raises an exception. // - AdapterPrx adpt = p->second->getProxy(_id, true); - adapters.push_back(make_pair(p->first, adpt)); + AdapterPrx adpt = (*p)->getProxy(_id, true); + adapters.push_back(make_pair((*p)->getId(), adpt)); } catch(const AdapterNotExistException&) { @@ -437,7 +463,7 @@ ReplicaGroupEntry::getProxies(int& nReplicas, bool& replicaGroup) float ReplicaGroupEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const { - ReplicaSeq replicas; + vector<ServerAdapterEntryPtr> replicas; { Lock sync(*this); replicas = _replicas; @@ -449,9 +475,9 @@ ReplicaGroupEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const // RandomNumberGenerator rng; random_shuffle(replicas.begin(), replicas.end(), rng); - vector<ReplicaLoadComp::ReplicaLoad> rl; - transform(replicas.begin(), replicas.end(), back_inserter(rl), ToReplicaLoad(loadSample)); - AdapterEntryPtr adpt = min_element(rl.begin(), rl.end(), ReplicaLoadComp())->second.second; + vector<pair<float, ServerAdapterEntryPtr> > rl; + transform(replicas.begin(), replicas.end(), back_inserter(rl), TransformToReplicaLoad(loadSample)); + AdapterEntryPtr adpt = min_element(rl.begin(), rl.end(), ReplicaLoadComp())->second; return adpt->getLeastLoadedNodeLoad(loadSample); } @@ -465,16 +491,16 @@ ReplicaGroupEntry::getApplication() const AdapterInfoSeq ReplicaGroupEntry::getAdapterInfo() const { - ReplicaSeq replicas; + vector<ServerAdapterEntryPtr> replicas; { Lock sync(*this); replicas = _replicas; } AdapterInfoSeq infos; - for(ReplicaSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) + for(vector<ServerAdapterEntryPtr>::const_iterator p = replicas.begin(); p != replicas.end(); ++p) { - AdapterInfoSeq infs = p->second->getAdapterInfo(); + AdapterInfoSeq infs = (*p)->getAdapterInfo(); assert(infs.size() == 1); infos.push_back(infs[0]); } |