summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Topics.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r--cpp/src/IceGrid/Topics.cpp111
1 files changed, 101 insertions, 10 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 48dbaea12dd..cbb3f7766dc 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -43,7 +43,7 @@ ObserverTopic::~ObserverTopic()
}
void
-ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name, int serial)
+ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name)
{
Lock sync(*this);
if(!_topic)
@@ -57,7 +57,9 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name, int ser
if(!name.empty())
{
+ assert(_syncSubscribers.find(name) == _syncSubscribers.end());
_syncSubscribers.insert(name);
+ waitForSyncedSubscribers(_serial, name);
}
}
@@ -72,6 +74,7 @@ ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name)
if(!name.empty())
{
+ assert(_syncSubscribers.find(name) != _syncSubscribers.end());
_syncSubscribers.erase(name);
map<int, set<string> >::iterator p = _waitForUpdates.begin();
@@ -109,7 +112,6 @@ void
ObserverTopic::receivedUpdate(const string& name, int serial, const string& failure)
{
Lock sync(*this);
-
map<int, set<string> >::iterator p = _waitForUpdates.find(serial);
if(p != _waitForUpdates.end())
{
@@ -134,14 +136,22 @@ ObserverTopic::receivedUpdate(const string& name, int serial, const string& fail
}
void
-ObserverTopic::waitForSyncedSubscribers(int serial)
+ObserverTopic::waitForSyncedSubscribers(int serial, const string& name)
{
- if(_syncSubscribers.empty())
+ if(_syncSubscribers.empty() && name.empty())
{
return;
}
- _waitForUpdates.insert(make_pair(serial, _syncSubscribers));
+ if(name.empty())
+ {
+ assert(_waitForUpdates[serial].empty());
+ _waitForUpdates[serial] = _syncSubscribers;
+ }
+ else
+ {
+ _waitForUpdates[serial].insert(name);
+ }
//
// Wait until all the updates are received.
@@ -262,7 +272,7 @@ RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
registries.push_back(p->second);
}
- observer->registryInit(registries, getContext(-1));
+ observer->registryInit(registries, getContext(_serial));
}
NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
@@ -454,7 +464,7 @@ NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
nodes.push_back(p->second);
}
- observer->nodeInit(nodes, getContext(-1));
+ observer->nodeInit(nodes, getContext(_serial));
}
ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
@@ -596,7 +606,7 @@ ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
applications.push_back(p->second);
}
- observer->applicationInit(_serial, applications, getContext(-1));
+ observer->applicationInit(_serial, applications, getContext(_serial));
}
AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
@@ -707,7 +717,7 @@ AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
adapters.push_back(p->second);
}
- observer->adapterInit(adapters, getContext(-1));
+ observer->adapterInit(adapters, getContext(_serial));
}
ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
@@ -810,6 +820,87 @@ ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id)
}
void
+ObjectObserverTopic::objectsAddedOrUpdated(int serial, const ObjectInfoSeq& infos)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return;
+ }
+ updateSerial(serial);
+
+ for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
+ {
+ map<Ice::Identity, ObjectInfo>::iterator q = _objects.find(p->proxy->ice_getIdentity());
+ if(q != _objects.end())
+ {
+ q->second = *p;
+ try
+ {
+ _publisher->objectUpdated(*p, getContext(serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
+ }
+ }
+ else
+ {
+ _objects.insert(make_pair(p->proxy->ice_getIdentity(), *p));
+ try
+ {
+ _publisher->objectAdded(*p, getContext(serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
+ }
+ }
+ }
+
+ //
+ // We don't need to wait for the update to be received by the
+ // replicas here. This operation is only called internaly by
+ // IceGrid.
+ //
+ //waitForSyncedSubscribers(serial);
+}
+
+void
+ObjectObserverTopic::objectsRemoved(int serial, const ObjectInfoSeq& infos)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return;
+ }
+ updateSerial(serial);
+
+ for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
+ {
+ _objects.erase(p->proxy->ice_getIdentity());
+ try
+ {
+ _publisher->objectRemoved(p->proxy->ice_getIdentity(), getContext(serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
+ }
+ }
+
+ //
+ // We don't need to wait for the update to be received by the
+ // replicas here. This operation is only called internaly by
+ // IceGrid.
+ //
+ //waitForSyncedSubscribers(serial);
+}
+
+void
ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv);
@@ -818,5 +909,5 @@ ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
objects.push_back(p->second);
}
- observer->objectInit(objects, getContext(-1));
+ observer->objectInit(objects, getContext(_serial));
}