diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-04-05 12:54:15 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-04-05 12:54:15 +0000 |
commit | 872fd9e5dfb743a648c64bf774f6c9a76a45b651 (patch) | |
tree | c071c5dc5bd1177c0913e82cb8a3233f0da50ea6 /cpp/src/IceGrid/Topics.cpp | |
parent | adding timeout test (diff) | |
download | ice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.tar.bz2 ice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.tar.xz ice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.zip |
- Added support for observing adapters and objects.
- Lots of cleanup in the IceGrid registry initilization method.
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 184 |
1 files changed, 156 insertions, 28 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index f85e074122a..3fdff40ea35 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -77,9 +77,25 @@ private: }; -NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicPrx& topic, const NodeObserverPrx& publisher) : - _topic(topic), _publisher(publisher), _serial(0) +NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : _serial(0) { + IceStorm::TopicPrx t; + try + { + t = topicManager->create("NodeObserver"); + } + catch(const IceStorm::TopicExists&) + { + t = topicManager->retrieve("NodeObserver"); + } + + // + // NOTE: collocation optimization needs to be turned on for the + // topic because the subscribe() method is given a fixed proxy + // which can't be marshalled. + // + const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimization(true)); + const_cast<NodeObserverPrx&>(_publisher) = NodeObserverPrx::uncheckedCast(_topic->getPublisher()); } void @@ -229,22 +245,52 @@ NodeObserverTopic::unsubscribe(const NodeObserverPrx& observer) _topic->unsubscribe(observer); } -RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicPrx& topic, - const RegistryObserverPrx& publisher, - NodeObserverTopic& nodeObserver) : - _topic(topic), _publisher(publisher), _nodeObserver(nodeObserver), _serial(0) +RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : _serial(0) { + IceStorm::TopicPrx t; + try + { + t = topicManager->create("RegistryObserver"); + } + catch(const IceStorm::TopicExists&) + { + t = topicManager->retrieve("RegistryObserver"); + } + + // + // NOTE: collocation optimization needs to be turned on for the + // topic because the subscribe() method is given a fixed proxy + // which can't be marshalled. + // + const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimization(true)); + const_cast<RegistryObserverPrx&>(_publisher) = RegistryObserverPrx::uncheckedCast(_topic->getPublisher()); } void -RegistryObserverTopic::init(int serial, const ApplicationDescriptorSeq& apps, const Ice::Current&) +RegistryObserverTopic::init(int serial, + const ApplicationDescriptorSeq& apps, + const AdapterInfoSeq& adpts, + const ObjectInfoSeq& objects, + const Ice::Current&) { Lock sync(*this); _serial = serial; - _applications = apps; - _publisher->init(serial, apps); + for(ApplicationDescriptorSeq::const_iterator p = apps.begin(); p != apps.end(); ++p) + { + _applications.insert(make_pair(p->name, *p)); + } + for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q) + { + _adapters.insert(make_pair(q->id, *q)); + } + for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r) + { + _objects.insert(make_pair(r->proxy->ice_getIdentity(), *r)); + } + + _publisher->init(serial, apps, adpts, objects); } void @@ -254,7 +300,7 @@ RegistryObserverTopic::applicationAdded(int serial, const ApplicationDescriptor& updateSerial(serial); - _applications.push_back(desc); + _applications.insert(make_pair(desc.name, desc)); _publisher->applicationAdded(serial, desc); } @@ -265,14 +311,8 @@ RegistryObserverTopic::applicationRemoved(int serial, const string& name, const Lock sync(*this); updateSerial(serial); - for(ApplicationDescriptorSeq::iterator p = _applications.begin(); p != _applications.end(); ++p) - { - if(p->name == name) - { - _applications.erase(p); - break; - } - } + + _applications.erase(name); _publisher->applicationRemoved(serial, name); } @@ -285,15 +325,12 @@ RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateDes updateSerial(serial); try { - for(ApplicationDescriptorSeq::iterator p = _applications.begin(); p != _applications.end(); ++p) + map<string, ApplicationDescriptor>::iterator p = _applications.find(desc.name); + if(p != _applications.end()) { - if(p->name == desc.name) - { - ApplicationHelper helper(*p); - helper.update(desc); - *p = helper.getDescriptor(); - break; - } + ApplicationHelper helper(p->second); + helper.update(desc); + p->second = helper.getDescriptor(); } } catch(const DeploymentException& ex) @@ -320,6 +357,78 @@ RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateDes } void +RegistryObserverTopic::adapterAdded(int serial, const AdapterInfo& info, const Ice::Current&) +{ + Lock sync(*this); + + updateSerial(serial); + + _adapters.insert(make_pair(info.id, info)); + + _publisher->adapterAdded(serial, info); +} + +void +RegistryObserverTopic::adapterUpdated(int serial, const AdapterInfo& info, const Ice::Current&) +{ + Lock sync(*this); + + updateSerial(serial); + + _adapters[info.id] = info; + + _publisher->adapterUpdated(serial, info); +} + +void +RegistryObserverTopic::adapterRemoved(int serial, const string& id, const Ice::Current&) +{ + Lock sync(*this); + + updateSerial(serial); + + _adapters.erase(id); + + _publisher->adapterRemoved(serial, id); +} + +void +RegistryObserverTopic::objectAdded(int serial, const ObjectInfo& info, const Ice::Current&) +{ + Lock sync(*this); + + updateSerial(serial); + + _objects.insert(make_pair(info.proxy->ice_getIdentity(), info)); + + _publisher->objectAdded(serial, info); +} + +void +RegistryObserverTopic::objectUpdated(int serial, const ObjectInfo& info, const Ice::Current&) +{ + Lock sync(*this); + + updateSerial(serial); + + _objects[info.proxy->ice_getIdentity()] = info; + + _publisher->objectUpdated(serial, info); +} + +void +RegistryObserverTopic::objectRemoved(int serial, const Ice::Identity& id, const Ice::Current&) +{ + Lock sync(*this); + + updateSerial(serial); + + _objects.erase(id); + + _publisher->objectRemoved(serial, id); +} + +void RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial) { while(true) @@ -327,13 +436,32 @@ RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial if(serial == -1) { ApplicationDescriptorSeq applications; + AdapterInfoSeq adapters; + ObjectInfoSeq objects; { Lock sync(*this); assert(_serial != -1); serial = _serial; - applications = _applications; + + map<string, ApplicationDescriptor>::const_iterator p; + for(p = _applications.begin(); p != _applications.end(); ++p) + { + applications.push_back(p->second); + } + + map<string, AdapterInfo>::const_iterator q; + for(q = _adapters.begin(); q != _adapters.end(); ++q) + { + adapters.push_back(q->second); + } + + map<Ice::Identity, ObjectInfo>::const_iterator r; + for(r = _objects.begin(); r != _objects.end(); ++r) + { + objects.push_back(r->second); + } } - observer->init_async(new RegistryInitCB(this, observer, serial), serial, applications); + observer->init_async(new RegistryInitCB(this, observer, serial), serial, applications, adapters, objects); return; } |