summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Topics.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-09-13 10:09:50 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-09-13 10:09:50 +0000
commite65e272d55bfe9f9d26de43b73721c04b683234a (patch)
tree0b0e04bbaf26fe357a1620e4725986d5d750c003 /cpp/src/IceGrid/Topics.cpp
parentFixed for VC6 (STLport bug?) (diff)
downloadice-e65e272d55bfe9f9d26de43b73721c04b683234a.tar.bz2
ice-e65e272d55bfe9f9d26de43b73721c04b683234a.tar.xz
ice-e65e272d55bfe9f9d26de43b73721c04b683234a.zip
Fixes
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r--cpp/src/IceGrid/Topics.cpp185
1 files changed, 150 insertions, 35 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 6eebdae97bc..5c896132f1b 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -22,9 +22,11 @@ class InitCB : public T
{
public:
- InitCB(const ObserverTopicPtr& topic, const Ice::ObjectPrx& observer, const string& name, int serial) :
+ InitCB(const ObserverTopicPtr& topic, const Ice::ObjectPrx& observer, const string& subscriberName,
+ const string& name, int serial) :
_topic(topic),
_observer(observer),
+ _subscriberName(subscriberName),
_name(name),
_serial(serial)
{
@@ -33,7 +35,7 @@ public:
void
ice_response()
{
- _topic->subscribe(_observer, _serial);
+ _topic->subscribe(_observer, _subscriberName, _serial);
}
void
@@ -47,6 +49,7 @@ private:
const ObserverTopicPtr _topic;
const Ice::ObjectPrx _observer;
+ const string _subscriberName;
const string _name;
const int _serial;
};
@@ -80,13 +83,13 @@ ObserverTopic::~ObserverTopic()
}
void
-ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, int serial)
+ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name, int serial)
{
while(true)
{
if(serial == -1)
{
- initObserver(obsv);
+ initObserver(obsv, name);
return;
}
@@ -97,19 +100,19 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, int serial)
continue;
}
- subscribeImpl(obsv);
+ subscribeImpl(obsv, name);
break;
}
}
void
-ObserverTopic::subscribeAndWaitForSubscription(const Ice::ObjectPrx& obsv)
+ObserverTopic::subscribeAndWaitForSubscription(const Ice::ObjectPrx& obsv, const string& name)
{
{
Lock sync(*this);
_waitForSubscribe.insert(obsv->ice_getIdentity());
}
- subscribe(obsv);
+ subscribe(obsv, name);
{
Lock sync(*this);
while(_topic && _waitForSubscribe.find(obsv->ice_getIdentity()) != _waitForSubscribe.end())
@@ -120,13 +123,39 @@ ObserverTopic::subscribeAndWaitForSubscription(const Ice::ObjectPrx& obsv)
}
void
-ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer)
+ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name)
{
Lock sync(*this);
if(_topic)
{
_topic->unsubscribe(observer);
}
+
+ if(!name.empty())
+ {
+ _syncSubscribers.erase(name);
+
+ map<int, set<string> >::iterator p = _waitForUpdates.begin();
+ bool notifyMonitor = false;
+ while(p != _waitForUpdates.end())
+ {
+ p->second.erase(name);
+ if(p->second.empty())
+ {
+ _waitForUpdates.erase(p++);
+ notifyMonitor = true;
+ }
+ else
+ {
+ ++p;
+ }
+ }
+
+ if(notifyMonitor)
+ {
+ notifyAll();
+ }
+ }
}
void
@@ -137,8 +166,78 @@ ObserverTopic::destroy()
notifyAll();
}
+void
+ObserverTopic::receivedUpdate(const string& name, int serial, const string& failure)
+{
+ Lock sync(*this);
+
+ map<int, set<string> >::iterator p = _waitForUpdates.find(serial);
+ if(p != _waitForUpdates.end())
+ {
+ p->second.erase(name);
+
+ if(!failure.empty())
+ {
+ map<int, map<string, string> >::iterator q = _updateFailures.find(serial);
+ if(q == _updateFailures.end())
+ {
+ q = _updateFailures.insert(make_pair(serial, map<string ,string>())).first;
+ }
+ q->second.insert(make_pair(name, failure));
+ }
+
+ if(p->second.empty())
+ {
+ _waitForUpdates.erase(p);
+ notifyAll();
+ }
+ }
+}
+
+void
+ObserverTopic::waitForSyncedSubscribers(int serial)
+{
+ if(_syncSubscribers.empty())
+ {
+ return;
+ }
+
+ _waitForUpdates.insert(make_pair(serial, _syncSubscribers));
+
+ //
+ // Wait until all the updates are received.
+ //
+ while(true)
+ {
+ map<int, set<string> >::const_iterator p = _waitForUpdates.find(serial);
+ if(p == _waitForUpdates.end())
+ {
+ map<int, map<string, string> >::iterator q = _updateFailures.find(serial);
+ if(q != _updateFailures.end())
+ {
+ map<string, string> failures = q->second;
+ _updateFailures.erase(q);
+
+ ostringstream os;
+ for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r)
+ {
+ os << "replication failed on replica `" << r->first << "':\n" << r->second << "\n";
+ }
+ // TODO: XXX
+// Ice::Error err(_traceLevels->logger);
+// err << os.str();
+ }
+ return;
+ }
+ else
+ {
+ wait();
+ }
+ }
+}
+
void
-ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer)
+ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer, const string& name)
{
// This must be called with the mutex locked.
if(!_topic)
@@ -151,6 +250,12 @@ ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer)
_topic->subscribe(qos, observer);
_waitForSubscribe.erase(observer->ice_getIdentity());
+
+ if(!name.empty())
+ {
+ _syncSubscribers.insert(name);
+ }
+
notifyAll();
}
@@ -171,13 +276,13 @@ ObserverTopic::updateSerial(int serial)
}
Ice::Context
-ObserverTopic::getContext(const string& name, int serial) const
+ObserverTopic::getContext(int serial) const
{
ostringstream os;
os << serial;
Ice::Context context;
- context[name] = os.str();
+ context["serial"] = os.str();
return context;
}
@@ -233,7 +338,7 @@ RegistryObserverTopic::registryDown(const string& name)
}
void
-RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name)
{
RegistryObserverPrx observer = RegistryObserverPrx::uncheckedCast(obsv);
RegistryInfoSeq registries;
@@ -247,8 +352,8 @@ RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
}
serial = _serial;
}
- observer->registryInit_async(new InitCB<AMI_RegistryObserver_registryInit>(this, observer, "registry", serial),
- registries);
+ observer->registryInit_async(new InitCB<AMI_RegistryObserver_registryInit>(this, observer, name, "registry",
+ serial), registries);
}
NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
@@ -431,7 +536,7 @@ NodeObserverTopic::nodeDown(const string& name)
}
void
-NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name)
{
NodeObserverPrx observer = NodeObserverPrx::uncheckedCast(obsv);
int serial;
@@ -445,7 +550,7 @@ NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
}
serial = _serial;
}
- observer->nodeInit_async(new InitCB<AMI_NodeObserver_nodeInit>(this, observer, "node", serial), nodes);
+ observer->nodeInit_async(new InitCB<AMI_NodeObserver_nodeInit>(this, observer, name, "node", serial), nodes);
}
ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
@@ -472,7 +577,7 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq&
}
try
{
- _publisher->applicationInit(serial, apps, getContext("application", serial));
+ _publisher->applicationInit(serial, apps, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
@@ -493,13 +598,14 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in
_applications.insert(make_pair(info.descriptor.name, info));
try
{
- _publisher->applicationAdded(serial, info, getContext("application", serial));
+ _publisher->applicationAdded(serial, info, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `applicationAdded' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
@@ -514,13 +620,14 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name)
_applications.erase(name);
try
{
- _publisher->applicationRemoved(serial, name, getContext("application", serial));
+ _publisher->applicationRemoved(serial, name, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
@@ -566,17 +673,18 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate
}
try
{
- _publisher->applicationUpdated(serial, info, getContext("application", serial));
+ _publisher->applicationUpdated(serial, info, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
-ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name)
{
ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv);
int serial;
@@ -590,8 +698,8 @@ ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
applications.push_back(p->second);
}
}
- observer->applicationInit_async(new InitCB<AMI_ApplicationObserver_applicationInit>(this, observer, "application",
- serial),
+ observer->applicationInit_async(new InitCB<AMI_ApplicationObserver_applicationInit>(this, observer, name,
+ "application", serial),
serial, applications);
}
@@ -619,7 +727,7 @@ AdapterObserverTopic::adapterInit(int serial, const AdapterInfoSeq& adpts)
}
try
{
- _publisher->adapterInit(adpts, getContext("adapter", serial));
+ _publisher->adapterInit(adpts, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
@@ -640,13 +748,14 @@ AdapterObserverTopic::adapterAdded(int serial, const AdapterInfo& info)
_adapters.insert(make_pair(info.id, info));
try
{
- _publisher->adapterAdded(info, getContext("adapter", serial));
+ _publisher->adapterAdded(info, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `adapterAdded' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
@@ -661,13 +770,14 @@ AdapterObserverTopic::adapterUpdated(int serial, const AdapterInfo& info)
_adapters[info.id] = info;
try
{
- _publisher->adapterUpdated(info, getContext("adapter", serial));
+ _publisher->adapterUpdated(info, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
@@ -682,17 +792,18 @@ AdapterObserverTopic::adapterRemoved(int serial, const string& id)
_adapters.erase(id);
try
{
- _publisher->adapterRemoved(id, getContext("adapter", serial));
+ _publisher->adapterRemoved(id, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
-AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name)
{
AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv);
int serial;
@@ -706,7 +817,7 @@ AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
adapters.push_back(p->second);
}
}
- observer->adapterInit_async(new InitCB<AMI_AdapterObserver_adapterInit>(this, observer, "adapter", serial),
+ observer->adapterInit_async(new InitCB<AMI_AdapterObserver_adapterInit>(this, observer, name, "adapter", serial),
adapters);
}
@@ -734,7 +845,7 @@ ObjectObserverTopic::objectInit(int serial, const ObjectInfoSeq& objects)
}
try
{
- _publisher->objectInit(objects, getContext("object", serial));
+ _publisher->objectInit(objects, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
@@ -755,13 +866,14 @@ ObjectObserverTopic::objectAdded(int serial, const ObjectInfo& info)
_objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
try
{
- _publisher->objectAdded(info, getContext("object", serial));
+ _publisher->objectAdded(info, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
@@ -776,13 +888,14 @@ ObjectObserverTopic::objectUpdated(int serial, const ObjectInfo& info)
_objects[info.proxy->ice_getIdentity()] = info;
try
{
- _publisher->objectUpdated(info, getContext("object", serial));
+ _publisher->objectUpdated(info, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
@@ -797,17 +910,18 @@ ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id)
_objects.erase(id);
try
{
- _publisher->objectRemoved(id, getContext("object", serial));
+ _publisher->objectRemoved(id, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `objectRemoved' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
-ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name)
{
ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv);
int serial;
@@ -820,7 +934,8 @@ ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
objects.push_back(p->second);
}
}
- observer->objectInit_async(new InitCB<AMI_ObjectObserver_objectInit>(this, observer, "object", serial), objects);
+ observer->objectInit_async(new InitCB<AMI_ObjectObserver_objectInit>(this, observer, name, "object", serial),
+ objects);
}