summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Topics.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2013-01-17 15:48:26 +0100
committerBenoit Foucher <benoit@zeroc.com>2013-01-17 15:48:26 +0100
commitcade94b03f44c09a65542d58746e111a997477c1 (patch)
tree47c62bd60c8fa62587c271d5e16de711bf973243 /cpp/src/IceGrid/Topics.cpp
parentAdded OS X Frameworks (diff)
downloadice-cade94b03f44c09a65542d58746e111a997477c1.tar.bz2
ice-cade94b03f44c09a65542d58746e111a997477c1.tar.xz
ice-cade94b03f44c09a65542d58746e111a997477c1.zip
Fixed ICE-4968 - Support for 1.0 observers with IceGrid
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r--cpp/src/IceGrid/Topics.cpp230
1 files changed, 162 insertions, 68 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 62c582858ac..d2b976c7452 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -14,28 +14,48 @@
using namespace std;
using namespace IceGrid;
+namespace
+{
+
+//
+// Encodings supported by the observers. We create one topic per
+// encoding version and subscribe the observer to the appropriate
+// topic depending on its encoding.
+//
+Ice::EncodingVersion encodings[] = {
+ { 1, 0 },
+ { 1, 1 }
+};
+
+}
ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const string& name) :
_logger(topicManager->ice_getCommunicator()->getLogger()),
_serial(0)
{
- IceStorm::TopicPrx t;
- try
- {
- t = topicManager->create(name);
- }
- catch(const IceStorm::TopicExists&)
+ for(int i = 0; i < sizeof(encodings) / sizeof(Ice::EncodingVersion); ++i)
{
- t = topicManager->retrieve(name);
- }
+ ostringstream os;
+ os << name << "-" << Ice::encodingVersionToString(encodings[i]);
+ IceStorm::TopicPrx t;
+ try
+ {
+ t = topicManager->create(os.str());
+ }
+ catch(const IceStorm::TopicExists&)
+ {
+ t = topicManager->retrieve(os.str());
+ }
- //
- // NOTE: collocation optimization needs to be turned on for the
- // topic because the subscribe() method is given a fixed proxy
- // which can't be marshalled.
- //
- _topic = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true));
- _basePublisher = _topic->getPublisher()->ice_collocationOptimized(false);
+ //
+ // NOTE: collocation optimization needs to be turned on for the
+ // topic because the subscribe() method is given a fixed proxy
+ // which can't be marshalled.
+ //
+ _topics[encodings[i]] = t->ice_collocationOptimized(true);
+ _basePublishers.push_back(
+ t->getPublisher()->ice_collocationOptimized(false)->ice_encodingVersion(encodings[i]));
+ }
}
ObserverTopic::~ObserverTopic()
@@ -46,7 +66,7 @@ int
ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -56,7 +76,15 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name)
{
IceStorm::QoS qos;
qos["reliability"] = "ordered";
- initObserver(_topic->subscribeAndGetPublisher(qos, obsv->ice_twoway()));
+ Ice::EncodingVersion v = IceInternal::getCompatibleEncoding(obsv->ice_getEncodingVersion());
+ map<Ice::EncodingVersion, IceStorm::TopicPrx>::const_iterator p = _topics.find(v);
+ if(p == _topics.end())
+ {
+ Ice::Warning out(_logger);
+ out << "unsupported encoding version for observer `" << obsv << "'";
+ return -1;
+ }
+ initObserver(p->second->subscribeAndGetPublisher(qos, obsv->ice_twoway()));
}
catch(const IceStorm::AlreadySubscribed&)
{
@@ -77,10 +105,13 @@ void
ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name)
{
Lock sync(*this);
- if(_topic)
+ Ice::EncodingVersion v = IceInternal::getCompatibleEncoding(observer->ice_getEncodingVersion());
+ map<Ice::EncodingVersion, IceStorm::TopicPrx>::const_iterator p = _topics.find(v);
+ if(p == _topics.end())
{
- _topic->unsubscribe(observer);
+ return;
}
+ p->second->unsubscribe(observer);
assert(observer);
@@ -116,7 +147,7 @@ void
ObserverTopic::destroy()
{
Lock sync(*this);
- _topic = 0;
+ _topics.clear();
notifyAll();
}
@@ -186,7 +217,7 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
//
// Wait until all the updates are received or the service shutdown.
//
- while(_topic)
+ while(!_topics.empty())
{
map<int, set<string> >::const_iterator p = _waitForUpdates.find(serial);
if(p == _waitForUpdates.end())
@@ -239,14 +270,14 @@ ObserverTopic::getContext(int serial) const
RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) :
ObserverTopic(topicManager, "RegistryObserver")
{
- const_cast<RegistryObserverPrx&>(_publisher) = RegistryObserverPrx::uncheckedCast(_basePublisher);
+ _publishers = getPublishers<RegistryObserverPrx>();
}
void
RegistryObserverTopic::registryUp(const RegistryInfo& info)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return;
}
@@ -254,7 +285,10 @@ RegistryObserverTopic::registryUp(const RegistryInfo& info)
_registries.insert(make_pair(info.name, info));
try
{
- _publisher->registryUp(info);
+ for(vector<RegistryObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->registryUp(info);
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -267,7 +301,7 @@ void
RegistryObserverTopic::registryDown(const string& name)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return;
}
@@ -281,7 +315,10 @@ RegistryObserverTopic::registryDown(const string& name)
_registries.erase(name);
try
{
- _publisher->registryDown(name);
+ for(vector<RegistryObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->registryDown(name);
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -307,7 +344,7 @@ NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManag
const Ice::ObjectAdapterPtr& adapter) :
ObserverTopic(topicManager, "NodeObserver")
{
- const_cast<NodeObserverPrx&>(_publisher) = NodeObserverPrx::uncheckedCast(_basePublisher);
+ _publishers = getPublishers<NodeObserverPrx>();
try
{
const_cast<NodeObserverPrx&>(_externalPublisher) = NodeObserverPrx::uncheckedCast(adapter->addWithUUID(this));
@@ -327,7 +364,7 @@ void
NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return;
}
@@ -335,7 +372,10 @@ NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&)
_nodes.insert(make_pair(info.info.name, info));
try
{
- _publisher->nodeUp(info);
+ for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->nodeUp(info);
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -354,7 +394,7 @@ void
NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return;
}
@@ -394,7 +434,10 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
try
{
- _publisher->updateServer(node, server);
+ for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->updateServer(node, server);
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -407,7 +450,7 @@ void
NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& adapter, const Ice::Current&)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return;
}
@@ -447,7 +490,10 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a
try
{
- _publisher->updateAdapter(node, adapter);
+ for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->updateAdapter(node, adapter);
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -460,7 +506,7 @@ void
NodeObserverTopic::nodeDown(const string& name)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return;
}
@@ -472,7 +518,10 @@ NodeObserverTopic::nodeDown(const string& name)
_nodes.erase(name);
try
{
- _publisher->nodeDown(name);
+ for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->nodeDown(name);
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -500,14 +549,14 @@ ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerP
ObserverTopic(topicManager, "ApplicationObserver"),
_applications(applications)
{
- const_cast<ApplicationObserverPrx&>(_publisher) = ApplicationObserverPrx::uncheckedCast(_basePublisher);
+ _publishers = getPublishers<ApplicationObserverPrx>();
}
int
ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& apps)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -519,7 +568,10 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq&
}
try
{
- _publisher->applicationInit(serial, apps, getContext(serial));
+ for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->applicationInit(serial, apps, getContext(serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -534,7 +586,7 @@ int
ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& info)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -543,7 +595,10 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in
_applications.insert(make_pair(info.descriptor.name, info));
try
{
- _publisher->applicationAdded(serial, info, getContext(serial));
+ for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->applicationAdded(serial, info, getContext(serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -558,7 +613,7 @@ int
ApplicationObserverTopic::applicationRemoved(int serial, const string& name)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -566,7 +621,10 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name)
_applications.erase(name);
try
{
- _publisher->applicationRemoved(serial, name, getContext(serial));
+ for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->applicationRemoved(serial, name, getContext(serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -581,7 +639,7 @@ int
ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInfo& info)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -592,7 +650,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate
map<string, ApplicationInfo>::iterator p = _applications.find(info.descriptor.name);
if(p != _applications.end())
{
- ApplicationHelper helper(_publisher->ice_getCommunicator(), p->second.descriptor);
+ ApplicationHelper helper(_publishers[0]->ice_getCommunicator(), p->second.descriptor);
p->second.descriptor = helper.update(info.descriptor);
p->second.updateTime = info.updateTime;
p->second.updateUser = info.updateUser;
@@ -625,7 +683,10 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate
}
try
{
- _publisher->applicationUpdated(serial, info, getContext(serial));
+ for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->applicationUpdated(serial, info, getContext(serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -653,14 +714,14 @@ AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topi
ObserverTopic(topicManager, "AdapterObserver"),
_adapters(adapters)
{
- const_cast<AdapterObserverPrx&>(_publisher) = AdapterObserverPrx::uncheckedCast(_basePublisher);
+ _publishers = getPublishers<AdapterObserverPrx>();
}
int
AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -672,7 +733,10 @@ AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts)
}
try
{
- _publisher->adapterInit(adpts, getContext(_serial));
+ for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->adapterInit(adpts, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -687,7 +751,7 @@ int
AdapterObserverTopic::adapterAdded(const AdapterInfo& info)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -695,7 +759,10 @@ AdapterObserverTopic::adapterAdded(const AdapterInfo& info)
_adapters.insert(make_pair(info.id, info));
try
{
- _publisher->adapterAdded(info, getContext(_serial));
+ for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->adapterAdded(info, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -710,7 +777,7 @@ int
AdapterObserverTopic::adapterUpdated(const AdapterInfo& info)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -718,7 +785,10 @@ AdapterObserverTopic::adapterUpdated(const AdapterInfo& info)
_adapters[info.id] = info;
try
{
- _publisher->adapterUpdated(info, getContext(_serial));
+ for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->adapterUpdated(info, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -733,7 +803,7 @@ int
AdapterObserverTopic::adapterRemoved(const string& id)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -741,7 +811,10 @@ AdapterObserverTopic::adapterRemoved(const string& id)
_adapters.erase(id);
try
{
- _publisher->adapterRemoved(id, getContext(_serial));
+ for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->adapterRemoved(id, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -769,14 +842,14 @@ ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicM
ObserverTopic(topicManager, "ObjectObserver"),
_objects(objects)
{
- const_cast<ObjectObserverPrx&>(_publisher) = ObjectObserverPrx::uncheckedCast(_basePublisher);
+ _publishers = getPublishers<ObjectObserverPrx>();
}
int
ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -788,7 +861,10 @@ ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects)
}
try
{
- _publisher->objectInit(objects, getContext(_serial));
+ for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->objectInit(objects, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -803,7 +879,7 @@ int
ObjectObserverTopic::objectAdded(const ObjectInfo& info)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -811,7 +887,10 @@ ObjectObserverTopic::objectAdded(const ObjectInfo& info)
_objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
try
{
- _publisher->objectAdded(info, getContext(_serial));
+ for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->objectAdded(info, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -826,7 +905,7 @@ int
ObjectObserverTopic::objectUpdated(const ObjectInfo& info)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -834,7 +913,10 @@ ObjectObserverTopic::objectUpdated(const ObjectInfo& info)
_objects[info.proxy->ice_getIdentity()] = info;
try
{
- _publisher->objectUpdated(info, getContext(_serial));
+ for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->objectUpdated(info, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -849,7 +931,7 @@ int
ObjectObserverTopic::objectRemoved(const Ice::Identity& id)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -857,7 +939,10 @@ ObjectObserverTopic::objectRemoved(const Ice::Identity& id)
_objects.erase(id);
try
{
- _publisher->objectRemoved(id, getContext(_serial));
+ for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->objectRemoved(id, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -872,7 +957,7 @@ int
ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -886,7 +971,10 @@ ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos)
q->second = *p;
try
{
- _publisher->objectUpdated(*p, getContext(_serial));
+ for(vector<ObjectObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q)
+ {
+ (*q)->objectUpdated(*p, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -899,7 +987,10 @@ ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos)
_objects.insert(make_pair(p->proxy->ice_getIdentity(), *p));
try
{
- _publisher->objectAdded(*p, getContext(_serial));
+ for(vector<ObjectObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q)
+ {
+ (*q)->objectAdded(*p, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -922,7 +1013,7 @@ int
ObjectObserverTopic::objectsRemoved(const ObjectInfoSeq& infos)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -933,7 +1024,10 @@ ObjectObserverTopic::objectsRemoved(const ObjectInfoSeq& infos)
_objects.erase(p->proxy->ice_getIdentity());
try
{
- _publisher->objectRemoved(p->proxy->ice_getIdentity(), getContext(_serial));
+ for(vector<ObjectObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q)
+ {
+ (*q)->objectRemoved(p->proxy->ice_getIdentity(), getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{