diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 34 | ||||
-rw-r--r-- | cpp/src/IceGrid/Database.h | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/IceGridObserver.cpp | 32 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionI.cpp | 23 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionI.h | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/RegistryI.cpp | 4 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 58 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.h | 9 |
9 files changed, 57 insertions, 111 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 39f4a200bde..75b9a7eab14 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -221,31 +221,26 @@ Database::~Database() } void -Database::setRegistryObserver(const RegistryObserverPrx& observer) +Database::setObservers(const RegistryObserverPrx& registryObserver, const NodeObserverPrx& nodeObserver) { int serial; ApplicationDescriptorSeq applications; - Ice::StringSeq nodes; { Lock sync(*this); - _registryObserver = observer; + _registryObserver = registryObserver; + _nodeObserver = nodeObserver; serial = _serial; for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p) { applications.push_back(p->second); } - - for(map<string, NodeSessionIPtr>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p) - { - nodes.push_back(p->first); - } } // // Notify the observers. // - _registryObserver->init(serial, applications, nodes); + _registryObserver->init(serial, applications); } void @@ -715,15 +710,26 @@ Database::getNode(const string& name) const void Database::removeNode(const string& name) { - Lock sync(*this); - if(_nodes.erase(name) > 0) { - if(_traceLevels->node > 0) + Lock sync(*this); + if(_nodes.erase(name) > 0) { - Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat); - out << "removed node `" << name << "'"; + if(_traceLevels->node > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat); + out << "removed node `" << name << "'"; + } } } + + try + { + _nodeObserver->nodeDown(name); + } + catch(const Ice::LocalException&) + { + // TODO: Log a warning? + } } Ice::StringSeq diff --git a/cpp/src/IceGrid/Database.h b/cpp/src/IceGrid/Database.h index c7c5cde3959..3e42d34b0e7 100644 --- a/cpp/src/IceGrid/Database.h +++ b/cpp/src/IceGrid/Database.h @@ -73,7 +73,7 @@ public: Database(const Ice::ObjectAdapterPtr&, const std::string&, int, const TraceLevelsPtr&); virtual ~Database(); - void setRegistryObserver(const RegistryObserverPrx&); + void setObservers(const RegistryObserverPrx&, const NodeObserverPrx&); void lock(int serial, ObserverSessionI*, const std::string&); void unlock(ObserverSessionI*); @@ -144,6 +144,7 @@ private: const int _nodeSessionTimeout; const TraceLevelsPtr _traceLevels; RegistryObserverPrx _registryObserver; + NodeObserverPrx _nodeObserver; std::map<std::string, ServerEntryPtr> _servers; std::map<std::string, ServerEntryPtr> _serversByAdapterId; diff --git a/cpp/src/IceGrid/IceGridObserver.cpp b/cpp/src/IceGrid/IceGridObserver.cpp index d0679587a2e..2cbb659c054 100644 --- a/cpp/src/IceGrid/IceGridObserver.cpp +++ b/cpp/src/IceGrid/IceGridObserver.cpp @@ -76,14 +76,8 @@ class RegistryObserverI : public RegistryObserver public: virtual void - init(int, const ApplicationDescriptorSeq&, const Ice::StringSeq& nodes, const Ice::Current&) + init(int, const ApplicationDescriptorSeq&, const Ice::Current&) { - cout << "active nodes: "; - for(Ice::StringSeq::const_iterator p = nodes.begin(); p != nodes.end(); ++p) - { - cout << *p << " "; - } - cout << endl; } virtual void @@ -109,18 +103,6 @@ public: { cout << "application `" << app->name << "' synced (serial = " << serial << ")" << endl; } - - virtual void - nodeUp(const string& name, const Ice::Current& current) - { - cout << "node `" << name << "' is up" << endl; - } - - virtual void - nodeDown(const string& name, const Ice::Current& current) - { - cout << "node `" << name << "' is down" << endl; - } }; class NodeObserverI : public NodeObserver @@ -132,12 +114,12 @@ public: { for(NodeDynamicInfoSeq::const_iterator p = nodes.begin(); p != nodes.end(); ++p) { - initNode(*p, c); + nodeUp(*p, c); } } - virtual void - initNode(const NodeDynamicInfo& node, const Ice::Current&) + virtual void + nodeUp(const NodeDynamicInfo& node, const Ice::Current& current) { cout << "node `" << node.name << "' servers: "; for(ServerDynamicInfoSeq::const_iterator p = node.servers.begin(); p != node.servers.end(); ++p) @@ -153,6 +135,12 @@ public: } virtual void + nodeDown(const string& name, const Ice::Current& current) + { + cout << "node `" << name << "' is down" << endl; + } + + virtual void updateServer(const string& node, const ServerDynamicInfo& info, const Ice::Current&) { cout << "node `" << node << "' server `" << info.name << "' updated: pid = " << info.pid << " state = `"; diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index b5de9dcbbce..575c974202a 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -468,7 +468,7 @@ NodeI::initObserver(const Ice::StringSeq& servers) info.name = _name; info.servers = serverInfos; info.adapters = adapterInfos; - _observer->initNode(info); + _observer->nodeUp(info); } catch(const Ice::LocalException&) { diff --git a/cpp/src/IceGrid/NodeSessionI.cpp b/cpp/src/IceGrid/NodeSessionI.cpp index 838158e8719..98eaa052aee 100644 --- a/cpp/src/IceGrid/NodeSessionI.cpp +++ b/cpp/src/IceGrid/NodeSessionI.cpp @@ -14,10 +14,8 @@ using namespace std; using namespace IceGrid; -NodeSessionI::NodeSessionI(const DatabasePtr& database, const RegistryObserverPrx& observer, const string& name, - const NodePrx& node) : +NodeSessionI::NodeSessionI(const DatabasePtr& database, const string& name, const NodePrx& node) : _database(database), - _observer(observer), _name(name), _node(node), _startTime(IceUtil::Time::now()), @@ -28,16 +26,6 @@ NodeSessionI::NodeSessionI(const DatabasePtr& database, const RegistryObserverPr try { _database->addNode(name, this); - - try - { - _observer->nodeUp(_name); - } - catch(const Ice::LocalException& ex) - { - // TODO: Log a warning? - cerr << ex << endl; - } } catch(...) { @@ -80,15 +68,6 @@ NodeSessionI::destroy(const Ice::Current& current) try { - _observer->nodeDown(_name); - } - catch(const Ice::LocalException&) - { - // TODO: Log a warning? - } - - try - { current.adapter->remove(current.id); } catch(const Ice::ObjectAdapterDeactivatedException&) diff --git a/cpp/src/IceGrid/NodeSessionI.h b/cpp/src/IceGrid/NodeSessionI.h index b2d8e1c7662..84c06954ddf 100644 --- a/cpp/src/IceGrid/NodeSessionI.h +++ b/cpp/src/IceGrid/NodeSessionI.h @@ -23,7 +23,7 @@ class NodeSessionI : public NodeSession, public SessionI, public IceUtil::Mutex { public: - NodeSessionI(const DatabasePtr&, const RegistryObserverPrx&, const std::string&, const NodePrx&); + NodeSessionI(const DatabasePtr&, const std::string&, const NodePrx&); virtual void keepAlive(const Ice::Current&); virtual Ice::StringSeq getServers(const Ice::Current&); @@ -35,7 +35,6 @@ public: private: const DatabasePtr _database; - const RegistryObserverPrx _observer; const std::string _name; const NodePrx _node; const IceUtil::Time _startTime; diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp index e4f4f0d1891..eab152a6957 100644 --- a/cpp/src/IceGrid/RegistryI.cpp +++ b/cpp/src/IceGrid/RegistryI.cpp @@ -355,7 +355,7 @@ RegistryI::start(bool nowarn) obj = registryAdapter->addWithUUID(regTopic)->ice_collocationOptimization(false); obj = obj->ice_locator(_communicator->getDefaultLocator()); _registryObserver = RegistryObserverPrx::uncheckedCast(obj); - _database->setRegistryObserver(_registryObserver); + _database->setObservers(_registryObserver, _nodeObserver); // // Create the session manager. @@ -403,7 +403,7 @@ NodeSessionPrx RegistryI::registerNode(const std::string& name, const NodePrx& node, const Ice::Current& c) { NodePrx n = NodePrx::uncheckedCast(node->ice_timeout(_nodeSessionTimeout)); - NodeSessionIPtr session = new NodeSessionI(_database, _registryObserver, name, n); + NodeSessionIPtr session = new NodeSessionI(_database, name, n); NodeSessionPrx proxy = NodeSessionPrx::uncheckedCast(c.adapter->addWithUUID(session)); _reaper->add(proxy, session); return proxy; diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 2ee3b28361b..1d4372cb370 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -87,16 +87,28 @@ NodeObserverTopic::init(const NodeDynamicInfoSeq&, const Ice::Current&) } void -NodeObserverTopic::initNode(const NodeDynamicInfo& info, const Ice::Current& current) +NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current& current) { Lock sync(*this); _nodes.insert(make_pair(info.name, info)); - _publisher->initNode(info); + _publisher->nodeUp(info); } void +NodeObserverTopic::nodeDown(const string& name, const Ice::Current&) +{ + Lock sync(*this); + + assert(_nodes.find(name) != _nodes.end()); + _nodes.erase(name); + + _publisher->nodeDown(name); +} + + +void NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&) { Lock sync(*this); @@ -202,13 +214,6 @@ NodeObserverTopic::unsubscribe(const NodeObserverPrx& observer) _topic->unsubscribe(observer); } -void -NodeObserverTopic::removeNode(const string& name) -{ - Lock sync(*this); - _nodes.erase(name); -} - RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicPrx& topic, const RegistryObserverPrx& publisher, NodeObserverTopic& nodeObserver) : @@ -217,18 +222,14 @@ RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicPrx& topic, } void -RegistryObserverTopic::init(int serial, - const ApplicationDescriptorSeq& apps, - const Ice::StringSeq& nodes, - const Ice::Current&) +RegistryObserverTopic::init(int serial, const ApplicationDescriptorSeq& apps, const Ice::Current&) { Lock sync(*this); _serial = serial; _applications = apps; - _nodes = nodes; - _publisher->init(serial, apps, nodes); + _publisher->init(serial, apps); } void @@ -308,31 +309,6 @@ RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateDes } void -RegistryObserverTopic::nodeUp(const string& name, const Ice::Current&) -{ - Lock sync(*this); - - assert(find(_nodes.begin(), _nodes.end(), name) == _nodes.end()); - _nodes.push_back(name); - - _publisher->nodeUp(name); -} - -void -RegistryObserverTopic::nodeDown(const string& name, const Ice::Current&) -{ - Lock sync(*this); - - Ice::StringSeq::iterator p = find(_nodes.begin(), _nodes.end(), name); - assert(p != _nodes.end()); - _nodes.erase(p); - - _nodeObserver.removeNode(name); - - _publisher->nodeDown(name); -} - -void RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial) { while(true) @@ -341,7 +317,7 @@ RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial { Lock sync(*this); assert(_serial != -1); - observer->init_async(new RegistryInitCB(this, observer, _serial), _serial, _applications, _nodes); + observer->init_async(new RegistryInitCB(this, observer, _serial), _serial, _applications); return; } diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h index f1a57d08292..fb18519e579 100644 --- a/cpp/src/IceGrid/Topics.h +++ b/cpp/src/IceGrid/Topics.h @@ -26,7 +26,8 @@ public: NodeObserverTopic(const IceStorm::TopicPrx&, const NodeObserverPrx&); virtual void init(const NodeDynamicInfoSeq&, const Ice::Current&); - virtual void initNode(const NodeDynamicInfo&, const Ice::Current&); + virtual void nodeUp(const NodeDynamicInfo&, const Ice::Current&); + virtual void nodeDown(const std::string&, const Ice::Current&); virtual void updateServer(const std::string&, const ServerDynamicInfo&, const Ice::Current&); virtual void updateAdapter(const std::string&, const AdapterDynamicInfo&, const Ice::Current&); @@ -50,16 +51,13 @@ class RegistryObserverTopic : public RegistryObserver, public IceUtil::Monitor<I public: RegistryObserverTopic(const IceStorm::TopicPrx&, const RegistryObserverPrx&, NodeObserverTopic&); - virtual void init(int, const ApplicationDescriptorSeq&, const Ice::StringSeq&, const Ice::Current&); + virtual void init(int, const ApplicationDescriptorSeq&, const Ice::Current&); virtual void applicationAdded(int, const ApplicationDescriptorPtr&, const Ice::Current&); virtual void applicationRemoved(int, const std::string&, const Ice::Current&); virtual void applicationSynced(int, const ApplicationDescriptorPtr&, const Ice::Current&); virtual void applicationUpdated(int, const ApplicationUpdateDescriptor&, const Ice::Current&); - virtual void nodeUp(const std::string&, const Ice::Current&); - virtual void nodeDown(const std::string&, const Ice::Current&); - void subscribe(const RegistryObserverPrx&, int = -1); void unsubscribe(const RegistryObserverPrx&); @@ -73,7 +71,6 @@ private: int _serial; ApplicationDescriptorSeq _applications; - Ice::StringSeq _nodes; }; typedef IceUtil::Handle<RegistryObserverTopic> RegistryObserverTopicPtr; |