diff options
author | Benoit Foucher <benoit@zeroc.com> | 2005-10-12 17:21:02 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2005-10-12 17:21:02 +0000 |
commit | aac841a43441f7911056ddbc6fc8c21aa6126431 (patch) | |
tree | 8dcad281655b53155e9c10e72b07d436208787a8 /cpp/src | |
parent | changing getLogger to return a custom Python impl (diff) | |
download | ice-aac841a43441f7911056ddbc6fc8c21aa6126431.tar.bz2 ice-aac841a43441f7911056ddbc6fc8c21aa6126431.tar.xz ice-aac841a43441f7911056ddbc6fc8c21aa6126431.zip |
Added support for replica groups and removed replicated adapters.
Diffstat (limited to 'cpp/src')
29 files changed, 690 insertions, 538 deletions
diff --git a/cpp/src/Ice/ObjectAdapterI.cpp b/cpp/src/Ice/ObjectAdapterI.cpp index 36bae61cca3..6b823d7ad5d 100644 --- a/cpp/src/Ice/ObjectAdapterI.cpp +++ b/cpp/src/Ice/ObjectAdapterI.cpp @@ -64,7 +64,6 @@ Ice::ObjectAdapterI::activate() LocatorRegistryPrx locatorRegistry; bool registerProcess = false; string serverId; - string replicaId; CommunicatorPtr communicator; bool printAdapterReady = false; @@ -82,7 +81,6 @@ Ice::ObjectAdapterI::activate() registerProcess = _instance->properties()->getPropertyAsInt(_name + ".RegisterProcess") > 0; serverId = _instance->properties()->getProperty("Ice.ServerId"); - replicaId = _instance->properties()->getPropertyWithDefault(_name + ".ReplicaId", serverId); printAdapterReady = _instance->properties()->getPropertyAsInt("Ice.PrintAdapterReady") > 0; if(registerProcess && !locatorRegistry) @@ -122,17 +120,24 @@ Ice::ObjectAdapterI::activate() { Identity ident; ident.name = "dummy"; - locatorRegistry->setAdapterDirectProxy(_id, replicaId, createDirectProxy(ident)); + locatorRegistry->setAdapterDirectProxy(_id, _replicaGroupId, createDirectProxy(ident)); } catch(const ObjectAdapterDeactivatedException&) { // IGNORE: The object adapter is already inactive. } - catch(const AdapterNotFoundException& e) + catch(const AdapterNotFoundException&) { NotRegisteredException ex(__FILE__, __LINE__); ex.kindOfObject = "object adapter"; - ex.id = e.replica ? _id + " (replica = " + replicaId + ")" : _id; + ex.id = _id; + throw ex; + } + catch(const InvalidReplicaGroupIdException&) + { + NotRegisteredException ex(__FILE__, __LINE__); + ex.kindOfObject = "replica group"; + ex.id = _replicaGroupId; throw ex; } catch(const AdapterAlreadyActiveException&) @@ -453,6 +458,17 @@ Ice::ObjectAdapterI::createDirectProxy(const Identity& ident) const } ObjectPrx +Ice::ObjectAdapterI::createIndirectProxy(const Identity& ident) const +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + checkForDeactivation(); + checkIdentity(ident); + + return newIndirectProxy(ident, "", _id); +} + +ObjectPrx Ice::ObjectAdapterI::createReverseProxy(const Identity& ident) const { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); @@ -748,6 +764,7 @@ Ice::ObjectAdapterI::ObjectAdapterI(const InstancePtr& instance, const Communica _printAdapterReadyDone(false), _name(name), _id(instance->properties()->getProperty(name + ".AdapterId")), + _replicaGroupId(instance->properties()->getProperty(name + ".ReplicaGroupId")), _directCount(0), _waitForDeactivate(false) { @@ -849,19 +866,13 @@ Ice::ObjectAdapterI::newProxy(const Identity& ident, const string& facet) const { return newDirectProxy(ident, facet); } + else if(_replicaGroupId.empty()) + { + return newIndirectProxy(ident, facet, _id); + } else { - // - // Create a reference with the adapter id. - // - ReferencePtr ref = _instance->referenceFactory()->create( - ident, _instance->getDefaultContext(), facet, Reference::ModeTwoway, false, _id, - 0, _locatorInfo, _instance->defaultsAndOverrides()->defaultCollocationOptimization); - - // - // Return a proxy for the reference. - // - return _instance->proxyFactory()->referenceToProxy(ref); + return newIndirectProxy(ident, facet, _replicaGroupId); } } @@ -887,6 +898,22 @@ Ice::ObjectAdapterI::newDirectProxy(const Identity& ident, const string& facet) } +ObjectPrx +Ice::ObjectAdapterI::newIndirectProxy(const Identity& ident, const string& facet, const string& id) const +{ + // + // Create an indirect reference with the given adapter id. + // + ReferencePtr ref = _instance->referenceFactory()->create( + ident, _instance->getDefaultContext(), facet, Reference::ModeTwoway, false, id, 0, _locatorInfo, + _instance->defaultsAndOverrides()->defaultCollocationOptimization); + + // + // Return a proxy for the reference. + // + return _instance->proxyFactory()->referenceToProxy(ref); +} + void Ice::ObjectAdapterI::checkForDeactivation() const { diff --git a/cpp/src/Ice/ObjectAdapterI.h b/cpp/src/Ice/ObjectAdapterI.h index c595b6e0877..22abe6d24af 100644 --- a/cpp/src/Ice/ObjectAdapterI.h +++ b/cpp/src/Ice/ObjectAdapterI.h @@ -21,11 +21,11 @@ #include <Ice/ServantManagerF.h> #include <Ice/ProxyF.h> #include <Ice/ObjectF.h> -#include <Ice/RouterInfoF.h>
+#include <Ice/RouterInfoF.h> #include <Ice/EndpointIF.h> #include <Ice/LocatorInfoF.h> #include <Ice/ThreadPoolF.h> -#include <Ice/Exception.h>
+#include <Ice/Exception.h> #include <Ice/Process.h> #include <list> @@ -66,6 +66,7 @@ public: virtual ObjectPrx createProxy(const Identity&) const; virtual ObjectPrx createDirectProxy(const Identity&) const; + virtual ObjectPrx createIndirectProxy(const Identity&) const; virtual ObjectPrx createReverseProxy(const Identity&) const; virtual void addRouter(const RouterPrx&); @@ -92,6 +93,7 @@ private: ObjectPrx newProxy(const Identity&, const std::string&) const; ObjectPrx newDirectProxy(const Identity&, const std::string&) const; + ObjectPrx newIndirectProxy(const Identity&, const std::string&, const std::string&) const; void checkForDeactivation() const; static void checkIdentity(const Identity&); std::vector<IceInternal::EndpointIPtr> parseEndpoints(const std::string&) const; @@ -104,6 +106,7 @@ private: bool _printAdapterReadyDone; const std::string _name; const std::string _id; + const std::string _replicaGroupId; std::vector<IceInternal::IncomingConnectionFactoryPtr> _incomingConnectionFactories; std::vector<IceInternal::EndpointIPtr> _routerEndpoints; std::vector<IceInternal::RouterInfoPtr> _routerInfos; diff --git a/cpp/src/Ice/PropertyNames.cpp b/cpp/src/Ice/PropertyNames.cpp index 9893f00c3b0..cc88fb0c323 100644 --- a/cpp/src/Ice/PropertyNames.cpp +++ b/cpp/src/Ice/PropertyNames.cpp @@ -7,7 +7,7 @@ // // ********************************************************************** -// Generated by makeprops.py from file `../config/PropertyNames.def', Wed Oct 12 11:03:32 2005 +// Generated by makeprops.py from file `../config/PropertyNames.def', Wed Oct 12 16:57:50 2005 // IMPORTANT: Do not edit this file -- any edits made here will be lost! @@ -81,7 +81,7 @@ const char* IceInternal::PropertyNames::IceBoxProps[] = "IceBox.PrintServicesReady", "IceBox.Service.*", "IceBox.ServiceManager.AdapterId", - "IceBox.ServiceManager.ReplicaId", + "IceBox.ServiceManager.ReplicaGroupId", "IceBox.ServiceManager.Endpoints", "IceBox.ServiceManager.Identity", "IceBox.ServiceManager.PublishedEndpoints", @@ -98,7 +98,7 @@ const char* IceInternal::PropertyNames::IceGridProps[] = { "IceGrid.InstanceName", "IceGrid.Node.AdapterId", - "IceGrid.Node.ReplicaId", + "IceGrid.Node.ReplicaGroupId", "IceGrid.Node.RegisterProcess", "IceGrid.Node.CollocateRegistry", "IceGrid.Node.Data", @@ -120,7 +120,7 @@ const char* IceInternal::PropertyNames::IceGridProps[] = "IceGrid.Node.Trace.Patch", "IceGrid.Node.WaitTime", "IceGrid.Registry.Admin.AdapterId", - "IceGrid.Registry.Admin.ReplicaId", + "IceGrid.Registry.Admin.ReplicaGroupId", "IceGrid.Registry.Admin.Endpoints", "IceGrid.Registry.Admin.PublishedEndpoints", "IceGrid.Registry.Admin.ThreadPool.Size", @@ -129,7 +129,7 @@ const char* IceInternal::PropertyNames::IceGridProps[] = "IceGrid.Registry.Admin.ThreadPool.StackSize", "IceGrid.Registry.AdminIdentity", "IceGrid.Registry.Client.AdapterId", - "IceGrid.Registry.Client.ReplicaId", + "IceGrid.Registry.Client.ReplicaGroupId", "IceGrid.Registry.Client.Endpoints", "IceGrid.Registry.Client.PublishedEndpoints", "IceGrid.Registry.Client.ThreadPool.Size", @@ -140,7 +140,7 @@ const char* IceInternal::PropertyNames::IceGridProps[] = "IceGrid.Registry.DefaultTemplates", "IceGrid.Registry.DynamicRegistration", "IceGrid.Registry.Internal.AdapterId", - "IceGrid.Registry.Internal.ReplicaId", + "IceGrid.Registry.Internal.ReplicaGroupId", "IceGrid.Registry.Internal.Endpoints", "IceGrid.Registry.Internal.PublishedEndpoints", "IceGrid.Registry.Internal.ThreadPool.Size", @@ -151,7 +151,7 @@ const char* IceInternal::PropertyNames::IceGridProps[] = "IceGrid.Registry.NodeSessionTimeout", "IceGrid.Registry.QueryIdentity", "IceGrid.Registry.Server.AdapterId", - "IceGrid.Registry.Server.ReplicaId", + "IceGrid.Registry.Server.ReplicaGroupId", "IceGrid.Registry.Server.Endpoints", "IceGrid.Registry.Server.PublishedEndpoints", "IceGrid.Registry.Server.ThreadPool.Size", @@ -185,9 +185,9 @@ const char* IceInternal::PropertyNames::IcePatchProps[] = const char* IceInternal::PropertyNames::IcePatch2Props[] = { "IcePatch2.AdapterId", - "IcePatch2.ReplicaId", + "IcePatch2.ReplicaGroupId", "IcePatch2.Admin.AdapterId", - "IcePatch2.Admin.ReplicaId", + "IcePatch2.Admin.ReplicaGroupId", "IcePatch2.Admin.Endpoints", "IcePatch2.Admin.PublishedEndpoints", "IcePatch2.Admin.ThreadPool.Size", @@ -252,7 +252,7 @@ const char* IceInternal::PropertyNames::IceStormProps[] = "IceStorm.Flush.Timeout", "IceStorm.InstanceName", "IceStorm.Publish.AdapterId", - "IceStorm.Publish.ReplicaId", + "IceStorm.Publish.ReplicaGroupId", "IceStorm.Publish.Endpoints", "IceStorm.Publish.PublishedEndpoints", "IceStorm.Publish.RegisterProcess", @@ -261,7 +261,7 @@ const char* IceInternal::PropertyNames::IceStormProps[] = "IceStorm.Publish.ThreadPool.SizeWarn", "IceStorm.Publish.ThreadPool.StackSize", "IceStorm.TopicManagerIdentity", - "IceStorm.TopicManager.ReplicaId", + "IceStorm.TopicManager.ReplicaGroupId", "IceStorm.TopicManager.Endpoints", "IceStorm.TopicManager.Proxy", "IceStorm.TopicManager.PublishedEndpoints", @@ -281,10 +281,10 @@ const char* IceInternal::PropertyNames::GlacierProps[] = { "Glacier.Router.AcceptCert", "Glacier.Router.AdapterId", - "Glacier.Router.ReplicaId", + "Glacier.Router.ReplicaGroupId", "Glacier.Router.AllowCategories", "Glacier.Router.Client.AdapterId", - "Glacier.Router.Client.ReplicaId", + "Glacier.Router.Client.ReplicaGroupId", "Glacier.Router.Client.Endpoints", "Glacier.Router.Client.ForwardContext", "Glacier.Router.Client.PublishedEndpoints", @@ -299,7 +299,7 @@ const char* IceInternal::PropertyNames::GlacierProps[] = "Glacier.Router.PrintProxyOnFd", "Glacier.Router.PublishedEndpoints", "Glacier.Router.Server.AdapterId", - "Glacier.Router.Server.ReplicaId", + "Glacier.Router.Server.ReplicaGroupId", "Glacier.Router.Server.Endpoints", "Glacier.Router.Server.ForwardContext", "Glacier.Router.Server.PublishedEndpoints", @@ -320,7 +320,7 @@ const char* IceInternal::PropertyNames::GlacierProps[] = "Glacier.Router.Trace.Throttle", "Glacier.Router.UserId", "Glacier.Starter.AdapterId", - "Glacier.Starter.ReplicaId", + "Glacier.Starter.ReplicaGroupId", "Glacier.Starter.AddUserToAllowCategories", "Glacier.Starter.Certificate.BitStrength", "Glacier.Starter.Certificate.CommonName", diff --git a/cpp/src/Ice/PropertyNames.h b/cpp/src/Ice/PropertyNames.h index dcb5c164116..65c09015407 100644 --- a/cpp/src/Ice/PropertyNames.h +++ b/cpp/src/Ice/PropertyNames.h @@ -7,7 +7,7 @@ // // ********************************************************************** -// Generated by makeprops.py from file `../config/PropertyNames.def', Wed Oct 12 11:03:32 2005 +// Generated by makeprops.py from file `../config/PropertyNames.def', Wed Oct 12 16:57:50 2005 // IMPORTANT: Do not edit this file -- any edits made here will be lost! diff --git a/cpp/src/IceGrid/AdapterCache.cpp b/cpp/src/IceGrid/AdapterCache.cpp index be78dc91349..d0c55b7879e 100644 --- a/cpp/src/IceGrid/AdapterCache.cpp +++ b/cpp/src/IceGrid/AdapterCache.cpp @@ -8,7 +8,7 @@ // ********************************************************************** #include <Ice/LoggerUtil.h> - +#include <Ice/Locator.h> #include <IceGrid/AdapterCache.h> #include <IceGrid/NodeSessionI.h> #include <IceGrid/ServerCache.h> @@ -20,16 +20,16 @@ using namespace IceGrid; namespace IceGrid { -struct ServerLoadCI : binary_function<ServerEntryPtr&, ServerEntryPtr&, bool> +struct ServerLoadCI : binary_function<ServerAdapterEntryPtr&, ServerAdapterEntryPtr&, bool> { ServerLoadCI(LoadSample loadSample) : _loadSample(loadSample) { } - bool operator()(const pair<string, ServerEntryPtr>& lhs, const pair<string, ServerEntryPtr>& rhs) + bool operator()(const pair<string, ServerAdapterEntryPtr>& lhs, const pair<string, ServerAdapterEntryPtr>& rhs) { float lhsl = 1.0f; try { - lhsl = lhs.second->getLoad(_loadSample); + lhsl = lhs.second->getLeastLoadedNodeLoad(_loadSample); } catch(const ServerNotExistException&) { @@ -41,7 +41,7 @@ struct ServerLoadCI : binary_function<ServerEntryPtr&, ServerEntryPtr&, bool> float rhsl = 1.0f; try { - rhsl = rhs.second->getLoad(_loadSample); + rhsl = rhs.second->getLeastLoadedNodeLoad(_loadSample); } catch(const ServerNotExistException&) { @@ -58,27 +58,65 @@ struct ServerLoadCI : binary_function<ServerEntryPtr&, ServerEntryPtr&, bool> } AdapterEntryPtr -AdapterCache::get(const string& id, bool create) const +AdapterCache::get(const string& id) const { Lock sync(*this); AdapterCache& self = const_cast<AdapterCache&>(*this); - AdapterEntryPtr entry = self.getImpl(id, create); + AdapterEntryPtr entry = self.getImpl(id); if(!entry) { - throw AdapterNotExistException(id, ""); + throw AdapterNotExistException(id); } return entry; } +ReplicaGroupEntryPtr +AdapterCache::getReplicaGroup(const string& id, bool create) const +{ + Lock sync(*this); + AdapterCache& self = const_cast<AdapterCache&>(*this); + + AdapterEntryPtr entry = self.getImpl(id); + if(!entry && create) + { + return ReplicaGroupEntryPtr::dynamicCast(self.addImpl(id, new ReplicaGroupEntry(self, id))); + } + ReplicaGroupEntryPtr repEntry = ReplicaGroupEntryPtr::dynamicCast(entry); + if(!repEntry) + { + throw AdapterNotExistException(id); + } + return repEntry; +} + +ServerAdapterEntryPtr +AdapterCache::getServerAdapter(const string& id, bool create) const +{ + Lock sync(*this); + AdapterCache& self = const_cast<AdapterCache&>(*this); + + AdapterEntryPtr entry = self.getImpl(id); + if(!entry && create) + { + return ServerAdapterEntryPtr::dynamicCast(self.addImpl(id, new ServerAdapterEntry(self, id))); + } + ServerAdapterEntryPtr svrEntry = ServerAdapterEntryPtr::dynamicCast(entry); + if(!svrEntry) + { + throw AdapterNotExistException(id); + } + return svrEntry; +} + AdapterEntryPtr -AdapterCache::addImpl(const string& id) +AdapterCache::addImpl(const string& id, const AdapterEntryPtr& entry) { if(_traceLevels && _traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); out << "added adapter `" << id << "'"; } - return Cache<string, AdapterEntry>::addImpl(id); + return Cache<string, AdapterEntry>::addImpl(id, entry); } AdapterEntryPtr @@ -93,148 +131,206 @@ AdapterCache::removeImpl(const string& id) } AdapterEntry::AdapterEntry(Cache<string, AdapterEntry>& cache, const std::string& id) : - _cache(cache), - _id(id), - _replicated(false), - _lastReplica(0) + _cache(*dynamic_cast<AdapterCache*>(&cache)), + _id(id) { } -void -AdapterEntry::enableReplication(const LoadBalancingPolicyPtr& policy) +bool +AdapterEntry::canRemove() { - Lock sync(*this); - _replicated = true; - _loadBalancing = policy; - istringstream is(policy->nReplicas); - is >> _loadBalancingNReplicas; - if(_loadBalancingNReplicas < 1) - { - _loadBalancingNReplicas = 1; - } + return true; +} + +ServerAdapterEntry::ServerAdapterEntry(Cache<string, AdapterEntry>& cache, const std::string& id) : + AdapterEntry(cache, id) +{ +} + +vector<pair<string, AdapterPrx> > +ServerAdapterEntry::getProxies(int& nReplicas) +{ + vector<pair<string, AdapterPrx> > adapters; + adapters.push_back(make_pair(_id, getProxy())); + return adapters; +} + +float +ServerAdapterEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const +{ + return getServer()->getLoad(loadSample); +} + +string +ServerAdapterEntry::getApplication() const +{ + return getServer()->getApplication(); +} - AdaptiveLoadBalancingPolicyPtr alb = AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing); - if(alb) +void +ServerAdapterEntry::set(const ServerEntryPtr& server, const string& replicaGroupId) +{ { - if(alb->loadSample == "1") - { - _loadSample = LoadSample1; - } - else if(alb->loadSample == "5") - { - _loadSample = LoadSample5; - } - else if(alb->loadSample == "15") - { - _loadSample = LoadSample15; - } - else - { - _loadSample = LoadSample1; - } + Lock sync(*this); + _server = server; + _replicaGroupId = replicaGroupId; } - - if(_cache.getTraceLevels() && _cache.getTraceLevels()->adapter > 0) + if(!replicaGroupId.empty()) { - Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->adapterCat); - out << "enabled replication on adapter `" << _id << "'"; + _cache.getReplicaGroup(replicaGroupId)->addReplica(_id, this); } } void -AdapterEntry::disableReplication() +ServerAdapterEntry::destroy() { - bool remove; + string replicaGroupId; { Lock sync(*this); - _replicated = false; - remove = _replicas.empty(); + replicaGroupId = _replicaGroupId; } - if(_cache.getTraceLevels() && _cache.getTraceLevels()->adapter > 0) + if(!replicaGroupId.empty()) { - Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->adapterCat); - out << "disabled replication on adapter `" << _id << "'"; + _cache.getReplicaGroup(replicaGroupId)->removeReplica(_id); } - if(remove) + _cache.remove(_id); +} + +AdapterPrx +ServerAdapterEntry::getProxy(const string& replicaGroupId) const +{ + if(replicaGroupId.empty()) + { + return getServer()->getAdapter(_id); + } + else { - _cache.remove(_id); + Lock sync(*this); + if(_replicaGroupId != replicaGroupId) + { + throw Ice::InvalidReplicaGroupIdException(); + } + return _server->getAdapter(_id); } } -void -AdapterEntry::addReplica(const string& replicaId, const ServerEntryPtr& entry) +ServerEntryPtr +ServerAdapterEntry::getServer() const { Lock sync(*this); - assert(_replicated || _replicas.empty()); - _replicas.push_back(make_pair(replicaId, entry)); + assert(_server); + return _server; +} + +ReplicaGroupEntry::ReplicaGroupEntry(Cache<string, AdapterEntry>& cache, const std::string& id) : + AdapterEntry(cache, id), + _lastReplica(0) +{ } void -AdapterEntry::removeReplica(const string& replicaId) +ReplicaGroupEntry::set(const string& application, const LoadBalancingPolicyPtr& policy) { - bool remove = false; + Lock sync(*this); + _application = application; + if(policy) { - Lock sync(*this); - for(ReplicaSeq::iterator p = _replicas.begin(); p != _replicas.end(); ++p) + _loadBalancing = policy; + istringstream is(policy->nReplicas); + is >> _loadBalancingNReplicas; + if(_loadBalancingNReplicas < 1) { - if(replicaId == p->first) + _loadBalancingNReplicas = 1; + } + + AdaptiveLoadBalancingPolicyPtr alb = AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing); + if(alb) + { + if(alb->loadSample == "1") { - _replicas.erase(p); - // Make sure _lastReplica is still within the bounds. - _lastReplica = _replicas.empty() ? 0 : _lastReplica % _replicas.size(); - break; + _loadSample = LoadSample1; + } + else if(alb->loadSample == "5") + { + _loadSample = LoadSample5; + } + else if(alb->loadSample == "15") + { + _loadSample = LoadSample15; + } + else + { + _loadSample = LoadSample1; } } - remove = _replicas.empty() && !_replicated; } - if(remove) + else + { + _loadBalancing = 0; + _loadBalancingNReplicas = 0; + } +} + +void +ReplicaGroupEntry::addReplica(const string& replicaId, const ServerAdapterEntryPtr& adapter) +{ + Lock sync(*this); + _replicas.push_back(make_pair(replicaId, adapter)); +} + +void +ReplicaGroupEntry::removeReplica(const string& replicaId) +{ + Lock sync(*this); + for(ReplicaSeq::iterator p = _replicas.begin(); p != _replicas.end(); ++p) { - _cache.remove(_id); + if(replicaId == p->first) + { + _replicas.erase(p); + // Make sure _lastReplica is still within the bounds. + _lastReplica = _replicas.empty() ? 0 : _lastReplica % _replicas.size(); + break; + } } } vector<pair<string, AdapterPrx> > -AdapterEntry::getProxies(int& nReplicas) +ReplicaGroupEntry::getProxies(int& nReplicas) { ReplicaSeq replicas; bool adaptive = false; LoadSample loadSample; { - Lock sync(*this); + Lock sync(*this); if(_replicas.empty()) { - throw AdapterNotExistException(_id, ""); + return vector<pair<string, AdapterPrx> >(); } - if(!_replicated) + nReplicas = _loadBalancingNReplicas > 0 ? _loadBalancingNReplicas : _replicas.size(); + replicas.reserve(_replicas.size()); + if(!_loadBalancing) { - nReplicas = 1; - replicas.push_back(_replicas[0]); + replicas = _replicas; } - else + if(RoundRobinLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) { - nReplicas = _loadBalancingNReplicas; - - replicas.reserve(_replicas.size()); - if(RoundRobinLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) - { - for(unsigned int i = 0; i < _replicas.size(); ++i) - { - replicas.push_back(_replicas[(_lastReplica + i) % _replicas.size()]); - } - _lastReplica = (_lastReplica + 1) % _replicas.size(); - } - else if(AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) - { - replicas = _replicas; - adaptive = true; - loadSample = _loadSample; - } - else// if(RandomLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) + for(unsigned int i = 0; i < _replicas.size(); ++i) { - replicas = _replicas; - random_shuffle(replicas.begin(), replicas.end()); + replicas.push_back(_replicas[(_lastReplica + i) % _replicas.size()]); } + _lastReplica = (_lastReplica + 1) % _replicas.size(); + } + else if(AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) + { + replicas = _replicas; + adaptive = true; + loadSample = _loadSample; + } + else// if(RandomLoadBalancingPolicyPtr::dynamicCast(_loadBalancing)) + { + replicas = _replicas; + random_shuffle(replicas.begin(), replicas.end()); } } @@ -254,47 +350,28 @@ AdapterEntry::getProxies(int& nReplicas) // reachable. // vector<pair<string, AdapterPrx> > adapters; - auto_ptr<Ice::UserException> exception; for(ReplicaSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) { try { - adapters.push_back(make_pair(p->second->getId(), p->second->getAdapter(_id, p->first))); + adapters.push_back(make_pair(p->first, p->second->getProxy())); } catch(AdapterNotExistException&) { } - catch(const NodeUnreachableException& ex) - { - exception.reset(dynamic_cast<NodeUnreachableException*>(ex.ice_clone())); - } - } - - if(adapters.empty()) - { - if(!exception.get()) - { - throw AdapterNotExistException(_id, ""); - } - else + catch(const NodeUnreachableException&) { - exception->ice_throw(); } } - return adapters; } float -AdapterEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const +ReplicaGroupEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const { ReplicaSeq replicas; { - Lock sync(*this); - if(_replicas.empty()) - { - throw AdapterNotExistException(_id, ""); - } + Lock sync(*this); replicas = _replicas; } @@ -303,67 +380,13 @@ AdapterEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const // min_element() will call and lock each server entry. // random_shuffle(replicas.begin(), replicas.end()); - return min_element(replicas.begin(), replicas.end(), ServerLoadCI(loadSample))->second->getLoad(loadSample); + AdapterEntryPtr adpt = min_element(replicas.begin(), replicas.end(), ServerLoadCI(loadSample))->second; + return adpt->getLeastLoadedNodeLoad(loadSample); } string -AdapterEntry::getApplication() const -{ - Lock sync(*this); - if(_replicas.empty()) - { - throw AdapterNotExistException(_id, ""); - } - return _replicas[0].second->getApplication(); -} - -AdapterPrx -AdapterEntry::getProxy(const string& replicaId) const -{ - pair<string, ServerEntryPtr> replica; - bool replicated; - { - Lock sync(*this); - if(_replicas.empty()) - { - throw AdapterNotExistException(_id, (_replicated ? replicaId : "")); - } - - replicated = _replicated; - if(!_replicated) - { - replica = _replicas[0]; - } - else - { - for(ReplicaSeq::const_iterator p = _replicas.begin(); p != _replicas.end(); ++p) - { - if(p->first == replicaId) - { - replica = *p; - break; - } - } - } - } - - if(replica.second) - { - try - { - return replica.second->getAdapter(_id, replica.first); - } - catch(AdapterNotExistException&) - { - } - } - - throw AdapterNotExistException(_id, (replicated ? replicaId : "")); -} - -bool -AdapterEntry::canRemove() +ReplicaGroupEntry::getApplication() const { Lock sync(*this); - return _replicas.empty() && !_replicated; + return _application; } diff --git a/cpp/src/IceGrid/AdapterCache.h b/cpp/src/IceGrid/AdapterCache.h index f3b757d51b4..a05a4df0189 100644 --- a/cpp/src/IceGrid/AdapterCache.h +++ b/cpp/src/IceGrid/AdapterCache.h @@ -34,42 +34,80 @@ public: AdapterEntry(Cache<std::string, AdapterEntry>&, const std::string&); - AdapterPrx getProxy(const std::string&) const; - std::vector<std::pair<std::string, AdapterPrx> > getProxies(int&); - float getLeastLoadedNodeLoad(LoadSample) const; - std::string getApplication() const; + virtual std::vector<std::pair<std::string, AdapterPrx> > getProxies(int&) { + return std::vector<std::pair<std::string, AdapterPrx> >(); } + virtual float getLeastLoadedNodeLoad(LoadSample) const { return 0.0f; } + virtual std::string getApplication() const { return ""; } + virtual bool canRemove(); + +protected: + + AdapterCache& _cache; + const std::string _id; +}; +typedef IceUtil::Handle<AdapterEntry> AdapterEntryPtr; + +class ServerAdapterEntry : public AdapterEntry +{ +public: - void enableReplication(const LoadBalancingPolicyPtr&); - void disableReplication(); + ServerAdapterEntry(Cache<std::string, AdapterEntry>&, const std::string&); - void addReplica(const std::string&, const ServerEntryPtr&); - void removeReplica(const std::string&); + virtual std::vector<std::pair<std::string, AdapterPrx> > getProxies(int&); + virtual float getLeastLoadedNodeLoad(LoadSample) const; + virtual std::string getApplication() const; + + void set(const ServerEntryPtr&, const std::string&); + void destroy(); + + AdapterPrx getProxy(const std::string& = std::string()) const; - bool canRemove(); - private: - Cache<std::string, AdapterEntry>& _cache; - const std::string _id; - bool _replicated; + ServerEntryPtr getServer() const; + + ServerEntryPtr _server; + std::string _replicaGroupId; +}; +typedef IceUtil::Handle<ServerAdapterEntry> ServerAdapterEntryPtr; + +class ReplicaGroupEntry : public AdapterEntry +{ +public: + + ReplicaGroupEntry(Cache<std::string, AdapterEntry>&, const std::string&); + + virtual std::vector<std::pair<std::string, AdapterPrx> > getProxies(int&); + virtual float getLeastLoadedNodeLoad(LoadSample) const; + virtual std::string getApplication() const; + + void set(const std::string&, const LoadBalancingPolicyPtr&); + void addReplica(const std::string&, const ServerAdapterEntryPtr&); + void removeReplica(const std::string&); + +private: + LoadBalancingPolicyPtr _loadBalancing; int _loadBalancingNReplicas; LoadSample _loadSample; - typedef std::vector<std::pair<std::string, ServerEntryPtr> > ReplicaSeq; + std::string _application; + typedef std::vector<std::pair<std::string, ServerAdapterEntryPtr> > ReplicaSeq; ReplicaSeq _replicas; int _lastReplica; }; -typedef IceUtil::Handle<AdapterEntry> AdapterEntryPtr; +typedef IceUtil::Handle<ReplicaGroupEntry> ReplicaGroupEntryPtr; class AdapterCache : public CacheByString<AdapterEntry> { public: - AdapterEntryPtr get(const std::string&, bool = false) const; + AdapterEntryPtr get(const std::string&) const; + ServerAdapterEntryPtr getServerAdapter(const std::string&, bool = false) const; + ReplicaGroupEntryPtr getReplicaGroup(const std::string&, bool = false) const; protected: - AdapterEntryPtr addImpl(const std::string&); + AdapterEntryPtr addImpl(const std::string&, const AdapterEntryPtr&); AdapterEntryPtr removeImpl(const std::string&); }; diff --git a/cpp/src/IceGrid/AdminI.cpp b/cpp/src/IceGrid/AdminI.cpp index e13a6a5288c..447df7103e8 100644 --- a/cpp/src/IceGrid/AdminI.cpp +++ b/cpp/src/IceGrid/AdminI.cpp @@ -175,6 +175,24 @@ AdminI::getDefaultApplicationDescriptor(const Current& current) const throw IceXML::ParserException(__FILE__, __LINE__, "invalid default application descriptor:\ndistribution is not allowed"); } + if(!desc.replicaGroups.empty()) + { + throw IceXML::ParserException(__FILE__, __LINE__, + "invalid default application descriptor:\n" + "replica group definitions are not allowed"); + } + if(!desc.description.empty()) + { + throw IceXML::ParserException(__FILE__, __LINE__, + "invalid default application descriptor:\ndescription is not allowed"); + } + if(!desc.variables.empty()) + { + throw IceXML::ParserException(__FILE__, __LINE__, + "invalid default application descriptor:\n" + "variable definitions are not allowed"); + } + return desc; } catch(const IceXML::ParserException& ex) @@ -388,12 +406,6 @@ AdminI::getAdapterEndpoints(const string& adapterId, const Current&) const } void -AdminI::removeAdapterWithReplicaId(const string& adapterId, const string& replicaId, const Ice::Current&) -{ - _database->setAdapterDirectProxy(adapterId, replicaId, 0); -} - -void AdminI::removeAdapter(const string& adapterId, const Ice::Current&) { _database->removeAdapter(adapterId); diff --git a/cpp/src/IceGrid/AdminI.h b/cpp/src/IceGrid/AdminI.h index af4c530e658..7dfb28235d4 100644 --- a/cpp/src/IceGrid/AdminI.h +++ b/cpp/src/IceGrid/AdminI.h @@ -50,7 +50,6 @@ public: virtual bool isServerEnabled(const ::std::string&, const Ice::Current&) const; virtual StringObjectProxyDict getAdapterEndpoints(const ::std::string&, const ::Ice::Current&) const; - virtual void removeAdapterWithReplicaId(const std::string&, const std::string&, const Ice::Current&); virtual void removeAdapter(const std::string&, const Ice::Current&); virtual Ice::StringSeq getAllAdapterIds(const ::Ice::Current&) const; diff --git a/cpp/src/IceGrid/Cache.h b/cpp/src/IceGrid/Cache.h index eac00d9794c..1dee9c8feb7 100644 --- a/cpp/src/IceGrid/Cache.h +++ b/cpp/src/IceGrid/Cache.h @@ -97,7 +97,12 @@ protected: virtual ValuePtr addImpl(const Key& key) { - ValuePtr entry = createEntry(key); + return addImpl(key, createEntry(key)); + } + + virtual ValuePtr + addImpl(const Key& key, const ValuePtr& entry) + { typename std::map<Key, ValuePtr>::value_type v(key, entry); _entriesHint = _entries.insert(_entriesHint, v); return entry; diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 3523a17614c..7df15736697 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -24,6 +24,7 @@ using namespace IceGrid; const string Database::_descriptorDbName = "applications"; const string Database::_adapterDbName = "adapters"; +const string Database::_replicaGroupDbName = "replica-groups"; const string Database::_objectDbName = "objects"; namespace IceGrid @@ -50,17 +51,7 @@ public: virtual Ice::ObjectPrx getDirectProxy(const Ice::Current& current) const { - istringstream is(current.id.name); - unsigned int size; - is >> size; - char c; - is >> c; - assert(c == '-'); - string id; - is >> id; - string adapterId = id.substr(0, size); - string replicaId = (id.size() > size) ? id.substr(size + 1) : string(); - return _database->getAdapterDirectProxy(adapterId, replicaId); + return _database->getAdapterDirectProxy(current.id.name); } virtual void @@ -137,6 +128,7 @@ Database::Database(const Ice::ObjectAdapterPtr& adapter, _descriptors(_connection, _descriptorDbName), _objects(_connection, _objectDbName), _adapters(_connection, _adapterDbName), + _replicaGroups(_connection, _replicaGroupDbName), _lock(0), _serial(0) { @@ -509,10 +501,11 @@ Database::getAllNodeServers(const string& node) } bool -Database::setAdapterDirectProxy(const string& adapterId, const string& replicaId, const Ice::ObjectPrx& proxy) +Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy) { Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringObjectProxiesDict adapters(connection, _adapterDbName); + StringProxyDict adapters(connection, _adapterDbName); + StringStringSeqDict replicaGroups(connection, _replicaGroupDbName); if(proxy) { Lock sync(*this); @@ -521,39 +514,46 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaId return false; } - StringObjectProxiesDict::iterator p = adapters.find(adapterId); + StringProxyDict::iterator p = adapters.find(adapterId); if(p != adapters.end()) { - StringObjectProxyDict proxies = p->second; - proxies[replicaId] = proxy; - p.set(proxies); - + p.set(proxy); if(_traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); out << "updated adapter `" << adapterId << "'"; - if(!replicaId.empty()) - { - out << " from replica `" << replicaId << "'"; - } } } else { - StringObjectProxyDict proxies; - proxies[replicaId] = proxy; - adapters.put(StringObjectProxiesDict::value_type(adapterId, proxies)); - + adapters.put(StringProxyDict::value_type(adapterId, proxy)); if(_traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); out << "added adapter `" << adapterId << "'"; - if(!replicaId.empty()) + } + } + + if(!replicaGroupId.empty()) + { + StringStringSeqDict::iterator q = replicaGroups.find(replicaGroupId); + if(q != replicaGroups.end()) + { + if(find(q->second.begin(), q->second.end(), adapterId) == q->second.end()) { - out << " from replica `" << replicaId << "'"; + Ice::StringSeq adapters = q->second; + adapters.push_back(adapterId); + q.set(adapters); } } + else + { + Ice::StringSeq adapters; + adapters.push_back(adapterId); + replicaGroups.put(StringStringSeqDict::value_type(replicaGroupId, adapters)); + } } + return true; } else @@ -564,59 +564,43 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaId return false; } - StringObjectProxiesDict::iterator p = adapters.find(adapterId); - if(p == adapters.end()) + if(adapters.erase(adapterId) == 0) { return true; } - StringObjectProxyDict proxies = p->second; - if(proxies.erase(replicaId) == 0) + if(_traceLevels->adapter > 0) { - throw AdapterNotExistException(adapterId, replicaId); + Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); + out << "removed adapter `" << adapterId << "'"; } - if(proxies.empty()) + if(!replicaGroupId.empty()) { - adapters.erase(p); - - if(_traceLevels->adapter > 0) + StringStringSeqDict::iterator q = replicaGroups.find(replicaGroupId); + if(q == replicaGroups.end()) { - Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); - out << "removed adapter `" << adapterId << "'"; + return true; } + + Ice::StringSeq adapters = q->second; + adapters.erase(remove(adapters.begin(), adapters.end(), adapterId), adapters.end()); + q.set(adapters); } - else - { - p.set(proxies); - if(_traceLevels->adapter > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); - out << "removed adapter `" << adapterId << "'"; - if(!replicaId.empty()) - { - out << " from replica `" << replicaId << "'"; - } - } - } return true; } } Ice::ObjectPrx -Database::getAdapterDirectProxy(const string& adapterId, const string& replicaId) +Database::getAdapterDirectProxy(const string& adapterId) { Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringObjectProxiesDict adapters(connection, _adapterDbName); - StringObjectProxiesDict::const_iterator p = adapters.find(adapterId); + StringProxyDict adapters(connection, _adapterDbName); + StringProxyDict::const_iterator p = adapters.find(adapterId); if(p != adapters.end()) { - StringObjectProxyDict::const_iterator q = p->second.find(replicaId); - if(q != p->second.end()) - { - return q->second; - } + return p->second; } return 0; } @@ -637,28 +621,39 @@ Database::removeAdapter(const string& adapterId) } Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringObjectProxiesDict adapters(connection, _adapterDbName); - StringObjectProxiesDict::iterator p = adapters.find(adapterId); + StringProxyDict adapters(connection, _adapterDbName); + StringProxyDict::iterator p = adapters.find(adapterId); if(p != adapters.end()) { adapters.erase(p); - if(_traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); out << "removed adapter `" << adapterId << "'"; } + return; } - else + + StringStringSeqDict replicaGroups(connection, _replicaGroupDbName); + StringStringSeqDict::iterator q = replicaGroups.find(adapterId); + if(q != replicaGroups.end()) { - throw AdapterNotExistException(adapterId, ""); + replicaGroups.erase(q); + if(_traceLevels->adapter > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); + out << "removed adapter `" << adapterId << "'"; + } + return; } + + throw AdapterNotExistException(adapterId); } AdapterPrx -Database::getAdapter(const string& id, const string& replicaId) +Database::getAdapter(const string& id, const string& replicaGroupId) { - return _adapterCache.get(id)->getProxy(replicaId); + return _adapterCache.getServerAdapter(id, false)->getProxy(replicaGroupId); } vector<pair<string, AdapterPrx> > @@ -669,18 +664,12 @@ Database::getAdapters(const string& id, int& endpointCount) // server, if that's the case we get the adapter proxy from the // server. // - auto_ptr<Ice::UserException> exception; try { return _adapterCache.get(id)->getProxies(endpointCount); } - catch(AdapterNotExistException& ex) - { - exception.reset(dynamic_cast<AdapterNotExistException*>(ex.ice_clone())); - } - catch(const NodeUnreachableException& ex) + catch(AdapterNotExistException&) { - exception.reset(dynamic_cast<NodeUnreachableException*>(ex.ice_clone())); } // @@ -688,29 +677,42 @@ Database::getAdapters(const string& id, int& endpointCount) // entry the adapter is managed by the registry itself. // Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringObjectProxiesDict adapters(connection, _adapterDbName); - StringObjectProxiesDict::const_iterator p = adapters.find(id); + StringProxyDict adapters(connection, _adapterDbName); + StringProxyDict::const_iterator p = adapters.find(id); if(p != adapters.end()) { vector<pair<string, AdapterPrx> > adapters; - for(StringObjectProxyDict::const_iterator q = p->second.begin(); q != p->second.end(); ++q) + Ice::Identity identity; + identity.category = "IceGridAdapter"; + identity.name = id; + AdapterPrx adpt = AdapterPrx::uncheckedCast(_internalAdapter->createDirectProxy(identity)); + adapters.push_back(make_pair(id, adpt)); + return adapters; + } + + // + // If it's not a regular object adapter, perhaps it's a replica + // group... + // + StringStringSeqDict replicaGroups(connection, _replicaGroupDbName); + StringStringSeqDict::const_iterator q = replicaGroups.find(id); + if(q != replicaGroups.end()) + { + vector<pair<string, AdapterPrx> > adapters; + for(Ice::StringSeq::const_iterator r = q->second.begin(); r != q->second.end(); ++r) { Ice::Identity identity; identity.category = "IceGridAdapter"; - ostringstream os; - os << id.size() << "-" << id << "-" << q->first; - identity.name = os.str(); - adapters.push_back( - make_pair(q->first, AdapterPrx::uncheckedCast(_internalAdapter->createDirectProxy(identity)))); + identity.name = *r; + AdapterPrx adpt = AdapterPrx::uncheckedCast(_internalAdapter->createDirectProxy(identity)); + adapters.push_back(make_pair(*r, adpt)); } random_shuffle(adapters.begin(), adapters.end()); endpointCount = adapters.size(); return adapters; } - assert(exception.get()); - exception->ice_throw(); - return vector<pair<string, AdapterPrx> >(); // Keeps the compiler happy. + throw AdapterNotExistException(id); } Ice::StringSeq @@ -720,7 +722,9 @@ Database::getAllAdapters(const string& expression) vector<string> result; vector<string> ids = _adapterCache.getAll(expression); result.swap(ids); - ids = getMatchingKeys<StringObjectProxiesDict>(_adapters, expression); + ids = getMatchingKeys<StringProxyDict>(_adapters, expression); + result.insert(result.end(), ids.begin(), ids.end()); + ids = getMatchingKeys<StringStringSeqDict>(_replicaGroups, expression); result.insert(result.end(), ids.begin(), ids.end()); return result; } @@ -991,7 +995,9 @@ Database::checkServerForAddition(const string& id) void Database::checkAdapterForAddition(const string& id) { - if(_adapterCache.has(id) || _adapters.find(id) != _adapters.end()) + if(_adapterCache.has(id) || + _adapters.find(id) != _adapters.end() || + _replicaGroups.find(id) != _replicaGroups.end()) { DeploymentException ex; ex.reason = "adapter `" + id + "' is already registered"; @@ -1020,11 +1026,11 @@ Database::load(const ApplicationHelper& app, ServerEntrySeq& entries) _nodeCache.get(n->first, true)->addDescriptor(application, n->second); } - const ReplicatedAdapterDescriptorSeq& adpts = app.getInstance().replicatedAdapters; - for(ReplicatedAdapterDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r) + const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups; + for(ReplicaGroupDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r) { assert(!r->id.empty()); - _adapterCache.get(r->id, true)->enableReplication(r->loadBalancing); + _adapterCache.getReplicaGroup(r->id, true)->set(application, r->loadBalancing); for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) { _objectCache.add(application, r->id, "", *o); @@ -1041,35 +1047,37 @@ Database::load(const ApplicationHelper& app, ServerEntrySeq& entries) void Database::unload(const ApplicationHelper& app, ServerEntrySeq& entries) { - const NodeDescriptorDict& nodes = app.getInstance().nodes; - const string application = app.getInstance().name; - for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n) + map<string, ServerInfo> servers = app.getServerInfos(); + for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p) { - _nodeCache.get(n->first)->removeDescriptor(application); + entries.push_back(_serverCache.remove(p->first)); } - const ReplicatedAdapterDescriptorSeq& adpts = app.getInstance().replicatedAdapters; - for(ReplicatedAdapterDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r) + const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups; + for(ReplicaGroupDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r) { for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) { _objectCache.remove(o->id); } - _adapterCache.get(r->id, false)->disableReplication(); + _adapterCache.remove(r->id); } - map<string, ServerInfo> servers = app.getServerInfos(); - for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p) + const NodeDescriptorDict& nodes = app.getInstance().nodes; + const string application = app.getInstance().name; + for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n) { - entries.push_back(_serverCache.remove(p->first)); + _nodeCache.get(n->first)->removeDescriptor(application); } } void Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newApp, ServerEntrySeq& entries) { + const string application = oldApp.getInstance().name; + // - // Figure out which servers need to removed/updated and added. + // Remove destroyed servers. // map<string, ServerInfo> oldServers = oldApp.getServerInfos(); map<string, ServerInfo> newServers = newApp.getServerInfos(); @@ -1088,10 +1096,6 @@ Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newAp load.push_back(p->second); } } - - // - // Remove destroyed servers. - // for(p = oldServers.begin(); p != oldServers.end(); ++p) { map<string, ServerInfo>::const_iterator q = newServers.find(p->first); @@ -1102,10 +1106,35 @@ Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newAp } // + // Remove destroyed replica groups. + // + const ReplicaGroupDescriptorSeq& oldAdpts = oldApp.getInstance().replicaGroups; + const ReplicaGroupDescriptorSeq& newAdpts = newApp.getInstance().replicaGroups; + ReplicaGroupDescriptorSeq::const_iterator r; + for(r = oldAdpts.begin(); r != oldAdpts.end(); ++r) + { + ReplicaGroupDescriptorSeq::const_iterator t; + for(t = newAdpts.begin(); t != newAdpts.end(); ++t) + { + if(t->id == r->id) + { + break; + } + } + for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) + { + _objectCache.remove(o->id); + } + if(t == newAdpts.end()) + { + _adapterCache.remove(r->id); + } + } + + // // Remove all the node descriptors. // const NodeDescriptorDict& oldNodes = oldApp.getInstance().nodes; - const string application = oldApp.getInstance().name; NodeDescriptorDict::const_iterator n; for(n = oldNodes.begin(); n != oldNodes.end(); ++n) { @@ -1113,26 +1142,20 @@ Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newAp } // - // Remove all the replicated adapters. + // Add back node descriptors. // - const ReplicatedAdapterDescriptorSeq& oldAdpts = oldApp.getInstance().replicatedAdapters; - ReplicatedAdapterDescriptorSeq::const_iterator r; - for(r = oldAdpts.begin(); r != oldAdpts.end(); ++r) + const NodeDescriptorDict& newNodes = newApp.getInstance().nodes; + for(n = newNodes.begin(); n != newNodes.end(); ++n) { - for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) - { - _objectCache.remove(o->id); - } - _adapterCache.get(r->id, false)->disableReplication(); + _nodeCache.get(n->first, true)->addDescriptor(application, n->second); } // - // Add back replicated adapters. + // Add back replica groups. // - const ReplicatedAdapterDescriptorSeq& newAdpts = newApp.getInstance().replicatedAdapters; for(r = newAdpts.begin(); r != newAdpts.end(); ++r) { - _adapterCache.get(r->id, true)->enableReplication(r->loadBalancing); + _adapterCache.getReplicaGroup(r->id, true)->set(application, r->loadBalancing); for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) { _objectCache.add(application, r->id, "", *o); @@ -1140,14 +1163,8 @@ Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newAp } // - // Add back node descriptors. + // Add back servers. // - const NodeDescriptorDict& newNodes = newApp.getInstance().nodes; - for(n = newNodes.begin(); n != newNodes.end(); ++n) - { - _nodeCache.get(n->first, true)->addDescriptor(application, n->second); - } - for(vector<ServerInfo>::const_iterator q = load.begin(); q != load.end(); ++q) { entries.push_back(_serverCache.add(*q)); diff --git a/cpp/src/IceGrid/Database.h b/cpp/src/IceGrid/Database.h index 83bab907043..c19c9c12b7e 100644 --- a/cpp/src/IceGrid/Database.h +++ b/cpp/src/IceGrid/Database.h @@ -18,7 +18,8 @@ #include <IceGrid/Internal.h> #include <IceGrid/StringApplicationDescriptorDict.h> #include <IceGrid/IdentityObjectInfoDict.h> -#include <IceGrid/StringObjectProxiesDict.h> +#include <IceGrid/StringProxyDict.h> +#include <IceGrid/StringStringSeqDict.h> #include <IceGrid/ServerCache.h> #include <IceGrid/NodeCache.h> #include <IceGrid/ObjectCache.h> @@ -73,7 +74,7 @@ public: Ice::StringSeq getAllNodeServers(const std::string&); bool setAdapterDirectProxy(const std::string&, const std::string&, const Ice::ObjectPrx&); - Ice::ObjectPrx getAdapterDirectProxy(const std::string&, const std::string&); + Ice::ObjectPrx getAdapterDirectProxy(const std::string&); void removeAdapter(const std::string&); AdapterPrx getAdapter(const std::string&, const std::string&); std::vector<std::pair<std::string, AdapterPrx> > getAdapters(const std::string&, int&); @@ -111,6 +112,7 @@ private: static const std::string _descriptorDbName; static const std::string _objectDbName; static const std::string _adapterDbName; + static const std::string _replicaGroupDbName; const Ice::CommunicatorPtr _communicator; const Ice::ObjectAdapterPtr _internalAdapter; @@ -130,7 +132,8 @@ private: Freeze::ConnectionPtr _connection; StringApplicationDescriptorDict _descriptors; IdentityObjectInfoDict _objects; - StringObjectProxiesDict _adapters; + StringProxyDict _adapters; + StringStringSeqDict _replicaGroups; ObserverSessionI* _lock; std::string _lockUserId; diff --git a/cpp/src/IceGrid/DescriptorBuilder.cpp b/cpp/src/IceGrid/DescriptorBuilder.cpp index 2f21765d3a6..fb0345df84e 100644 --- a/cpp/src/IceGrid/DescriptorBuilder.cpp +++ b/cpp/src/IceGrid/DescriptorBuilder.cpp @@ -132,12 +132,11 @@ ApplicationDescriptorBuilder::setDescription(const string& desc) } void -ApplicationDescriptorBuilder::addReplicatedAdapter(const XmlAttributesHelper& attrs) +ApplicationDescriptorBuilder::addReplicaGroup(const XmlAttributesHelper& attrs) { - ReplicatedAdapterDescriptor adapter; + ReplicaGroupDescriptor adapter; adapter.id = attrs("id"); - adapter.loadBalancing = new RandomLoadBalancingPolicy(); // Default load balancing - _descriptor.replicatedAdapters.push_back(adapter); + _descriptor.replicaGroups.push_back(adapter); } void @@ -164,7 +163,7 @@ ApplicationDescriptorBuilder::setLoadBalancing(const XmlAttributesHelper& attrs) throw "invalid load balancing policy `" + type + "'"; } policy->nReplicas = attrs("n-replicas", "0"); - _descriptor.replicatedAdapters.back().loadBalancing = policy; + _descriptor.replicaGroups.back().loadBalancing = policy; } void @@ -173,7 +172,7 @@ ApplicationDescriptorBuilder::addObject(const XmlAttributesHelper& attrs) ObjectDescriptor object; object.type = attrs("type", ""); object.id = Ice::stringToIdentity(attrs("identity")); - _descriptor.replicatedAdapters.back().objects.push_back(object); + _descriptor.replicaGroups.back().objects.push_back(object); } void @@ -411,7 +410,7 @@ CommunicatorDescriptorBuilder::addAdapter(const XmlAttributesHelper& attrs) } desc.id = fqn + "." + desc.name; } - desc.replicaId = attrs("replica-id", ""); + desc.replicaGroupId = attrs("replica-group", ""); desc.registerProcess = attrs("register-process", "false") == "true"; if(desc.id == "" && attrs.contains("wait-for-activation")) { @@ -433,6 +432,12 @@ CommunicatorDescriptorBuilder::addAdapter(const XmlAttributesHelper& attrs) } void +CommunicatorDescriptorBuilder::setAdapterDescription(const string& value) +{ + _descriptor->adapters.back().description = value; +} + +void CommunicatorDescriptorBuilder::addObject(const XmlAttributesHelper& attrs) { ObjectDescriptor object; @@ -487,6 +492,12 @@ CommunicatorDescriptorBuilder::addDbEnvProperty(const XmlAttributesHelper& attrs _descriptor->dbEnvs.back().properties.push_back(prop); } +void +CommunicatorDescriptorBuilder::setDbEnvDescription(const string& value) +{ + _descriptor->dbEnvs.back().description = value; +} + ServerDescriptorBuilder::ServerDescriptorBuilder(const XmlAttributesHelper& attrs) { init(new ServerDescriptor(), attrs); diff --git a/cpp/src/IceGrid/DescriptorBuilder.h b/cpp/src/IceGrid/DescriptorBuilder.h index 30107ecd515..501045dc546 100644 --- a/cpp/src/IceGrid/DescriptorBuilder.h +++ b/cpp/src/IceGrid/DescriptorBuilder.h @@ -63,7 +63,7 @@ public: void setVariableOverrides(const std::map<std::string, std::string>&); void setDescription(const std::string&); - void addReplicatedAdapter(const XmlAttributesHelper&); + void addReplicaGroup(const XmlAttributesHelper&); void setLoadBalancing(const XmlAttributesHelper&); void addObject(const XmlAttributesHelper&); virtual void addVariable(const XmlAttributesHelper&); @@ -148,9 +148,11 @@ public: virtual void setDescription(const std::string&); virtual void addProperty(const XmlAttributesHelper&); virtual void addAdapter(const XmlAttributesHelper&); + virtual void setAdapterDescription(const std::string&); virtual void addObject(const XmlAttributesHelper&); virtual void addDbEnv(const XmlAttributesHelper&); virtual void addDbEnvProperty(const XmlAttributesHelper&); + virtual void setDbEnvDescription(const std::string&); private: diff --git a/cpp/src/IceGrid/DescriptorHelper.cpp b/cpp/src/IceGrid/DescriptorHelper.cpp index 75ff3e9aeac..273be2878d9 100644 --- a/cpp/src/IceGrid/DescriptorHelper.cpp +++ b/cpp/src/IceGrid/DescriptorHelper.cpp @@ -20,10 +20,10 @@ using namespace IceGrid; namespace IceGrid { -struct GetReplicatedAdapterId : unary_function<ReplicatedAdapterDescriptor&, const string&> +struct GetReplicaGroupId : unary_function<ReplicaGroupDescriptor&, const string&> { const string& - operator()(const ReplicatedAdapterDescriptor& desc) + operator()(const ReplicaGroupDescriptor& desc) { return desc.id; } @@ -73,10 +73,10 @@ struct TemplateDescriptorEqual : std::binary_function<TemplateDescriptor&, Templ } }; -struct ReplicatedAdapterEq : std::binary_function<ReplicatedAdapterDescriptor&, ReplicatedAdapterDescriptor&, bool> +struct ReplicaGroupEq : std::binary_function<ReplicaGroupDescriptor&, ReplicaGroupDescriptor&, bool> { bool - operator()(const ReplicatedAdapterDescriptor& lhs, const ReplicatedAdapterDescriptor& rhs) + operator()(const ReplicaGroupDescriptor& lhs, const ReplicaGroupDescriptor& rhs) { if(lhs.id != rhs.id) { @@ -87,17 +87,24 @@ struct ReplicatedAdapterEq : std::binary_function<ReplicatedAdapterDescriptor&, { return false; } - if(lhs.loadBalancing->ice_id() != rhs.loadBalancing->ice_id()) + if(lhs.loadBalancing && rhs.loadBalancing) { - return false; - } - if(lhs.loadBalancing->nReplicas != rhs.loadBalancing->nReplicas) - { - return false; + if(lhs.loadBalancing->ice_id() != rhs.loadBalancing->ice_id()) + { + return false; + } + if(lhs.loadBalancing->nReplicas != rhs.loadBalancing->nReplicas) + { + return false; + } + AdaptiveLoadBalancingPolicyPtr alhs = AdaptiveLoadBalancingPolicyPtr::dynamicCast(lhs.loadBalancing); + AdaptiveLoadBalancingPolicyPtr arhs = AdaptiveLoadBalancingPolicyPtr::dynamicCast(rhs.loadBalancing); + if(alhs && arhs && alhs->loadSample != arhs->loadSample) + { + return false; + } } - AdaptiveLoadBalancingPolicyPtr alhs = AdaptiveLoadBalancingPolicyPtr::dynamicCast(lhs.loadBalancing); - AdaptiveLoadBalancingPolicyPtr arhs = AdaptiveLoadBalancingPolicyPtr::dynamicCast(rhs.loadBalancing); - if(alhs && arhs && alhs->loadSample != arhs->loadSample) + else if(lhs.loadBalancing || rhs.loadBalancing) { return false; } @@ -381,6 +388,26 @@ Resolver::getServiceTemplate(const string& tmpl) const return _application->getServiceTemplate(tmpl); } +bool +Resolver::hasReplicaGroup(const string& id) const +{ + if(!_application) + { + return true; // If we don't know the application descrpitor we + // assume that the replica group exists. + } + ReplicaGroupDescriptorSeq::const_iterator p; + const ApplicationDescriptor& app = _application->getDescriptor(); + for(p = app.replicaGroups.begin(); p != app.replicaGroups.end(); ++p) + { + if(p->id == id) + { + return true; + } + } + return false; +} + string Resolver::substitute(const string& v, bool useParams) const { @@ -589,7 +616,11 @@ CommunicatorHelper::instantiateImpl(const CommunicatorDescriptorPtr& instance, c adapter.id = resolve(p->id, "object adapter id"); adapter.registerProcess = p->registerProcess; adapter.waitForActivation = p->waitForActivation; - adapter.replicaId = resolve(p->replicaId, "object adapter replica id"); + adapter.replicaGroupId = resolve(p->replicaGroupId, "object adapter replica group id"); + if(!adapter.replicaGroupId.empty() && !resolve.hasReplicaGroup(adapter.replicaGroupId)) + { + resolve.exception("unknown replica group `" + adapter.replicaGroupId + "'"); + } for(ObjectDescriptorSeq::const_iterator q = p->objects.begin(); q != p->objects.end(); ++q) { ObjectDescriptor obj; @@ -683,7 +714,7 @@ CommunicatorHelper::printObjectAdapter(Output& out, const AdapterDescriptor& ada out << nl << "adapter '" << adapter.name << "'"; out << sb; out << nl << "id = '" << adapter.id << "'"; - out << nl << "replica id = '" << adapter.replicaId << "'"; + out << nl << "replica group id = '" << adapter.replicaGroupId << "'"; out << nl << "endpoints = '" << getProperty(adapter.name + ".Endpoints") << "'"; out << nl << "register process = '" << (adapter.registerProcess ? "true" : "false") << "'"; out << nl << "wait for activation = '" << (adapter.waitForActivation ? "true" : "false") << "'"; @@ -1768,7 +1799,7 @@ NodeHelper::validate(const Resolver& appResolve) const { resolve.exception("empty variable name"); } - } + } } ApplicationHelper::ApplicationHelper(const ApplicationDescriptor& desc) : @@ -1795,18 +1826,21 @@ ApplicationHelper::instantiate(const Resolver& resolve) const { ApplicationDescriptor desc = _definition; - ReplicatedAdapterDescriptorSeq::iterator r; - for(r = desc.replicatedAdapters.begin(); r != desc.replicatedAdapters.end(); ++r) + ReplicaGroupDescriptorSeq::iterator r; + for(r = desc.replicaGroups.begin(); r != desc.replicaGroups.end(); ++r) { - r->loadBalancing = LoadBalancingPolicyPtr::dynamicCast(r->loadBalancing->ice_clone()); - r->loadBalancing->nReplicas = resolve(r->loadBalancing->nReplicas, "number of replicas"); - AdaptiveLoadBalancingPolicyPtr alb = AdaptiveLoadBalancingPolicyPtr::dynamicCast(r->loadBalancing); - if(alb) + if(r->loadBalancing) { - alb->loadSample = resolve(alb->loadSample, "load sample"); - if(alb->loadSample != "" && alb->loadSample != "1" && alb->loadSample != "5" && alb->loadSample != "15") + r->loadBalancing = LoadBalancingPolicyPtr::dynamicCast(r->loadBalancing->ice_clone()); + r->loadBalancing->nReplicas = resolve(r->loadBalancing->nReplicas, "number of replicas"); + AdaptiveLoadBalancingPolicyPtr al = AdaptiveLoadBalancingPolicyPtr::dynamicCast(r->loadBalancing); + if(al) { - resolve.exception("invalid load sample value (allowed values are 1, 5 or 15)"); + al->loadSample = resolve(al->loadSample, "load sample"); + if(al->loadSample != "" && al->loadSample != "1" && al->loadSample != "5" && al->loadSample != "15") + { + resolve.exception("invalid load sample value (allowed values are 1, 5 or 15)"); + } } } @@ -1846,17 +1880,17 @@ ApplicationHelper::diff(const ApplicationHelper& helper) update.distrib = new BoxedDistributionDescriptor(_definition.distrib); } - GetReplicatedAdapterId rk; - ReplicatedAdapterEq req; - update.replicatedAdapters = - getSeqUpdatedEltsWithEq(helper._definition.replicatedAdapters, _definition.replicatedAdapters, rk, req); - update.removeReplicatedAdapters = - getSeqRemovedElts(helper._definition.replicatedAdapters, _definition.replicatedAdapters, rk); + GetReplicaGroupId rk; + ReplicaGroupEq req; + update.replicaGroups = + getSeqUpdatedEltsWithEq(helper._definition.replicaGroups, _definition.replicaGroups, rk, req); + update.removeReplicaGroups = getSeqRemovedElts(helper._definition.replicaGroups, _definition.replicaGroups, rk); TemplateDescriptorEqual tmpleq; update.serverTemplates = getDictUpdatedEltsWithEq(helper._definition.serverTemplates, _definition.serverTemplates, tmpleq); - update.removeServerTemplates = getDictRemovedElts(helper._definition.serverTemplates, _definition.serverTemplates); + update.removeServerTemplates = + getDictRemovedElts(helper._definition.serverTemplates, _definition.serverTemplates); update.serviceTemplates = getDictUpdatedEltsWithEq(helper._definition.serviceTemplates, _definition.serviceTemplates, tmpleq); update.removeServiceTemplates = @@ -1896,8 +1930,8 @@ ApplicationHelper::update(const ApplicationUpdateDescriptor& update) _definition.description = update.description->value; } - _definition.replicatedAdapters = updateSeqElts(_definition.replicatedAdapters, update.replicatedAdapters, - update.removeReplicatedAdapters, GetReplicatedAdapterId()); + _definition.replicaGroups = updateSeqElts(_definition.replicaGroups, update.replicaGroups, + update.removeReplicaGroups, GetReplicaGroupId()); _definition.variables = updateDictElts(_definition.variables, update.variables, update.removeVariables); @@ -1986,8 +2020,8 @@ ApplicationHelper::getIds(set<string>& serverIds, set<string>& adapterIds, set<I { p->second.getIds(sIds, aIds, oIds); } - ReplicatedAdapterDescriptorSeq::const_iterator r; - for(r = _definition.replicatedAdapters.begin(); r != _definition.replicatedAdapters.end(); ++r) + ReplicaGroupDescriptorSeq::const_iterator r; + for(r = _definition.replicaGroups.begin(); r != _definition.replicaGroups.end(); ++r) { aIds.insert(r->id); for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) @@ -2098,15 +2132,19 @@ ApplicationHelper::print(Output& out) const out << nl << "directories = " << toString(_instance.distrib.directories); out << eb; } - if(!_instance.replicatedAdapters.empty()) + if(!_instance.replicaGroups.empty()) { - out << nl << "replicated adapters"; + out << nl << "replica groups"; out << sb; - ReplicatedAdapterDescriptorSeq::const_iterator p; - for(p = _instance.replicatedAdapters.begin(); p != _instance.replicatedAdapters.end(); ++p) + ReplicaGroupDescriptorSeq::const_iterator p; + for(p = _instance.replicaGroups.begin(); p != _instance.replicaGroups.end(); ++p) { out << nl << "id = `" << p->id << "' load balancing = `"; - if(RandomLoadBalancingPolicyPtr::dynamicCast(p->loadBalancing)) + if(!p->loadBalancing) + { + out << "default (return all endpoints)"; + } + else if(RandomLoadBalancingPolicyPtr::dynamicCast(p->loadBalancing)) { out << "random"; } @@ -2179,43 +2217,41 @@ ApplicationHelper::printDiff(Output& out, const ApplicationHelper& helper) const } } { - GetReplicatedAdapterId rk; - ReplicatedAdapterEq req; - ReplicatedAdapterDescriptorSeq updated = - getSeqUpdatedEltsWithEq(helper._definition.replicatedAdapters, _definition.replicatedAdapters, rk, req); - Ice::StringSeq removed = - getSeqRemovedElts(helper._definition.replicatedAdapters, _definition.replicatedAdapters, rk); + GetReplicaGroupId rk; + ReplicaGroupEq req; + ReplicaGroupDescriptorSeq updated = + getSeqUpdatedEltsWithEq(helper._definition.replicaGroups, _definition.replicaGroups, rk, req); + Ice::StringSeq removed = getSeqRemovedElts(helper._definition.replicaGroups, _definition.replicaGroups, rk); if(!updated.empty() || !removed.empty()) { - out << nl << "replicated adapters"; + out << nl << "replica groups"; out << sb; - ReplicatedAdapterDescriptorSeq::iterator p = updated.begin(); + ReplicaGroupDescriptorSeq::iterator p = updated.begin(); while(p != updated.end()) { - ReplicatedAdapterDescriptorSeq::const_iterator r; - for(r = helper._instance.replicatedAdapters.begin(); r != helper._instance.replicatedAdapters.end(); + ReplicaGroupDescriptorSeq::const_iterator r; + for(r = helper._instance.replicaGroups.begin(); r != helper._instance.replicaGroups.end(); ++r) { if(p->id == r->id) { - out << nl << "replicated adapter `" << r->id << "' updated"; + out << nl << "replica group `" << r->id << "' updated"; p = updated.erase(p); break; } } - if(r == helper._instance.replicatedAdapters.end()) + if(r == helper._instance.replicaGroups.end()) { ++p; } } - p = updated.begin(); - while(p != updated.end()) + for(p = updated.begin(); p != updated.end(); ++p) { - out << nl << "replicated adapter `" << p->id << "' added"; + out << nl << "replica group `" << p->id << "' added"; } for(Ice::StringSeq::const_iterator q = removed.begin(); q != removed.end(); ++q) { - out << nl << "replicated adapter `" << *q << "' removed"; + out << nl << "replica group `" << *q << "' removed"; } out << eb; } @@ -2347,30 +2383,30 @@ ApplicationHelper::validate(const Resolver& resolve) const multiset<string> serverIds; multiset<string> adapterIds; multiset<Ice::Identity> objectIds; - set<string> replicatedAdapterIds; - ReplicatedAdapterDescriptorSeq::const_iterator r; - for(r = _definition.replicatedAdapters.begin(); r != _definition.replicatedAdapters.end(); ++r) + for(NodeHelperDict::const_iterator n = _nodes.begin(); n != _nodes.end(); ++n) + { + n->second.validate(resolve); + n->second.getIds(serverIds, adapterIds, objectIds); + } + + ReplicaGroupDescriptorSeq::const_iterator r; + for(r = _definition.replicaGroups.begin(); r != _definition.replicaGroups.end(); ++r) { if(r->id.empty()) { - throw DeploymentException("replicated adapter id is empty"); + throw DeploymentException("replica group id is empty"); } - if(!replicatedAdapterIds.insert(r->id).second) + if(adapterIds.find(r->id) != adapterIds.end()) { - throw DeploymentException("duplicate replicated adapter `" + r->id + "'"); + throw DeploymentException("duplicate replica group `" + r->id + "'"); } + adapterIds.insert(r->id); for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) { objectIds.insert(o->id); } } - for(NodeHelperDict::const_iterator n = _nodes.begin(); n != _nodes.end(); ++n) - { - n->second.validate(resolve); - n->second.getIds(serverIds, adapterIds, objectIds); - } - for(multiset<string>::const_iterator s = serverIds.begin(); s != serverIds.end(); ++s) { if(serverIds.count(*s) > 1) @@ -2380,7 +2416,7 @@ ApplicationHelper::validate(const Resolver& resolve) const } for(multiset<string>::const_iterator a = adapterIds.begin(); a != adapterIds.end(); ++a) { - if(adapterIds.count(*a) > 1 && replicatedAdapterIds.find(*a) == replicatedAdapterIds.end()) + if(adapterIds.count(*a) > 1) { resolve.exception("duplicate adapter `" + *a + "'"); } diff --git a/cpp/src/IceGrid/DescriptorHelper.h b/cpp/src/IceGrid/DescriptorHelper.h index 4c2e6273f20..68e9a22a111 100644 --- a/cpp/src/IceGrid/DescriptorHelper.h +++ b/cpp/src/IceGrid/DescriptorHelper.h @@ -35,7 +35,8 @@ public: TemplateDescriptor getServerTemplate(const std::string&) const; TemplateDescriptor getServiceTemplate(const std::string&) const; - + bool hasReplicaGroup(const std::string&) const; + private: std::string substitute(const std::string&, bool = false) const; diff --git a/cpp/src/IceGrid/DescriptorParser.cpp b/cpp/src/IceGrid/DescriptorParser.cpp index 0f9382ae15f..2183ed3ee2f 100644 --- a/cpp/src/IceGrid/DescriptorParser.cpp +++ b/cpp/src/IceGrid/DescriptorParser.cpp @@ -272,20 +272,20 @@ DescriptorHandler::startElement(const string& name, const IceXML::Attributes& at _currentTemplate.reset(_currentApplication->createServiceTemplate(attributes)); } - else if(name == "replicated-adapter") + else if(name == "replica-group") { if(!_currentApplication.get()) { - error("the <replicated-adapter> element can only be a child of a <application> element"); + error("the <replica-group> element can only be a child of a <application> element"); } - _currentApplication->addReplicatedAdapter(attributes); + _currentApplication->addReplicaGroup(attributes); _inAdapter = true; } else if(name == "load-balancing") { if(!_inAdapter || _currentServer.get()) { - error("the <load-balancing> element can only be a child of a <replicated-adapter> element"); + error("the <load-balancing> element can only be a child of a <replica-group> element"); } _currentApplication->setLoadBalancing(attributes); } @@ -333,7 +333,7 @@ DescriptorHandler::startElement(const string& name, const IceXML::Attributes& at { if(!_inAdapter) { - error("the <object> element can only be a child of an <adapter> or <replicated-adapter> element"); + error("the <object> element can only be a child of an <adapter> or <replica-group> element"); } if(!_currentCommunicator) { @@ -456,8 +456,18 @@ DescriptorHandler::endElement(const string& name, int line, int column) _currentApplication->addServiceTemplate(_currentTemplate->getId(), _currentTemplate->getDescriptor()); _currentTemplate.reset(0); } - else if(name == "comment") + else if(name == "description") { + if(_inAdapter) + { + assert(_currentCommunicator); + _currentCommunicator->setAdapterDescription(elementValue()); + } + else if(_inDbEnv) + { + assert(_currentCommunicator); + _currentCommunicator->setDbEnvDescription(elementValue()); + } if(_currentCommunicator) { _currentCommunicator->setDescription(elementValue()); @@ -506,7 +516,7 @@ DescriptorHandler::endElement(const string& name, int line, int column) { _inAdapter = false; } - else if(name == "replicated-adapter") + else if(name == "replica-group") { _inAdapter = false; } diff --git a/cpp/src/IceGrid/Internal.ice b/cpp/src/IceGrid/Internal.ice index d9a22a7c5b7..d63a54e2a9b 100644 --- a/cpp/src/IceGrid/Internal.ice +++ b/cpp/src/IceGrid/Internal.ice @@ -99,12 +99,7 @@ exception AdapterExistsException string id; }; -struct ReplicatedAdapterIdentity -{ - string replicaId; - string id; -}; -dictionary<ReplicatedAdapterIdentity, Adapter*> AdapterPrxDict; +dictionary<string, Adapter*> AdapterPrxDict; interface Server { diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp index 69e88bc2db4..ba46f59e6da 100644 --- a/cpp/src/IceGrid/LocatorI.cpp +++ b/cpp/src/IceGrid/LocatorI.cpp @@ -340,15 +340,16 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, { int count; vector<pair<string, AdapterPrx> > adapters = _database->getAdapters(id, count); + if(adapters.empty()) + { + cb->ice_response(0); + return; + } (new Request(cb, const_cast<LocatorI*>(this), id, adapters, count))->execute(); } catch(const AdapterNotExistException&) { - throw Ice::AdapterNotFoundException(); - } - catch(const NodeUnreachableException&) - { - cb->ice_response(0); + cb->ice_exception(Ice::AdapterNotFoundException()); return; } } diff --git a/cpp/src/IceGrid/LocatorRegistryI.cpp b/cpp/src/IceGrid/LocatorRegistryI.cpp index 50cc29e5bca..268093f53de 100644 --- a/cpp/src/IceGrid/LocatorRegistryI.cpp +++ b/cpp/src/IceGrid/LocatorRegistryI.cpp @@ -104,7 +104,7 @@ LocatorRegistryI::LocatorRegistryI(const DatabasePtr& database, bool dynamicRegi void LocatorRegistryI::setAdapterDirectProxy_async(const Ice::AMD_LocatorRegistry_setAdapterDirectProxyPtr& cb, const string& adapterId, - const string& replicaId, + const string& replicaGroupId, const Ice::ObjectPrx& proxy, const Ice::Current&) { @@ -116,14 +116,14 @@ LocatorRegistryI::setAdapterDirectProxy_async(const Ice::AMD_LocatorRegistry_set // Get the adapter from the registry and set its direct proxy. // AMI_Adapter_setDirectProxyPtr amiCB = new AMI_Adapter_setDirectProxyI(cb); - _database->getAdapter(adapterId, replicaId)->setDirectProxy_async(amiCB, proxy); + _database->getAdapter(adapterId, replicaGroupId)->setDirectProxy_async(amiCB, proxy); return; } - catch(const AdapterNotExistException& ex) + catch(const AdapterNotExistException&) { if(!_dynamicRegistration) { - throw Ice::AdapterNotFoundException(!ex.replicaId.empty()); + throw Ice::AdapterNotFoundException(); } } catch(const NodeUnreachableException&) @@ -138,7 +138,7 @@ LocatorRegistryI::setAdapterDirectProxy_async(const Ice::AMD_LocatorRegistry_set } assert(_dynamicRegistration); - if(_database->setAdapterDirectProxy(adapterId, replicaId, proxy)) + if(_database->setAdapterDirectProxy(adapterId, replicaGroupId, proxy)) { cb->ice_response(); return; diff --git a/cpp/src/IceGrid/Makefile b/cpp/src/IceGrid/Makefile index d2ff5f627c1..4f3224a3149 100644 --- a/cpp/src/IceGrid/Makefile +++ b/cpp/src/IceGrid/Makefile @@ -57,7 +57,8 @@ NODE_OBJS = NodeI.o \ PlatformInfo.o REGISTRY_OBJS = RegistryI.o \ - StringObjectProxiesDict.o \ + StringProxyDict.o \ + StringStringSeqDict.o \ StringApplicationDescriptorDict.o \ IdentityObjectInfoDict.o \ Database.o \ @@ -161,13 +162,21 @@ $(LOCAL_HDIR)/%.h %.cpp: $(SDIR)/%.ice $(SLICE2CPP) rm -f $(HDIR)/$(*F).h $(*F).cpp $(SLICE2CPP) $(SLICE2CPPFLAGS) $(SDIR)/$(*F).ice -$(LOCAL_HDIR)/StringObjectProxiesDict.h StringObjectProxiesDict.cpp: $(SLICE2FREEZE) - rm -f StringObjectProxiesDict.h StringObjectProxiesDict.cpp - $(SLICE2FREEZECMD) --dict IceGrid::StringObjectProxiesDict,string,IceGrid::StringObjectProxyDict \ - StringObjectProxiesDict $(LOCAL_SDIR)/Internal.ice +$(LOCAL_HDIR)/StringProxyDict.h StringProxyDict.cpp: $(SLICE2FREEZE) + rm -f StringProxyDict.h StringProxyDict.cpp + $(SLICE2FREEZECMD) --dict IceGrid::StringProxyDict,string,Object* \ + StringProxyDict $(LOCAL_SDIR)/Internal.ice clean:: - rm -f StringObjectProxiesDict.h StringObjectProxiesDict.cpp + rm -f StringProxyDict.h StringProxyDict.cpp + +$(LOCAL_HDIR)/StringStringSeqDict.h StringStringSeqDict.cpp: $(SLICE2FREEZE) + rm -f StringStringSeqDict.h StringStringSeqDict.cpp + $(SLICE2FREEZECMD) --dict IceGrid::StringStringSeqDict,string,Ice::StringSeq \ + StringStringSeqDict $(SDIR)/../Ice/BuiltinSequences.ice + +clean:: + rm -f StringStringSeqDict.h StringStringSeqDict.cpp $(LOCAL_HDIR)/StringApplicationDescriptorDict.h StringApplicationDescriptorDict.cpp: $(SLICE2FREEZE) rm -f StringApplicationDescriptorDict.h StringApplicationDescriptorDict.cpp diff --git a/cpp/src/IceGrid/Parser.cpp b/cpp/src/IceGrid/Parser.cpp index a6244cd3018..cc5e894a15d 100644 --- a/cpp/src/IceGrid/Parser.cpp +++ b/cpp/src/IceGrid/Parser.cpp @@ -927,25 +927,15 @@ Parser::endpointsAdapter(const list<string>& args) void Parser::removeAdapter(const list<string>& args) { - if(args.size() < 1) + if(args.size() != 1) { - error("`adapter remove' requires at least one argument\n(`help' for more info)"); + error("`adapter remove' requires exactly one argument\n(`help' for more info)"); return; } try { - list<string>::const_iterator p = args.begin(); - string adapterId = *p++; - StringObjectProxyDict proxies = _admin->getAdapterEndpoints(adapterId); - if(args.size() > 1) - { - _admin->removeAdapterWithReplicaId(adapterId, *p++); - } - else - { - _admin->removeAdapter(adapterId); - } + _admin->removeAdapter(*args.begin()); } catch(const Ice::Exception& ex) { diff --git a/cpp/src/IceGrid/ServerAdapterI.cpp b/cpp/src/IceGrid/ServerAdapterI.cpp index 86ae6c7b918..d7fbd7045d5 100644 --- a/cpp/src/IceGrid/ServerAdapterI.cpp +++ b/cpp/src/IceGrid/ServerAdapterI.cpp @@ -21,13 +21,11 @@ ServerAdapterI::ServerAdapterI(const NodeIPtr& node, const string& serverName, const AdapterPrx& proxy, const string& id, - const string& replicaId, Ice::Int waitTime) : _node(node), _this(proxy), _serverId(serverName), _id(id), - _replicaId(id), _server(server), _waitTime(IceUtil::Time::seconds(waitTime)) { diff --git a/cpp/src/IceGrid/ServerAdapterI.h b/cpp/src/IceGrid/ServerAdapterI.h index 20522b2ddd8..8826819eade 100644 --- a/cpp/src/IceGrid/ServerAdapterI.h +++ b/cpp/src/IceGrid/ServerAdapterI.h @@ -25,8 +25,7 @@ class ServerAdapterI : public Adapter, public IceUtil::Mutex { public: - ServerAdapterI(const NodeIPtr&, ServerI*, const std::string&, const AdapterPrx&, const std::string&, - const std::string&, Ice::Int); + ServerAdapterI(const NodeIPtr&, ServerI*, const std::string&, const AdapterPrx&, const std::string&, Ice::Int); virtual ~ServerAdapterI(); virtual void activate_async(const AMD_Adapter_activatePtr& cb, const Ice::Current&); diff --git a/cpp/src/IceGrid/ServerCache.cpp b/cpp/src/IceGrid/ServerCache.cpp index bac1b356779..2b75ea67bd8 100644 --- a/cpp/src/IceGrid/ServerCache.cpp +++ b/cpp/src/IceGrid/ServerCache.cpp @@ -147,8 +147,9 @@ ServerCache::addCommunicator(const CommunicatorDescriptorPtr& comm, const Server { if(!q->id.empty()) { - _adapterCache.get(q->id, true)->addReplica(getReplicaId(*q, comm, entry->getId()), entry); + _adapterCache.getServerAdapter(q->id, true)->set(entry, q->replicaGroupId); } + for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r) { const string edpts = IceGrid::getProperty(comm->properties, q->name + ".Endpoints"); @@ -164,7 +165,7 @@ ServerCache::removeCommunicator(const CommunicatorDescriptorPtr& comm, const Ser { if(!q->id.empty()) { - _adapterCache.get(q->id)->removeReplica(getReplicaId(*q, comm, entry->getId())); + _adapterCache.getServerAdapter(q->id)->destroy(); } for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r) { @@ -315,19 +316,15 @@ ServerEntry::getProxy(int& activationTimeout, int& deactivationTimeout, string& } AdapterPrx -ServerEntry::getAdapter(const string& id, const string& replicaId) +ServerEntry::getAdapter(const string& id) { AdapterPrx proxy; - ReplicatedAdapterIdentity adptId; - adptId.id = id; - adptId.replicaId = replicaId; - { Lock sync(*this); if(_proxy) // Synced { - proxy = _adapters[adptId]; + proxy = _adapters[id]; } } @@ -347,7 +344,7 @@ ServerEntry::getAdapter(const string& id, const string& replicaId) int activationTimeout, deactivationTimeout; string node; syncImpl(adapters, activationTimeout, deactivationTimeout, node); - AdapterPrx adapter = adapters[adptId]; + AdapterPrx adapter = adapters[id]; if(!adapter) { throw AdapterNotExistException(); diff --git a/cpp/src/IceGrid/ServerCache.h b/cpp/src/IceGrid/ServerCache.h index b5106db7c2a..d7a5e1cb606 100644 --- a/cpp/src/IceGrid/ServerCache.h +++ b/cpp/src/IceGrid/ServerCache.h @@ -41,7 +41,7 @@ public: std::string getId() const; ServerPrx getProxy(int&, int&, std::string&); - AdapterPrx getAdapter(const std::string&, const std::string&); + AdapterPrx getAdapter(const std::string&); NodeEntryPtr getNode() const; std::string getApplication() const; float getLoad(LoadSample) const; diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp index ff46c6981b3..60b1eef559d 100644 --- a/cpp/src/IceGrid/ServerI.cpp +++ b/cpp/src/IceGrid/ServerI.cpp @@ -702,7 +702,7 @@ ServerI::addDynamicInfo(ServerDynamicInfoSeq& serverInfos, AdapterDynamicInfoSeq try { AdapterDynamicInfo adapter; - adapter.id = p->first.id; + adapter.id = p->first; adapter.proxy = p->second->getDirectProxy(); adapterInfos.push_back(adapter); } @@ -1148,27 +1148,23 @@ ServerI::updateImpl(const string& app, const ServerDescriptorPtr& descriptor, bo } } -ReplicatedAdapterIdentity +string ServerI::addAdapter(const AdapterDescriptor& desc, const CommunicatorDescriptorPtr& comm, const Ice::Current& current) { assert(!desc.id.empty()); - ReplicatedAdapterIdentity adptId; - adptId.id = desc.id; - adptId.replicaId = getReplicaId(desc, comm, _id); - Ice::Identity id; id.category = "IceGridServerAdapter"; - id.name = _desc->id + "-" + adptId.id + "-" + adptId.replicaId + "-" + desc.name; // Use UUID instead? + id.name = _desc->id + "-" + desc.id; AdapterPrx proxy = AdapterPrx::uncheckedCast(current.adapter->createProxy(id)); ServerAdapterIPtr servant = ServerAdapterIPtr::dynamicCast(current.adapter->find(id)); if(!servant) { - servant = new ServerAdapterI(_node, this, _desc->id, proxy, desc.id, adptId.replicaId, _waitTime); + servant = new ServerAdapterI(_node, this, _desc->id, proxy, desc.id, _waitTime); current.adapter->add(servant, id); } - _adapters.insert(make_pair(adptId, servant)); - return adptId; + _adapters.insert(make_pair(desc.id, servant)); + return desc.id; } void @@ -1238,7 +1234,10 @@ ServerI::updateConfigFile(const string& serverDir, const CommunicatorDescriptorP { props.push_back(createProperty(q->name + ".RegisterProcess", "1")); } - props.push_back(createProperty(q->name + ".ReplicaId", getReplicaId(*q, descriptor, _id))); + if(!q->replicaGroupId.empty()) + { + props.push_back(createProperty(q->name + ".ReplicaGroupId", q->replicaGroupId)); + } } // diff --git a/cpp/src/IceGrid/ServerI.h b/cpp/src/IceGrid/ServerI.h index 1b4eeaa6dc6..c212efbf0da 100644 --- a/cpp/src/IceGrid/ServerI.h +++ b/cpp/src/IceGrid/ServerI.h @@ -90,10 +90,7 @@ private: void setStateNoSync(InternalServerState); void updateImpl(const std::string&, const ServerDescriptorPtr&, bool, const Ice::Current&); - - ReplicatedAdapterIdentity - addAdapter(const AdapterDescriptor&, const CommunicatorDescriptorPtr&, const Ice::Current&); - + std::string addAdapter(const AdapterDescriptor&, const CommunicatorDescriptorPtr&, const Ice::Current&); void updateConfigFile(const std::string&, const CommunicatorDescriptorPtr&, bool); void updateDbEnv(const std::string&, const DbEnvDescriptor&); void getAdaptersAndTimeouts(AdapterPrxDict&, int&, int&) const; @@ -114,7 +111,7 @@ private: ServerActivation _activation; int _activationTimeout; int _deactivationTimeout; - typedef std::map<ReplicatedAdapterIdentity, ServerAdapterIPtr> ServerAdapterDict; + typedef std::map<std::string, ServerAdapterIPtr> ServerAdapterDict; ServerAdapterDict _adapters; bool _processRegistered; Ice::ProcessPrx _process; diff --git a/cpp/src/IceGrid/Util.cpp b/cpp/src/IceGrid/Util.cpp index a7a04ad6259..3506cf29271 100644 --- a/cpp/src/IceGrid/Util.cpp +++ b/cpp/src/IceGrid/Util.cpp @@ -47,21 +47,3 @@ IceGrid::getProperty(const PropertyDescriptorSeq& properties, const string& name return def; } -string -IceGrid::getReplicaId(const AdapterDescriptor& adapter, const CommunicatorDescriptorPtr& comm, const string& serverId) -{ - if(!adapter.replicaId.empty()) - { - return adapter.replicaId; - } - - // - // Compute the default replica id of an object adapter: if the - // adapter belongs to a service the replica id will be "<server - // id>.<service name>", if the adapter belongs to a server its - // replica id will be "<server id>". - // - ServiceDescriptorPtr service = ServiceDescriptorPtr::dynamicCast(comm); - return service ? serverId + "." + service->name : serverId; -} - diff --git a/cpp/src/IceGrid/Util.h b/cpp/src/IceGrid/Util.h index 1bb4a0192e4..ead8410d465 100644 --- a/cpp/src/IceGrid/Util.h +++ b/cpp/src/IceGrid/Util.h @@ -26,8 +26,6 @@ std::string toString(const std::vector<std::string>&, const std::string& = std:: std::string getProperty(const PropertyDescriptorSeq&, const std::string&, const std::string& = std::string()); -std::string getReplicaId(const AdapterDescriptor&, const CommunicatorDescriptorPtr&, const std::string&); - template<class Function> struct ForEachCommunicator : std::unary_function<CommunicatorDescriptorPtr&, void> { |