diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-09-04 19:39:59 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-09-04 19:39:59 +0000 |
commit | 1ce69fc8c36f2b7fa1d71ebe18e7ac4de01e3268 (patch) | |
tree | cd87f8121980159fdafd6b01e6802ea594478b93 /cpp/src/IceGrid/Topics.cpp | |
parent | icegridadmin always uses routed config if possible. added error messages if (diff) | |
download | ice-1ce69fc8c36f2b7fa1d71ebe18e7ac4de01e3268.tar.bz2 ice-1ce69fc8c36f2b7fa1d71ebe18e7ac4de01e3268.tar.xz ice-1ce69fc8c36f2b7fa1d71ebe18e7ac4de01e3268.zip |
Improved observers, refactored topic manager and initialization Added
icegridadmin commands to examine the registries
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 618 |
1 files changed, 356 insertions, 262 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 508a07920a4..3fcf4ac924f 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -14,13 +14,18 @@ using namespace std; using namespace IceGrid; -class RegistryInitCB : public AMI_RegistryObserver_init +namespace IceGrid +{ + +template<class T> +class InitCB : public T { public: - RegistryInitCB(const RegistryObserverTopicPtr& topic, const RegistryObserverPrx& observer, int serial) : + InitCB(const ObserverTopicPtr& topic, const Ice::ObjectPrx& observer, const string& name, int serial) : _topic(topic), _observer(observer), + _name(name), _serial(serial) { } @@ -35,93 +40,185 @@ public: ice_exception(const Ice::Exception& ex) { Ice::Warning out(_observer->ice_getCommunicator()->getLogger()); - out << "couldn't initialize registry observer:\n" << ex; + out << "couldn't initialize " << _name << " observer:\n" << ex; } private: - const RegistryObserverTopicPtr _topic; - const RegistryObserverPrx _observer; + const ObserverTopicPtr _topic; + const Ice::ObjectPrx _observer; + const string _name; const int _serial; }; -class NodeInitCB : public AMI_NodeObserver_init +}; + +ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const string& name) : + _serial(0) { -public: + IceStorm::TopicPrx t; + try + { + t = topicManager->create(name); + } + catch(const IceStorm::TopicExists&) + { + t = topicManager->retrieve(name); + } - NodeInitCB(const NodeObserverTopicPtr& topic, const NodeObserverPrx& observer, int serial) : - _topic(topic), - _observer(observer), - _serial(serial) - { + // + // NOTE: collocation optimization needs to be turned on for the + // topic because the subscribe() method is given a fixed proxy + // which can't be marshalled. + // + _topic = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true)); + _basePublisher = _topic->getPublisher()->ice_collocationOptimized(false); +} + +ObserverTopic::~ObserverTopic() +{ +} + +void +ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, int serial) +{ + while(true) + { + if(serial == -1) + { + initObserver(obsv); + return; + } + + Lock sync(*this); + if(serial != _serial) + { + serial = -1; + continue; + } + + subscribeImpl(obsv); + break; } +} - void - ice_response() +void +ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer) +{ + Lock sync(*this); + if(_topic) { - _topic->subscribe(_observer, _serial); + _topic->unsubscribe(observer); } - - void - ice_exception(const Ice::Exception& ex) +} + +void +ObserverTopic::destroy() +{ + Lock sync(*this); + _topic = 0; +} + +void +ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer) +{ + // This must be called with the mutex locked. + if(!_topic) { - Ice::Warning out(_observer->ice_getCommunicator()->getLogger()); - out << "couldn't initialize node observer:\n" << ex; + return; } - -private: - const NodeObserverTopicPtr _topic; - const NodeObserverPrx _observer; - const int _serial; -}; + IceStorm::QoS qos; + qos["reliability"] = "twoway ordered"; + _topic->subscribe(qos, observer); +} + +void +ObserverTopic::updateSerial(int serial) +{ + // + // This loop ensures that updates from the database are processed + // sequentially. + // + assert(_serial < serial); + while(_serial + 1 != serial) + { + wait(); + } + _serial = serial; + notifyAll(); +} -NodeObserverTopic::NodeObserverTopic(const Ice::ObjectAdapterPtr& adapter, - const IceStorm::TopicManagerPrx& topicManager) : -/* -#ifdef __BCPLUSPLUS__ // COMPILERFIX +RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : + ObserverTopic(topicManager, "RegistryObserver") { + const_cast<RegistryObserverPrx&>(_publisher) = RegistryObserverPrx::uncheckedCast(_basePublisher); } void -NodeObserverTopic::initialize(const IceStorm::TopicManagerPrx& topicManager) -#endif -*/ - _serial(0) +RegistryObserverTopic::registryUp(const RegistryInfo& info) { - IceStorm::TopicPrx t; - try + Lock sync(*this); + if(!_topic) { - t = topicManager->create("NodeObserver"); + return; } - catch(const IceStorm::TopicExists&) + updateSerial(_serial + 1); + _registries.insert(make_pair(info.name, info)); + _publisher->registryUp(info); +} + +void +RegistryObserverTopic::registryDown(const string& name) +{ + Lock sync(*this); + if(!_topic) + { + return; + } + updateSerial(_serial + 1); + if(_registries.find(name) != _registries.end()) { - t = topicManager->retrieve("NodeObserver"); + _registries.erase(name); + _publisher->registryDown(name); } +} - // - // NOTE: collocation optimization needs to be turned on for the - // topic because the subscribe() method is given a fixed proxy - // which can't be marshalled. - // - const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true)); - const_cast<NodeObserverPrx&>(_internalPublisher) = NodeObserverPrx::uncheckedCast(_topic->getPublisher()); +void +RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +{ + RegistryObserverPrx observer = RegistryObserverPrx::uncheckedCast(obsv); + RegistryInfoSeq registries; + int serial; + { + Lock sync(*this); + registries.reserve(_registries.size()); + for(map<string, RegistryInfo>::const_iterator p = _registries.begin(); p != _registries.end(); ++p) + { + registries.push_back(p->second); + } + serial = _serial; + } + observer->registryInit_async(new InitCB<AMI_RegistryObserver_registryInit>(this, observer, "registry", serial), + registries); +} - __setNoDelete(true); +NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager, + const Ice::ObjectAdapterPtr& adapter) : + ObserverTopic(topicManager, "NodeObserver") +{ + const_cast<NodeObserverPrx&>(_publisher) = NodeObserverPrx::uncheckedCast(_basePublisher); try { - const_cast<NodeObserverPrx&>(_publisher) = NodeObserverPrx::uncheckedCast(adapter->addWithUUID(this)); + const_cast<NodeObserverPrx&>(_externalPublisher) = NodeObserverPrx::uncheckedCast(adapter->addWithUUID(this)); } - catch(...) + catch(const Ice::LocalException&) { - __setNoDelete(false); - throw; } - __setNoDelete(false); } void -NodeObserverTopic::init(const NodeDynamicInfoSeq&, const Ice::Current&) +NodeObserverTopic::nodeInit(const NodeDynamicInfoSeq&, const Ice::Current&) { assert(false); } @@ -130,25 +227,30 @@ void NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current& current) { Lock sync(*this); - _nodes.insert(make_pair(info.name, info)); - _internalPublisher->nodeUp(info); + if(!_topic) + { + return; + } + updateSerial(_serial + 1); + _nodes.insert(make_pair(info.info.name, info)); + _publisher->nodeUp(info); } void NodeObserverTopic::nodeDown(const string& name, const Ice::Current&) { - Lock sync(*this); - if(_nodes.find(name) != _nodes.end()) - { - _nodes.erase(name); - _internalPublisher->nodeDown(name); - } + assert(false); } void NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&) { Lock sync(*this); + if(!_topic) + { + return; + } + if(_nodes.find(node) == _nodes.end()) { // @@ -157,7 +259,7 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser return; } - ++_serial; + updateSerial(_serial + 1); ServerDynamicInfoSeq& servers = _nodes[node].servers; ServerDynamicInfoSeq::iterator p = servers.begin(); @@ -182,13 +284,18 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser servers.push_back(server); } - _internalPublisher->updateServer(node, server); + _publisher->updateServer(node, server); } void NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& adapter, const Ice::Current&) { Lock sync(*this); + if(!_topic) + { + return; + } + if(_nodes.find(node) == _nodes.end()) { // @@ -197,7 +304,7 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a return; } - ++_serial; + updateSerial(_serial + 1); AdapterDynamicInfoSeq& adapters = _nodes[node].adapters; AdapterDynamicInfoSeq::iterator p = adapters.begin(); @@ -222,149 +329,103 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a adapters.push_back(adapter); } - _internalPublisher->updateAdapter(node, adapter); + _publisher->updateAdapter(node, adapter); } -void -NodeObserverTopic::subscribe(const NodeObserverPrx& observer, int serial) +void +NodeObserverTopic::nodeDown(const string& name) { - while(true) + Lock sync(*this); + if(!_topic) { - if(serial == -1) - { - NodeDynamicInfoSeq nodes; - { - Lock sync(*this); - nodes.reserve(_nodes.size()); - for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p) - { - nodes.push_back(p->second); - } - serial = _serial; - } - observer->init_async(new NodeInitCB(this, observer, serial), nodes); - return; - } - - Lock sync(*this); - if(serial != _serial) - { - serial = -1; - continue; - } - - IceStorm::QoS qos; - qos["reliability"] = "twoway ordered"; - _topic->subscribe(qos, observer); - break; + return; } -} -void -NodeObserverTopic::unsubscribe(const NodeObserverPrx& observer) -{ - _topic->unsubscribe(observer); -} + updateSerial(_serial + 1); -RegistryObserverTopic::RegistryObserverTopic(const Ice::ObjectAdapterPtr& adapter, - const IceStorm::TopicManagerPrx& topicManager) : -/* -#ifdef __BCPLUSPLUS__ // COMPILERFIX -{ + if(_nodes.find(name) != _nodes.end()) + { + _nodes.erase(name); + _publisher->nodeDown(name); + } } void -RegistryObserverTopic::initialize(const IceStorm::TopicManagerPrx& topicManager) -#endif -*/ - _serial(0) +NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { - IceStorm::TopicPrx t; - try - { - t = topicManager->create("RegistryObserver"); - } - catch(const IceStorm::TopicExists&) + NodeObserverPrx observer = NodeObserverPrx::uncheckedCast(obsv); + int serial; + NodeDynamicInfoSeq nodes; { - t = topicManager->retrieve("RegistryObserver"); + Lock sync(*this); + nodes.reserve(_nodes.size()); + for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p) + { + nodes.push_back(p->second); + } + serial = _serial; } + observer->nodeInit_async(new InitCB<AMI_NodeObserver_nodeInit>(this, observer, "node", serial), nodes); +} - // - // NOTE: collocation optimization needs to be turned on for the - // topic because the subscribe() method is given a fixed proxy - // which can't be marshalled. - // - const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true)); - const_cast<RegistryObserverPrx&>(_internalPublisher) = RegistryObserverPrx::uncheckedCast(_topic->getPublisher()); - - __setNoDelete(true); - try - { - const_cast<RegistryObserverPrx&>(_publisher) = RegistryObserverPrx::uncheckedCast(adapter->addWithUUID(this)); - } - catch(...) - { - __setNoDelete(false); - throw; - } - __setNoDelete(false); +ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager, + const StringApplicationInfoDict& applications) : + ObserverTopic(topicManager, "ApplicationObserver"), + _applications(applications.begin(), applications.end()) +{ + const_cast<ApplicationObserverPrx&>(_publisher) = ApplicationObserverPrx::uncheckedCast(_basePublisher); } void -RegistryObserverTopic::init(int serial, - const ApplicationInfoSeq& apps, - const AdapterInfoSeq& adpts, - const ObjectInfoSeq& objects, - const Ice::Current&) +ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& apps) { Lock sync(*this); - - _serial = serial; - - for(ApplicationInfoSeq::const_iterator p = apps.begin(); p != apps.end(); ++p) - { - _applications.insert(make_pair(p->descriptor.name, *p)); - } - for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q) + if(!_topic) { - _adapters.insert(make_pair(q->id, *q)); + return; } - for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r) + updateSerial(serial); + _applications.clear(); + for(ApplicationInfoSeq::const_iterator p = apps.begin(); p != apps.end(); ++p) { - _objects.insert(make_pair(r->proxy->ice_getIdentity(), *r)); + _applications.insert(make_pair(p->descriptor.name, *p)); } - - _internalPublisher->init(serial, apps, adpts, objects); } void -RegistryObserverTopic::applicationAdded(int serial, const ApplicationInfo& info, const Ice::Current&) +ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& info) { Lock sync(*this); - + if(!_topic) + { + return; + } updateSerial(serial); - _applications.insert(make_pair(info.descriptor.name, info)); - - _internalPublisher->applicationAdded(serial, info); + _publisher->applicationAdded(serial, info); } void -RegistryObserverTopic::applicationRemoved(int serial, const string& name, const Ice::Current&) +ApplicationObserverTopic::applicationRemoved(int serial, const string& name) { Lock sync(*this); - + if(!_topic) + { + return; + } updateSerial(serial); - _applications.erase(name); - - _internalPublisher->applicationRemoved(serial, name); + _publisher->applicationRemoved(serial, name); } void -RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInfo& info, const Ice::Current& c) +ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInfo& info) { Lock sync(*this); + if(!_topic) + { + return; + } updateSerial(serial); try @@ -372,7 +433,7 @@ RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInf map<string, ApplicationInfo>::iterator p = _applications.find(info.descriptor.name); if(p != _applications.end()) { - ApplicationHelper helper(c.adapter->getCommunicator(), p->second.descriptor); + ApplicationHelper helper(_publisher->ice_getCommunicator(), p->second.descriptor); p->second.descriptor = helper.update(info.descriptor); p->second.updateTime = info.updateTime; p->second.updateUser = info.updateUser; @@ -398,156 +459,189 @@ RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInf { assert(false); } + _publisher->applicationUpdated(serial, info); +} + +void +ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +{ + ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv); + int serial; + ApplicationInfoSeq applications; + { + Lock sync(*this); + serial = _serial; + assert(serial != -1); + for(map<string, ApplicationInfo>::const_iterator p = _applications.begin(); p != _applications.end(); ++p) + { + applications.push_back(p->second); + } + } + observer->applicationInit_async(new InitCB<AMI_ApplicationObserver_applicationInit>(this, observer, "application", + serial), + serial, applications); +} - _internalPublisher->applicationUpdated(serial, info); +AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topicManager, + const StringAdapterInfoDict& adapters) : + ObserverTopic(topicManager, "AdapterObserver"), + _adapters(adapters.begin(), adapters.end()) +{ + const_cast<AdapterObserverPrx&>(_publisher) = AdapterObserverPrx::uncheckedCast(_basePublisher); } void -RegistryObserverTopic::adapterAdded(int serial, const AdapterInfo& info, const Ice::Current&) +AdapterObserverTopic::adapterInit(int serial, const AdapterInfoSeq& adpts) { Lock sync(*this); - + if(!_topic) + { + return; + } updateSerial(serial); + _adapters.clear(); + for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q) + { + _adapters.insert(make_pair(q->id, *q)); + } +} +void +AdapterObserverTopic::adapterAdded(int serial, const AdapterInfo& info) +{ + Lock sync(*this); + if(!_topic) + { + return; + } + updateSerial(serial); _adapters.insert(make_pair(info.id, info)); - - _internalPublisher->adapterAdded(serial, info); + _publisher->adapterAdded(info); } void -RegistryObserverTopic::adapterUpdated(int serial, const AdapterInfo& info, const Ice::Current&) +AdapterObserverTopic::adapterUpdated(int serial, const AdapterInfo& info) { Lock sync(*this); - + if(!_topic) + { + return; + } updateSerial(serial); - _adapters[info.id] = info; - - _internalPublisher->adapterUpdated(serial, info); + _publisher->adapterUpdated(info); } void -RegistryObserverTopic::adapterRemoved(int serial, const string& id, const Ice::Current&) +AdapterObserverTopic::adapterRemoved(int serial, const string& id) { Lock sync(*this); - + if(!_topic) + { + return; + } updateSerial(serial); - _adapters.erase(id); + _publisher->adapterRemoved(id); +} - _internalPublisher->adapterRemoved(serial, id); +void +AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +{ + AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv); + int serial; + AdapterInfoSeq adapters; + { + Lock sync(*this); + serial = _serial; + assert(serial != -1); + for(map<string, AdapterInfo>::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + { + adapters.push_back(p->second); + } + } + observer->adapterInit_async(new InitCB<AMI_AdapterObserver_adapterInit>(this, observer, "adapter", serial), + adapters); +} + +ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicManager, + const IdentityObjectInfoDict& objects) : + ObserverTopic(topicManager, "ObjectObserver"), + _objects(objects.begin(), objects.end()) +{ + const_cast<ObjectObserverPrx&>(_publisher) = ObjectObserverPrx::uncheckedCast(_basePublisher); } void -RegistryObserverTopic::objectAdded(int serial, const ObjectInfo& info, const Ice::Current&) +ObjectObserverTopic::objectInit(int serial, const ObjectInfoSeq& objects) { Lock sync(*this); - + if(!_topic) + { + return; + } updateSerial(serial); + _objects.clear(); + for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r) + { + _objects.insert(make_pair(r->proxy->ice_getIdentity(), *r)); + } +} +void +ObjectObserverTopic::objectAdded(int serial, const ObjectInfo& info) +{ + Lock sync(*this); + if(!_topic) + { + return; + } + updateSerial(serial); _objects.insert(make_pair(info.proxy->ice_getIdentity(), info)); - - _internalPublisher->objectAdded(serial, info); + _publisher->objectAdded(info); } void -RegistryObserverTopic::objectUpdated(int serial, const ObjectInfo& info, const Ice::Current&) +ObjectObserverTopic::objectUpdated(int serial, const ObjectInfo& info) { Lock sync(*this); - + if(!_topic) + { + return; + } updateSerial(serial); - _objects[info.proxy->ice_getIdentity()] = info; - - _internalPublisher->objectUpdated(serial, info); + _publisher->objectUpdated(info); } void -RegistryObserverTopic::objectRemoved(int serial, const Ice::Identity& id, const Ice::Current&) +ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id) { Lock sync(*this); - + if(!_topic) + { + return; + } updateSerial(serial); - _objects.erase(id); - - _internalPublisher->objectRemoved(serial, id); + _publisher->objectRemoved(id); } void -RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial) +ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { - while(true) + ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv); + int serial; + ObjectInfoSeq objects; { - if(serial == -1) - { - ApplicationInfoSeq applications; - AdapterInfoSeq adapters; - ObjectInfoSeq objects; - { - Lock sync(*this); - assert(_serial != -1); - serial = _serial; - - map<string, ApplicationInfo>::const_iterator p; - for(p = _applications.begin(); p != _applications.end(); ++p) - { - applications.push_back(p->second); - } - - map<string, AdapterInfo>::const_iterator q; - for(q = _adapters.begin(); q != _adapters.end(); ++q) - { - adapters.push_back(q->second); - } - - map<Ice::Identity, ObjectInfo>::const_iterator r; - for(r = _objects.begin(); r != _objects.end(); ++r) - { - objects.push_back(r->second); - } - } - observer->init_async(new RegistryInitCB(this, observer, serial), serial, applications, adapters, objects); - return; - } - - // - // If the registry cache changed since we've send the init() - // call we need to do it again. Otherwise, we can subscribe to - // the IceStorm topic. - // Lock sync(*this); - if(serial != _serial) + serial = _serial; + for(map<Ice::Identity, ObjectInfo>::const_iterator p = _objects.begin(); p != _objects.end(); ++p) { - serial = -1; - continue; + objects.push_back(p->second); } - - IceStorm::QoS qos; - qos["reliability"] = "twoway ordered"; - _topic->subscribe(qos, observer); - break; - } + } + observer->objectInit_async(new InitCB<AMI_ObjectObserver_objectInit>(this, observer, "object", serial), objects); } -void -RegistryObserverTopic::unsubscribe(const RegistryObserverPrx& observer) -{ - Lock sync(*this); - _topic->unsubscribe(observer); -} -void -RegistryObserverTopic::updateSerial(int serial) -{ - // - // This loop ensures that updates from the database are processed - // sequentially. - // - while(_serial + 1 != serial) - { - wait(); - } - _serial = serial; - notifyAll(); -} |