diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-09-13 10:09:50 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-09-13 10:09:50 +0000 |
commit | e65e272d55bfe9f9d26de43b73721c04b683234a (patch) | |
tree | 0b0e04bbaf26fe357a1620e4725986d5d750c003 /cpp/src/IceGrid/Topics.cpp | |
parent | Fixed for VC6 (STLport bug?) (diff) | |
download | ice-e65e272d55bfe9f9d26de43b73721c04b683234a.tar.bz2 ice-e65e272d55bfe9f9d26de43b73721c04b683234a.tar.xz ice-e65e272d55bfe9f9d26de43b73721c04b683234a.zip |
Fixes
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 185 |
1 files changed, 150 insertions, 35 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 6eebdae97bc..5c896132f1b 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -22,9 +22,11 @@ class InitCB : public T { public: - InitCB(const ObserverTopicPtr& topic, const Ice::ObjectPrx& observer, const string& name, int serial) : + InitCB(const ObserverTopicPtr& topic, const Ice::ObjectPrx& observer, const string& subscriberName, + const string& name, int serial) : _topic(topic), _observer(observer), + _subscriberName(subscriberName), _name(name), _serial(serial) { @@ -33,7 +35,7 @@ public: void ice_response() { - _topic->subscribe(_observer, _serial); + _topic->subscribe(_observer, _subscriberName, _serial); } void @@ -47,6 +49,7 @@ private: const ObserverTopicPtr _topic; const Ice::ObjectPrx _observer; + const string _subscriberName; const string _name; const int _serial; }; @@ -80,13 +83,13 @@ ObserverTopic::~ObserverTopic() } void -ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, int serial) +ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name, int serial) { while(true) { if(serial == -1) { - initObserver(obsv); + initObserver(obsv, name); return; } @@ -97,19 +100,19 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, int serial) continue; } - subscribeImpl(obsv); + subscribeImpl(obsv, name); break; } } void -ObserverTopic::subscribeAndWaitForSubscription(const Ice::ObjectPrx& obsv) +ObserverTopic::subscribeAndWaitForSubscription(const Ice::ObjectPrx& obsv, const string& name) { { Lock sync(*this); _waitForSubscribe.insert(obsv->ice_getIdentity()); } - subscribe(obsv); + subscribe(obsv, name); { Lock sync(*this); while(_topic && _waitForSubscribe.find(obsv->ice_getIdentity()) != _waitForSubscribe.end()) @@ -120,13 +123,39 @@ ObserverTopic::subscribeAndWaitForSubscription(const Ice::ObjectPrx& obsv) } void -ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer) +ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name) { Lock sync(*this); if(_topic) { _topic->unsubscribe(observer); } + + if(!name.empty()) + { + _syncSubscribers.erase(name); + + map<int, set<string> >::iterator p = _waitForUpdates.begin(); + bool notifyMonitor = false; + while(p != _waitForUpdates.end()) + { + p->second.erase(name); + if(p->second.empty()) + { + _waitForUpdates.erase(p++); + notifyMonitor = true; + } + else + { + ++p; + } + } + + if(notifyMonitor) + { + notifyAll(); + } + } } void @@ -137,8 +166,78 @@ ObserverTopic::destroy() notifyAll(); } +void +ObserverTopic::receivedUpdate(const string& name, int serial, const string& failure) +{ + Lock sync(*this); + + map<int, set<string> >::iterator p = _waitForUpdates.find(serial); + if(p != _waitForUpdates.end()) + { + p->second.erase(name); + + if(!failure.empty()) + { + map<int, map<string, string> >::iterator q = _updateFailures.find(serial); + if(q == _updateFailures.end()) + { + q = _updateFailures.insert(make_pair(serial, map<string ,string>())).first; + } + q->second.insert(make_pair(name, failure)); + } + + if(p->second.empty()) + { + _waitForUpdates.erase(p); + notifyAll(); + } + } +} + +void +ObserverTopic::waitForSyncedSubscribers(int serial) +{ + if(_syncSubscribers.empty()) + { + return; + } + + _waitForUpdates.insert(make_pair(serial, _syncSubscribers)); + + // + // Wait until all the updates are received. + // + while(true) + { + map<int, set<string> >::const_iterator p = _waitForUpdates.find(serial); + if(p == _waitForUpdates.end()) + { + map<int, map<string, string> >::iterator q = _updateFailures.find(serial); + if(q != _updateFailures.end()) + { + map<string, string> failures = q->second; + _updateFailures.erase(q); + + ostringstream os; + for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r) + { + os << "replication failed on replica `" << r->first << "':\n" << r->second << "\n"; + } + // TODO: XXX +// Ice::Error err(_traceLevels->logger); +// err << os.str(); + } + return; + } + else + { + wait(); + } + } +} + void -ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer) +ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer, const string& name) { // This must be called with the mutex locked. if(!_topic) @@ -151,6 +250,12 @@ ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer) _topic->subscribe(qos, observer); _waitForSubscribe.erase(observer->ice_getIdentity()); + + if(!name.empty()) + { + _syncSubscribers.insert(name); + } + notifyAll(); } @@ -171,13 +276,13 @@ ObserverTopic::updateSerial(int serial) } Ice::Context -ObserverTopic::getContext(const string& name, int serial) const +ObserverTopic::getContext(int serial) const { ostringstream os; os << serial; Ice::Context context; - context[name] = os.str(); + context["serial"] = os.str(); return context; } @@ -233,7 +338,7 @@ RegistryObserverTopic::registryDown(const string& name) } void -RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name) { RegistryObserverPrx observer = RegistryObserverPrx::uncheckedCast(obsv); RegistryInfoSeq registries; @@ -247,8 +352,8 @@ RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv) } serial = _serial; } - observer->registryInit_async(new InitCB<AMI_RegistryObserver_registryInit>(this, observer, "registry", serial), - registries); + observer->registryInit_async(new InitCB<AMI_RegistryObserver_registryInit>(this, observer, name, "registry", + serial), registries); } NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager, @@ -431,7 +536,7 @@ NodeObserverTopic::nodeDown(const string& name) } void -NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name) { NodeObserverPrx observer = NodeObserverPrx::uncheckedCast(obsv); int serial; @@ -445,7 +550,7 @@ NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv) } serial = _serial; } - observer->nodeInit_async(new InitCB<AMI_NodeObserver_nodeInit>(this, observer, "node", serial), nodes); + observer->nodeInit_async(new InitCB<AMI_NodeObserver_nodeInit>(this, observer, name, "node", serial), nodes); } ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager, @@ -472,7 +577,7 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& } try { - _publisher->applicationInit(serial, apps, getContext("application", serial)); + _publisher->applicationInit(serial, apps, getContext(serial)); } catch(const Ice::LocalException& ex) { @@ -493,13 +598,14 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in _applications.insert(make_pair(info.descriptor.name, info)); try { - _publisher->applicationAdded(serial, info, getContext("application", serial)); + _publisher->applicationAdded(serial, info, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `applicationAdded' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void @@ -514,13 +620,14 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name) _applications.erase(name); try { - _publisher->applicationRemoved(serial, name, getContext("application", serial)); + _publisher->applicationRemoved(serial, name, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void @@ -566,17 +673,18 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate } try { - _publisher->applicationUpdated(serial, info, getContext("application", serial)); + _publisher->applicationUpdated(serial, info, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void -ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name) { ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv); int serial; @@ -590,8 +698,8 @@ ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv) applications.push_back(p->second); } } - observer->applicationInit_async(new InitCB<AMI_ApplicationObserver_applicationInit>(this, observer, "application", - serial), + observer->applicationInit_async(new InitCB<AMI_ApplicationObserver_applicationInit>(this, observer, name, + "application", serial), serial, applications); } @@ -619,7 +727,7 @@ AdapterObserverTopic::adapterInit(int serial, const AdapterInfoSeq& adpts) } try { - _publisher->adapterInit(adpts, getContext("adapter", serial)); + _publisher->adapterInit(adpts, getContext(serial)); } catch(const Ice::LocalException& ex) { @@ -640,13 +748,14 @@ AdapterObserverTopic::adapterAdded(int serial, const AdapterInfo& info) _adapters.insert(make_pair(info.id, info)); try { - _publisher->adapterAdded(info, getContext("adapter", serial)); + _publisher->adapterAdded(info, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `adapterAdded' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void @@ -661,13 +770,14 @@ AdapterObserverTopic::adapterUpdated(int serial, const AdapterInfo& info) _adapters[info.id] = info; try { - _publisher->adapterUpdated(info, getContext("adapter", serial)); + _publisher->adapterUpdated(info, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void @@ -682,17 +792,18 @@ AdapterObserverTopic::adapterRemoved(int serial, const string& id) _adapters.erase(id); try { - _publisher->adapterRemoved(id, getContext("adapter", serial)); + _publisher->adapterRemoved(id, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void -AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name) { AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv); int serial; @@ -706,7 +817,7 @@ AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv) adapters.push_back(p->second); } } - observer->adapterInit_async(new InitCB<AMI_AdapterObserver_adapterInit>(this, observer, "adapter", serial), + observer->adapterInit_async(new InitCB<AMI_AdapterObserver_adapterInit>(this, observer, name, "adapter", serial), adapters); } @@ -734,7 +845,7 @@ ObjectObserverTopic::objectInit(int serial, const ObjectInfoSeq& objects) } try { - _publisher->objectInit(objects, getContext("object", serial)); + _publisher->objectInit(objects, getContext(serial)); } catch(const Ice::LocalException& ex) { @@ -755,13 +866,14 @@ ObjectObserverTopic::objectAdded(int serial, const ObjectInfo& info) _objects.insert(make_pair(info.proxy->ice_getIdentity(), info)); try { - _publisher->objectAdded(info, getContext("object", serial)); + _publisher->objectAdded(info, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `objectAdded' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void @@ -776,13 +888,14 @@ ObjectObserverTopic::objectUpdated(int serial, const ObjectInfo& info) _objects[info.proxy->ice_getIdentity()] = info; try { - _publisher->objectUpdated(info, getContext("object", serial)); + _publisher->objectUpdated(info, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void @@ -797,17 +910,18 @@ ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id) _objects.erase(id); try { - _publisher->objectRemoved(id, getContext("object", serial)); + _publisher->objectRemoved(id, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `objectRemoved' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void -ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name) { ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv); int serial; @@ -820,7 +934,8 @@ ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv) objects.push_back(p->second); } } - observer->objectInit_async(new InitCB<AMI_ObjectObserver_objectInit>(this, observer, "object", serial), objects); + observer->objectInit_async(new InitCB<AMI_ObjectObserver_objectInit>(this, observer, name, "object", serial), + objects); } |