diff options
author | Benoit Foucher <benoit@zeroc.com> | 2005-06-17 15:25:02 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2005-06-17 15:25:02 +0000 |
commit | 2bc0f2b81423bb51c8649fefdeae23c602a56558 (patch) | |
tree | da5dc69eda441fe23345267d1606da5266413f94 /cpp/src/IceGrid/Topics.cpp | |
parent | build fix (diff) | |
download | ice-2bc0f2b81423bb51c8649fefdeae23c602a56558.tar.bz2 ice-2bc0f2b81423bb51c8649fefdeae23c602a56558.tar.xz ice-2bc0f2b81423bb51c8649fefdeae23c602a56558.zip |
Fixed bugs + added "update" test suite
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 162 |
1 files changed, 124 insertions, 38 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index b92a14de1b4..41427244449 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -14,24 +14,86 @@ using namespace std; using namespace IceGrid; +class RegistryInitCB : public AMI_RegistryObserver_init +{ +public: + + RegistryInitCB(const RegistryObserverTopicPtr& topic, const RegistryObserverPrx& observer, int serial) : + _topic(topic), + _observer(observer), + _serial(serial) + { + } + + void + ice_response() + { + _topic->subscribe(_observer, _serial); + } + + void + ice_exception(const Ice::Exception&) + { + // Ignore + } + +private: + + const RegistryObserverTopicPtr _topic; + const RegistryObserverPrx _observer; + const int _serial; +}; + +class NodeInitCB : public AMI_NodeObserver_init +{ +public: + + NodeInitCB(const NodeObserverTopicPtr& topic, const NodeObserverPrx& observer, int serial) : + _topic(topic), + _observer(observer), + _serial(serial) + { + } + + void + ice_response() + { + _topic->subscribe(_observer, _serial); + } + + void + ice_exception(const Ice::Exception&) + { + // Ignore + } + +private: + + const NodeObserverTopicPtr _topic; + const NodeObserverPrx _observer; + const int _serial; +}; + + NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicPrx& topic, const NodeObserverPrx& publisher) : - _topic(topic), _publisher(publisher) + _topic(topic), _publisher(publisher), _serial(0) { } void -NodeObserverTopic::init(const string& node, - const ServerDynamicInfoSeq& servers, - const AdapterDynamicInfoSeq& adapters, - const Ice::Current& current) +NodeObserverTopic::init(const NodeDynamicInfoSeq&, const Ice::Current&) +{ + assert(false); +} + +void +NodeObserverTopic::initNode(const NodeDynamicInfo& info, const Ice::Current& current) { Lock sync(*this); - _nodes.insert(node); - _servers[node] = servers; - _adapters[node] = adapters; + _nodes.insert(make_pair(info.name, info)); - _publisher->init(node, servers, adapters); + _publisher->initNode(info); } void @@ -40,7 +102,9 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser Lock sync(*this); assert(_nodes.find(node) != _nodes.end()); - ServerDynamicInfoSeq& servers = _servers[node]; + ++_serial; + + ServerDynamicInfoSeq& servers = _nodes[node].servers; ServerDynamicInfoSeq::iterator p = servers.begin(); while(p != servers.end()) { @@ -65,7 +129,9 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a Lock sync(*this); assert(_nodes.find(node) != _nodes.end()); - AdapterDynamicInfoSeq& adapters = _adapters[node]; + ++_serial; + + AdapterDynamicInfoSeq& adapters = _nodes[node].adapters; AdapterDynamicInfoSeq::iterator p = adapters.begin(); while(p != adapters.end()) { @@ -85,24 +151,34 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a } void -NodeObserverTopic::subscribe(const NodeObserverPrx& observer) +NodeObserverTopic::subscribe(const NodeObserverPrx& observer, int serial) { - IceStorm::QoS qos; - qos["reliability"] = "twoway ordered"; - - Lock sync(*this); - _topic->subscribe(qos, observer); - for(set<string>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p) + while(true) { - try + if(serial == -1) { - observer->init(*p, _servers[*p], _adapters[*p]); + Lock sync(*this); + NodeDynamicInfoSeq nodes; + nodes.reserve(_nodes.size()); + for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p) + { + nodes.push_back(p->second); + } + observer->init_async(new NodeInitCB(this, observer, _serial), nodes); + return; } - catch(const Ice::LocalException& ex) + + Lock sync(*this); + if(serial != _serial) { - cerr << ex << endl; - break; + serial = -1; + continue; } + + IceStorm::QoS qos; + qos["reliability"] = "twoway ordered"; + _topic->subscribe(qos, observer); + break; } } @@ -117,11 +193,8 @@ NodeObserverTopic::removeNode(const string& name) { Lock sync(*this); _nodes.erase(name); - _servers.erase(name); - _adapters.erase(name); } - RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicPrx& topic, const RegistryObserverPrx& publisher, NodeObserverTopic& nodeObserver) : @@ -246,21 +319,34 @@ RegistryObserverTopic::nodeDown(const string& name, const Ice::Current&) } void -RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer) +RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial) { - IceStorm::QoS qos; - qos["reliability"] = "twoway ordered"; + while(true) + { + if(serial == -1) + { + Lock sync(*this); + assert(_serial != -1); + observer->init_async(new RegistryInitCB(this, observer, _serial), _serial, _applications, _nodes); + return; + } - Lock sync(*this); - _topic->subscribe(qos, observer); + // + // If the registry cache changed since we've send the init() + // call we need to do it again. Otherwise, we can subscribe to + // the IceStorm topic. + // + Lock sync(*this); + if(serial != _serial) + { + serial = -1; + continue; + } - try - { - observer->init(_serial, _applications, _nodes); - } - catch(const Ice::LocalException& ex) - { - cerr << ex << endl; + IceStorm::QoS qos; + qos["reliability"] = "twoway ordered"; + _topic->subscribe(qos, observer); + break; } } |