summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Topics.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-09-13 16:18:06 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-09-13 16:18:06 +0000
commite817166c5e13369f6cc8042f6530b1f3fac7c359 (patch)
tree19691a2ca1e4c01faa3ae75fba840e50c6f70d06 /cpp/src/IceGrid/Topics.cpp
parent- Updating Ice and library verison to 3.2. (diff)
downloadice-e817166c5e13369f6cc8042f6530b1f3fac7c359.tar.bz2
ice-e817166c5e13369f6cc8042f6530b1f3fac7c359.tar.xz
ice-e817166c5e13369f6cc8042f6530b1f3fac7c359.zip
Simplified subscription code to use new per-subscriber publisher
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r--cpp/src/IceGrid/Topics.cpp151
1 files changed, 36 insertions, 115 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 5c896132f1b..1b9969a4543 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -85,43 +85,22 @@ ObserverTopic::~ObserverTopic()
void
ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name, int serial)
{
- while(true)
+ Lock sync(*this);
+ if(!_topic)
{
- if(serial == -1)
- {
- initObserver(obsv, name);
- return;
- }
-
- Lock sync(*this);
- if(serial != _serial)
- {
- serial = -1;
- continue;
- }
-
- subscribeImpl(obsv, name);
- break;
+ return;
}
-}
+
+ IceStorm::QoS qos;
+ qos["reliability"] = "twoway ordered";
+ initObserver(_topic->subscribe(qos, obsv));
-void
-ObserverTopic::subscribeAndWaitForSubscription(const Ice::ObjectPrx& obsv, const string& name)
-{
- {
- Lock sync(*this);
- _waitForSubscribe.insert(obsv->ice_getIdentity());
- }
- subscribe(obsv, name);
+ if(!name.empty())
{
- Lock sync(*this);
- while(_topic && _waitForSubscribe.find(obsv->ice_getIdentity()) != _waitForSubscribe.end())
- {
- wait();
- }
+ _syncSubscribers.insert(name);
}
}
-
+
void
ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name)
{
@@ -201,7 +180,7 @@ ObserverTopic::waitForSyncedSubscribers(int serial)
{
return;
}
-
+
_waitForUpdates.insert(make_pair(serial, _syncSubscribers));
//
@@ -216,8 +195,7 @@ ObserverTopic::waitForSyncedSubscribers(int serial)
if(q != _updateFailures.end())
{
map<string, string> failures = q->second;
- _updateFailures.erase(q);
-
+ _updateFailures.erase(q);
ostringstream os;
for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r)
{
@@ -236,29 +214,6 @@ ObserverTopic::waitForSyncedSubscribers(int serial)
}
}
-void
-ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer, const string& name)
-{
- // This must be called with the mutex locked.
- if(!_topic)
- {
- return;
- }
-
- IceStorm::QoS qos;
- qos["reliability"] = "twoway ordered";
- _topic->subscribe(qos, observer);
-
- _waitForSubscribe.erase(observer->ice_getIdentity());
-
- if(!name.empty())
- {
- _syncSubscribers.insert(name);
- }
-
- notifyAll();
-}
-
void
ObserverTopic::updateSerial(int serial)
{
@@ -338,22 +293,16 @@ RegistryObserverTopic::registryDown(const string& name)
}
void
-RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name)
+RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
RegistryObserverPrx observer = RegistryObserverPrx::uncheckedCast(obsv);
RegistryInfoSeq registries;
- int serial;
+ registries.reserve(_registries.size());
+ for(map<string, RegistryInfo>::const_iterator p = _registries.begin(); p != _registries.end(); ++p)
{
- Lock sync(*this);
- registries.reserve(_registries.size());
- for(map<string, RegistryInfo>::const_iterator p = _registries.begin(); p != _registries.end(); ++p)
- {
- registries.push_back(p->second);
- }
- serial = _serial;
+ registries.push_back(p->second);
}
- observer->registryInit_async(new InitCB<AMI_RegistryObserver_registryInit>(this, observer, name, "registry",
- serial), registries);
+ observer->registryInit(registries, getContext(-1));
}
NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
@@ -536,21 +485,16 @@ NodeObserverTopic::nodeDown(const string& name)
}
void
-NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name)
+NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
NodeObserverPrx observer = NodeObserverPrx::uncheckedCast(obsv);
- int serial;
NodeDynamicInfoSeq nodes;
+ nodes.reserve(_nodes.size());
+ for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
{
- Lock sync(*this);
- nodes.reserve(_nodes.size());
- for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
- {
- nodes.push_back(p->second);
- }
- serial = _serial;
+ nodes.push_back(p->second);
}
- observer->nodeInit_async(new InitCB<AMI_NodeObserver_nodeInit>(this, observer, name, "node", serial), nodes);
+ observer->nodeInit(nodes, getContext(-1));
}
ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
@@ -684,23 +628,15 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate
}
void
-ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name)
+ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv);
- int serial;
ApplicationInfoSeq applications;
+ for(map<string, ApplicationInfo>::const_iterator p = _applications.begin(); p != _applications.end(); ++p)
{
- Lock sync(*this);
- serial = _serial;
- assert(serial != -1);
- for(map<string, ApplicationInfo>::const_iterator p = _applications.begin(); p != _applications.end(); ++p)
- {
- applications.push_back(p->second);
- }
+ applications.push_back(p->second);
}
- observer->applicationInit_async(new InitCB<AMI_ApplicationObserver_applicationInit>(this, observer, name,
- "application", serial),
- serial, applications);
+ observer->applicationInit(_serial, applications, getContext(-1));
}
AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
@@ -803,22 +739,15 @@ AdapterObserverTopic::adapterRemoved(int serial, const string& id)
}
void
-AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name)
+AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv);
- int serial;
AdapterInfoSeq adapters;
+ for(map<string, AdapterInfo>::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
{
- Lock sync(*this);
- serial = _serial;
- assert(serial != -1);
- for(map<string, AdapterInfo>::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
- {
- adapters.push_back(p->second);
- }
- }
- observer->adapterInit_async(new InitCB<AMI_AdapterObserver_adapterInit>(this, observer, name, "adapter", serial),
- adapters);
+ adapters.push_back(p->second);
+ }
+ observer->adapterInit(adapters, getContext(-1));
}
ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
@@ -921,21 +850,13 @@ ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id)
}
void
-ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name)
+ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv);
- int serial;
ObjectInfoSeq objects;
+ for(map<Ice::Identity, ObjectInfo>::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
{
- Lock sync(*this);
- serial = _serial;
- for(map<Ice::Identity, ObjectInfo>::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
- {
- objects.push_back(p->second);
- }
- }
- observer->objectInit_async(new InitCB<AMI_ObjectObserver_objectInit>(this, observer, name, "object", serial),
- objects);
+ objects.push_back(p->second);
+ }
+ observer->objectInit(objects, getContext(-1));
}
-
-