From 7254df3066ca33d7326f17bf8f63995dcfc216f3 Mon Sep 17 00:00:00 2001 From: Benoit Foucher Date: Mon, 23 Oct 2006 15:59:28 +0000 Subject: Fixed race condition --- cpp/src/IceGrid/Topics.cpp | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) (limited to 'cpp/src/IceGrid/Topics.cpp') diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 582a5f5da5b..b342b2fc27c 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -159,27 +159,21 @@ ObserverTopic::waitForSyncedSubscribers(int serial, const string& name) } void -ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name) +ObserverTopic::addExpectedUpdate(int serial) { - if(_syncSubscribers.empty() && name.empty()) + if(_syncSubscribers.empty()) { return; } - if(name.empty()) - { - assert(_waitForUpdates[serial].empty()); - _waitForUpdates[serial] = _syncSubscribers; - } - else - { - if(_syncSubscribers.find(name) == _syncSubscribers.end()) - { - return; // Not subscribed anymore. - } - _waitForUpdates[serial].insert(name); - } + // Must be called with the lock held. + assert(_waitForUpdates[serial].empty()); + _waitForUpdates[serial] = _syncSubscribers; +} +void +ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name) +{ // // Wait until all the updates are received. // @@ -206,6 +200,10 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name) } else { + if(!name.empty() && p->second.find(name) == p->second.end()) + { + return; + } wait(); } } @@ -546,6 +544,7 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in Ice::Warning out(_logger); out << "unexpected exception while publishing `applicationAdded' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -568,6 +567,7 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name) Ice::Warning out(_logger); out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -621,6 +621,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate Ice::Warning out(_logger); out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -688,6 +689,7 @@ AdapterObserverTopic::adapterAdded(int serial, const AdapterInfo& info) Ice::Warning out(_logger); out << "unexpected exception while publishing `adapterAdded' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -710,6 +712,7 @@ AdapterObserverTopic::adapterUpdated(int serial, const AdapterInfo& info) Ice::Warning out(_logger); out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -732,6 +735,7 @@ AdapterObserverTopic::adapterRemoved(int serial, const string& id) Ice::Warning out(_logger); out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -799,6 +803,7 @@ ObjectObserverTopic::objectAdded(int serial, const ObjectInfo& info) Ice::Warning out(_logger); out << "unexpected exception while publishing `objectAdded' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -821,6 +826,7 @@ ObjectObserverTopic::objectUpdated(int serial, const ObjectInfo& info) Ice::Warning out(_logger); out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -843,6 +849,7 @@ ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id) Ice::Warning out(_logger); out << "unexpected exception while publishing `objectRemoved' update:\n" << ex; } + addExpectedUpdate(serial); waitForSyncedSubscribersNoSync(serial); } @@ -891,6 +898,7 @@ ObjectObserverTopic::objectsAddedOrUpdated(int serial, const ObjectInfoSeq& info // We don't wait for the update to be received by the replicas // here. This operation is called by ReplicaSessionI. // + addExpectedUpdate(serial); //waitForSyncedSubscribersNoSync(serial); } @@ -923,6 +931,7 @@ ObjectObserverTopic::objectsRemoved(int serial, const ObjectInfoSeq& infos) // replicas here. This operation is only called internaly by // IceGrid. // + addExpectedUpdate(serial); //waitForSyncedSubscribersNoSync(serial); } -- cgit v1.2.3