summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Topics.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r--cpp/src/IceGrid/Topics.cpp39
1 files changed, 24 insertions, 15 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 582a5f5da5b..b342b2fc27c 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -159,27 +159,21 @@ ObserverTopic::waitForSyncedSubscribers(int serial, const string& name)
}
void
-ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
+ObserverTopic::addExpectedUpdate(int serial)
{
- if(_syncSubscribers.empty() && name.empty())
+ if(_syncSubscribers.empty())
{
return;
}
- if(name.empty())
- {
- assert(_waitForUpdates[serial].empty());
- _waitForUpdates[serial] = _syncSubscribers;
- }
- else
- {
- if(_syncSubscribers.find(name) == _syncSubscribers.end())
- {
- return; // Not subscribed anymore.
- }
- _waitForUpdates[serial].insert(name);
- }
+ // Must be called with the lock held.
+ assert(_waitForUpdates[serial].empty());
+ _waitForUpdates[serial] = _syncSubscribers;
+}
+void
+ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
+{
//
// Wait until all the updates are received.
//
@@ -206,6 +200,10 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
}
else
{
+ if(!name.empty() && p->second.find(name) == p->second.end())
+ {
+ return;
+ }
wait();
}
}
@@ -546,6 +544,7 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in
Ice::Warning out(_logger);
out << "unexpected exception while publishing `applicationAdded' update:\n" << ex;
}
+ addExpectedUpdate(serial);
waitForSyncedSubscribersNoSync(serial);
}
@@ -568,6 +567,7 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name)
Ice::Warning out(_logger);
out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex;
}
+ addExpectedUpdate(serial);
waitForSyncedSubscribersNoSync(serial);
}
@@ -621,6 +621,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate
Ice::Warning out(_logger);
out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex;
}
+ addExpectedUpdate(serial);
waitForSyncedSubscribersNoSync(serial);
}
@@ -688,6 +689,7 @@ AdapterObserverTopic::adapterAdded(int serial, const AdapterInfo& info)
Ice::Warning out(_logger);
out << "unexpected exception while publishing `adapterAdded' update:\n" << ex;
}
+ addExpectedUpdate(serial);
waitForSyncedSubscribersNoSync(serial);
}
@@ -710,6 +712,7 @@ AdapterObserverTopic::adapterUpdated(int serial, const AdapterInfo& info)
Ice::Warning out(_logger);
out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex;
}
+ addExpectedUpdate(serial);
waitForSyncedSubscribersNoSync(serial);
}
@@ -732,6 +735,7 @@ AdapterObserverTopic::adapterRemoved(int serial, const string& id)
Ice::Warning out(_logger);
out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex;
}
+ addExpectedUpdate(serial);
waitForSyncedSubscribersNoSync(serial);
}
@@ -799,6 +803,7 @@ ObjectObserverTopic::objectAdded(int serial, const ObjectInfo& info)
Ice::Warning out(_logger);
out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
}
+ addExpectedUpdate(serial);
waitForSyncedSubscribersNoSync(serial);
}
@@ -821,6 +826,7 @@ ObjectObserverTopic::objectUpdated(int serial, const ObjectInfo& info)
Ice::Warning out(_logger);
out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
}
+ addExpectedUpdate(serial);
waitForSyncedSubscribersNoSync(serial);
}
@@ -843,6 +849,7 @@ ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id)
Ice::Warning out(_logger);
out << "unexpected exception while publishing `objectRemoved' update:\n" << ex;
}
+ addExpectedUpdate(serial);
waitForSyncedSubscribersNoSync(serial);
}
@@ -891,6 +898,7 @@ ObjectObserverTopic::objectsAddedOrUpdated(int serial, const ObjectInfoSeq& info
// We don't wait for the update to be received by the replicas
// here. This operation is called by ReplicaSessionI.
//
+ addExpectedUpdate(serial);
//waitForSyncedSubscribersNoSync(serial);
}
@@ -923,6 +931,7 @@ ObjectObserverTopic::objectsRemoved(int serial, const ObjectInfoSeq& infos)
// replicas here. This operation is only called internaly by
// IceGrid.
//
+ addExpectedUpdate(serial);
//waitForSyncedSubscribersNoSync(serial);
}