summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Topics.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-09-06 15:39:41 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-09-06 15:39:41 +0000
commit6f46bb760b30ef883b386dfa8e695c8d5004f05f (patch)
treec3dabd2d404b72a8e4ad16996a913ceee963815e /cpp/src/IceGrid/Topics.cpp
parentFixed bug 1209 (diff)
downloadice-6f46bb760b30ef883b386dfa8e695c8d5004f05f.tar.bz2
ice-6f46bb760b30ef883b386dfa8e695c8d5004f05f.tar.xz
ice-6f46bb760b30ef883b386dfa8e695c8d5004f05f.zip
The master now waits for the replicas to be updated before to return.
Added support for dynamic registration of adapters in the replicas.
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r--cpp/src/IceGrid/Topics.cpp53
1 files changed, 44 insertions, 9 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 3fcf4ac924f..6fd3f381a4e 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -103,6 +103,23 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, int serial)
}
void
+ObserverTopic::subscribeAndWaitForSubscription(const Ice::ObjectPrx& obsv)
+{
+ {
+ Lock sync(*this);
+ _waitForSubscribe.insert(obsv->ice_getIdentity());
+ }
+ subscribe(obsv);
+ {
+ Lock sync(*this);
+ while(_topic && _waitForSubscribe.find(obsv->ice_getIdentity()) != _waitForSubscribe.end())
+ {
+ wait();
+ }
+ }
+}
+
+void
ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer)
{
Lock sync(*this);
@@ -117,6 +134,7 @@ ObserverTopic::destroy()
{
Lock sync(*this);
_topic = 0;
+ notifyAll();
}
void
@@ -131,6 +149,9 @@ ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer)
IceStorm::QoS qos;
qos["reliability"] = "twoway ordered";
_topic->subscribe(qos, observer);
+
+ _waitForSubscribe.erase(observer->ice_getIdentity());
+ notifyAll();
}
void
@@ -149,6 +170,17 @@ ObserverTopic::updateSerial(int serial)
notifyAll();
}
+Ice::Context
+ObserverTopic::getContext(const string& name, int serial) const
+{
+ ostringstream os;
+ os << serial;
+
+ Ice::Context context;
+ context[name] = os.str();
+ return context;
+}
+
RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) :
ObserverTopic(topicManager, "RegistryObserver")
{
@@ -390,6 +422,7 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq&
{
_applications.insert(make_pair(p->descriptor.name, *p));
}
+ _publisher->applicationInit(serial, apps, getContext("application", serial));
}
void
@@ -402,7 +435,7 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in
}
updateSerial(serial);
_applications.insert(make_pair(info.descriptor.name, info));
- _publisher->applicationAdded(serial, info);
+ _publisher->applicationAdded(serial, info, getContext("application", serial));
}
void
@@ -415,7 +448,7 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name)
}
updateSerial(serial);
_applications.erase(name);
- _publisher->applicationRemoved(serial, name);
+ _publisher->applicationRemoved(serial, name, getContext("application", serial));
}
void
@@ -459,7 +492,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate
{
assert(false);
}
- _publisher->applicationUpdated(serial, info);
+ _publisher->applicationUpdated(serial, info, getContext("application", serial));
}
void
@@ -504,6 +537,7 @@ AdapterObserverTopic::adapterInit(int serial, const AdapterInfoSeq& adpts)
{
_adapters.insert(make_pair(q->id, *q));
}
+ _publisher->adapterInit(adpts, getContext("adapter", serial));
}
void
@@ -516,7 +550,7 @@ AdapterObserverTopic::adapterAdded(int serial, const AdapterInfo& info)
}
updateSerial(serial);
_adapters.insert(make_pair(info.id, info));
- _publisher->adapterAdded(info);
+ _publisher->adapterAdded(info, getContext("adapter", serial));
}
void
@@ -529,7 +563,7 @@ AdapterObserverTopic::adapterUpdated(int serial, const AdapterInfo& info)
}
updateSerial(serial);
_adapters[info.id] = info;
- _publisher->adapterUpdated(info);
+ _publisher->adapterUpdated(info, getContext("adapter", serial));
}
void
@@ -542,7 +576,7 @@ AdapterObserverTopic::adapterRemoved(int serial, const string& id)
}
updateSerial(serial);
_adapters.erase(id);
- _publisher->adapterRemoved(id);
+ _publisher->adapterRemoved(id, getContext("adapter", serial));
}
void
@@ -586,6 +620,7 @@ ObjectObserverTopic::objectInit(int serial, const ObjectInfoSeq& objects)
{
_objects.insert(make_pair(r->proxy->ice_getIdentity(), *r));
}
+ _publisher->objectInit(objects, getContext("object", serial));
}
void
@@ -598,7 +633,7 @@ ObjectObserverTopic::objectAdded(int serial, const ObjectInfo& info)
}
updateSerial(serial);
_objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
- _publisher->objectAdded(info);
+ _publisher->objectAdded(info, getContext("object", serial));
}
void
@@ -611,7 +646,7 @@ ObjectObserverTopic::objectUpdated(int serial, const ObjectInfo& info)
}
updateSerial(serial);
_objects[info.proxy->ice_getIdentity()] = info;
- _publisher->objectUpdated(info);
+ _publisher->objectUpdated(info, getContext("object", serial));
}
void
@@ -624,7 +659,7 @@ ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id)
}
updateSerial(serial);
_objects.erase(id);
- _publisher->objectRemoved(id);
+ _publisher->objectRemoved(id, getContext("object", serial));
}
void