summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Topics.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-04-05 12:54:15 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-04-05 12:54:15 +0000
commit872fd9e5dfb743a648c64bf774f6c9a76a45b651 (patch)
treec071c5dc5bd1177c0913e82cb8a3233f0da50ea6 /cpp/src/IceGrid/Topics.cpp
parentadding timeout test (diff)
downloadice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.tar.bz2
ice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.tar.xz
ice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.zip
- Added support for observing adapters and objects.
- Lots of cleanup in the IceGrid registry initilization method.
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r--cpp/src/IceGrid/Topics.cpp184
1 files changed, 156 insertions, 28 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index f85e074122a..3fdff40ea35 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -77,9 +77,25 @@ private:
};
-NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicPrx& topic, const NodeObserverPrx& publisher) :
- _topic(topic), _publisher(publisher), _serial(0)
+NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : _serial(0)
{
+ IceStorm::TopicPrx t;
+ try
+ {
+ t = topicManager->create("NodeObserver");
+ }
+ catch(const IceStorm::TopicExists&)
+ {
+ t = topicManager->retrieve("NodeObserver");
+ }
+
+ //
+ // NOTE: collocation optimization needs to be turned on for the
+ // topic because the subscribe() method is given a fixed proxy
+ // which can't be marshalled.
+ //
+ const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimization(true));
+ const_cast<NodeObserverPrx&>(_publisher) = NodeObserverPrx::uncheckedCast(_topic->getPublisher());
}
void
@@ -229,22 +245,52 @@ NodeObserverTopic::unsubscribe(const NodeObserverPrx& observer)
_topic->unsubscribe(observer);
}
-RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicPrx& topic,
- const RegistryObserverPrx& publisher,
- NodeObserverTopic& nodeObserver) :
- _topic(topic), _publisher(publisher), _nodeObserver(nodeObserver), _serial(0)
+RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : _serial(0)
{
+ IceStorm::TopicPrx t;
+ try
+ {
+ t = topicManager->create("RegistryObserver");
+ }
+ catch(const IceStorm::TopicExists&)
+ {
+ t = topicManager->retrieve("RegistryObserver");
+ }
+
+ //
+ // NOTE: collocation optimization needs to be turned on for the
+ // topic because the subscribe() method is given a fixed proxy
+ // which can't be marshalled.
+ //
+ const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimization(true));
+ const_cast<RegistryObserverPrx&>(_publisher) = RegistryObserverPrx::uncheckedCast(_topic->getPublisher());
}
void
-RegistryObserverTopic::init(int serial, const ApplicationDescriptorSeq& apps, const Ice::Current&)
+RegistryObserverTopic::init(int serial,
+ const ApplicationDescriptorSeq& apps,
+ const AdapterInfoSeq& adpts,
+ const ObjectInfoSeq& objects,
+ const Ice::Current&)
{
Lock sync(*this);
_serial = serial;
- _applications = apps;
- _publisher->init(serial, apps);
+ for(ApplicationDescriptorSeq::const_iterator p = apps.begin(); p != apps.end(); ++p)
+ {
+ _applications.insert(make_pair(p->name, *p));
+ }
+ for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q)
+ {
+ _adapters.insert(make_pair(q->id, *q));
+ }
+ for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r)
+ {
+ _objects.insert(make_pair(r->proxy->ice_getIdentity(), *r));
+ }
+
+ _publisher->init(serial, apps, adpts, objects);
}
void
@@ -254,7 +300,7 @@ RegistryObserverTopic::applicationAdded(int serial, const ApplicationDescriptor&
updateSerial(serial);
- _applications.push_back(desc);
+ _applications.insert(make_pair(desc.name, desc));
_publisher->applicationAdded(serial, desc);
}
@@ -265,14 +311,8 @@ RegistryObserverTopic::applicationRemoved(int serial, const string& name, const
Lock sync(*this);
updateSerial(serial);
- for(ApplicationDescriptorSeq::iterator p = _applications.begin(); p != _applications.end(); ++p)
- {
- if(p->name == name)
- {
- _applications.erase(p);
- break;
- }
- }
+
+ _applications.erase(name);
_publisher->applicationRemoved(serial, name);
}
@@ -285,15 +325,12 @@ RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateDes
updateSerial(serial);
try
{
- for(ApplicationDescriptorSeq::iterator p = _applications.begin(); p != _applications.end(); ++p)
+ map<string, ApplicationDescriptor>::iterator p = _applications.find(desc.name);
+ if(p != _applications.end())
{
- if(p->name == desc.name)
- {
- ApplicationHelper helper(*p);
- helper.update(desc);
- *p = helper.getDescriptor();
- break;
- }
+ ApplicationHelper helper(p->second);
+ helper.update(desc);
+ p->second = helper.getDescriptor();
}
}
catch(const DeploymentException& ex)
@@ -320,6 +357,78 @@ RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateDes
}
void
+RegistryObserverTopic::adapterAdded(int serial, const AdapterInfo& info, const Ice::Current&)
+{
+ Lock sync(*this);
+
+ updateSerial(serial);
+
+ _adapters.insert(make_pair(info.id, info));
+
+ _publisher->adapterAdded(serial, info);
+}
+
+void
+RegistryObserverTopic::adapterUpdated(int serial, const AdapterInfo& info, const Ice::Current&)
+{
+ Lock sync(*this);
+
+ updateSerial(serial);
+
+ _adapters[info.id] = info;
+
+ _publisher->adapterUpdated(serial, info);
+}
+
+void
+RegistryObserverTopic::adapterRemoved(int serial, const string& id, const Ice::Current&)
+{
+ Lock sync(*this);
+
+ updateSerial(serial);
+
+ _adapters.erase(id);
+
+ _publisher->adapterRemoved(serial, id);
+}
+
+void
+RegistryObserverTopic::objectAdded(int serial, const ObjectInfo& info, const Ice::Current&)
+{
+ Lock sync(*this);
+
+ updateSerial(serial);
+
+ _objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
+
+ _publisher->objectAdded(serial, info);
+}
+
+void
+RegistryObserverTopic::objectUpdated(int serial, const ObjectInfo& info, const Ice::Current&)
+{
+ Lock sync(*this);
+
+ updateSerial(serial);
+
+ _objects[info.proxy->ice_getIdentity()] = info;
+
+ _publisher->objectUpdated(serial, info);
+}
+
+void
+RegistryObserverTopic::objectRemoved(int serial, const Ice::Identity& id, const Ice::Current&)
+{
+ Lock sync(*this);
+
+ updateSerial(serial);
+
+ _objects.erase(id);
+
+ _publisher->objectRemoved(serial, id);
+}
+
+void
RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial)
{
while(true)
@@ -327,13 +436,32 @@ RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial
if(serial == -1)
{
ApplicationDescriptorSeq applications;
+ AdapterInfoSeq adapters;
+ ObjectInfoSeq objects;
{
Lock sync(*this);
assert(_serial != -1);
serial = _serial;
- applications = _applications;
+
+ map<string, ApplicationDescriptor>::const_iterator p;
+ for(p = _applications.begin(); p != _applications.end(); ++p)
+ {
+ applications.push_back(p->second);
+ }
+
+ map<string, AdapterInfo>::const_iterator q;
+ for(q = _adapters.begin(); q != _adapters.end(); ++q)
+ {
+ adapters.push_back(q->second);
+ }
+
+ map<Ice::Identity, ObjectInfo>::const_iterator r;
+ for(r = _objects.begin(); r != _objects.end(); ++r)
+ {
+ objects.push_back(r->second);
+ }
}
- observer->init_async(new RegistryInitCB(this, observer, serial), serial, applications);
+ observer->init_async(new RegistryInitCB(this, observer, serial), serial, applications, adapters, objects);
return;
}