summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Topics.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-11-29 09:25:19 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-11-29 09:25:19 +0000
commit55d4301510cd67fc11638255df6b94574f9d49b5 (patch)
treef4a631a5e8de10c511c6bc3ee9d8aad3c6b577d2 /cpp/src/IceGrid/Topics.cpp
parentvarious file chooser enhancements (diff)
downloadice-55d4301510cd67fc11638255df6b94574f9d49b5.tar.bz2
ice-55d4301510cd67fc11638255df6b94574f9d49b5.tar.xz
ice-55d4301510cd67fc11638255df6b94574f9d49b5.zip
Fixes
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r--cpp/src/IceGrid/Topics.cpp101
1 files changed, 61 insertions, 40 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 34d80ab18a9..968a733ed45 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -72,6 +72,7 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name)
{
assert(_syncSubscribers.find(name) == _syncSubscribers.end());
_syncSubscribers.insert(name);
+ addExpectedUpdate(_serial, name);
waitForSyncedSubscribersNoSync(_serial, name);
}
}
@@ -146,8 +147,9 @@ ObserverTopic::receivedUpdate(const string& name, int serial, const string& fail
if(p->second.empty())
{
_waitForUpdates.erase(p);
- notifyAll();
}
+
+ notifyAll();
}
}
@@ -159,21 +161,33 @@ ObserverTopic::waitForSyncedSubscribers(int serial, const string& name)
}
void
-ObserverTopic::addExpectedUpdate(int serial)
+ObserverTopic::addExpectedUpdate(int serial, const string& name)
{
- if(_syncSubscribers.empty())
+ if(_syncSubscribers.empty() && name.empty())
{
return;
}
// Must be called with the lock held.
- assert(_waitForUpdates[serial].empty());
- _waitForUpdates[serial] = _syncSubscribers;
+ if(name.empty())
+ {
+ assert(_waitForUpdates[serial].empty());
+ _waitForUpdates[serial] = _syncSubscribers;
+ }
+ else
+ {
+ _waitForUpdates[serial].insert(name);
+ }
}
void
ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
{
+ if(serial < 0)
+ {
+ return;
+ }
+
//
// Wait until all the updates are received.
//
@@ -500,13 +514,13 @@ ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerP
const_cast<ApplicationObserverPrx&>(_publisher) = ApplicationObserverPrx::uncheckedCast(_basePublisher);
}
-void
+int
ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& apps)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(serial);
_applications.clear();
@@ -523,16 +537,19 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq&
Ice::Warning out(_logger);
out << "unexpected exception while publishing `applicationInit' update:\n" << ex;
}
+ addExpectedUpdate(serial);
+ return serial;
}
-void
+int
ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& info)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
+
updateSerial(serial);
_applications.insert(make_pair(info.descriptor.name, info));
try
@@ -545,16 +562,16 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in
out << "unexpected exception while publishing `applicationAdded' update:\n" << ex;
}
addExpectedUpdate(serial);
- waitForSyncedSubscribersNoSync(serial);
+ return serial;
}
-void
+int
ApplicationObserverTopic::applicationRemoved(int serial, const string& name)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(serial);
_applications.erase(name);
@@ -568,16 +585,16 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name)
out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex;
}
addExpectedUpdate(serial);
- waitForSyncedSubscribersNoSync(serial);
+ return serial;
}
-void
+int
ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInfo& info)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(serial);
@@ -622,7 +639,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate
out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex;
}
addExpectedUpdate(serial);
- waitForSyncedSubscribersNoSync(serial);
+ return serial;
}
void
@@ -654,13 +671,13 @@ AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topi
const_cast<AdapterObserverPrx&>(_publisher) = AdapterObserverPrx::uncheckedCast(_basePublisher);
}
-void
+int
AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(_serial + 1);
_adapters.clear();
@@ -677,15 +694,17 @@ AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts)
Ice::Warning out(_logger);
out << "unexpected exception while publishing `adapterInit' update:\n" << ex;
}
+ addExpectedUpdate(_serial);
+ return _serial;
}
-void
+int
AdapterObserverTopic::adapterAdded(const AdapterInfo& info)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(_serial + 1);
_adapters.insert(make_pair(info.id, info));
@@ -699,16 +718,16 @@ AdapterObserverTopic::adapterAdded(const AdapterInfo& info)
out << "unexpected exception while publishing `adapterAdded' update:\n" << ex;
}
addExpectedUpdate(_serial);
- waitForSyncedSubscribersNoSync(_serial);
+ return _serial;
}
-void
+int
AdapterObserverTopic::adapterUpdated(const AdapterInfo& info)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(_serial + 1);
_adapters[info.id] = info;
@@ -722,16 +741,16 @@ AdapterObserverTopic::adapterUpdated(const AdapterInfo& info)
out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex;
}
addExpectedUpdate(_serial);
- waitForSyncedSubscribersNoSync(_serial);
+ return _serial;
}
-void
+int
AdapterObserverTopic::adapterRemoved(const string& id)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(_serial + 1);
_adapters.erase(id);
@@ -745,7 +764,7 @@ AdapterObserverTopic::adapterRemoved(const string& id)
out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex;
}
addExpectedUpdate(_serial);
- waitForSyncedSubscribersNoSync(_serial);
+ return _serial;
}
void
@@ -777,13 +796,13 @@ ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicM
const_cast<ObjectObserverPrx&>(_publisher) = ObjectObserverPrx::uncheckedCast(_basePublisher);
}
-void
+int
ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(_serial + 1);
_objects.clear();
@@ -800,15 +819,17 @@ ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects)
Ice::Warning out(_logger);
out << "unexpected exception while publishing `objectInit' update:\n" << ex;
}
+ addExpectedUpdate(_serial);
+ return _serial;
}
-void
+int
ObjectObserverTopic::objectAdded(const ObjectInfo& info)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(_serial + 1);
_objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
@@ -822,16 +843,16 @@ ObjectObserverTopic::objectAdded(const ObjectInfo& info)
out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
}
addExpectedUpdate(_serial);
- waitForSyncedSubscribersNoSync(_serial);
+ return _serial;
}
-void
+int
ObjectObserverTopic::objectUpdated(const ObjectInfo& info)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(_serial + 1);
_objects[info.proxy->ice_getIdentity()] = info;
@@ -845,16 +866,16 @@ ObjectObserverTopic::objectUpdated(const ObjectInfo& info)
out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
}
addExpectedUpdate(_serial);
- waitForSyncedSubscribersNoSync(_serial);
+ return _serial;
}
-void
+int
ObjectObserverTopic::objectRemoved(const Ice::Identity& id)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(_serial + 1);
_objects.erase(id);
@@ -868,7 +889,7 @@ ObjectObserverTopic::objectRemoved(const Ice::Identity& id)
out << "unexpected exception while publishing `objectRemoved' update:\n" << ex;
}
addExpectedUpdate(_serial);
- waitForSyncedSubscribersNoSync(_serial);
+ return _serial;
}
int
@@ -877,7 +898,7 @@ ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos)
Lock sync(*this);
if(!_topic)
{
- return _serial;
+ return -1;
}
updateSerial(_serial + 1);
@@ -927,7 +948,7 @@ ObjectObserverTopic::objectsRemoved(const ObjectInfoSeq& infos)
Lock sync(*this);
if(!_topic)
{
- return _serial;
+ return -1;
}
updateSerial(_serial + 1);