summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/AdapterCache.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-11-03 09:27:47 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-11-03 09:27:47 +0000
commite68a0e117ee67b3b0e3f64e14da73802e7877814 (patch)
tree62fc9806929cc447a694a1a5d094271766b998e4 /cpp/src/IceGrid/AdapterCache.cpp
parentAdded some IceVb stuff (diff)
downloadice-e68a0e117ee67b3b0e3f64e14da73802e7877814.tar.bz2
ice-e68a0e117ee67b3b0e3f64e14da73802e7877814.tar.xz
ice-e68a0e117ee67b3b0e3f64e14da73802e7877814.zip
Added ordered load balancing
Diffstat (limited to 'cpp/src/IceGrid/AdapterCache.cpp')
-rw-r--r--cpp/src/IceGrid/AdapterCache.cpp176
1 files changed, 101 insertions, 75 deletions
diff --git a/cpp/src/IceGrid/AdapterCache.cpp b/cpp/src/IceGrid/AdapterCache.cpp
index 84776e93237..087049e83bc 100644
--- a/cpp/src/IceGrid/AdapterCache.cpp
+++ b/cpp/src/IceGrid/AdapterCache.cpp
@@ -26,25 +26,33 @@ namespace IceGrid
struct ReplicaLoadComp : binary_function<ServerAdapterEntryPtr&, ServerAdapterEntryPtr&, bool>
{
- typedef ReplicaGroupEntry::ReplicaSeq::value_type Replica;
- typedef pair<float, Replica> ReplicaLoad;
-
- bool operator()(const ReplicaLoad& lhs, const ReplicaLoad& rhs)
+ bool operator()(const pair<float, ServerAdapterEntryPtr>& lhs, const pair<float, ServerAdapterEntryPtr>& rhs)
{
return lhs.first < rhs.first;
}
};
-struct ToReplicaLoad : public unary_function<const ReplicaLoadComp::Replica&, ReplicaLoadComp::ReplicaLoad>
+struct ReplicaPriorityComp : binary_function<ServerAdapterEntryPtr&, ServerAdapterEntryPtr&, bool>
+{
+ bool operator()(const ServerAdapterEntryPtr& lhs, const ServerAdapterEntryPtr& rhs)
+ {
+ return lhs->getPriority() < rhs->getPriority();
+ }
+};
+
+struct TransformToReplicaLoad :
+ public unary_function<const ServerAdapterEntryPtr&, pair<float, ServerAdapterEntryPtr> >
{
- ToReplicaLoad(LoadSample loadSample) : _loadSample(loadSample) { }
+public:
- ReplicaLoadComp::ReplicaLoad
- operator()(const ReplicaLoadComp::Replica& value)
+ TransformToReplicaLoad(LoadSample loadSample) : _loadSample(loadSample) { }
+
+ pair<float, ServerAdapterEntryPtr>
+ operator()(const ServerAdapterEntryPtr& value)
{
try
{
- return make_pair(value.second->getLeastLoadedNodeLoad(_loadSample), value);
+ return make_pair(value->getLeastLoadedNodeLoad(_loadSample), value);
}
catch(const Ice::Exception&)
{
@@ -55,10 +63,10 @@ struct ToReplicaLoad : public unary_function<const ReplicaLoadComp::Replica&, Re
LoadSample _loadSample;
};
-struct ToReplica : public unary_function<const ReplicaLoadComp::ReplicaLoad&, ReplicaLoadComp::Replica>
+struct TransformToReplica : public unary_function<const pair<string, ServerAdapterEntryPtr>&, ServerAdapterEntryPtr>
{
- ReplicaLoadComp::Replica
- operator()(const ReplicaLoadComp::ReplicaLoad& value)
+ ServerAdapterEntryPtr
+ operator()(const pair<float, ServerAdapterEntryPtr>& value)
{
return value.second;
}
@@ -67,30 +75,35 @@ struct ToReplica : public unary_function<const ReplicaLoadComp::ReplicaLoad&, Re
}
ServerAdapterEntryPtr
-AdapterCache::addServerAdapter(const string& id, const string& rgId, const ServerEntryPtr& server)
+AdapterCache::addServerAdapter(const AdapterDescriptor& desc, const ServerEntryPtr& server)
{
Lock sync(*this);
- assert(!getImpl(id));
- ServerAdapterEntryPtr entry = new ServerAdapterEntry(*this, id, rgId, server);
- addImpl(id, entry);
+ assert(!getImpl(desc.id));
+
+ istringstream is(desc.priority);
+ int priority = 0;
+ is >> priority;
+
+ ServerAdapterEntryPtr entry = new ServerAdapterEntry(*this, desc.id, desc.replicaGroupId, priority, server);
+ addImpl(desc.id, entry);
- if(!rgId.empty())
+ if(!desc.replicaGroupId.empty())
{
- ReplicaGroupEntryPtr repEntry = ReplicaGroupEntryPtr::dynamicCast(getImpl(rgId));
+ ReplicaGroupEntryPtr repEntry = ReplicaGroupEntryPtr::dynamicCast(getImpl(desc.replicaGroupId));
assert(repEntry);
- repEntry->addReplica(id, entry);
+ repEntry->addReplica(desc.id, entry);
}
return entry;
}
ReplicaGroupEntryPtr
-AdapterCache::addReplicaGroup(const string& id, const string& app, const LoadBalancingPolicyPtr& loadBalancing)
+AdapterCache::addReplicaGroup(const ReplicaGroupDescriptor& desc, const string& app)
{
Lock sync(*this);
- assert(!getImpl(id));
- ReplicaGroupEntryPtr entry = new ReplicaGroupEntry(*this, id, app, loadBalancing);
- addImpl(id, entry);
+ assert(!getImpl(desc.id));
+ ReplicaGroupEntryPtr entry = new ReplicaGroupEntry(*this, desc.id, app, desc.loadBalancing);
+ addImpl(desc.id, entry);
return entry;
}
@@ -189,12 +202,20 @@ AdapterEntry::canRemove()
return true;
}
+string
+AdapterEntry::getId() const
+{
+ return _id;
+}
+
ServerAdapterEntry::ServerAdapterEntry(AdapterCache& cache,
const string& id,
const string& replicaGroupId,
+ int priority,
const ServerEntryPtr& server) :
AdapterEntry(cache, id),
_replicaGroupId(replicaGroupId),
+ _priority(priority),
_server(server)
{
}
@@ -263,6 +284,12 @@ ServerAdapterEntry::getProxy(const string& replicaGroupId, bool upToDate) const
}
}
+int
+ServerAdapterEntry::getPriority() const
+{
+ return _priority;
+}
+
ServerEntryPtr
ServerAdapterEntry::getServer() const
{
@@ -286,16 +313,16 @@ void
ReplicaGroupEntry::addReplica(const string& replicaId, const ServerAdapterEntryPtr& adapter)
{
Lock sync(*this);
- _replicas.push_back(make_pair(replicaId, adapter));
+ _replicas.push_back(adapter);
}
void
ReplicaGroupEntry::removeReplica(const string& replicaId)
{
Lock sync(*this);
- for(ReplicaGroupEntry::ReplicaSeq::iterator p = _replicas.begin(); p != _replicas.end(); ++p)
+ for(vector<ServerAdapterEntryPtr>::iterator p = _replicas.begin(); p != _replicas.end(); ++p)
{
- if(replicaId == p->first)
+ if(replicaId == (*p)->getId())
{
_replicas.erase(p);
// Make sure _lastReplica is still within the bounds.
@@ -309,36 +336,32 @@ void
ReplicaGroupEntry::update(const LoadBalancingPolicyPtr& policy)
{
Lock sync(*this);
+ assert(policy);
+
_loadBalancing = policy;
- if(!_loadBalancing)
- {
- _loadBalancingNReplicas = 0;
- }
- else
+
+ istringstream is(_loadBalancing->nReplicas);
+ int nReplicas = 0;
+ is >> nReplicas;
+ _loadBalancingNReplicas = nReplicas < 0 ? 1 : nReplicas;
+ AdaptiveLoadBalancingPolicyPtr alb = AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing);
+ if(alb)
{
- istringstream is(_loadBalancing->nReplicas);
- int nReplicas = 0;
- is >> nReplicas;
- _loadBalancingNReplicas = nReplicas < 1 ? 1 : nReplicas;
- AdaptiveLoadBalancingPolicyPtr alb = AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing);
- if(alb)
+ if(alb->loadSample == "1")
{
- if(alb->loadSample == "1")
- {
- _loadSample = LoadSample1;
- }
- else if(alb->loadSample == "5")
- {
- _loadSample = LoadSample5;
- }
- else if(alb->loadSample == "15")
- {
- _loadSample = LoadSample15;
- }
- else
- {
- _loadSample = LoadSample1;
- }
+ _loadSample = LoadSample1;
+ }
+ else if(alb->loadSample == "5")
+ {
+ _loadSample = LoadSample5;
+ }
+ else if(alb->loadSample == "15")
+ {
+ _loadSample = LoadSample15;
+ }
+ else
+ {
+ _loadSample = LoadSample1;
}
}
}
@@ -346,7 +369,7 @@ ReplicaGroupEntry::update(const LoadBalancingPolicyPtr& policy)
vector<pair<string, AdapterPrx> >
ReplicaGroupEntry::getProxies(int& nReplicas, bool& replicaGroup)
{
- ReplicaSeq replicas;
+ vector<ServerAdapterEntryPtr> replicas;
bool adaptive = false;
LoadSample loadSample = LoadSample1;
{
@@ -360,10 +383,6 @@ ReplicaGroupEntry::getProxies(int& nReplicas, bool& replicaGroup)
nReplicas = _loadBalancingNReplicas > 0 ? _loadBalancingNReplicas : static_cast<int>(_replicas.size());
replicas.reserve(_replicas.size());
- if(!_loadBalancing)
- {
- replicas = _replicas;
- }
if(RoundRobinLoadBalancingPolicyPtr::dynamicCast(_loadBalancing))
{
for(unsigned int i = 0; i < _replicas.size(); ++i)
@@ -377,10 +396,15 @@ ReplicaGroupEntry::getProxies(int& nReplicas, bool& replicaGroup)
replicas = _replicas;
RandomNumberGenerator rng;
random_shuffle(replicas.begin(), replicas.end(), rng);
- adaptive = true;
loadSample = _loadSample;
+ adaptive = true;
}
- else// if(RandomLoadBalancingPolicyPtr::dynamicCast(_loadBalancing))
+ else if(OrderedLoadBalancingPolicyPtr::dynamicCast(_loadBalancing))
+ {
+ replicas = _replicas;
+ sort(replicas.begin(), replicas.end(), ReplicaPriorityComp());
+ }
+ else if(RandomLoadBalancingPolicyPtr::dynamicCast(_loadBalancing))
{
replicas = _replicas;
RandomNumberGenerator rng;
@@ -392,14 +416,16 @@ ReplicaGroupEntry::getProxies(int& nReplicas, bool& replicaGroup)
{
//
// This must be done outside the synchronization block since
- // the sort() will call and lock each server entry.
+ // the trasnform() might call and lock each server adapter
+ // entry. We also can't sort directly as the load of each
+ // server adapter is not stable so we first take a snapshot of
+ // each adapter and sort the snapshot.
//
-
- vector<ReplicaLoadComp::ReplicaLoad> rl;
- transform(replicas.begin(), replicas.end(), back_inserter(rl), ToReplicaLoad(loadSample));
+ vector<pair<float, ServerAdapterEntryPtr> > rl;
+ transform(replicas.begin(), replicas.end(), back_inserter(rl), TransformToReplicaLoad(loadSample));
sort(rl.begin(), rl.end(), ReplicaLoadComp());
replicas.clear();
- transform(rl.begin(), rl.end(), back_inserter(replicas), ToReplica());
+ transform(rl.begin(), rl.end(), back_inserter(replicas), TransformToReplica());
}
//
@@ -408,7 +434,7 @@ ReplicaGroupEntry::getProxies(int& nReplicas, bool& replicaGroup)
// reachable.
//
vector<pair<string, AdapterPrx> > adapters;
- for(ReplicaSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ for(vector<ServerAdapterEntryPtr>::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
{
try
{
@@ -417,8 +443,8 @@ ReplicaGroupEntry::getProxies(int& nReplicas, bool& replicaGroup)
// compiler bug with xlC on AIX which causes a segfault if
// getProxy raises an exception.
//
- AdapterPrx adpt = p->second->getProxy(_id, true);
- adapters.push_back(make_pair(p->first, adpt));
+ AdapterPrx adpt = (*p)->getProxy(_id, true);
+ adapters.push_back(make_pair((*p)->getId(), adpt));
}
catch(const AdapterNotExistException&)
{
@@ -437,7 +463,7 @@ ReplicaGroupEntry::getProxies(int& nReplicas, bool& replicaGroup)
float
ReplicaGroupEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const
{
- ReplicaSeq replicas;
+ vector<ServerAdapterEntryPtr> replicas;
{
Lock sync(*this);
replicas = _replicas;
@@ -449,9 +475,9 @@ ReplicaGroupEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const
//
RandomNumberGenerator rng;
random_shuffle(replicas.begin(), replicas.end(), rng);
- vector<ReplicaLoadComp::ReplicaLoad> rl;
- transform(replicas.begin(), replicas.end(), back_inserter(rl), ToReplicaLoad(loadSample));
- AdapterEntryPtr adpt = min_element(rl.begin(), rl.end(), ReplicaLoadComp())->second.second;
+ vector<pair<float, ServerAdapterEntryPtr> > rl;
+ transform(replicas.begin(), replicas.end(), back_inserter(rl), TransformToReplicaLoad(loadSample));
+ AdapterEntryPtr adpt = min_element(rl.begin(), rl.end(), ReplicaLoadComp())->second;
return adpt->getLeastLoadedNodeLoad(loadSample);
}
@@ -465,16 +491,16 @@ ReplicaGroupEntry::getApplication() const
AdapterInfoSeq
ReplicaGroupEntry::getAdapterInfo() const
{
- ReplicaSeq replicas;
+ vector<ServerAdapterEntryPtr> replicas;
{
Lock sync(*this);
replicas = _replicas;
}
AdapterInfoSeq infos;
- for(ReplicaSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ for(vector<ServerAdapterEntryPtr>::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
{
- AdapterInfoSeq infs = p->second->getAdapterInfo();
+ AdapterInfoSeq infs = (*p)->getAdapterInfo();
assert(infs.size() == 1);
infos.push_back(infs[0]);
}