diff options
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 39 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.h | 1 |
2 files changed, 25 insertions, 15 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 582a5f5da5b..b342b2fc27c 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -159,27 +159,21 @@ ObserverTopic::waitForSyncedSubscribers(int serial, const string& name) } void -ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name) +ObserverTopic::addExpectedUpdate(int serial) { - if(_syncSubscribers.empty() && name.empty()) + if(_syncSubscribers.empty()) { return; } - if(name.empty()) - { - assert(_waitForUpdates[serial].empty()); - _waitForUpdates[serial] = _syncSubscribers; - } - else - { - if(_syncSubscribers.find(name) == _syncSubscribers.end()) - { - return; // Not subscribed anymore. - } - _waitForUpdates[serial].insert(name); - } + // Must be called with the lock held. + assert(_waitForUpdates[serial].empty()); + _waitForUpdates[serial] = _syncSubscribers; +} +void +ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name) +{ // // Wait until all the updates are received. // @@ -206,6 +200,10 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name) } else { + if(!name.empty() && p->second.find(name) == p->second.end()) + { + return; + } wait(); } } @@ -546,6 +544,7 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in Ice::Warning out(_logger); out << "unexpected exception while publishing `applicationAdded' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -568,6 +567,7 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name) Ice::Warning out(_logger); out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -621,6 +621,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate Ice::Warning out(_logger); out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -688,6 +689,7 @@ AdapterObserverTopic::adapterAdded(int serial, const AdapterInfo& info) Ice::Warning out(_logger); out << "unexpected exception while publishing `adapterAdded' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -710,6 +712,7 @@ AdapterObserverTopic::adapterUpdated(int serial, const AdapterInfo& info) Ice::Warning out(_logger); out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -732,6 +735,7 @@ AdapterObserverTopic::adapterRemoved(int serial, const string& id) Ice::Warning out(_logger); out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -799,6 +803,7 @@ ObjectObserverTopic::objectAdded(int serial, const ObjectInfo& info) Ice::Warning out(_logger); out << "unexpected exception while publishing `objectAdded' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -821,6 +826,7 @@ ObjectObserverTopic::objectUpdated(int serial, const ObjectInfo& info) Ice::Warning out(_logger); out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -843,6 +849,7 @@ ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id) Ice::Warning out(_logger); out << "unexpected exception while publishing `objectRemoved' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -891,6 +898,7 @@ ObjectObserverTopic::objectsAddedOrUpdated(int serial, const ObjectInfoSeq& info // We don't wait for the update to be received by the replicas // here. This operation is called by ReplicaSessionI. // + addExpectedUpdate(serial); //waitForSyncedSubscribersNoSync(serial); } @@ -923,6 +931,7 @@ ObjectObserverTopic::objectsRemoved(int serial, const ObjectInfoSeq& infos) // replicas here. This operation is only called internaly by // IceGrid. // + addExpectedUpdate(serial); //waitForSyncedSubscribersNoSync(serial); } diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h index dc480fdc5d1..72d496db3fa 100644 --- a/cpp/src/IceGrid/Topics.h +++ b/cpp/src/IceGrid/Topics.h @@ -40,6 +40,7 @@ public: protected: + void addExpectedUpdate(int); void waitForSyncedSubscribersNoSync(int, const std::string& = std::string()); void updateSerial(int); Ice::Context getContext(int) const; |