diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-09-13 16:18:06 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-09-13 16:18:06 +0000 |
commit | e817166c5e13369f6cc8042f6530b1f3fac7c359 (patch) | |
tree | 19691a2ca1e4c01faa3ae75fba840e50c6f70d06 /cpp/src/IceGrid/Topics.cpp | |
parent | - Updating Ice and library verison to 3.2. (diff) | |
download | ice-e817166c5e13369f6cc8042f6530b1f3fac7c359.tar.bz2 ice-e817166c5e13369f6cc8042f6530b1f3fac7c359.tar.xz ice-e817166c5e13369f6cc8042f6530b1f3fac7c359.zip |
Simplified subscription code to use new per-subscriber publisher
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 151 |
1 files changed, 36 insertions, 115 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 5c896132f1b..1b9969a4543 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -85,43 +85,22 @@ ObserverTopic::~ObserverTopic() void ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name, int serial) { - while(true) + Lock sync(*this); + if(!_topic) { - if(serial == -1) - { - initObserver(obsv, name); - return; - } - - Lock sync(*this); - if(serial != _serial) - { - serial = -1; - continue; - } - - subscribeImpl(obsv, name); - break; + return; } -} + + IceStorm::QoS qos; + qos["reliability"] = "twoway ordered"; + initObserver(_topic->subscribe(qos, obsv)); -void -ObserverTopic::subscribeAndWaitForSubscription(const Ice::ObjectPrx& obsv, const string& name) -{ - { - Lock sync(*this); - _waitForSubscribe.insert(obsv->ice_getIdentity()); - } - subscribe(obsv, name); + if(!name.empty()) { - Lock sync(*this); - while(_topic && _waitForSubscribe.find(obsv->ice_getIdentity()) != _waitForSubscribe.end()) - { - wait(); - } + _syncSubscribers.insert(name); } } - + void ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name) { @@ -201,7 +180,7 @@ ObserverTopic::waitForSyncedSubscribers(int serial) { return; } - + _waitForUpdates.insert(make_pair(serial, _syncSubscribers)); // @@ -216,8 +195,7 @@ ObserverTopic::waitForSyncedSubscribers(int serial) if(q != _updateFailures.end()) { map<string, string> failures = q->second; - _updateFailures.erase(q); - + _updateFailures.erase(q); ostringstream os; for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r) { @@ -236,29 +214,6 @@ ObserverTopic::waitForSyncedSubscribers(int serial) } } -void -ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer, const string& name) -{ - // This must be called with the mutex locked. - if(!_topic) - { - return; - } - - IceStorm::QoS qos; - qos["reliability"] = "twoway ordered"; - _topic->subscribe(qos, observer); - - _waitForSubscribe.erase(observer->ice_getIdentity()); - - if(!name.empty()) - { - _syncSubscribers.insert(name); - } - - notifyAll(); -} - void ObserverTopic::updateSerial(int serial) { @@ -338,22 +293,16 @@ RegistryObserverTopic::registryDown(const string& name) } void -RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name) +RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { RegistryObserverPrx observer = RegistryObserverPrx::uncheckedCast(obsv); RegistryInfoSeq registries; - int serial; + registries.reserve(_registries.size()); + for(map<string, RegistryInfo>::const_iterator p = _registries.begin(); p != _registries.end(); ++p) { - Lock sync(*this); - registries.reserve(_registries.size()); - for(map<string, RegistryInfo>::const_iterator p = _registries.begin(); p != _registries.end(); ++p) - { - registries.push_back(p->second); - } - serial = _serial; + registries.push_back(p->second); } - observer->registryInit_async(new InitCB<AMI_RegistryObserver_registryInit>(this, observer, name, "registry", - serial), registries); + observer->registryInit(registries, getContext(-1)); } NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager, @@ -536,21 +485,16 @@ NodeObserverTopic::nodeDown(const string& name) } void -NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name) +NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { NodeObserverPrx observer = NodeObserverPrx::uncheckedCast(obsv); - int serial; NodeDynamicInfoSeq nodes; + nodes.reserve(_nodes.size()); + for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p) { - Lock sync(*this); - nodes.reserve(_nodes.size()); - for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p) - { - nodes.push_back(p->second); - } - serial = _serial; + nodes.push_back(p->second); } - observer->nodeInit_async(new InitCB<AMI_NodeObserver_nodeInit>(this, observer, name, "node", serial), nodes); + observer->nodeInit(nodes, getContext(-1)); } ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager, @@ -684,23 +628,15 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate } void -ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name) +ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv); - int serial; ApplicationInfoSeq applications; + for(map<string, ApplicationInfo>::const_iterator p = _applications.begin(); p != _applications.end(); ++p) { - Lock sync(*this); - serial = _serial; - assert(serial != -1); - for(map<string, ApplicationInfo>::const_iterator p = _applications.begin(); p != _applications.end(); ++p) - { - applications.push_back(p->second); - } + applications.push_back(p->second); } - observer->applicationInit_async(new InitCB<AMI_ApplicationObserver_applicationInit>(this, observer, name, - "application", serial), - serial, applications); + observer->applicationInit(_serial, applications, getContext(-1)); } AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topicManager, @@ -803,22 +739,15 @@ AdapterObserverTopic::adapterRemoved(int serial, const string& id) } void -AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name) +AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv); - int serial; AdapterInfoSeq adapters; + for(map<string, AdapterInfo>::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) { - Lock sync(*this); - serial = _serial; - assert(serial != -1); - for(map<string, AdapterInfo>::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) - { - adapters.push_back(p->second); - } - } - observer->adapterInit_async(new InitCB<AMI_AdapterObserver_adapterInit>(this, observer, name, "adapter", serial), - adapters); + adapters.push_back(p->second); + } + observer->adapterInit(adapters, getContext(-1)); } ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicManager, @@ -921,21 +850,13 @@ ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id) } void -ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name) +ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv); - int serial; ObjectInfoSeq objects; + for(map<Ice::Identity, ObjectInfo>::const_iterator p = _objects.begin(); p != _objects.end(); ++p) { - Lock sync(*this); - serial = _serial; - for(map<Ice::Identity, ObjectInfo>::const_iterator p = _objects.begin(); p != _objects.end(); ++p) - { - objects.push_back(p->second); - } - } - observer->objectInit_async(new InitCB<AMI_ObjectObserver_objectInit>(this, observer, name, "object", serial), - objects); + objects.push_back(p->second); + } + observer->objectInit(objects, getContext(-1)); } - - |