summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Topics.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2005-06-17 15:25:02 +0000
committerBenoit Foucher <benoit@zeroc.com>2005-06-17 15:25:02 +0000
commit2bc0f2b81423bb51c8649fefdeae23c602a56558 (patch)
treeda5dc69eda441fe23345267d1606da5266413f94 /cpp/src/IceGrid/Topics.cpp
parentbuild fix (diff)
downloadice-2bc0f2b81423bb51c8649fefdeae23c602a56558.tar.bz2
ice-2bc0f2b81423bb51c8649fefdeae23c602a56558.tar.xz
ice-2bc0f2b81423bb51c8649fefdeae23c602a56558.zip
Fixed bugs + added "update" test suite
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r--cpp/src/IceGrid/Topics.cpp162
1 files changed, 124 insertions, 38 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index b92a14de1b4..41427244449 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -14,24 +14,86 @@
using namespace std;
using namespace IceGrid;
+class RegistryInitCB : public AMI_RegistryObserver_init
+{
+public:
+
+ RegistryInitCB(const RegistryObserverTopicPtr& topic, const RegistryObserverPrx& observer, int serial) :
+ _topic(topic),
+ _observer(observer),
+ _serial(serial)
+ {
+ }
+
+ void
+ ice_response()
+ {
+ _topic->subscribe(_observer, _serial);
+ }
+
+ void
+ ice_exception(const Ice::Exception&)
+ {
+ // Ignore
+ }
+
+private:
+
+ const RegistryObserverTopicPtr _topic;
+ const RegistryObserverPrx _observer;
+ const int _serial;
+};
+
+class NodeInitCB : public AMI_NodeObserver_init
+{
+public:
+
+ NodeInitCB(const NodeObserverTopicPtr& topic, const NodeObserverPrx& observer, int serial) :
+ _topic(topic),
+ _observer(observer),
+ _serial(serial)
+ {
+ }
+
+ void
+ ice_response()
+ {
+ _topic->subscribe(_observer, _serial);
+ }
+
+ void
+ ice_exception(const Ice::Exception&)
+ {
+ // Ignore
+ }
+
+private:
+
+ const NodeObserverTopicPtr _topic;
+ const NodeObserverPrx _observer;
+ const int _serial;
+};
+
+
NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicPrx& topic, const NodeObserverPrx& publisher) :
- _topic(topic), _publisher(publisher)
+ _topic(topic), _publisher(publisher), _serial(0)
{
}
void
-NodeObserverTopic::init(const string& node,
- const ServerDynamicInfoSeq& servers,
- const AdapterDynamicInfoSeq& adapters,
- const Ice::Current& current)
+NodeObserverTopic::init(const NodeDynamicInfoSeq&, const Ice::Current&)
+{
+ assert(false);
+}
+
+void
+NodeObserverTopic::initNode(const NodeDynamicInfo& info, const Ice::Current& current)
{
Lock sync(*this);
- _nodes.insert(node);
- _servers[node] = servers;
- _adapters[node] = adapters;
+ _nodes.insert(make_pair(info.name, info));
- _publisher->init(node, servers, adapters);
+ _publisher->initNode(info);
}
void
@@ -40,7 +102,9 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
Lock sync(*this);
assert(_nodes.find(node) != _nodes.end());
- ServerDynamicInfoSeq& servers = _servers[node];
+ ++_serial;
+
+ ServerDynamicInfoSeq& servers = _nodes[node].servers;
ServerDynamicInfoSeq::iterator p = servers.begin();
while(p != servers.end())
{
@@ -65,7 +129,9 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a
Lock sync(*this);
assert(_nodes.find(node) != _nodes.end());
- AdapterDynamicInfoSeq& adapters = _adapters[node];
+ ++_serial;
+
+ AdapterDynamicInfoSeq& adapters = _nodes[node].adapters;
AdapterDynamicInfoSeq::iterator p = adapters.begin();
while(p != adapters.end())
{
@@ -85,24 +151,34 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a
}
void
-NodeObserverTopic::subscribe(const NodeObserverPrx& observer)
+NodeObserverTopic::subscribe(const NodeObserverPrx& observer, int serial)
{
- IceStorm::QoS qos;
- qos["reliability"] = "twoway ordered";
-
- Lock sync(*this);
- _topic->subscribe(qos, observer);
- for(set<string>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
+ while(true)
{
- try
+ if(serial == -1)
{
- observer->init(*p, _servers[*p], _adapters[*p]);
+ Lock sync(*this);
+ NodeDynamicInfoSeq nodes;
+ nodes.reserve(_nodes.size());
+ for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
+ {
+ nodes.push_back(p->second);
+ }
+ observer->init_async(new NodeInitCB(this, observer, _serial), nodes);
+ return;
}
- catch(const Ice::LocalException& ex)
+
+ Lock sync(*this);
+ if(serial != _serial)
{
- cerr << ex << endl;
- break;
+ serial = -1;
+ continue;
}
+
+ IceStorm::QoS qos;
+ qos["reliability"] = "twoway ordered";
+ _topic->subscribe(qos, observer);
+ break;
}
}
@@ -117,11 +193,8 @@ NodeObserverTopic::removeNode(const string& name)
{
Lock sync(*this);
_nodes.erase(name);
- _servers.erase(name);
- _adapters.erase(name);
}
-
RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicPrx& topic,
const RegistryObserverPrx& publisher,
NodeObserverTopic& nodeObserver) :
@@ -246,21 +319,34 @@ RegistryObserverTopic::nodeDown(const string& name, const Ice::Current&)
}
void
-RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer)
+RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial)
{
- IceStorm::QoS qos;
- qos["reliability"] = "twoway ordered";
+ while(true)
+ {
+ if(serial == -1)
+ {
+ Lock sync(*this);
+ assert(_serial != -1);
+ observer->init_async(new RegistryInitCB(this, observer, _serial), _serial, _applications, _nodes);
+ return;
+ }
- Lock sync(*this);
- _topic->subscribe(qos, observer);
+ //
+ // If the registry cache changed since we've send the init()
+ // call we need to do it again. Otherwise, we can subscribe to
+ // the IceStorm topic.
+ //
+ Lock sync(*this);
+ if(serial != _serial)
+ {
+ serial = -1;
+ continue;
+ }
- try
- {
- observer->init(_serial, _applications, _nodes);
- }
- catch(const Ice::LocalException& ex)
- {
- cerr << ex << endl;
+ IceStorm::QoS qos;
+ qos["reliability"] = "twoway ordered";
+ _topic->subscribe(qos, observer);
+ break;
}
}