diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/slice/IceGrid/Observer.ice | 9 | ||||
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 34 | ||||
-rw-r--r-- | cpp/src/IceGrid/Database.h | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/IceGridObserver.cpp | 32 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionI.cpp | 23 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionI.h | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/RegistryI.cpp | 4 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 58 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.h | 9 | ||||
-rw-r--r-- | cpp/test/IceGrid/session/AllTests.cpp | 44 |
11 files changed, 78 insertions, 143 deletions
diff --git a/cpp/slice/IceGrid/Observer.ice b/cpp/slice/IceGrid/Observer.ice index a7f509fcb9a..4f008aad007 100644 --- a/cpp/slice/IceGrid/Observer.ice +++ b/cpp/slice/IceGrid/Observer.ice @@ -44,7 +44,9 @@ interface NodeObserver { ["ami"] void init(NodeDynamicInfoSeq nodes); - void initNode(NodeDynamicInfo node); + void nodeUp(NodeDynamicInfo node); + + void nodeDown(string name); void updateServer(string node, ServerDynamicInfo updatedInfo); @@ -53,15 +55,12 @@ interface NodeObserver interface RegistryObserver { - ["ami"] void init(int serial, ApplicationDescriptorSeq applications, Ice::StringSeq nodesUp); + ["ami"] void init(int serial, ApplicationDescriptorSeq applications); void applicationAdded(int serial, ApplicationDescriptor desc); void applicationRemoved(int serial, string name); void applicationSynced(int serial, ApplicationDescriptor desc); void applicationUpdated(int serial, ApplicationUpdateDescriptor desc); - - void nodeUp(string name); - void nodeDown(string name); }; /** diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 39f4a200bde..75b9a7eab14 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -221,31 +221,26 @@ Database::~Database() } void -Database::setRegistryObserver(const RegistryObserverPrx& observer) +Database::setObservers(const RegistryObserverPrx& registryObserver, const NodeObserverPrx& nodeObserver) { int serial; ApplicationDescriptorSeq applications; - Ice::StringSeq nodes; { Lock sync(*this); - _registryObserver = observer; + _registryObserver = registryObserver; + _nodeObserver = nodeObserver; serial = _serial; for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p) { applications.push_back(p->second); } - - for(map<string, NodeSessionIPtr>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p) - { - nodes.push_back(p->first); - } } // // Notify the observers. // - _registryObserver->init(serial, applications, nodes); + _registryObserver->init(serial, applications); } void @@ -715,15 +710,26 @@ Database::getNode(const string& name) const void Database::removeNode(const string& name) { - Lock sync(*this); - if(_nodes.erase(name) > 0) { - if(_traceLevels->node > 0) + Lock sync(*this); + if(_nodes.erase(name) > 0) { - Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat); - out << "removed node `" << name << "'"; + if(_traceLevels->node > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat); + out << "removed node `" << name << "'"; + } } } + + try + { + _nodeObserver->nodeDown(name); + } + catch(const Ice::LocalException&) + { + // TODO: Log a warning? + } } Ice::StringSeq diff --git a/cpp/src/IceGrid/Database.h b/cpp/src/IceGrid/Database.h index c7c5cde3959..3e42d34b0e7 100644 --- a/cpp/src/IceGrid/Database.h +++ b/cpp/src/IceGrid/Database.h @@ -73,7 +73,7 @@ public: Database(const Ice::ObjectAdapterPtr&, const std::string&, int, const TraceLevelsPtr&); virtual ~Database(); - void setRegistryObserver(const RegistryObserverPrx&); + void setObservers(const RegistryObserverPrx&, const NodeObserverPrx&); void lock(int serial, ObserverSessionI*, const std::string&); void unlock(ObserverSessionI*); @@ -144,6 +144,7 @@ private: const int _nodeSessionTimeout; const TraceLevelsPtr _traceLevels; RegistryObserverPrx _registryObserver; + NodeObserverPrx _nodeObserver; std::map<std::string, ServerEntryPtr> _servers; std::map<std::string, ServerEntryPtr> _serversByAdapterId; diff --git a/cpp/src/IceGrid/IceGridObserver.cpp b/cpp/src/IceGrid/IceGridObserver.cpp index d0679587a2e..2cbb659c054 100644 --- a/cpp/src/IceGrid/IceGridObserver.cpp +++ b/cpp/src/IceGrid/IceGridObserver.cpp @@ -76,14 +76,8 @@ class RegistryObserverI : public RegistryObserver public: virtual void - init(int, const ApplicationDescriptorSeq&, const Ice::StringSeq& nodes, const Ice::Current&) + init(int, const ApplicationDescriptorSeq&, const Ice::Current&) { - cout << "active nodes: "; - for(Ice::StringSeq::const_iterator p = nodes.begin(); p != nodes.end(); ++p) - { - cout << *p << " "; - } - cout << endl; } virtual void @@ -109,18 +103,6 @@ public: { cout << "application `" << app->name << "' synced (serial = " << serial << ")" << endl; } - - virtual void - nodeUp(const string& name, const Ice::Current& current) - { - cout << "node `" << name << "' is up" << endl; - } - - virtual void - nodeDown(const string& name, const Ice::Current& current) - { - cout << "node `" << name << "' is down" << endl; - } }; class NodeObserverI : public NodeObserver @@ -132,12 +114,12 @@ public: { for(NodeDynamicInfoSeq::const_iterator p = nodes.begin(); p != nodes.end(); ++p) { - initNode(*p, c); + nodeUp(*p, c); } } - virtual void - initNode(const NodeDynamicInfo& node, const Ice::Current&) + virtual void + nodeUp(const NodeDynamicInfo& node, const Ice::Current& current) { cout << "node `" << node.name << "' servers: "; for(ServerDynamicInfoSeq::const_iterator p = node.servers.begin(); p != node.servers.end(); ++p) @@ -153,6 +135,12 @@ public: } virtual void + nodeDown(const string& name, const Ice::Current& current) + { + cout << "node `" << name << "' is down" << endl; + } + + virtual void updateServer(const string& node, const ServerDynamicInfo& info, const Ice::Current&) { cout << "node `" << node << "' server `" << info.name << "' updated: pid = " << info.pid << " state = `"; diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index b5de9dcbbce..575c974202a 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -468,7 +468,7 @@ NodeI::initObserver(const Ice::StringSeq& servers) info.name = _name; info.servers = serverInfos; info.adapters = adapterInfos; - _observer->initNode(info); + _observer->nodeUp(info); } catch(const Ice::LocalException&) { diff --git a/cpp/src/IceGrid/NodeSessionI.cpp b/cpp/src/IceGrid/NodeSessionI.cpp index 838158e8719..98eaa052aee 100644 --- a/cpp/src/IceGrid/NodeSessionI.cpp +++ b/cpp/src/IceGrid/NodeSessionI.cpp @@ -14,10 +14,8 @@ using namespace std; using namespace IceGrid; -NodeSessionI::NodeSessionI(const DatabasePtr& database, const RegistryObserverPrx& observer, const string& name, - const NodePrx& node) : +NodeSessionI::NodeSessionI(const DatabasePtr& database, const string& name, const NodePrx& node) : _database(database), - _observer(observer), _name(name), _node(node), _startTime(IceUtil::Time::now()), @@ -28,16 +26,6 @@ NodeSessionI::NodeSessionI(const DatabasePtr& database, const RegistryObserverPr try { _database->addNode(name, this); - - try - { - _observer->nodeUp(_name); - } - catch(const Ice::LocalException& ex) - { - // TODO: Log a warning? - cerr << ex << endl; - } } catch(...) { @@ -80,15 +68,6 @@ NodeSessionI::destroy(const Ice::Current& current) try { - _observer->nodeDown(_name); - } - catch(const Ice::LocalException&) - { - // TODO: Log a warning? - } - - try - { current.adapter->remove(current.id); } catch(const Ice::ObjectAdapterDeactivatedException&) diff --git a/cpp/src/IceGrid/NodeSessionI.h b/cpp/src/IceGrid/NodeSessionI.h index b2d8e1c7662..84c06954ddf 100644 --- a/cpp/src/IceGrid/NodeSessionI.h +++ b/cpp/src/IceGrid/NodeSessionI.h @@ -23,7 +23,7 @@ class NodeSessionI : public NodeSession, public SessionI, public IceUtil::Mutex { public: - NodeSessionI(const DatabasePtr&, const RegistryObserverPrx&, const std::string&, const NodePrx&); + NodeSessionI(const DatabasePtr&, const std::string&, const NodePrx&); virtual void keepAlive(const Ice::Current&); virtual Ice::StringSeq getServers(const Ice::Current&); @@ -35,7 +35,6 @@ public: private: const DatabasePtr _database; - const RegistryObserverPrx _observer; const std::string _name; const NodePrx _node; const IceUtil::Time _startTime; diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp index e4f4f0d1891..eab152a6957 100644 --- a/cpp/src/IceGrid/RegistryI.cpp +++ b/cpp/src/IceGrid/RegistryI.cpp @@ -355,7 +355,7 @@ RegistryI::start(bool nowarn) obj = registryAdapter->addWithUUID(regTopic)->ice_collocationOptimization(false); obj = obj->ice_locator(_communicator->getDefaultLocator()); _registryObserver = RegistryObserverPrx::uncheckedCast(obj); - _database->setRegistryObserver(_registryObserver); + _database->setObservers(_registryObserver, _nodeObserver); // // Create the session manager. @@ -403,7 +403,7 @@ NodeSessionPrx RegistryI::registerNode(const std::string& name, const NodePrx& node, const Ice::Current& c) { NodePrx n = NodePrx::uncheckedCast(node->ice_timeout(_nodeSessionTimeout)); - NodeSessionIPtr session = new NodeSessionI(_database, _registryObserver, name, n); + NodeSessionIPtr session = new NodeSessionI(_database, name, n); NodeSessionPrx proxy = NodeSessionPrx::uncheckedCast(c.adapter->addWithUUID(session)); _reaper->add(proxy, session); return proxy; diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 2ee3b28361b..1d4372cb370 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -87,16 +87,28 @@ NodeObserverTopic::init(const NodeDynamicInfoSeq&, const Ice::Current&) } void -NodeObserverTopic::initNode(const NodeDynamicInfo& info, const Ice::Current& current) +NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current& current) { Lock sync(*this); _nodes.insert(make_pair(info.name, info)); - _publisher->initNode(info); + _publisher->nodeUp(info); } void +NodeObserverTopic::nodeDown(const string& name, const Ice::Current&) +{ + Lock sync(*this); + + assert(_nodes.find(name) != _nodes.end()); + _nodes.erase(name); + + _publisher->nodeDown(name); +} + + +void NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&) { Lock sync(*this); @@ -202,13 +214,6 @@ NodeObserverTopic::unsubscribe(const NodeObserverPrx& observer) _topic->unsubscribe(observer); } -void -NodeObserverTopic::removeNode(const string& name) -{ - Lock sync(*this); - _nodes.erase(name); -} - RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicPrx& topic, const RegistryObserverPrx& publisher, NodeObserverTopic& nodeObserver) : @@ -217,18 +222,14 @@ RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicPrx& topic, } void -RegistryObserverTopic::init(int serial, - const ApplicationDescriptorSeq& apps, - const Ice::StringSeq& nodes, - const Ice::Current&) +RegistryObserverTopic::init(int serial, const ApplicationDescriptorSeq& apps, const Ice::Current&) { Lock sync(*this); _serial = serial; _applications = apps; - _nodes = nodes; - _publisher->init(serial, apps, nodes); + _publisher->init(serial, apps); } void @@ -308,31 +309,6 @@ RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateDes } void -RegistryObserverTopic::nodeUp(const string& name, const Ice::Current&) -{ - Lock sync(*this); - - assert(find(_nodes.begin(), _nodes.end(), name) == _nodes.end()); - _nodes.push_back(name); - - _publisher->nodeUp(name); -} - -void -RegistryObserverTopic::nodeDown(const string& name, const Ice::Current&) -{ - Lock sync(*this); - - Ice::StringSeq::iterator p = find(_nodes.begin(), _nodes.end(), name); - assert(p != _nodes.end()); - _nodes.erase(p); - - _nodeObserver.removeNode(name); - - _publisher->nodeDown(name); -} - -void RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial) { while(true) @@ -341,7 +317,7 @@ RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial { Lock sync(*this); assert(_serial != -1); - observer->init_async(new RegistryInitCB(this, observer, _serial), _serial, _applications, _nodes); + observer->init_async(new RegistryInitCB(this, observer, _serial), _serial, _applications); return; } diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h index f1a57d08292..fb18519e579 100644 --- a/cpp/src/IceGrid/Topics.h +++ b/cpp/src/IceGrid/Topics.h @@ -26,7 +26,8 @@ public: NodeObserverTopic(const IceStorm::TopicPrx&, const NodeObserverPrx&); virtual void init(const NodeDynamicInfoSeq&, const Ice::Current&); - virtual void initNode(const NodeDynamicInfo&, const Ice::Current&); + virtual void nodeUp(const NodeDynamicInfo&, const Ice::Current&); + virtual void nodeDown(const std::string&, const Ice::Current&); virtual void updateServer(const std::string&, const ServerDynamicInfo&, const Ice::Current&); virtual void updateAdapter(const std::string&, const AdapterDynamicInfo&, const Ice::Current&); @@ -50,16 +51,13 @@ class RegistryObserverTopic : public RegistryObserver, public IceUtil::Monitor<I public: RegistryObserverTopic(const IceStorm::TopicPrx&, const RegistryObserverPrx&, NodeObserverTopic&); - virtual void init(int, const ApplicationDescriptorSeq&, const Ice::StringSeq&, const Ice::Current&); + virtual void init(int, const ApplicationDescriptorSeq&, const Ice::Current&); virtual void applicationAdded(int, const ApplicationDescriptorPtr&, const Ice::Current&); virtual void applicationRemoved(int, const std::string&, const Ice::Current&); virtual void applicationSynced(int, const ApplicationDescriptorPtr&, const Ice::Current&); virtual void applicationUpdated(int, const ApplicationUpdateDescriptor&, const Ice::Current&); - virtual void nodeUp(const std::string&, const Ice::Current&); - virtual void nodeDown(const std::string&, const Ice::Current&); - void subscribe(const RegistryObserverPrx&, int = -1); void unsubscribe(const RegistryObserverPrx&); @@ -73,7 +71,6 @@ private: int _serial; ApplicationDescriptorSeq _applications; - Ice::StringSeq _nodes; }; typedef IceUtil::Handle<RegistryObserverTopic> RegistryObserverTopicPtr; diff --git a/cpp/test/IceGrid/session/AllTests.cpp b/cpp/test/IceGrid/session/AllTests.cpp index c4e781c3842..2db2bb1816d 100644 --- a/cpp/test/IceGrid/session/AllTests.cpp +++ b/cpp/test/IceGrid/session/AllTests.cpp @@ -86,14 +86,13 @@ public: } virtual void - init(int serial, const ApplicationDescriptorSeq& apps, const Ice::StringSeq& nodes, const Ice::Current&) + init(int serial, const ApplicationDescriptorSeq& apps, const Ice::Current&) { Lock sync(*this); for(ApplicationDescriptorSeq::const_iterator p = apps.begin(); p != apps.end(); ++p) { this->applications.insert(make_pair((*p)->name, *p)); } - this->nodes = set<string>(nodes.begin(), nodes.end()); updated(serial); } @@ -132,22 +131,6 @@ public: updated(serial); } - virtual void - nodeUp(const string& name, const Ice::Current& current) - { - Lock sync(*this); - this->nodes.insert(name); - updated(); - } - - virtual void - nodeDown(const string& name, const Ice::Current& current) - { - Lock sync(*this); - this->nodes.erase(name); - updated(); - } - void waitForUpdate(const char* file, int line) { @@ -165,7 +148,6 @@ public: int serial; map<string, ApplicationDescriptorPtr> applications; - set<string> nodes; private: @@ -202,8 +184,8 @@ public: updated(current); } - virtual void - initNode(const NodeDynamicInfo& info, const Ice::Current& current) + virtual void + nodeUp(const NodeDynamicInfo& info, const Ice::Current& current) { Lock sync(*this); this->nodes[info.name] = info; @@ -211,6 +193,14 @@ public: } virtual void + nodeDown(const string& name, const Ice::Current& current) + { + Lock sync(*this); + this->nodes.erase(name); + updated(current); + } + + virtual void updateServer(const string& node, const ServerDynamicInfo& info, const Ice::Current& current) { Lock sync(*this); @@ -534,7 +524,7 @@ allTests(const Ice::CommunicatorPtr& communicator) regObs1->waitForUpdate(__FILE__, __LINE__); int serial = regObs1->serial; - test(find(regObs1->nodes.begin(), regObs1->nodes.end(), "localnode") != regObs1->nodes.end()); + test(nodeObs1->nodes.find("localnode") != nodeObs1->nodes.end()); test(regObs1->applications.empty()); try @@ -649,10 +639,10 @@ allTests(const Ice::CommunicatorPtr& communicator) admin->startServer("node-1"); regObs1->waitForUpdate(__FILE__, __LINE__); - test(regObs1->nodes.find("node-1") != regObs1->nodes.end()); + test(nodeObs1->nodes.find("node-1") != nodeObs1->nodes.end()); admin->stopServer("node-1"); regObs1->waitForUpdate(__FILE__, __LINE__); - test(regObs1->nodes.find("node-1") == regObs1->nodes.end()); + test(nodeObs1->nodes.find("node-1") == nodeObs1->nodes.end()); try { @@ -697,7 +687,7 @@ allTests(const Ice::CommunicatorPtr& communicator) regObs1->waitForUpdate(__FILE__, __LINE__); nodeObs1->waitForUpdate(__FILE__, __LINE__); - test(find(regObs1->nodes.begin(), regObs1->nodes.end(), "localnode") != regObs1->nodes.end()); + test(nodeObs1->nodes.find("localnode") != nodeObs1->nodes.end()); test(regObs1->applications.empty()); test(nodeObs1->nodes.find("localnode") != nodeObs1->nodes.end()); @@ -729,7 +719,7 @@ allTests(const Ice::CommunicatorPtr& communicator) admin->startServer("node-1"); regObs1->waitForUpdate(__FILE__, __LINE__); - test(regObs1->nodes.find("node-1") != regObs1->nodes.end()); + test(nodeObs1->nodes.find("node-1") != nodeObs1->nodes.end()); nodeObs1->waitForUpdate(__FILE__, __LINE__); // serverUpdate nodeObs1->waitForUpdate(__FILE__, __LINE__); // serverUpdate @@ -747,7 +737,7 @@ allTests(const Ice::CommunicatorPtr& communicator) nodeObs1->waitForUpdate(__FILE__, __LINE__); // serverUpdate(Destroyed) regObs1->waitForUpdate(__FILE__, __LINE__); - test(regObs1->nodes.find("node-1") == regObs1->nodes.end()); + test(nodeObs1->nodes.find("node-1") == nodeObs1->nodes.end()); ApplicationDescriptorPtr testApp = new ApplicationDescriptor(); testApp->name = "TestApp"; |