// ********************************************************************** // // Copyright (c) 2003-2006 ZeroC, Inc. All rights reserved. // // This copy of Ice is licensed to you under the terms described in the // ICE_LICENSE file included in this distribution. // // ********************************************************************** #include #include #include #include #include #include using namespace std; using namespace IceGrid; namespace IceGrid { struct ServerLoadCI : binary_function { ServerLoadCI(LoadSample loadSample) : _loadSample(loadSample) { } bool operator()(const pair& lhs, const pair& rhs) { float lhsl = 1.0f; try { lhsl = lhs.second->getLeastLoadedNodeLoad(_loadSample); } catch(const ServerNotExistException&) { } catch(const NodeUnreachableException&) { } float rhsl = 1.0f; try { rhsl = rhs.second->getLeastLoadedNodeLoad(_loadSample); } catch(const ServerNotExistException&) { } catch(const NodeUnreachableException&) { } return lhsl < rhsl; } LoadSample _loadSample; }; } AdapterEntryPtr AdapterCache::get(const string& id) const { Lock sync(*this); AdapterCache& self = const_cast(*this); AdapterEntryPtr entry = self.getImpl(id); if(!entry) { throw AdapterNotExistException(id); } return entry; } ReplicaGroupEntryPtr AdapterCache::getReplicaGroup(const string& id, bool create) const { Lock sync(*this); AdapterCache& self = const_cast(*this); AdapterEntryPtr entry = self.getImpl(id); if(!entry && create) { return ReplicaGroupEntryPtr::dynamicCast(self.addImpl(id, new ReplicaGroupEntry(self, id))); } ReplicaGroupEntryPtr repEntry = ReplicaGroupEntryPtr::dynamicCast(entry); if(!repEntry) { throw AdapterNotExistException(id); } return repEntry; } ServerAdapterEntryPtr AdapterCache::getServerAdapter(const string& id, bool create) const { Lock sync(*this); AdapterCache& self = const_cast(*this); AdapterEntryPtr entry = self.getImpl(id); if(!entry && create) { return ServerAdapterEntryPtr::dynamicCast(self.addImpl(id, new ServerAdapterEntry(self, id))); } ServerAdapterEntryPtr svrEntry = ServerAdapterEntryPtr::dynamicCast(entry); if(!svrEntry) { throw AdapterNotExistException(id); } return svrEntry; } AdapterEntryPtr AdapterCache::addImpl(const string& id, const AdapterEntryPtr& entry) { if(_traceLevels && _traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); out << "added adapter `" << id << "'"; } return Cache::addImpl(id, entry); } AdapterEntryPtr AdapterCache::removeImpl(const string& id) { if(_traceLevels && _traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); out << "removed adapter `" << id << "'"; } return Cache::removeImpl(id); } AdapterEntry::AdapterEntry(Cache& cache, const std::string& id) : _cache(*dynamic_cast(&cache)), _id(id) { } bool AdapterEntry::canRemove() { return true; } ServerAdapterEntry::ServerAdapterEntry(Cache& cache, const std::string& id) : AdapterEntry(cache, id) { } vector > ServerAdapterEntry::getProxies(bool, int& nReplicas) { vector > adapters; try { nReplicas = 1; adapters.push_back(make_pair(_id, getProxy())); } catch(const NodeUnreachableException&) { } return adapters; } float ServerAdapterEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const { return getServer()->getLoad(loadSample); } string ServerAdapterEntry::getApplication() const { return getServer()->getApplication(); } void ServerAdapterEntry::set(const ServerEntryPtr& server, const string& replicaGroupId) { { Lock sync(*this); _server = server; _replicaGroupId = replicaGroupId; } if(!replicaGroupId.empty()) { _cache.getReplicaGroup(replicaGroupId)->addReplica(_id, this); } } void ServerAdapterEntry::destroy() { string replicaGroupId; { Lock sync(*this); replicaGroupId = _replicaGroupId; } if(!replicaGroupId.empty()) { _cache.getReplicaGroup(replicaGroupId)->removeReplica(_id); } _cache.remove(_id); } AdapterPrx ServerAdapterEntry::getProxy(const string& replicaGroupId) const { if(replicaGroupId.empty()) { return getServer()->getAdapter(_id); } else { Lock sync(*this); if(_replicaGroupId != replicaGroupId) { throw Ice::InvalidReplicaGroupIdException(); } return _server->getAdapter(_id); } } ServerEntryPtr ServerAdapterEntry::getServer() const { Lock sync(*this); assert(_server); return _server; } ReplicaGroupEntry::ReplicaGroupEntry(Cache& cache, const std::string& id) : AdapterEntry(cache, id), _lastReplica(0) { } void ReplicaGroupEntry::set(const string& application, const LoadBalancingPolicyPtr& policy) { Lock sync(*this); _application = application; if(policy) { _loadBalancing = policy; istringstream is(policy->nReplicas); is >> _loadBalancingNReplicas; if(_loadBalancingNReplicas < 1) { _loadBalancingNReplicas = 1; } AdaptiveLoadBalancingPolicyPtr alb = AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing); if(alb) { if(alb->loadSample == "1") { _loadSample = LoadSample1; } else if(alb->loadSample == "5") { _loadSample = LoadSample5; } else if(alb->loadSample == "15") { _loadSample = LoadSample15; } else { _loadSample = LoadSample1; } } } else { _loadBalancing = 0; _loadBalancingNReplicas = 0; } } void ReplicaGroupEntry::addReplica(const string& replicaId, const ServerAdapterEntryPtr& adapter) { Lock sync(*this); _replicas.push_back(make_pair(replicaId, adapter)); } void ReplicaGroupEntry::removeReplica(const string& replicaId) { Lock sync(*this); for(ReplicaSeq::iterator p = _replicas.begin(); p != _replicas.end(); ++p) { if(replicaId == p->first) { _replicas.erase(p); // Make sure _lastReplica is still within the bounds. _lastReplica = _replicas.empty() ? 0 : _lastReplica % static_cast(_replicas.size()); break; } } } vector > ReplicaGroupEntry::getProxies(bool allRegistered, int& nReplicas) { ReplicaSeq replicas; bool adaptive = false; LoadSample loadSample = LoadSample1; { Lock sync(*this); if(_replicas.empty()) { return vector >(); } nReplicas = _loadBalancingNReplicas > 0 ? _loadBalancingNReplicas : static_cast(_replicas.size()); replicas.reserve(_replicas.size()); if(!_loadBalancing) { replicas = _replicas; } if(RoundRobinLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) { for(unsigned int i = 0; i < _replicas.size(); ++i) { replicas.push_back(_replicas[(_lastReplica + i) % _replicas.size()]); } _lastReplica = (_lastReplica + 1) % static_cast(_replicas.size()); } else if(AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) { replicas = _replicas; adaptive = true; loadSample = _loadSample; } else// if(RandomLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) { replicas = _replicas; random_shuffle(replicas.begin(), replicas.end()); } } if(adaptive) { // // This must be done outside the synchronization block since // the sort() will call and lock each server entry. // random_shuffle(replicas.begin(), replicas.end()); sort(replicas.begin(), replicas.end(), ServerLoadCI(loadSample)); } // // Retrieve the proxy of each adapter from the server. The adapter // might not exist anymore at this time or the node might not be // reachable. // vector > adapters; for(ReplicaSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) { try { adapters.push_back(make_pair(p->first, p->second->getProxy())); } catch(AdapterNotExistException&) { } catch(const NodeUnreachableException&) { if(allRegistered) { adapters.push_back(make_pair(p->first, AdapterPrx())); } } } return adapters; } float ReplicaGroupEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const { ReplicaSeq replicas; { Lock sync(*this); replicas = _replicas; } // // This must be done outside the synchronization block since // min_element() will call and lock each server entry. // random_shuffle(replicas.begin(), replicas.end()); AdapterEntryPtr adpt = min_element(replicas.begin(), replicas.end(), ServerLoadCI(loadSample))->second; return adpt->getLeastLoadedNodeLoad(loadSample); } string ReplicaGroupEntry::getApplication() const { Lock sync(*this); return _application; }