summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Topics.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-09-04 19:39:59 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-09-04 19:39:59 +0000
commit1ce69fc8c36f2b7fa1d71ebe18e7ac4de01e3268 (patch)
treecd87f8121980159fdafd6b01e6802ea594478b93 /cpp/src/IceGrid/Topics.cpp
parenticegridadmin always uses routed config if possible. added error messages if (diff)
downloadice-1ce69fc8c36f2b7fa1d71ebe18e7ac4de01e3268.tar.bz2
ice-1ce69fc8c36f2b7fa1d71ebe18e7ac4de01e3268.tar.xz
ice-1ce69fc8c36f2b7fa1d71ebe18e7ac4de01e3268.zip
Improved observers, refactored topic manager and initialization Added
icegridadmin commands to examine the registries
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r--cpp/src/IceGrid/Topics.cpp618
1 files changed, 356 insertions, 262 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 508a07920a4..3fcf4ac924f 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -14,13 +14,18 @@
using namespace std;
using namespace IceGrid;
-class RegistryInitCB : public AMI_RegistryObserver_init
+namespace IceGrid
+{
+
+template<class T>
+class InitCB : public T
{
public:
- RegistryInitCB(const RegistryObserverTopicPtr& topic, const RegistryObserverPrx& observer, int serial) :
+ InitCB(const ObserverTopicPtr& topic, const Ice::ObjectPrx& observer, const string& name, int serial) :
_topic(topic),
_observer(observer),
+ _name(name),
_serial(serial)
{
}
@@ -35,93 +40,185 @@ public:
ice_exception(const Ice::Exception& ex)
{
Ice::Warning out(_observer->ice_getCommunicator()->getLogger());
- out << "couldn't initialize registry observer:\n" << ex;
+ out << "couldn't initialize " << _name << " observer:\n" << ex;
}
private:
- const RegistryObserverTopicPtr _topic;
- const RegistryObserverPrx _observer;
+ const ObserverTopicPtr _topic;
+ const Ice::ObjectPrx _observer;
+ const string _name;
const int _serial;
};
-class NodeInitCB : public AMI_NodeObserver_init
+};
+
+ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const string& name) :
+ _serial(0)
{
-public:
+ IceStorm::TopicPrx t;
+ try
+ {
+ t = topicManager->create(name);
+ }
+ catch(const IceStorm::TopicExists&)
+ {
+ t = topicManager->retrieve(name);
+ }
- NodeInitCB(const NodeObserverTopicPtr& topic, const NodeObserverPrx& observer, int serial) :
- _topic(topic),
- _observer(observer),
- _serial(serial)
- {
+ //
+ // NOTE: collocation optimization needs to be turned on for the
+ // topic because the subscribe() method is given a fixed proxy
+ // which can't be marshalled.
+ //
+ _topic = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true));
+ _basePublisher = _topic->getPublisher()->ice_collocationOptimized(false);
+}
+
+ObserverTopic::~ObserverTopic()
+{
+}
+
+void
+ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, int serial)
+{
+ while(true)
+ {
+ if(serial == -1)
+ {
+ initObserver(obsv);
+ return;
+ }
+
+ Lock sync(*this);
+ if(serial != _serial)
+ {
+ serial = -1;
+ continue;
+ }
+
+ subscribeImpl(obsv);
+ break;
}
+}
- void
- ice_response()
+void
+ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer)
+{
+ Lock sync(*this);
+ if(_topic)
{
- _topic->subscribe(_observer, _serial);
+ _topic->unsubscribe(observer);
}
-
- void
- ice_exception(const Ice::Exception& ex)
+}
+
+void
+ObserverTopic::destroy()
+{
+ Lock sync(*this);
+ _topic = 0;
+}
+
+void
+ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer)
+{
+ // This must be called with the mutex locked.
+ if(!_topic)
{
- Ice::Warning out(_observer->ice_getCommunicator()->getLogger());
- out << "couldn't initialize node observer:\n" << ex;
+ return;
}
-
-private:
- const NodeObserverTopicPtr _topic;
- const NodeObserverPrx _observer;
- const int _serial;
-};
+ IceStorm::QoS qos;
+ qos["reliability"] = "twoway ordered";
+ _topic->subscribe(qos, observer);
+}
+
+void
+ObserverTopic::updateSerial(int serial)
+{
+ //
+ // This loop ensures that updates from the database are processed
+ // sequentially.
+ //
+ assert(_serial < serial);
+ while(_serial + 1 != serial)
+ {
+ wait();
+ }
+ _serial = serial;
+ notifyAll();
+}
-NodeObserverTopic::NodeObserverTopic(const Ice::ObjectAdapterPtr& adapter,
- const IceStorm::TopicManagerPrx& topicManager) :
-/*
-#ifdef __BCPLUSPLUS__ // COMPILERFIX
+RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) :
+ ObserverTopic(topicManager, "RegistryObserver")
{
+ const_cast<RegistryObserverPrx&>(_publisher) = RegistryObserverPrx::uncheckedCast(_basePublisher);
}
void
-NodeObserverTopic::initialize(const IceStorm::TopicManagerPrx& topicManager)
-#endif
-*/
- _serial(0)
+RegistryObserverTopic::registryUp(const RegistryInfo& info)
{
- IceStorm::TopicPrx t;
- try
+ Lock sync(*this);
+ if(!_topic)
{
- t = topicManager->create("NodeObserver");
+ return;
}
- catch(const IceStorm::TopicExists&)
+ updateSerial(_serial + 1);
+ _registries.insert(make_pair(info.name, info));
+ _publisher->registryUp(info);
+}
+
+void
+RegistryObserverTopic::registryDown(const string& name)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return;
+ }
+ updateSerial(_serial + 1);
+ if(_registries.find(name) != _registries.end())
{
- t = topicManager->retrieve("NodeObserver");
+ _registries.erase(name);
+ _publisher->registryDown(name);
}
+}
- //
- // NOTE: collocation optimization needs to be turned on for the
- // topic because the subscribe() method is given a fixed proxy
- // which can't be marshalled.
- //
- const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true));
- const_cast<NodeObserverPrx&>(_internalPublisher) = NodeObserverPrx::uncheckedCast(_topic->getPublisher());
+void
+RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+{
+ RegistryObserverPrx observer = RegistryObserverPrx::uncheckedCast(obsv);
+ RegistryInfoSeq registries;
+ int serial;
+ {
+ Lock sync(*this);
+ registries.reserve(_registries.size());
+ for(map<string, RegistryInfo>::const_iterator p = _registries.begin(); p != _registries.end(); ++p)
+ {
+ registries.push_back(p->second);
+ }
+ serial = _serial;
+ }
+ observer->registryInit_async(new InitCB<AMI_RegistryObserver_registryInit>(this, observer, "registry", serial),
+ registries);
+}
- __setNoDelete(true);
+NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
+ const Ice::ObjectAdapterPtr& adapter) :
+ ObserverTopic(topicManager, "NodeObserver")
+{
+ const_cast<NodeObserverPrx&>(_publisher) = NodeObserverPrx::uncheckedCast(_basePublisher);
try
{
- const_cast<NodeObserverPrx&>(_publisher) = NodeObserverPrx::uncheckedCast(adapter->addWithUUID(this));
+ const_cast<NodeObserverPrx&>(_externalPublisher) = NodeObserverPrx::uncheckedCast(adapter->addWithUUID(this));
}
- catch(...)
+ catch(const Ice::LocalException&)
{
- __setNoDelete(false);
- throw;
}
- __setNoDelete(false);
}
void
-NodeObserverTopic::init(const NodeDynamicInfoSeq&, const Ice::Current&)
+NodeObserverTopic::nodeInit(const NodeDynamicInfoSeq&, const Ice::Current&)
{
assert(false);
}
@@ -130,25 +227,30 @@ void
NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current& current)
{
Lock sync(*this);
- _nodes.insert(make_pair(info.name, info));
- _internalPublisher->nodeUp(info);
+ if(!_topic)
+ {
+ return;
+ }
+ updateSerial(_serial + 1);
+ _nodes.insert(make_pair(info.info.name, info));
+ _publisher->nodeUp(info);
}
void
NodeObserverTopic::nodeDown(const string& name, const Ice::Current&)
{
- Lock sync(*this);
- if(_nodes.find(name) != _nodes.end())
- {
- _nodes.erase(name);
- _internalPublisher->nodeDown(name);
- }
+ assert(false);
}
void
NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&)
{
Lock sync(*this);
+ if(!_topic)
+ {
+ return;
+ }
+
if(_nodes.find(node) == _nodes.end())
{
//
@@ -157,7 +259,7 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
return;
}
- ++_serial;
+ updateSerial(_serial + 1);
ServerDynamicInfoSeq& servers = _nodes[node].servers;
ServerDynamicInfoSeq::iterator p = servers.begin();
@@ -182,13 +284,18 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
servers.push_back(server);
}
- _internalPublisher->updateServer(node, server);
+ _publisher->updateServer(node, server);
}
void
NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& adapter, const Ice::Current&)
{
Lock sync(*this);
+ if(!_topic)
+ {
+ return;
+ }
+
if(_nodes.find(node) == _nodes.end())
{
//
@@ -197,7 +304,7 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a
return;
}
- ++_serial;
+ updateSerial(_serial + 1);
AdapterDynamicInfoSeq& adapters = _nodes[node].adapters;
AdapterDynamicInfoSeq::iterator p = adapters.begin();
@@ -222,149 +329,103 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a
adapters.push_back(adapter);
}
- _internalPublisher->updateAdapter(node, adapter);
+ _publisher->updateAdapter(node, adapter);
}
-void
-NodeObserverTopic::subscribe(const NodeObserverPrx& observer, int serial)
+void
+NodeObserverTopic::nodeDown(const string& name)
{
- while(true)
+ Lock sync(*this);
+ if(!_topic)
{
- if(serial == -1)
- {
- NodeDynamicInfoSeq nodes;
- {
- Lock sync(*this);
- nodes.reserve(_nodes.size());
- for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
- {
- nodes.push_back(p->second);
- }
- serial = _serial;
- }
- observer->init_async(new NodeInitCB(this, observer, serial), nodes);
- return;
- }
-
- Lock sync(*this);
- if(serial != _serial)
- {
- serial = -1;
- continue;
- }
-
- IceStorm::QoS qos;
- qos["reliability"] = "twoway ordered";
- _topic->subscribe(qos, observer);
- break;
+ return;
}
-}
-void
-NodeObserverTopic::unsubscribe(const NodeObserverPrx& observer)
-{
- _topic->unsubscribe(observer);
-}
+ updateSerial(_serial + 1);
-RegistryObserverTopic::RegistryObserverTopic(const Ice::ObjectAdapterPtr& adapter,
- const IceStorm::TopicManagerPrx& topicManager) :
-/*
-#ifdef __BCPLUSPLUS__ // COMPILERFIX
-{
+ if(_nodes.find(name) != _nodes.end())
+ {
+ _nodes.erase(name);
+ _publisher->nodeDown(name);
+ }
}
void
-RegistryObserverTopic::initialize(const IceStorm::TopicManagerPrx& topicManager)
-#endif
-*/
- _serial(0)
+NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
- IceStorm::TopicPrx t;
- try
- {
- t = topicManager->create("RegistryObserver");
- }
- catch(const IceStorm::TopicExists&)
+ NodeObserverPrx observer = NodeObserverPrx::uncheckedCast(obsv);
+ int serial;
+ NodeDynamicInfoSeq nodes;
{
- t = topicManager->retrieve("RegistryObserver");
+ Lock sync(*this);
+ nodes.reserve(_nodes.size());
+ for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
+ {
+ nodes.push_back(p->second);
+ }
+ serial = _serial;
}
+ observer->nodeInit_async(new InitCB<AMI_NodeObserver_nodeInit>(this, observer, "node", serial), nodes);
+}
- //
- // NOTE: collocation optimization needs to be turned on for the
- // topic because the subscribe() method is given a fixed proxy
- // which can't be marshalled.
- //
- const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true));
- const_cast<RegistryObserverPrx&>(_internalPublisher) = RegistryObserverPrx::uncheckedCast(_topic->getPublisher());
-
- __setNoDelete(true);
- try
- {
- const_cast<RegistryObserverPrx&>(_publisher) = RegistryObserverPrx::uncheckedCast(adapter->addWithUUID(this));
- }
- catch(...)
- {
- __setNoDelete(false);
- throw;
- }
- __setNoDelete(false);
+ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
+ const StringApplicationInfoDict& applications) :
+ ObserverTopic(topicManager, "ApplicationObserver"),
+ _applications(applications.begin(), applications.end())
+{
+ const_cast<ApplicationObserverPrx&>(_publisher) = ApplicationObserverPrx::uncheckedCast(_basePublisher);
}
void
-RegistryObserverTopic::init(int serial,
- const ApplicationInfoSeq& apps,
- const AdapterInfoSeq& adpts,
- const ObjectInfoSeq& objects,
- const Ice::Current&)
+ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& apps)
{
Lock sync(*this);
-
- _serial = serial;
-
- for(ApplicationInfoSeq::const_iterator p = apps.begin(); p != apps.end(); ++p)
- {
- _applications.insert(make_pair(p->descriptor.name, *p));
- }
- for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q)
+ if(!_topic)
{
- _adapters.insert(make_pair(q->id, *q));
+ return;
}
- for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r)
+ updateSerial(serial);
+ _applications.clear();
+ for(ApplicationInfoSeq::const_iterator p = apps.begin(); p != apps.end(); ++p)
{
- _objects.insert(make_pair(r->proxy->ice_getIdentity(), *r));
+ _applications.insert(make_pair(p->descriptor.name, *p));
}
-
- _internalPublisher->init(serial, apps, adpts, objects);
}
void
-RegistryObserverTopic::applicationAdded(int serial, const ApplicationInfo& info, const Ice::Current&)
+ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& info)
{
Lock sync(*this);
-
+ if(!_topic)
+ {
+ return;
+ }
updateSerial(serial);
-
_applications.insert(make_pair(info.descriptor.name, info));
-
- _internalPublisher->applicationAdded(serial, info);
+ _publisher->applicationAdded(serial, info);
}
void
-RegistryObserverTopic::applicationRemoved(int serial, const string& name, const Ice::Current&)
+ApplicationObserverTopic::applicationRemoved(int serial, const string& name)
{
Lock sync(*this);
-
+ if(!_topic)
+ {
+ return;
+ }
updateSerial(serial);
-
_applications.erase(name);
-
- _internalPublisher->applicationRemoved(serial, name);
+ _publisher->applicationRemoved(serial, name);
}
void
-RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInfo& info, const Ice::Current& c)
+ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInfo& info)
{
Lock sync(*this);
+ if(!_topic)
+ {
+ return;
+ }
updateSerial(serial);
try
@@ -372,7 +433,7 @@ RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInf
map<string, ApplicationInfo>::iterator p = _applications.find(info.descriptor.name);
if(p != _applications.end())
{
- ApplicationHelper helper(c.adapter->getCommunicator(), p->second.descriptor);
+ ApplicationHelper helper(_publisher->ice_getCommunicator(), p->second.descriptor);
p->second.descriptor = helper.update(info.descriptor);
p->second.updateTime = info.updateTime;
p->second.updateUser = info.updateUser;
@@ -398,156 +459,189 @@ RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInf
{
assert(false);
}
+ _publisher->applicationUpdated(serial, info);
+}
+
+void
+ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+{
+ ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv);
+ int serial;
+ ApplicationInfoSeq applications;
+ {
+ Lock sync(*this);
+ serial = _serial;
+ assert(serial != -1);
+ for(map<string, ApplicationInfo>::const_iterator p = _applications.begin(); p != _applications.end(); ++p)
+ {
+ applications.push_back(p->second);
+ }
+ }
+ observer->applicationInit_async(new InitCB<AMI_ApplicationObserver_applicationInit>(this, observer, "application",
+ serial),
+ serial, applications);
+}
- _internalPublisher->applicationUpdated(serial, info);
+AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
+ const StringAdapterInfoDict& adapters) :
+ ObserverTopic(topicManager, "AdapterObserver"),
+ _adapters(adapters.begin(), adapters.end())
+{
+ const_cast<AdapterObserverPrx&>(_publisher) = AdapterObserverPrx::uncheckedCast(_basePublisher);
}
void
-RegistryObserverTopic::adapterAdded(int serial, const AdapterInfo& info, const Ice::Current&)
+AdapterObserverTopic::adapterInit(int serial, const AdapterInfoSeq& adpts)
{
Lock sync(*this);
-
+ if(!_topic)
+ {
+ return;
+ }
updateSerial(serial);
+ _adapters.clear();
+ for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q)
+ {
+ _adapters.insert(make_pair(q->id, *q));
+ }
+}
+void
+AdapterObserverTopic::adapterAdded(int serial, const AdapterInfo& info)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return;
+ }
+ updateSerial(serial);
_adapters.insert(make_pair(info.id, info));
-
- _internalPublisher->adapterAdded(serial, info);
+ _publisher->adapterAdded(info);
}
void
-RegistryObserverTopic::adapterUpdated(int serial, const AdapterInfo& info, const Ice::Current&)
+AdapterObserverTopic::adapterUpdated(int serial, const AdapterInfo& info)
{
Lock sync(*this);
-
+ if(!_topic)
+ {
+ return;
+ }
updateSerial(serial);
-
_adapters[info.id] = info;
-
- _internalPublisher->adapterUpdated(serial, info);
+ _publisher->adapterUpdated(info);
}
void
-RegistryObserverTopic::adapterRemoved(int serial, const string& id, const Ice::Current&)
+AdapterObserverTopic::adapterRemoved(int serial, const string& id)
{
Lock sync(*this);
-
+ if(!_topic)
+ {
+ return;
+ }
updateSerial(serial);
-
_adapters.erase(id);
+ _publisher->adapterRemoved(id);
+}
- _internalPublisher->adapterRemoved(serial, id);
+void
+AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+{
+ AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv);
+ int serial;
+ AdapterInfoSeq adapters;
+ {
+ Lock sync(*this);
+ serial = _serial;
+ assert(serial != -1);
+ for(map<string, AdapterInfo>::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ {
+ adapters.push_back(p->second);
+ }
+ }
+ observer->adapterInit_async(new InitCB<AMI_AdapterObserver_adapterInit>(this, observer, "adapter", serial),
+ adapters);
+}
+
+ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
+ const IdentityObjectInfoDict& objects) :
+ ObserverTopic(topicManager, "ObjectObserver"),
+ _objects(objects.begin(), objects.end())
+{
+ const_cast<ObjectObserverPrx&>(_publisher) = ObjectObserverPrx::uncheckedCast(_basePublisher);
}
void
-RegistryObserverTopic::objectAdded(int serial, const ObjectInfo& info, const Ice::Current&)
+ObjectObserverTopic::objectInit(int serial, const ObjectInfoSeq& objects)
{
Lock sync(*this);
-
+ if(!_topic)
+ {
+ return;
+ }
updateSerial(serial);
+ _objects.clear();
+ for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r)
+ {
+ _objects.insert(make_pair(r->proxy->ice_getIdentity(), *r));
+ }
+}
+void
+ObjectObserverTopic::objectAdded(int serial, const ObjectInfo& info)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return;
+ }
+ updateSerial(serial);
_objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
-
- _internalPublisher->objectAdded(serial, info);
+ _publisher->objectAdded(info);
}
void
-RegistryObserverTopic::objectUpdated(int serial, const ObjectInfo& info, const Ice::Current&)
+ObjectObserverTopic::objectUpdated(int serial, const ObjectInfo& info)
{
Lock sync(*this);
-
+ if(!_topic)
+ {
+ return;
+ }
updateSerial(serial);
-
_objects[info.proxy->ice_getIdentity()] = info;
-
- _internalPublisher->objectUpdated(serial, info);
+ _publisher->objectUpdated(info);
}
void
-RegistryObserverTopic::objectRemoved(int serial, const Ice::Identity& id, const Ice::Current&)
+ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id)
{
Lock sync(*this);
-
+ if(!_topic)
+ {
+ return;
+ }
updateSerial(serial);
-
_objects.erase(id);
-
- _internalPublisher->objectRemoved(serial, id);
+ _publisher->objectRemoved(id);
}
void
-RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial)
+ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
- while(true)
+ ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv);
+ int serial;
+ ObjectInfoSeq objects;
{
- if(serial == -1)
- {
- ApplicationInfoSeq applications;
- AdapterInfoSeq adapters;
- ObjectInfoSeq objects;
- {
- Lock sync(*this);
- assert(_serial != -1);
- serial = _serial;
-
- map<string, ApplicationInfo>::const_iterator p;
- for(p = _applications.begin(); p != _applications.end(); ++p)
- {
- applications.push_back(p->second);
- }
-
- map<string, AdapterInfo>::const_iterator q;
- for(q = _adapters.begin(); q != _adapters.end(); ++q)
- {
- adapters.push_back(q->second);
- }
-
- map<Ice::Identity, ObjectInfo>::const_iterator r;
- for(r = _objects.begin(); r != _objects.end(); ++r)
- {
- objects.push_back(r->second);
- }
- }
- observer->init_async(new RegistryInitCB(this, observer, serial), serial, applications, adapters, objects);
- return;
- }
-
- //
- // If the registry cache changed since we've send the init()
- // call we need to do it again. Otherwise, we can subscribe to
- // the IceStorm topic.
- //
Lock sync(*this);
- if(serial != _serial)
+ serial = _serial;
+ for(map<Ice::Identity, ObjectInfo>::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
{
- serial = -1;
- continue;
+ objects.push_back(p->second);
}
-
- IceStorm::QoS qos;
- qos["reliability"] = "twoway ordered";
- _topic->subscribe(qos, observer);
- break;
- }
+ }
+ observer->objectInit_async(new InitCB<AMI_ObjectObserver_objectInit>(this, observer, "object", serial), objects);
}
-void
-RegistryObserverTopic::unsubscribe(const RegistryObserverPrx& observer)
-{
- Lock sync(*this);
- _topic->unsubscribe(observer);
-}
-void
-RegistryObserverTopic::updateSerial(int serial)
-{
- //
- // This loop ensures that updates from the database are processed
- // sequentially.
- //
- while(_serial + 1 != serial)
- {
- wait();
- }
- _serial = serial;
- notifyAll();
-}