summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Topics.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r--cpp/src/IceGrid/Topics.cpp960
1 files changed, 960 insertions, 0 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
new file mode 100644
index 00000000000..37686d0b20e
--- /dev/null
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -0,0 +1,960 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <Ice/Ice.h>
+#include <IceGrid/Topics.h>
+#include <IceGrid/DescriptorHelper.h>
+
+using namespace std;
+using namespace IceGrid;
+
+
+ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const string& name) :
+ _logger(topicManager->ice_getCommunicator()->getLogger()),
+ _serial(0)
+{
+ IceStorm::TopicPrx t;
+ try
+ {
+ t = topicManager->create(name);
+ }
+ catch(const IceStorm::TopicExists&)
+ {
+ t = topicManager->retrieve(name);
+ }
+
+ //
+ // NOTE: collocation optimization needs to be turned on for the
+ // topic because the subscribe() method is given a fixed proxy
+ // which can't be marshalled.
+ //
+ _topic = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true));
+ _basePublisher = _topic->getPublisher()->ice_collocationOptimized(false);
+}
+
+ObserverTopic::~ObserverTopic()
+{
+}
+
+int
+ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return -1;
+ }
+
+ assert(obsv);
+ try
+ {
+ IceStorm::QoS qos;
+ qos["reliability"] = "ordered";
+ initObserver(_topic->subscribeAndGetPublisher(qos, obsv->ice_twoway()));
+ }
+ catch(const IceStorm::AlreadySubscribed&)
+ {
+ throw ObserverAlreadyRegisteredException(obsv->ice_getIdentity());
+ }
+
+ if(!name.empty())
+ {
+ assert(_syncSubscribers.find(name) == _syncSubscribers.end());
+ _syncSubscribers.insert(name);
+ addExpectedUpdate(_serial, name);
+ return _serial;
+ }
+ return -1;
+}
+
+void
+ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name)
+{
+ Lock sync(*this);
+ if(_topic)
+ {
+ _topic->unsubscribe(observer);
+ }
+
+ assert(observer);
+
+ if(!name.empty())
+ {
+ assert(_syncSubscribers.find(name) != _syncSubscribers.end());
+ _syncSubscribers.erase(name);
+
+ map<int, set<string> >::iterator p = _waitForUpdates.begin();
+ bool notifyMonitor = false;
+ while(p != _waitForUpdates.end())
+ {
+ p->second.erase(name);
+ if(p->second.empty())
+ {
+ _waitForUpdates.erase(p++);
+ notifyMonitor = true;
+ }
+ else
+ {
+ ++p;
+ }
+ }
+
+ if(notifyMonitor)
+ {
+ notifyAll();
+ }
+ }
+}
+
+void
+ObserverTopic::destroy()
+{
+ Lock sync(*this);
+ _topic = 0;
+ notifyAll();
+}
+
+void
+ObserverTopic::receivedUpdate(const string& name, int serial, const string& failure)
+{
+ Lock sync(*this);
+ map<int, set<string> >::iterator p = _waitForUpdates.find(serial);
+ if(p != _waitForUpdates.end())
+ {
+ p->second.erase(name);
+
+ if(!failure.empty())
+ {
+ map<int, map<string, string> >::iterator q = _updateFailures.find(serial);
+ if(q == _updateFailures.end())
+ {
+ q = _updateFailures.insert(make_pair(serial, map<string ,string>())).first;
+ }
+ q->second.insert(make_pair(name, failure));
+ }
+
+ if(p->second.empty())
+ {
+ _waitForUpdates.erase(p);
+ }
+
+ notifyAll();
+ }
+}
+
+void
+ObserverTopic::waitForSyncedSubscribers(int serial, const string& name)
+{
+ Lock sync(*this);
+ waitForSyncedSubscribersNoSync(serial, name);
+}
+
+void
+ObserverTopic::addExpectedUpdate(int serial, const string& name)
+{
+ if(_syncSubscribers.empty() && name.empty())
+ {
+ return;
+ }
+
+ // Must be called with the lock held.
+ if(name.empty())
+ {
+ assert(_waitForUpdates[serial].empty());
+ _waitForUpdates[serial] = _syncSubscribers;
+ }
+ else
+ {
+ _waitForUpdates[serial].insert(name);
+ }
+}
+
+void
+ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
+{
+ if(serial < 0)
+ {
+ return;
+ }
+
+ //
+ // Wait until all the updates are received or the service shutdown.
+ //
+ while(_topic)
+ {
+ map<int, set<string> >::const_iterator p = _waitForUpdates.find(serial);
+ if(p == _waitForUpdates.end())
+ {
+ map<int, map<string, string> >::iterator q = _updateFailures.find(serial);
+ if(q != _updateFailures.end())
+ {
+ map<string, string> failures = q->second;
+ _updateFailures.erase(q);
+ ostringstream os;
+ for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r)
+ {
+ os << "replication failed on replica `" << r->first << "':\n" << r->second << "\n";
+ }
+
+ Ice::Error err(_logger);
+ err << os.str();
+ }
+ return;
+ }
+ else
+ {
+ if(!name.empty() && p->second.find(name) == p->second.end())
+ {
+ return;
+ }
+ wait();
+ }
+ }
+}
+
+void
+ObserverTopic::updateSerial(int serial)
+{
+ assert(_serial + 1 == serial);
+ _serial = serial;
+}
+
+Ice::Context
+ObserverTopic::getContext(int serial) const
+{
+ ostringstream os;
+ os << serial;
+
+ Ice::Context context;
+ context["serial"] = os.str();
+ return context;
+}
+
+RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) :
+ ObserverTopic(topicManager, "RegistryObserver")
+{
+ const_cast<RegistryObserverPrx&>(_publisher) = RegistryObserverPrx::uncheckedCast(_basePublisher);
+}
+
+void
+RegistryObserverTopic::registryUp(const RegistryInfo& info)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return;
+ }
+ updateSerial(_serial + 1);
+ _registries.insert(make_pair(info.name, info));
+ try
+ {
+ _publisher->registryUp(info);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `registryUp' update:\n" << ex;
+ }
+}
+
+void
+RegistryObserverTopic::registryDown(const string& name)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return;
+ }
+
+ if(_registries.find(name) == _registries.end())
+ {
+ return;
+ }
+
+ updateSerial(_serial + 1);
+ _registries.erase(name);
+ try
+ {
+ _publisher->registryDown(name);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `registryDown' update:\n" << ex;
+ }
+}
+
+void
+RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+{
+ RegistryObserverPrx observer = RegistryObserverPrx::uncheckedCast(obsv);
+ RegistryInfoSeq registries;
+ registries.reserve(_registries.size());
+ for(map<string, RegistryInfo>::const_iterator p = _registries.begin(); p != _registries.end(); ++p)
+ {
+ registries.push_back(p->second);
+ }
+ observer->registryInit(registries, getContext(_serial));
+}
+
+NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
+ const Ice::ObjectAdapterPtr& adapter) :
+ ObserverTopic(topicManager, "NodeObserver")
+{
+ const_cast<NodeObserverPrx&>(_publisher) = NodeObserverPrx::uncheckedCast(_basePublisher);
+ try
+ {
+ const_cast<NodeObserverPrx&>(_externalPublisher) = NodeObserverPrx::uncheckedCast(adapter->addWithUUID(this));
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+}
+
+void
+NodeObserverTopic::nodeInit(const NodeDynamicInfoSeq&, const Ice::Current&)
+{
+ assert(false);
+}
+
+void
+NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current& current)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return;
+ }
+ updateSerial(_serial + 1);
+ _nodes.insert(make_pair(info.info.name, info));
+ try
+ {
+ _publisher->nodeUp(info);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing 'nodeUp' update:\n" << ex;
+ }
+}
+
+void
+NodeObserverTopic::nodeDown(const string& name, const Ice::Current&)
+{
+ assert(false);
+}
+
+void
+NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return;
+ }
+
+ if(_nodes.find(node) == _nodes.end())
+ {
+ //
+ // If the node isn't known anymore, we ignore the update.
+ //
+ return;
+ }
+
+ updateSerial(_serial + 1);
+
+ ServerDynamicInfoSeq& servers = _nodes[node].servers;
+ ServerDynamicInfoSeq::iterator p = servers.begin();
+ while(p != servers.end())
+ {
+ if(p->id == server.id)
+ {
+ if(server.state == Destroyed || (server.state == Inactive && server.enabled))
+ {
+ servers.erase(p);
+ }
+ else
+ {
+ *p = server;
+ }
+ break;
+ }
+ ++p;
+ }
+ if(server.state != Destroyed && (server.state != Inactive || !server.enabled) && p == servers.end())
+ {
+ servers.push_back(server);
+ }
+
+ try
+ {
+ _publisher->updateServer(node, server);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `updateServer' update:\n" << ex;
+ }
+}
+
+void
+NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& adapter, const Ice::Current&)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return;
+ }
+
+ if(_nodes.find(node) == _nodes.end())
+ {
+ //
+ // If the node isn't known anymore, we ignore the update.
+ //
+ return;
+ }
+
+ updateSerial(_serial + 1);
+
+ AdapterDynamicInfoSeq& adapters = _nodes[node].adapters;
+ AdapterDynamicInfoSeq::iterator p = adapters.begin();
+ while(p != adapters.end())
+ {
+ if(p->id == adapter.id)
+ {
+ if(adapter.proxy)
+ {
+ *p = adapter;
+ }
+ else
+ {
+ adapters.erase(p);
+ }
+ break;
+ }
+ ++p;
+ }
+ if(adapter.proxy && p == adapters.end())
+ {
+ adapters.push_back(adapter);
+ }
+
+ try
+ {
+ _publisher->updateAdapter(node, adapter);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `updateAdapter' update:\n" << ex;
+ }
+}
+
+void
+NodeObserverTopic::nodeDown(const string& name)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return;
+ }
+
+ updateSerial(_serial + 1);
+
+ if(_nodes.find(name) != _nodes.end())
+ {
+ _nodes.erase(name);
+ try
+ {
+ _publisher->nodeDown(name);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `nodeDown' update:\n" << ex;
+ }
+ }
+}
+
+void
+NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+{
+ NodeObserverPrx observer = NodeObserverPrx::uncheckedCast(obsv);
+ NodeDynamicInfoSeq nodes;
+ nodes.reserve(_nodes.size());
+ for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
+ {
+ nodes.push_back(p->second);
+ }
+ observer->nodeInit(nodes, getContext(_serial));
+}
+
+ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
+ const map<string, ApplicationInfo>& applications) :
+ ObserverTopic(topicManager, "ApplicationObserver"),
+ _applications(applications)
+{
+ const_cast<ApplicationObserverPrx&>(_publisher) = ApplicationObserverPrx::uncheckedCast(_basePublisher);
+}
+
+int
+ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& apps)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return -1;
+ }
+ updateSerial(serial);
+ _applications.clear();
+ for(ApplicationInfoSeq::const_iterator p = apps.begin(); p != apps.end(); ++p)
+ {
+ _applications.insert(make_pair(p->descriptor.name, *p));
+ }
+ try
+ {
+ _publisher->applicationInit(serial, apps, getContext(serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `applicationInit' update:\n" << ex;
+ }
+ addExpectedUpdate(serial);
+ return serial;
+}
+
+int
+ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& info)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return -1;
+ }
+
+ updateSerial(serial);
+ _applications.insert(make_pair(info.descriptor.name, info));
+ try
+ {
+ _publisher->applicationAdded(serial, info, getContext(serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `applicationAdded' update:\n" << ex;
+ }
+ addExpectedUpdate(serial);
+ return serial;
+}
+
+int
+ApplicationObserverTopic::applicationRemoved(int serial, const string& name)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return -1;
+ }
+ updateSerial(serial);
+ _applications.erase(name);
+ try
+ {
+ _publisher->applicationRemoved(serial, name, getContext(serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex;
+ }
+ addExpectedUpdate(serial);
+ return serial;
+}
+
+int
+ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInfo& info)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return -1;
+ }
+
+ updateSerial(serial);
+ try
+ {
+ map<string, ApplicationInfo>::iterator p = _applications.find(info.descriptor.name);
+ if(p != _applications.end())
+ {
+ ApplicationHelper helper(_publisher->ice_getCommunicator(), p->second.descriptor);
+ p->second.descriptor = helper.update(info.descriptor);
+ p->second.updateTime = info.updateTime;
+ p->second.updateUser = info.updateUser;
+ p->second.revision = info.revision;
+ }
+ }
+ catch(const DeploymentException& ex)
+ {
+ cerr << ex.reason << endl;
+ assert(false);
+ }
+ catch(const std::string& msg)
+ {
+ cerr << msg << endl;
+ assert(false);
+ }
+ catch(const char* msg)
+ {
+ cerr << msg << endl;
+ assert(false);
+ }
+ catch(...)
+ {
+ assert(false);
+ }
+ try
+ {
+ _publisher->applicationUpdated(serial, info, getContext(serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex;
+ }
+ addExpectedUpdate(serial);
+ return serial;
+}
+
+void
+ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+{
+ ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv);
+ ApplicationInfoSeq applications;
+ for(map<string, ApplicationInfo>::const_iterator p = _applications.begin(); p != _applications.end(); ++p)
+ {
+ applications.push_back(p->second);
+ }
+ observer->applicationInit(_serial, applications, getContext(_serial));
+}
+
+AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
+ const map<string, AdapterInfo>& adapters) :
+ ObserverTopic(topicManager, "AdapterObserver"),
+ _adapters(adapters)
+{
+ const_cast<AdapterObserverPrx&>(_publisher) = AdapterObserverPrx::uncheckedCast(_basePublisher);
+}
+
+int
+AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return -1;
+ }
+ updateSerial(_serial + 1);
+ _adapters.clear();
+ for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q)
+ {
+ _adapters.insert(make_pair(q->id, *q));
+ }
+ try
+ {
+ _publisher->adapterInit(adpts, getContext(_serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `adapterInit' update:\n" << ex;
+ }
+ addExpectedUpdate(_serial);
+ return _serial;
+}
+
+int
+AdapterObserverTopic::adapterAdded(const AdapterInfo& info)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return -1;
+ }
+ updateSerial(_serial + 1);
+ _adapters.insert(make_pair(info.id, info));
+ try
+ {
+ _publisher->adapterAdded(info, getContext(_serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `adapterAdded' update:\n" << ex;
+ }
+ addExpectedUpdate(_serial);
+ return _serial;
+}
+
+int
+AdapterObserverTopic::adapterUpdated(const AdapterInfo& info)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return -1;
+ }
+ updateSerial(_serial + 1);
+ _adapters[info.id] = info;
+ try
+ {
+ _publisher->adapterUpdated(info, getContext(_serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex;
+ }
+ addExpectedUpdate(_serial);
+ return _serial;
+}
+
+int
+AdapterObserverTopic::adapterRemoved(const string& id)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return -1;
+ }
+ updateSerial(_serial + 1);
+ _adapters.erase(id);
+ try
+ {
+ _publisher->adapterRemoved(id, getContext(_serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex;
+ }
+ addExpectedUpdate(_serial);
+ return _serial;
+}
+
+void
+AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+{
+ AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv);
+ AdapterInfoSeq adapters;
+ for(map<string, AdapterInfo>::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ {
+ adapters.push_back(p->second);
+ }
+ observer->adapterInit(adapters, getContext(_serial));
+}
+
+ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
+ const map<Ice::Identity, ObjectInfo>& objects) :
+ ObserverTopic(topicManager, "ObjectObserver"),
+ _objects(objects)
+{
+ const_cast<ObjectObserverPrx&>(_publisher) = ObjectObserverPrx::uncheckedCast(_basePublisher);
+}
+
+int
+ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return -1;
+ }
+ updateSerial(_serial + 1);
+ _objects.clear();
+ for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r)
+ {
+ _objects.insert(make_pair(r->proxy->ice_getIdentity(), *r));
+ }
+ try
+ {
+ _publisher->objectInit(objects, getContext(_serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `objectInit' update:\n" << ex;
+ }
+ addExpectedUpdate(_serial);
+ return _serial;
+}
+
+int
+ObjectObserverTopic::objectAdded(const ObjectInfo& info)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return -1;
+ }
+ updateSerial(_serial + 1);
+ _objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
+ try
+ {
+ _publisher->objectAdded(info, getContext(_serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
+ }
+ addExpectedUpdate(_serial);
+ return _serial;
+}
+
+int
+ObjectObserverTopic::objectUpdated(const ObjectInfo& info)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return -1;
+ }
+ updateSerial(_serial + 1);
+ _objects[info.proxy->ice_getIdentity()] = info;
+ try
+ {
+ _publisher->objectUpdated(info, getContext(_serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
+ }
+ addExpectedUpdate(_serial);
+ return _serial;
+}
+
+int
+ObjectObserverTopic::objectRemoved(const Ice::Identity& id)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return -1;
+ }
+ updateSerial(_serial + 1);
+ _objects.erase(id);
+ try
+ {
+ _publisher->objectRemoved(id, getContext(_serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `objectRemoved' update:\n" << ex;
+ }
+ addExpectedUpdate(_serial);
+ return _serial;
+}
+
+int
+ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return -1;
+ }
+ updateSerial(_serial + 1);
+
+ for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
+ {
+ map<Ice::Identity, ObjectInfo>::iterator q = _objects.find(p->proxy->ice_getIdentity());
+ if(q != _objects.end())
+ {
+ q->second = *p;
+ try
+ {
+ _publisher->objectUpdated(*p, getContext(_serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
+ }
+ }
+ else
+ {
+ _objects.insert(make_pair(p->proxy->ice_getIdentity(), *p));
+ try
+ {
+ _publisher->objectAdded(*p, getContext(_serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
+ }
+ }
+ }
+
+ //
+ // We don't wait for the update to be received by the replicas
+ // here. This operation is called by ReplicaSessionI.
+ //
+ addExpectedUpdate(_serial);
+ //waitForSyncedSubscribersNoSync(_serial);
+ return _serial;
+}
+
+int
+ObjectObserverTopic::objectsRemoved(const ObjectInfoSeq& infos)
+{
+ Lock sync(*this);
+ if(!_topic)
+ {
+ return -1;
+ }
+ updateSerial(_serial + 1);
+
+ for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
+ {
+ _objects.erase(p->proxy->ice_getIdentity());
+ try
+ {
+ _publisher->objectRemoved(p->proxy->ice_getIdentity(), getContext(_serial));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
+ }
+ }
+
+ //
+ // We don't need to wait for the update to be received by the
+ // replicas here. This operation is only called internaly by
+ // IceGrid.
+ //
+ addExpectedUpdate(_serial);
+ //waitForSyncedSubscribersNoSync(_serial);
+ return _serial;
+}
+
+void
+ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+{
+ ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv);
+ ObjectInfoSeq objects;
+ for(map<Ice::Identity, ObjectInfo>::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
+ {
+ objects.push_back(p->second);
+ }
+ observer->objectInit(objects, getContext(_serial));
+}