diff options
author | Benoit Foucher <benoit@zeroc.com> | 2005-10-12 17:21:02 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2005-10-12 17:21:02 +0000 |
commit | aac841a43441f7911056ddbc6fc8c21aa6126431 (patch) | |
tree | 8dcad281655b53155e9c10e72b07d436208787a8 /cpp/src/IceGrid/AdapterCache.cpp | |
parent | changing getLogger to return a custom Python impl (diff) | |
download | ice-aac841a43441f7911056ddbc6fc8c21aa6126431.tar.bz2 ice-aac841a43441f7911056ddbc6fc8c21aa6126431.tar.xz ice-aac841a43441f7911056ddbc6fc8c21aa6126431.zip |
Added support for replica groups and removed replicated adapters.
Diffstat (limited to 'cpp/src/IceGrid/AdapterCache.cpp')
-rw-r--r-- | cpp/src/IceGrid/AdapterCache.cpp | 383 |
1 files changed, 203 insertions, 180 deletions
diff --git a/cpp/src/IceGrid/AdapterCache.cpp b/cpp/src/IceGrid/AdapterCache.cpp index be78dc91349..d0c55b7879e 100644 --- a/cpp/src/IceGrid/AdapterCache.cpp +++ b/cpp/src/IceGrid/AdapterCache.cpp @@ -8,7 +8,7 @@ // ********************************************************************** #include <Ice/LoggerUtil.h> - +#include <Ice/Locator.h> #include <IceGrid/AdapterCache.h> #include <IceGrid/NodeSessionI.h> #include <IceGrid/ServerCache.h> @@ -20,16 +20,16 @@ using namespace IceGrid; namespace IceGrid { -struct ServerLoadCI : binary_function<ServerEntryPtr&, ServerEntryPtr&, bool> +struct ServerLoadCI : binary_function<ServerAdapterEntryPtr&, ServerAdapterEntryPtr&, bool> { ServerLoadCI(LoadSample loadSample) : _loadSample(loadSample) { } - bool operator()(const pair<string, ServerEntryPtr>& lhs, const pair<string, ServerEntryPtr>& rhs) + bool operator()(const pair<string, ServerAdapterEntryPtr>& lhs, const pair<string, ServerAdapterEntryPtr>& rhs) { float lhsl = 1.0f; try { - lhsl = lhs.second->getLoad(_loadSample); + lhsl = lhs.second->getLeastLoadedNodeLoad(_loadSample); } catch(const ServerNotExistException&) { @@ -41,7 +41,7 @@ struct ServerLoadCI : binary_function<ServerEntryPtr&, ServerEntryPtr&, bool> float rhsl = 1.0f; try { - rhsl = rhs.second->getLoad(_loadSample); + rhsl = rhs.second->getLeastLoadedNodeLoad(_loadSample); } catch(const ServerNotExistException&) { @@ -58,27 +58,65 @@ struct ServerLoadCI : binary_function<ServerEntryPtr&, ServerEntryPtr&, bool> } AdapterEntryPtr -AdapterCache::get(const string& id, bool create) const +AdapterCache::get(const string& id) const { Lock sync(*this); AdapterCache& self = const_cast<AdapterCache&>(*this); - AdapterEntryPtr entry = self.getImpl(id, create); + AdapterEntryPtr entry = self.getImpl(id); if(!entry) { - throw AdapterNotExistException(id, ""); + throw AdapterNotExistException(id); } return entry; } +ReplicaGroupEntryPtr +AdapterCache::getReplicaGroup(const string& id, bool create) const +{ + Lock sync(*this); + AdapterCache& self = const_cast<AdapterCache&>(*this); + + AdapterEntryPtr entry = self.getImpl(id); + if(!entry && create) + { + return ReplicaGroupEntryPtr::dynamicCast(self.addImpl(id, new ReplicaGroupEntry(self, id))); + } + ReplicaGroupEntryPtr repEntry = ReplicaGroupEntryPtr::dynamicCast(entry); + if(!repEntry) + { + throw AdapterNotExistException(id); + } + return repEntry; +} + +ServerAdapterEntryPtr +AdapterCache::getServerAdapter(const string& id, bool create) const +{ + Lock sync(*this); + AdapterCache& self = const_cast<AdapterCache&>(*this); + + AdapterEntryPtr entry = self.getImpl(id); + if(!entry && create) + { + return ServerAdapterEntryPtr::dynamicCast(self.addImpl(id, new ServerAdapterEntry(self, id))); + } + ServerAdapterEntryPtr svrEntry = ServerAdapterEntryPtr::dynamicCast(entry); + if(!svrEntry) + { + throw AdapterNotExistException(id); + } + return svrEntry; +} + AdapterEntryPtr -AdapterCache::addImpl(const string& id) +AdapterCache::addImpl(const string& id, const AdapterEntryPtr& entry) { if(_traceLevels && _traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); out << "added adapter `" << id << "'"; } - return Cache<string, AdapterEntry>::addImpl(id); + return Cache<string, AdapterEntry>::addImpl(id, entry); } AdapterEntryPtr @@ -93,148 +131,206 @@ AdapterCache::removeImpl(const string& id) } AdapterEntry::AdapterEntry(Cache<string, AdapterEntry>& cache, const std::string& id) : - _cache(cache), - _id(id), - _replicated(false), - _lastReplica(0) + _cache(*dynamic_cast<AdapterCache*>(&cache)), + _id(id) { } -void -AdapterEntry::enableReplication(const LoadBalancingPolicyPtr& policy) +bool +AdapterEntry::canRemove() { - Lock sync(*this); - _replicated = true; - _loadBalancing = policy; - istringstream is(policy->nReplicas); - is >> _loadBalancingNReplicas; - if(_loadBalancingNReplicas < 1) - { - _loadBalancingNReplicas = 1; - } + return true; +} + +ServerAdapterEntry::ServerAdapterEntry(Cache<string, AdapterEntry>& cache, const std::string& id) : + AdapterEntry(cache, id) +{ +} + +vector<pair<string, AdapterPrx> > +ServerAdapterEntry::getProxies(int& nReplicas) +{ + vector<pair<string, AdapterPrx> > adapters; + adapters.push_back(make_pair(_id, getProxy())); + return adapters; +} + +float +ServerAdapterEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const +{ + return getServer()->getLoad(loadSample); +} + +string +ServerAdapterEntry::getApplication() const +{ + return getServer()->getApplication(); +} - AdaptiveLoadBalancingPolicyPtr alb = AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing); - if(alb) +void +ServerAdapterEntry::set(const ServerEntryPtr& server, const string& replicaGroupId) +{ { - if(alb->loadSample == "1") - { - _loadSample = LoadSample1; - } - else if(alb->loadSample == "5") - { - _loadSample = LoadSample5; - } - else if(alb->loadSample == "15") - { - _loadSample = LoadSample15; - } - else - { - _loadSample = LoadSample1; - } + Lock sync(*this); + _server = server; + _replicaGroupId = replicaGroupId; } - - if(_cache.getTraceLevels() && _cache.getTraceLevels()->adapter > 0) + if(!replicaGroupId.empty()) { - Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->adapterCat); - out << "enabled replication on adapter `" << _id << "'"; + _cache.getReplicaGroup(replicaGroupId)->addReplica(_id, this); } } void -AdapterEntry::disableReplication() +ServerAdapterEntry::destroy() { - bool remove; + string replicaGroupId; { Lock sync(*this); - _replicated = false; - remove = _replicas.empty(); + replicaGroupId = _replicaGroupId; } - if(_cache.getTraceLevels() && _cache.getTraceLevels()->adapter > 0) + if(!replicaGroupId.empty()) { - Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->adapterCat); - out << "disabled replication on adapter `" << _id << "'"; + _cache.getReplicaGroup(replicaGroupId)->removeReplica(_id); } - if(remove) + _cache.remove(_id); +} + +AdapterPrx +ServerAdapterEntry::getProxy(const string& replicaGroupId) const +{ + if(replicaGroupId.empty()) + { + return getServer()->getAdapter(_id); + } + else { - _cache.remove(_id); + Lock sync(*this); + if(_replicaGroupId != replicaGroupId) + { + throw Ice::InvalidReplicaGroupIdException(); + } + return _server->getAdapter(_id); } } -void -AdapterEntry::addReplica(const string& replicaId, const ServerEntryPtr& entry) +ServerEntryPtr +ServerAdapterEntry::getServer() const { Lock sync(*this); - assert(_replicated || _replicas.empty()); - _replicas.push_back(make_pair(replicaId, entry)); + assert(_server); + return _server; +} + +ReplicaGroupEntry::ReplicaGroupEntry(Cache<string, AdapterEntry>& cache, const std::string& id) : + AdapterEntry(cache, id), + _lastReplica(0) +{ } void -AdapterEntry::removeReplica(const string& replicaId) +ReplicaGroupEntry::set(const string& application, const LoadBalancingPolicyPtr& policy) { - bool remove = false; + Lock sync(*this); + _application = application; + if(policy) { - Lock sync(*this); - for(ReplicaSeq::iterator p = _replicas.begin(); p != _replicas.end(); ++p) + _loadBalancing = policy; + istringstream is(policy->nReplicas); + is >> _loadBalancingNReplicas; + if(_loadBalancingNReplicas < 1) { - if(replicaId == p->first) + _loadBalancingNReplicas = 1; + } + + AdaptiveLoadBalancingPolicyPtr alb = AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing); + if(alb) + { + if(alb->loadSample == "1") { - _replicas.erase(p); - // Make sure _lastReplica is still within the bounds. - _lastReplica = _replicas.empty() ? 0 : _lastReplica % _replicas.size(); - break; + _loadSample = LoadSample1; + } + else if(alb->loadSample == "5") + { + _loadSample = LoadSample5; + } + else if(alb->loadSample == "15") + { + _loadSample = LoadSample15; + } + else + { + _loadSample = LoadSample1; } } - remove = _replicas.empty() && !_replicated; } - if(remove) + else + { + _loadBalancing = 0; + _loadBalancingNReplicas = 0; + } +} + +void +ReplicaGroupEntry::addReplica(const string& replicaId, const ServerAdapterEntryPtr& adapter) +{ + Lock sync(*this); + _replicas.push_back(make_pair(replicaId, adapter)); +} + +void +ReplicaGroupEntry::removeReplica(const string& replicaId) +{ + Lock sync(*this); + for(ReplicaSeq::iterator p = _replicas.begin(); p != _replicas.end(); ++p) { - _cache.remove(_id); + if(replicaId == p->first) + { + _replicas.erase(p); + // Make sure _lastReplica is still within the bounds. + _lastReplica = _replicas.empty() ? 0 : _lastReplica % _replicas.size(); + break; + } } } vector<pair<string, AdapterPrx> > -AdapterEntry::getProxies(int& nReplicas) +ReplicaGroupEntry::getProxies(int& nReplicas) { ReplicaSeq replicas; bool adaptive = false; LoadSample loadSample; { - Lock sync(*this); + Lock sync(*this); if(_replicas.empty()) { - throw AdapterNotExistException(_id, ""); + return vector<pair<string, AdapterPrx> >(); } - if(!_replicated) + nReplicas = _loadBalancingNReplicas > 0 ? _loadBalancingNReplicas : _replicas.size(); + replicas.reserve(_replicas.size()); + if(!_loadBalancing) { - nReplicas = 1; - replicas.push_back(_replicas[0]); + replicas = _replicas; } - else + if(RoundRobinLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) { - nReplicas = _loadBalancingNReplicas; - - replicas.reserve(_replicas.size()); - if(RoundRobinLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) - { - for(unsigned int i = 0; i < _replicas.size(); ++i) - { - replicas.push_back(_replicas[(_lastReplica + i) % _replicas.size()]); - } - _lastReplica = (_lastReplica + 1) % _replicas.size(); - } - else if(AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) - { - replicas = _replicas; - adaptive = true; - loadSample = _loadSample; - } - else// if(RandomLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) + for(unsigned int i = 0; i < _replicas.size(); ++i) { - replicas = _replicas; - random_shuffle(replicas.begin(), replicas.end()); + replicas.push_back(_replicas[(_lastReplica + i) % _replicas.size()]); } + _lastReplica = (_lastReplica + 1) % _replicas.size(); + } + else if(AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) + { + replicas = _replicas; + adaptive = true; + loadSample = _loadSample; + } + else// if(RandomLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) + { + replicas = _replicas; + random_shuffle(replicas.begin(), replicas.end()); } } @@ -254,47 +350,28 @@ AdapterEntry::getProxies(int& nReplicas) // reachable. // vector<pair<string, AdapterPrx> > adapters; - auto_ptr<Ice::UserException> exception; for(ReplicaSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) { try { - adapters.push_back(make_pair(p->second->getId(), p->second->getAdapter(_id, p->first))); + adapters.push_back(make_pair(p->first, p->second->getProxy())); } catch(AdapterNotExistException&) { } - catch(const NodeUnreachableException& ex) - { - exception.reset(dynamic_cast<NodeUnreachableException*>(ex.ice_clone())); - } - } - - if(adapters.empty()) - { - if(!exception.get()) - { - throw AdapterNotExistException(_id, ""); - } - else + catch(const NodeUnreachableException&) { - exception->ice_throw(); } } - return adapters; } float -AdapterEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const +ReplicaGroupEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const { ReplicaSeq replicas; { - Lock sync(*this); - if(_replicas.empty()) - { - throw AdapterNotExistException(_id, ""); - } + Lock sync(*this); replicas = _replicas; } @@ -303,67 +380,13 @@ AdapterEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const // min_element() will call and lock each server entry. // random_shuffle(replicas.begin(), replicas.end()); - return min_element(replicas.begin(), replicas.end(), ServerLoadCI(loadSample))->second->getLoad(loadSample); + AdapterEntryPtr adpt = min_element(replicas.begin(), replicas.end(), ServerLoadCI(loadSample))->second; + return adpt->getLeastLoadedNodeLoad(loadSample); } string -AdapterEntry::getApplication() const -{ - Lock sync(*this); - if(_replicas.empty()) - { - throw AdapterNotExistException(_id, ""); - } - return _replicas[0].second->getApplication(); -} - -AdapterPrx -AdapterEntry::getProxy(const string& replicaId) const -{ - pair<string, ServerEntryPtr> replica; - bool replicated; - { - Lock sync(*this); - if(_replicas.empty()) - { - throw AdapterNotExistException(_id, (_replicated ? replicaId : "")); - } - - replicated = _replicated; - if(!_replicated) - { - replica = _replicas[0]; - } - else - { - for(ReplicaSeq::const_iterator p = _replicas.begin(); p != _replicas.end(); ++p) - { - if(p->first == replicaId) - { - replica = *p; - break; - } - } - } - } - - if(replica.second) - { - try - { - return replica.second->getAdapter(_id, replica.first); - } - catch(AdapterNotExistException&) - { - } - } - - throw AdapterNotExistException(_id, (replicated ? replicaId : "")); -} - -bool -AdapterEntry::canRemove() +ReplicaGroupEntry::getApplication() const { Lock sync(*this); - return _replicas.empty() && !_replicated; + return _application; } |