summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/AdapterCache.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2005-10-12 17:21:02 +0000
committerBenoit Foucher <benoit@zeroc.com>2005-10-12 17:21:02 +0000
commitaac841a43441f7911056ddbc6fc8c21aa6126431 (patch)
tree8dcad281655b53155e9c10e72b07d436208787a8 /cpp/src/IceGrid/AdapterCache.cpp
parentchanging getLogger to return a custom Python impl (diff)
downloadice-aac841a43441f7911056ddbc6fc8c21aa6126431.tar.bz2
ice-aac841a43441f7911056ddbc6fc8c21aa6126431.tar.xz
ice-aac841a43441f7911056ddbc6fc8c21aa6126431.zip
Added support for replica groups and removed replicated adapters.
Diffstat (limited to 'cpp/src/IceGrid/AdapterCache.cpp')
-rw-r--r--cpp/src/IceGrid/AdapterCache.cpp383
1 files changed, 203 insertions, 180 deletions
diff --git a/cpp/src/IceGrid/AdapterCache.cpp b/cpp/src/IceGrid/AdapterCache.cpp
index be78dc91349..d0c55b7879e 100644
--- a/cpp/src/IceGrid/AdapterCache.cpp
+++ b/cpp/src/IceGrid/AdapterCache.cpp
@@ -8,7 +8,7 @@
// **********************************************************************
#include <Ice/LoggerUtil.h>
-
+#include <Ice/Locator.h>
#include <IceGrid/AdapterCache.h>
#include <IceGrid/NodeSessionI.h>
#include <IceGrid/ServerCache.h>
@@ -20,16 +20,16 @@ using namespace IceGrid;
namespace IceGrid
{
-struct ServerLoadCI : binary_function<ServerEntryPtr&, ServerEntryPtr&, bool>
+struct ServerLoadCI : binary_function<ServerAdapterEntryPtr&, ServerAdapterEntryPtr&, bool>
{
ServerLoadCI(LoadSample loadSample) : _loadSample(loadSample) { }
- bool operator()(const pair<string, ServerEntryPtr>& lhs, const pair<string, ServerEntryPtr>& rhs)
+ bool operator()(const pair<string, ServerAdapterEntryPtr>& lhs, const pair<string, ServerAdapterEntryPtr>& rhs)
{
float lhsl = 1.0f;
try
{
- lhsl = lhs.second->getLoad(_loadSample);
+ lhsl = lhs.second->getLeastLoadedNodeLoad(_loadSample);
}
catch(const ServerNotExistException&)
{
@@ -41,7 +41,7 @@ struct ServerLoadCI : binary_function<ServerEntryPtr&, ServerEntryPtr&, bool>
float rhsl = 1.0f;
try
{
- rhsl = rhs.second->getLoad(_loadSample);
+ rhsl = rhs.second->getLeastLoadedNodeLoad(_loadSample);
}
catch(const ServerNotExistException&)
{
@@ -58,27 +58,65 @@ struct ServerLoadCI : binary_function<ServerEntryPtr&, ServerEntryPtr&, bool>
}
AdapterEntryPtr
-AdapterCache::get(const string& id, bool create) const
+AdapterCache::get(const string& id) const
{
Lock sync(*this);
AdapterCache& self = const_cast<AdapterCache&>(*this);
- AdapterEntryPtr entry = self.getImpl(id, create);
+ AdapterEntryPtr entry = self.getImpl(id);
if(!entry)
{
- throw AdapterNotExistException(id, "");
+ throw AdapterNotExistException(id);
}
return entry;
}
+ReplicaGroupEntryPtr
+AdapterCache::getReplicaGroup(const string& id, bool create) const
+{
+ Lock sync(*this);
+ AdapterCache& self = const_cast<AdapterCache&>(*this);
+
+ AdapterEntryPtr entry = self.getImpl(id);
+ if(!entry && create)
+ {
+ return ReplicaGroupEntryPtr::dynamicCast(self.addImpl(id, new ReplicaGroupEntry(self, id)));
+ }
+ ReplicaGroupEntryPtr repEntry = ReplicaGroupEntryPtr::dynamicCast(entry);
+ if(!repEntry)
+ {
+ throw AdapterNotExistException(id);
+ }
+ return repEntry;
+}
+
+ServerAdapterEntryPtr
+AdapterCache::getServerAdapter(const string& id, bool create) const
+{
+ Lock sync(*this);
+ AdapterCache& self = const_cast<AdapterCache&>(*this);
+
+ AdapterEntryPtr entry = self.getImpl(id);
+ if(!entry && create)
+ {
+ return ServerAdapterEntryPtr::dynamicCast(self.addImpl(id, new ServerAdapterEntry(self, id)));
+ }
+ ServerAdapterEntryPtr svrEntry = ServerAdapterEntryPtr::dynamicCast(entry);
+ if(!svrEntry)
+ {
+ throw AdapterNotExistException(id);
+ }
+ return svrEntry;
+}
+
AdapterEntryPtr
-AdapterCache::addImpl(const string& id)
+AdapterCache::addImpl(const string& id, const AdapterEntryPtr& entry)
{
if(_traceLevels && _traceLevels->adapter > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
out << "added adapter `" << id << "'";
}
- return Cache<string, AdapterEntry>::addImpl(id);
+ return Cache<string, AdapterEntry>::addImpl(id, entry);
}
AdapterEntryPtr
@@ -93,148 +131,206 @@ AdapterCache::removeImpl(const string& id)
}
AdapterEntry::AdapterEntry(Cache<string, AdapterEntry>& cache, const std::string& id) :
- _cache(cache),
- _id(id),
- _replicated(false),
- _lastReplica(0)
+ _cache(*dynamic_cast<AdapterCache*>(&cache)),
+ _id(id)
{
}
-void
-AdapterEntry::enableReplication(const LoadBalancingPolicyPtr& policy)
+bool
+AdapterEntry::canRemove()
{
- Lock sync(*this);
- _replicated = true;
- _loadBalancing = policy;
- istringstream is(policy->nReplicas);
- is >> _loadBalancingNReplicas;
- if(_loadBalancingNReplicas < 1)
- {
- _loadBalancingNReplicas = 1;
- }
+ return true;
+}
+
+ServerAdapterEntry::ServerAdapterEntry(Cache<string, AdapterEntry>& cache, const std::string& id) :
+ AdapterEntry(cache, id)
+{
+}
+
+vector<pair<string, AdapterPrx> >
+ServerAdapterEntry::getProxies(int& nReplicas)
+{
+ vector<pair<string, AdapterPrx> > adapters;
+ adapters.push_back(make_pair(_id, getProxy()));
+ return adapters;
+}
+
+float
+ServerAdapterEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const
+{
+ return getServer()->getLoad(loadSample);
+}
+
+string
+ServerAdapterEntry::getApplication() const
+{
+ return getServer()->getApplication();
+}
- AdaptiveLoadBalancingPolicyPtr alb = AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing);
- if(alb)
+void
+ServerAdapterEntry::set(const ServerEntryPtr& server, const string& replicaGroupId)
+{
{
- if(alb->loadSample == "1")
- {
- _loadSample = LoadSample1;
- }
- else if(alb->loadSample == "5")
- {
- _loadSample = LoadSample5;
- }
- else if(alb->loadSample == "15")
- {
- _loadSample = LoadSample15;
- }
- else
- {
- _loadSample = LoadSample1;
- }
+ Lock sync(*this);
+ _server = server;
+ _replicaGroupId = replicaGroupId;
}
-
- if(_cache.getTraceLevels() && _cache.getTraceLevels()->adapter > 0)
+ if(!replicaGroupId.empty())
{
- Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->adapterCat);
- out << "enabled replication on adapter `" << _id << "'";
+ _cache.getReplicaGroup(replicaGroupId)->addReplica(_id, this);
}
}
void
-AdapterEntry::disableReplication()
+ServerAdapterEntry::destroy()
{
- bool remove;
+ string replicaGroupId;
{
Lock sync(*this);
- _replicated = false;
- remove = _replicas.empty();
+ replicaGroupId = _replicaGroupId;
}
- if(_cache.getTraceLevels() && _cache.getTraceLevels()->adapter > 0)
+ if(!replicaGroupId.empty())
{
- Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->adapterCat);
- out << "disabled replication on adapter `" << _id << "'";
+ _cache.getReplicaGroup(replicaGroupId)->removeReplica(_id);
}
- if(remove)
+ _cache.remove(_id);
+}
+
+AdapterPrx
+ServerAdapterEntry::getProxy(const string& replicaGroupId) const
+{
+ if(replicaGroupId.empty())
+ {
+ return getServer()->getAdapter(_id);
+ }
+ else
{
- _cache.remove(_id);
+ Lock sync(*this);
+ if(_replicaGroupId != replicaGroupId)
+ {
+ throw Ice::InvalidReplicaGroupIdException();
+ }
+ return _server->getAdapter(_id);
}
}
-void
-AdapterEntry::addReplica(const string& replicaId, const ServerEntryPtr& entry)
+ServerEntryPtr
+ServerAdapterEntry::getServer() const
{
Lock sync(*this);
- assert(_replicated || _replicas.empty());
- _replicas.push_back(make_pair(replicaId, entry));
+ assert(_server);
+ return _server;
+}
+
+ReplicaGroupEntry::ReplicaGroupEntry(Cache<string, AdapterEntry>& cache, const std::string& id) :
+ AdapterEntry(cache, id),
+ _lastReplica(0)
+{
}
void
-AdapterEntry::removeReplica(const string& replicaId)
+ReplicaGroupEntry::set(const string& application, const LoadBalancingPolicyPtr& policy)
{
- bool remove = false;
+ Lock sync(*this);
+ _application = application;
+ if(policy)
{
- Lock sync(*this);
- for(ReplicaSeq::iterator p = _replicas.begin(); p != _replicas.end(); ++p)
+ _loadBalancing = policy;
+ istringstream is(policy->nReplicas);
+ is >> _loadBalancingNReplicas;
+ if(_loadBalancingNReplicas < 1)
{
- if(replicaId == p->first)
+ _loadBalancingNReplicas = 1;
+ }
+
+ AdaptiveLoadBalancingPolicyPtr alb = AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing);
+ if(alb)
+ {
+ if(alb->loadSample == "1")
{
- _replicas.erase(p);
- // Make sure _lastReplica is still within the bounds.
- _lastReplica = _replicas.empty() ? 0 : _lastReplica % _replicas.size();
- break;
+ _loadSample = LoadSample1;
+ }
+ else if(alb->loadSample == "5")
+ {
+ _loadSample = LoadSample5;
+ }
+ else if(alb->loadSample == "15")
+ {
+ _loadSample = LoadSample15;
+ }
+ else
+ {
+ _loadSample = LoadSample1;
}
}
- remove = _replicas.empty() && !_replicated;
}
- if(remove)
+ else
+ {
+ _loadBalancing = 0;
+ _loadBalancingNReplicas = 0;
+ }
+}
+
+void
+ReplicaGroupEntry::addReplica(const string& replicaId, const ServerAdapterEntryPtr& adapter)
+{
+ Lock sync(*this);
+ _replicas.push_back(make_pair(replicaId, adapter));
+}
+
+void
+ReplicaGroupEntry::removeReplica(const string& replicaId)
+{
+ Lock sync(*this);
+ for(ReplicaSeq::iterator p = _replicas.begin(); p != _replicas.end(); ++p)
{
- _cache.remove(_id);
+ if(replicaId == p->first)
+ {
+ _replicas.erase(p);
+ // Make sure _lastReplica is still within the bounds.
+ _lastReplica = _replicas.empty() ? 0 : _lastReplica % _replicas.size();
+ break;
+ }
}
}
vector<pair<string, AdapterPrx> >
-AdapterEntry::getProxies(int& nReplicas)
+ReplicaGroupEntry::getProxies(int& nReplicas)
{
ReplicaSeq replicas;
bool adaptive = false;
LoadSample loadSample;
{
- Lock sync(*this);
+ Lock sync(*this);
if(_replicas.empty())
{
- throw AdapterNotExistException(_id, "");
+ return vector<pair<string, AdapterPrx> >();
}
- if(!_replicated)
+ nReplicas = _loadBalancingNReplicas > 0 ? _loadBalancingNReplicas : _replicas.size();
+ replicas.reserve(_replicas.size());
+ if(!_loadBalancing)
{
- nReplicas = 1;
- replicas.push_back(_replicas[0]);
+ replicas = _replicas;
}
- else
+ if(RoundRobinLoadBalancingPolicyPtr::dynamicCast(_loadBalancing))
{
- nReplicas = _loadBalancingNReplicas;
-
- replicas.reserve(_replicas.size());
- if(RoundRobinLoadBalancingPolicyPtr::dynamicCast(_loadBalancing))
- {
- for(unsigned int i = 0; i < _replicas.size(); ++i)
- {
- replicas.push_back(_replicas[(_lastReplica + i) % _replicas.size()]);
- }
- _lastReplica = (_lastReplica + 1) % _replicas.size();
- }
- else if(AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing))
- {
- replicas = _replicas;
- adaptive = true;
- loadSample = _loadSample;
- }
- else// if(RandomLoadBalancingPolicyPtr::dynamicCast(_loadBalancing))
+ for(unsigned int i = 0; i < _replicas.size(); ++i)
{
- replicas = _replicas;
- random_shuffle(replicas.begin(), replicas.end());
+ replicas.push_back(_replicas[(_lastReplica + i) % _replicas.size()]);
}
+ _lastReplica = (_lastReplica + 1) % _replicas.size();
+ }
+ else if(AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing))
+ {
+ replicas = _replicas;
+ adaptive = true;
+ loadSample = _loadSample;
+ }
+ else// if(RandomLoadBalancingPolicyPtr::dynamicCast(_loadBalancing))
+ {
+ replicas = _replicas;
+ random_shuffle(replicas.begin(), replicas.end());
}
}
@@ -254,47 +350,28 @@ AdapterEntry::getProxies(int& nReplicas)
// reachable.
//
vector<pair<string, AdapterPrx> > adapters;
- auto_ptr<Ice::UserException> exception;
for(ReplicaSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
{
try
{
- adapters.push_back(make_pair(p->second->getId(), p->second->getAdapter(_id, p->first)));
+ adapters.push_back(make_pair(p->first, p->second->getProxy()));
}
catch(AdapterNotExistException&)
{
}
- catch(const NodeUnreachableException& ex)
- {
- exception.reset(dynamic_cast<NodeUnreachableException*>(ex.ice_clone()));
- }
- }
-
- if(adapters.empty())
- {
- if(!exception.get())
- {
- throw AdapterNotExistException(_id, "");
- }
- else
+ catch(const NodeUnreachableException&)
{
- exception->ice_throw();
}
}
-
return adapters;
}
float
-AdapterEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const
+ReplicaGroupEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const
{
ReplicaSeq replicas;
{
- Lock sync(*this);
- if(_replicas.empty())
- {
- throw AdapterNotExistException(_id, "");
- }
+ Lock sync(*this);
replicas = _replicas;
}
@@ -303,67 +380,13 @@ AdapterEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const
// min_element() will call and lock each server entry.
//
random_shuffle(replicas.begin(), replicas.end());
- return min_element(replicas.begin(), replicas.end(), ServerLoadCI(loadSample))->second->getLoad(loadSample);
+ AdapterEntryPtr adpt = min_element(replicas.begin(), replicas.end(), ServerLoadCI(loadSample))->second;
+ return adpt->getLeastLoadedNodeLoad(loadSample);
}
string
-AdapterEntry::getApplication() const
-{
- Lock sync(*this);
- if(_replicas.empty())
- {
- throw AdapterNotExistException(_id, "");
- }
- return _replicas[0].second->getApplication();
-}
-
-AdapterPrx
-AdapterEntry::getProxy(const string& replicaId) const
-{
- pair<string, ServerEntryPtr> replica;
- bool replicated;
- {
- Lock sync(*this);
- if(_replicas.empty())
- {
- throw AdapterNotExistException(_id, (_replicated ? replicaId : ""));
- }
-
- replicated = _replicated;
- if(!_replicated)
- {
- replica = _replicas[0];
- }
- else
- {
- for(ReplicaSeq::const_iterator p = _replicas.begin(); p != _replicas.end(); ++p)
- {
- if(p->first == replicaId)
- {
- replica = *p;
- break;
- }
- }
- }
- }
-
- if(replica.second)
- {
- try
- {
- return replica.second->getAdapter(_id, replica.first);
- }
- catch(AdapterNotExistException&)
- {
- }
- }
-
- throw AdapterNotExistException(_id, (replicated ? replicaId : ""));
-}
-
-bool
-AdapterEntry::canRemove()
+ReplicaGroupEntry::getApplication() const
{
Lock sync(*this);
- return _replicas.empty() && !_replicated;
+ return _application;
}