diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-09-06 15:39:41 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-09-06 15:39:41 +0000 |
commit | 6f46bb760b30ef883b386dfa8e695c8d5004f05f (patch) | |
tree | c3dabd2d404b72a8e4ad16996a913ceee963815e /cpp/src/IceGrid/Topics.cpp | |
parent | Fixed bug 1209 (diff) | |
download | ice-6f46bb760b30ef883b386dfa8e695c8d5004f05f.tar.bz2 ice-6f46bb760b30ef883b386dfa8e695c8d5004f05f.tar.xz ice-6f46bb760b30ef883b386dfa8e695c8d5004f05f.zip |
The master now waits for the replicas to be updated before to return.
Added support for dynamic registration of adapters in the replicas.
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 53 |
1 files changed, 44 insertions, 9 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 3fcf4ac924f..6fd3f381a4e 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -103,6 +103,23 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, int serial) } void +ObserverTopic::subscribeAndWaitForSubscription(const Ice::ObjectPrx& obsv) +{ + { + Lock sync(*this); + _waitForSubscribe.insert(obsv->ice_getIdentity()); + } + subscribe(obsv); + { + Lock sync(*this); + while(_topic && _waitForSubscribe.find(obsv->ice_getIdentity()) != _waitForSubscribe.end()) + { + wait(); + } + } +} + +void ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer) { Lock sync(*this); @@ -117,6 +134,7 @@ ObserverTopic::destroy() { Lock sync(*this); _topic = 0; + notifyAll(); } void @@ -131,6 +149,9 @@ ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer) IceStorm::QoS qos; qos["reliability"] = "twoway ordered"; _topic->subscribe(qos, observer); + + _waitForSubscribe.erase(observer->ice_getIdentity()); + notifyAll(); } void @@ -149,6 +170,17 @@ ObserverTopic::updateSerial(int serial) notifyAll(); } +Ice::Context +ObserverTopic::getContext(const string& name, int serial) const +{ + ostringstream os; + os << serial; + + Ice::Context context; + context[name] = os.str(); + return context; +} + RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : ObserverTopic(topicManager, "RegistryObserver") { @@ -390,6 +422,7 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& { _applications.insert(make_pair(p->descriptor.name, *p)); } + _publisher->applicationInit(serial, apps, getContext("application", serial)); } void @@ -402,7 +435,7 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in } updateSerial(serial); _applications.insert(make_pair(info.descriptor.name, info)); - _publisher->applicationAdded(serial, info); + _publisher->applicationAdded(serial, info, getContext("application", serial)); } void @@ -415,7 +448,7 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name) } updateSerial(serial); _applications.erase(name); - _publisher->applicationRemoved(serial, name); + _publisher->applicationRemoved(serial, name, getContext("application", serial)); } void @@ -459,7 +492,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate { assert(false); } - _publisher->applicationUpdated(serial, info); + _publisher->applicationUpdated(serial, info, getContext("application", serial)); } void @@ -504,6 +537,7 @@ AdapterObserverTopic::adapterInit(int serial, const AdapterInfoSeq& adpts) { _adapters.insert(make_pair(q->id, *q)); } + _publisher->adapterInit(adpts, getContext("adapter", serial)); } void @@ -516,7 +550,7 @@ AdapterObserverTopic::adapterAdded(int serial, const AdapterInfo& info) } updateSerial(serial); _adapters.insert(make_pair(info.id, info)); - _publisher->adapterAdded(info); + _publisher->adapterAdded(info, getContext("adapter", serial)); } void @@ -529,7 +563,7 @@ AdapterObserverTopic::adapterUpdated(int serial, const AdapterInfo& info) } updateSerial(serial); _adapters[info.id] = info; - _publisher->adapterUpdated(info); + _publisher->adapterUpdated(info, getContext("adapter", serial)); } void @@ -542,7 +576,7 @@ AdapterObserverTopic::adapterRemoved(int serial, const string& id) } updateSerial(serial); _adapters.erase(id); - _publisher->adapterRemoved(id); + _publisher->adapterRemoved(id, getContext("adapter", serial)); } void @@ -586,6 +620,7 @@ ObjectObserverTopic::objectInit(int serial, const ObjectInfoSeq& objects) { _objects.insert(make_pair(r->proxy->ice_getIdentity(), *r)); } + _publisher->objectInit(objects, getContext("object", serial)); } void @@ -598,7 +633,7 @@ ObjectObserverTopic::objectAdded(int serial, const ObjectInfo& info) } updateSerial(serial); _objects.insert(make_pair(info.proxy->ice_getIdentity(), info)); - _publisher->objectAdded(info); + _publisher->objectAdded(info, getContext("object", serial)); } void @@ -611,7 +646,7 @@ ObjectObserverTopic::objectUpdated(int serial, const ObjectInfo& info) } updateSerial(serial); _objects[info.proxy->ice_getIdentity()] = info; - _publisher->objectUpdated(info); + _publisher->objectUpdated(info, getContext("object", serial)); } void @@ -624,7 +659,7 @@ ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id) } updateSerial(serial); _objects.erase(id); - _publisher->objectRemoved(id); + _publisher->objectRemoved(id, getContext("object", serial)); } void |