diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-11-29 09:25:19 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-11-29 09:25:19 +0000 |
commit | 55d4301510cd67fc11638255df6b94574f9d49b5 (patch) | |
tree | f4a631a5e8de10c511c6bc3ee9d8aad3c6b577d2 /cpp/src/IceGrid/Topics.cpp | |
parent | various file chooser enhancements (diff) | |
download | ice-55d4301510cd67fc11638255df6b94574f9d49b5.tar.bz2 ice-55d4301510cd67fc11638255df6b94574f9d49b5.tar.xz ice-55d4301510cd67fc11638255df6b94574f9d49b5.zip |
Fixes
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 101 |
1 files changed, 61 insertions, 40 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 34d80ab18a9..968a733ed45 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -72,6 +72,7 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name) { assert(_syncSubscribers.find(name) == _syncSubscribers.end()); _syncSubscribers.insert(name); + addExpectedUpdate(_serial, name); waitForSyncedSubscribersNoSync(_serial, name); } } @@ -146,8 +147,9 @@ ObserverTopic::receivedUpdate(const string& name, int serial, const string& fail if(p->second.empty()) { _waitForUpdates.erase(p); - notifyAll(); } + + notifyAll(); } } @@ -159,21 +161,33 @@ ObserverTopic::waitForSyncedSubscribers(int serial, const string& name) } void -ObserverTopic::addExpectedUpdate(int serial) +ObserverTopic::addExpectedUpdate(int serial, const string& name) { - if(_syncSubscribers.empty()) + if(_syncSubscribers.empty() && name.empty()) { return; } // Must be called with the lock held. - assert(_waitForUpdates[serial].empty()); - _waitForUpdates[serial] = _syncSubscribers; + if(name.empty()) + { + assert(_waitForUpdates[serial].empty()); + _waitForUpdates[serial] = _syncSubscribers; + } + else + { + _waitForUpdates[serial].insert(name); + } } void ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name) { + if(serial < 0) + { + return; + } + // // Wait until all the updates are received. // @@ -500,13 +514,13 @@ ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerP const_cast<ApplicationObserverPrx&>(_publisher) = ApplicationObserverPrx::uncheckedCast(_basePublisher); } -void +int ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& apps) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(serial); _applications.clear(); @@ -523,16 +537,19 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& Ice::Warning out(_logger); out << "unexpected exception while publishing `applicationInit' update:\n" << ex; } + addExpectedUpdate(serial); + return serial; } -void +int ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& info) { Lock sync(*this); if(!_topic) { - return; + return -1; } + updateSerial(serial); _applications.insert(make_pair(info.descriptor.name, info)); try @@ -545,16 +562,16 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in out << "unexpected exception while publishing `applicationAdded' update:\n" << ex; } addExpectedUpdate(serial); - waitForSyncedSubscribersNoSync(serial); + return serial; } -void +int ApplicationObserverTopic::applicationRemoved(int serial, const string& name) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(serial); _applications.erase(name); @@ -568,16 +585,16 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name) out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex; } addExpectedUpdate(serial); - waitForSyncedSubscribersNoSync(serial); + return serial; } -void +int ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInfo& info) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(serial); @@ -622,7 +639,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex; } addExpectedUpdate(serial); - waitForSyncedSubscribersNoSync(serial); + return serial; } void @@ -654,13 +671,13 @@ AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topi const_cast<AdapterObserverPrx&>(_publisher) = AdapterObserverPrx::uncheckedCast(_basePublisher); } -void +int AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(_serial + 1); _adapters.clear(); @@ -677,15 +694,17 @@ AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts) Ice::Warning out(_logger); out << "unexpected exception while publishing `adapterInit' update:\n" << ex; } + addExpectedUpdate(_serial); + return _serial; } -void +int AdapterObserverTopic::adapterAdded(const AdapterInfo& info) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(_serial + 1); _adapters.insert(make_pair(info.id, info)); @@ -699,16 +718,16 @@ AdapterObserverTopic::adapterAdded(const AdapterInfo& info) out << "unexpected exception while publishing `adapterAdded' update:\n" << ex; } addExpectedUpdate(_serial); - waitForSyncedSubscribersNoSync(_serial); + return _serial; } -void +int AdapterObserverTopic::adapterUpdated(const AdapterInfo& info) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(_serial + 1); _adapters[info.id] = info; @@ -722,16 +741,16 @@ AdapterObserverTopic::adapterUpdated(const AdapterInfo& info) out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex; } addExpectedUpdate(_serial); - waitForSyncedSubscribersNoSync(_serial); + return _serial; } -void +int AdapterObserverTopic::adapterRemoved(const string& id) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(_serial + 1); _adapters.erase(id); @@ -745,7 +764,7 @@ AdapterObserverTopic::adapterRemoved(const string& id) out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex; } addExpectedUpdate(_serial); - waitForSyncedSubscribersNoSync(_serial); + return _serial; } void @@ -777,13 +796,13 @@ ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicM const_cast<ObjectObserverPrx&>(_publisher) = ObjectObserverPrx::uncheckedCast(_basePublisher); } -void +int ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(_serial + 1); _objects.clear(); @@ -800,15 +819,17 @@ ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects) Ice::Warning out(_logger); out << "unexpected exception while publishing `objectInit' update:\n" << ex; } + addExpectedUpdate(_serial); + return _serial; } -void +int ObjectObserverTopic::objectAdded(const ObjectInfo& info) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(_serial + 1); _objects.insert(make_pair(info.proxy->ice_getIdentity(), info)); @@ -822,16 +843,16 @@ ObjectObserverTopic::objectAdded(const ObjectInfo& info) out << "unexpected exception while publishing `objectAdded' update:\n" << ex; } addExpectedUpdate(_serial); - waitForSyncedSubscribersNoSync(_serial); + return _serial; } -void +int ObjectObserverTopic::objectUpdated(const ObjectInfo& info) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(_serial + 1); _objects[info.proxy->ice_getIdentity()] = info; @@ -845,16 +866,16 @@ ObjectObserverTopic::objectUpdated(const ObjectInfo& info) out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; } addExpectedUpdate(_serial); - waitForSyncedSubscribersNoSync(_serial); + return _serial; } -void +int ObjectObserverTopic::objectRemoved(const Ice::Identity& id) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(_serial + 1); _objects.erase(id); @@ -868,7 +889,7 @@ ObjectObserverTopic::objectRemoved(const Ice::Identity& id) out << "unexpected exception while publishing `objectRemoved' update:\n" << ex; } addExpectedUpdate(_serial); - waitForSyncedSubscribersNoSync(_serial); + return _serial; } int @@ -877,7 +898,7 @@ ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos) Lock sync(*this); if(!_topic) { - return _serial; + return -1; } updateSerial(_serial + 1); @@ -927,7 +948,7 @@ ObjectObserverTopic::objectsRemoved(const ObjectInfoSeq& infos) Lock sync(*this); if(!_topic) { - return _serial; + return -1; } updateSerial(_serial + 1); |