diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/DescriptorHelper.cpp | 17 | ||||
-rw-r--r-- | cpp/src/IceGrid/IceGridRegistry.cpp | 6 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 6 | ||||
-rw-r--r-- | cpp/src/IceGrid/ObserverSessionI.cpp | 96 | ||||
-rw-r--r-- | cpp/src/IceGrid/ObserverSessionI.h | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 162 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.h | 15 |
8 files changed, 241 insertions, 67 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index b91f4ea8206..39f4a200bde 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -743,9 +743,6 @@ Database::getServerDescriptor(const std::string& name) { ApplicationDescriptorPtr app = getApplicationDescriptor(getServerApplication(name)); - // - // TODO: Is it really safe to read the application descriptor outside the lock!? - // for(ServerInstanceDescriptorSeq::const_iterator p = app->servers.begin(); p != app->servers.end(); ++p) { if(p->descriptor->name == name) diff --git a/cpp/src/IceGrid/DescriptorHelper.cpp b/cpp/src/IceGrid/DescriptorHelper.cpp index 6366a584fa9..14525b4fcd2 100644 --- a/cpp/src/IceGrid/DescriptorHelper.cpp +++ b/cpp/src/IceGrid/DescriptorHelper.cpp @@ -669,9 +669,6 @@ ApplicationDescriptorHelper::update(const ApplicationUpdateDescriptor& update) newApp->variables.erase(*p); } - // - // TODO: Check if the new templates are valid? - // newApp->serverTemplates = newUpdate.serverTemplates; newApp->serverTemplates.insert(oldApp->serverTemplates.begin(), oldApp->serverTemplates.end()); for(p = newUpdate.removeServerTemplates.begin(); p != newUpdate.removeServerTemplates.end(); ++p) @@ -679,9 +676,6 @@ ApplicationDescriptorHelper::update(const ApplicationUpdateDescriptor& update) newApp->serverTemplates.erase(*p); } - // - // TODO: Check if the new templates are valid? - // newApp->serviceTemplates = newUpdate.serviceTemplates; newApp->serviceTemplates.insert(oldApp->serviceTemplates.begin(), oldApp->serviceTemplates.end()); for(p = newUpdate.removeServiceTemplates.begin(); p != newUpdate.removeServiceTemplates.end(); ++p) @@ -689,9 +683,6 @@ ApplicationDescriptorHelper::update(const ApplicationUpdateDescriptor& update) newApp->serviceTemplates.erase(*p); } - // - // Update the node descriptors. - // newApp->nodes = newUpdate.nodes; for(NodeDescriptorSeq::const_iterator q = oldApp->nodes.begin(); q != oldApp->nodes.end(); ++q) { @@ -721,7 +712,7 @@ ApplicationDescriptorHelper::update(const ApplicationUpdateDescriptor& update) for_each(newApp->servers.begin(), newApp->servers.end(), AddServerName(updated)); for(ServerInstanceDescriptorSeq::const_iterator q = oldApp->servers.begin(); q != oldApp->servers.end(); ++q) { - ServerInstanceDescriptor inst = instantiate(*q); // Re-instantiate old server. + ServerInstanceDescriptor inst = instantiate(*q); // Re-instantiate old servers. if(updated.find(inst.descriptor->name) == updated.end() && remove.find(inst.descriptor->name) == remove.end()) { if(q->node != inst.node || @@ -1123,10 +1114,8 @@ ServerDescriptorHelper::operator==(const ServerDescriptorHelper& helper) const return false; } - if(set<string>(_descriptor->interpreterOptions.begin(), - _descriptor->interpreterOptions.end()) != - set<string>(helper._descriptor->interpreterOptions.begin(), - helper._descriptor->interpreterOptions.end())) + if(set<string>(_descriptor->interpreterOptions.begin(), _descriptor->interpreterOptions.end()) != + set<string>(helper._descriptor->interpreterOptions.begin(), helper._descriptor->interpreterOptions.end())) { return false; } diff --git a/cpp/src/IceGrid/IceGridRegistry.cpp b/cpp/src/IceGrid/IceGridRegistry.cpp index c3f4d36f1d0..1e0434daf0e 100644 --- a/cpp/src/IceGrid/IceGridRegistry.cpp +++ b/cpp/src/IceGrid/IceGridRegistry.cpp @@ -84,6 +84,12 @@ RegistryService::start(int argc, char* argv[]) return false; } + PropertiesPtr properties = communicator()->getProperties(); + if(properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 5) <= 5) + { + properties->setProperty("Ice.ThreadPool.Server.Size", "5"); + } + _registry = new RegistryI(communicator()); if(!_registry->start(nowarn)) { diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index fefcc49ce9c..b5de9dcbbce 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -464,7 +464,11 @@ NodeI::initObserver(const Ice::StringSeq& servers) try { - _observer->init(_name, serverInfos, adapterInfos); + NodeDynamicInfo info; + info.name = _name; + info.servers = serverInfos; + info.adapters = adapterInfos; + _observer->initNode(info); } catch(const Ice::LocalException&) { diff --git a/cpp/src/IceGrid/ObserverSessionI.cpp b/cpp/src/IceGrid/ObserverSessionI.cpp index ae4a7c8e977..f2c4e8760a2 100644 --- a/cpp/src/IceGrid/ObserverSessionI.cpp +++ b/cpp/src/IceGrid/ObserverSessionI.cpp @@ -73,17 +73,49 @@ ObserverSessionI::setObserversByIdentity(const Ice::Identity& registryObserver, } void -ObserverSessionI::startUpdate(int serial, const Ice::Current&) +ObserverSessionI::startUpdate(int serial, const Ice::Current& current) { Lock sync(*this); + if(_destroyed) + { + Ice::ObjectNotExistException ex(__FILE__, __LINE__); + ex.id = current.id; + throw ex; + } + _database->lock(serial, this, _userId); _updating = true; } void -ObserverSessionI::updateApplication(const ApplicationUpdateDescriptor& update, const Ice::Current&) +ObserverSessionI::addApplication(const ApplicationDescriptorPtr& app, const Ice::Current& current) { Lock sync(*this); + if(_destroyed) + { + Ice::ObjectNotExistException ex(__FILE__, __LINE__); + ex.id = current.id; + throw ex; + } + + if(!_updating) + { + throw AccessDenied(); + } + _database->addApplicationDescriptor(this, app); +} + +void +ObserverSessionI::updateApplication(const ApplicationUpdateDescriptor& update, const Ice::Current& current) +{ + Lock sync(*this); + if(_destroyed) + { + Ice::ObjectNotExistException ex(__FILE__, __LINE__); + ex.id = current.id; + throw ex; + } + if(!_updating) { throw AccessDenied(); @@ -92,20 +124,76 @@ ObserverSessionI::updateApplication(const ApplicationUpdateDescriptor& update, c } void -ObserverSessionI::finishUpdate(const Ice::Current&) +ObserverSessionI::syncApplication(const ApplicationDescriptorPtr& app, const Ice::Current& current) +{ + Lock sync(*this); + if(_destroyed) + { + Ice::ObjectNotExistException ex(__FILE__, __LINE__); + ex.id = current.id; + throw ex; + } + + if(!_updating) + { + throw AccessDenied(); + } + _database->syncApplicationDescriptor(this, app); +} + +void +ObserverSessionI::removeApplication(const string& name, const Ice::Current& current) { Lock sync(*this); + if(_destroyed) + { + Ice::ObjectNotExistException ex(__FILE__, __LINE__); + ex.id = current.id; + throw ex; + } + if(!_updating) { throw AccessDenied(); } + _database->removeApplicationDescriptor(this, name); +} +void +ObserverSessionI::finishUpdate(const Ice::Current& current) +{ + Lock sync(*this); + if(_destroyed) + { + Ice::ObjectNotExistException ex(__FILE__, __LINE__); + ex.id = current.id; + throw ex; + } + + if(!_updating) + { + throw AccessDenied(); + } _database->unlock(this); + _updating = false; } void -ObserverSessionI::destroy(const Ice::Current&) +ObserverSessionI::destroy(const Ice::Current& current) { + Lock sync(*this); + if(_destroyed) + { + Ice::ObjectNotExistException ex(__FILE__, __LINE__); + ex.id = current.id; + throw ex; + } + if(_updating) + { + _database->unlock(this); + _updating = false; + } + // // Unsubscribe from the topics. // diff --git a/cpp/src/IceGrid/ObserverSessionI.h b/cpp/src/IceGrid/ObserverSessionI.h index b8ec32c55bc..7eeab821895 100644 --- a/cpp/src/IceGrid/ObserverSessionI.h +++ b/cpp/src/IceGrid/ObserverSessionI.h @@ -32,7 +32,10 @@ public: virtual void setObserversByIdentity(const Ice::Identity&, const Ice::Identity&, const Ice::Current&); virtual void startUpdate(int, const Ice::Current&); + virtual void addApplication(const ApplicationDescriptorPtr&, const Ice::Current&); + virtual void syncApplication(const ApplicationDescriptorPtr&, const Ice::Current&); virtual void updateApplication(const ApplicationUpdateDescriptor&, const Ice::Current&); + virtual void removeApplication(const std::string&, const Ice::Current&); virtual void finishUpdate(const Ice::Current&); virtual void destroy(const Ice::Current&); diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index b92a14de1b4..41427244449 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -14,24 +14,86 @@ using namespace std; using namespace IceGrid; +class RegistryInitCB : public AMI_RegistryObserver_init +{ +public: + + RegistryInitCB(const RegistryObserverTopicPtr& topic, const RegistryObserverPrx& observer, int serial) : + _topic(topic), + _observer(observer), + _serial(serial) + { + } + + void + ice_response() + { + _topic->subscribe(_observer, _serial); + } + + void + ice_exception(const Ice::Exception&) + { + // Ignore + } + +private: + + const RegistryObserverTopicPtr _topic; + const RegistryObserverPrx _observer; + const int _serial; +}; + +class NodeInitCB : public AMI_NodeObserver_init +{ +public: + + NodeInitCB(const NodeObserverTopicPtr& topic, const NodeObserverPrx& observer, int serial) : + _topic(topic), + _observer(observer), + _serial(serial) + { + } + + void + ice_response() + { + _topic->subscribe(_observer, _serial); + } + + void + ice_exception(const Ice::Exception&) + { + // Ignore + } + +private: + + const NodeObserverTopicPtr _topic; + const NodeObserverPrx _observer; + const int _serial; +}; + + NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicPrx& topic, const NodeObserverPrx& publisher) : - _topic(topic), _publisher(publisher) + _topic(topic), _publisher(publisher), _serial(0) { } void -NodeObserverTopic::init(const string& node, - const ServerDynamicInfoSeq& servers, - const AdapterDynamicInfoSeq& adapters, - const Ice::Current& current) +NodeObserverTopic::init(const NodeDynamicInfoSeq&, const Ice::Current&) +{ + assert(false); +} + +void +NodeObserverTopic::initNode(const NodeDynamicInfo& info, const Ice::Current& current) { Lock sync(*this); - _nodes.insert(node); - _servers[node] = servers; - _adapters[node] = adapters; + _nodes.insert(make_pair(info.name, info)); - _publisher->init(node, servers, adapters); + _publisher->initNode(info); } void @@ -40,7 +102,9 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser Lock sync(*this); assert(_nodes.find(node) != _nodes.end()); - ServerDynamicInfoSeq& servers = _servers[node]; + ++_serial; + + ServerDynamicInfoSeq& servers = _nodes[node].servers; ServerDynamicInfoSeq::iterator p = servers.begin(); while(p != servers.end()) { @@ -65,7 +129,9 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a Lock sync(*this); assert(_nodes.find(node) != _nodes.end()); - AdapterDynamicInfoSeq& adapters = _adapters[node]; + ++_serial; + + AdapterDynamicInfoSeq& adapters = _nodes[node].adapters; AdapterDynamicInfoSeq::iterator p = adapters.begin(); while(p != adapters.end()) { @@ -85,24 +151,34 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a } void -NodeObserverTopic::subscribe(const NodeObserverPrx& observer) +NodeObserverTopic::subscribe(const NodeObserverPrx& observer, int serial) { - IceStorm::QoS qos; - qos["reliability"] = "twoway ordered"; - - Lock sync(*this); - _topic->subscribe(qos, observer); - for(set<string>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p) + while(true) { - try + if(serial == -1) { - observer->init(*p, _servers[*p], _adapters[*p]); + Lock sync(*this); + NodeDynamicInfoSeq nodes; + nodes.reserve(_nodes.size()); + for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p) + { + nodes.push_back(p->second); + } + observer->init_async(new NodeInitCB(this, observer, _serial), nodes); + return; } - catch(const Ice::LocalException& ex) + + Lock sync(*this); + if(serial != _serial) { - cerr << ex << endl; - break; + serial = -1; + continue; } + + IceStorm::QoS qos; + qos["reliability"] = "twoway ordered"; + _topic->subscribe(qos, observer); + break; } } @@ -117,11 +193,8 @@ NodeObserverTopic::removeNode(const string& name) { Lock sync(*this); _nodes.erase(name); - _servers.erase(name); - _adapters.erase(name); } - RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicPrx& topic, const RegistryObserverPrx& publisher, NodeObserverTopic& nodeObserver) : @@ -246,21 +319,34 @@ RegistryObserverTopic::nodeDown(const string& name, const Ice::Current&) } void -RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer) +RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial) { - IceStorm::QoS qos; - qos["reliability"] = "twoway ordered"; + while(true) + { + if(serial == -1) + { + Lock sync(*this); + assert(_serial != -1); + observer->init_async(new RegistryInitCB(this, observer, _serial), _serial, _applications, _nodes); + return; + } - Lock sync(*this); - _topic->subscribe(qos, observer); + // + // If the registry cache changed since we've send the init() + // call we need to do it again. Otherwise, we can subscribe to + // the IceStorm topic. + // + Lock sync(*this); + if(serial != _serial) + { + serial = -1; + continue; + } - try - { - observer->init(_serial, _applications, _nodes); - } - catch(const Ice::LocalException& ex) - { - cerr << ex << endl; + IceStorm::QoS qos; + qos["reliability"] = "twoway ordered"; + _topic->subscribe(qos, observer); + break; } } diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h index e62cd9d2a1e..f1a57d08292 100644 --- a/cpp/src/IceGrid/Topics.h +++ b/cpp/src/IceGrid/Topics.h @@ -25,12 +25,12 @@ public: NodeObserverTopic(const IceStorm::TopicPrx&, const NodeObserverPrx&); - virtual void init(const std::string&, const ServerDynamicInfoSeq&, const AdapterDynamicInfoSeq&, - const Ice::Current&); + virtual void init(const NodeDynamicInfoSeq&, const Ice::Current&); + virtual void initNode(const NodeDynamicInfo&, const Ice::Current&); virtual void updateServer(const std::string&, const ServerDynamicInfo&, const Ice::Current&); virtual void updateAdapter(const std::string&, const AdapterDynamicInfo&, const Ice::Current&); - void subscribe(const NodeObserverPrx&); + void subscribe(const NodeObserverPrx&, int serial = -1); void unsubscribe(const NodeObserverPrx&); void removeNode(const std::string&); @@ -40,10 +40,10 @@ private: const IceStorm::TopicPrx _topic; const NodeObserverPrx _publisher; - std::set<std::string> _nodes; - std::map<std::string, ServerDynamicInfoSeq> _servers; - std::map<std::string, AdapterDynamicInfoSeq> _adapters; + int _serial; + std::map<std::string, NodeDynamicInfo> _nodes; }; +typedef IceUtil::Handle<NodeObserverTopic> NodeObserverTopicPtr; class RegistryObserverTopic : public RegistryObserver, public IceUtil::Monitor<IceUtil::Mutex> { @@ -60,7 +60,7 @@ public: virtual void nodeUp(const std::string&, const Ice::Current&); virtual void nodeDown(const std::string&, const Ice::Current&); - void subscribe(const RegistryObserverPrx&); + void subscribe(const RegistryObserverPrx&, int = -1); void unsubscribe(const RegistryObserverPrx&); private: @@ -75,6 +75,7 @@ private: ApplicationDescriptorSeq _applications; Ice::StringSeq _nodes; }; +typedef IceUtil::Handle<RegistryObserverTopic> RegistryObserverTopicPtr; }; |