diff options
author | Benoit Foucher <benoit@zeroc.com> | 2013-01-17 15:48:26 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2013-01-17 15:48:26 +0100 |
commit | cade94b03f44c09a65542d58746e111a997477c1 (patch) | |
tree | 47c62bd60c8fa62587c271d5e16de711bf973243 /cpp/src/IceGrid/Topics.cpp | |
parent | Added OS X Frameworks (diff) | |
download | ice-cade94b03f44c09a65542d58746e111a997477c1.tar.bz2 ice-cade94b03f44c09a65542d58746e111a997477c1.tar.xz ice-cade94b03f44c09a65542d58746e111a997477c1.zip |
Fixed ICE-4968 - Support for 1.0 observers with IceGrid
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 230 |
1 files changed, 162 insertions, 68 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 62c582858ac..d2b976c7452 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -14,28 +14,48 @@ using namespace std; using namespace IceGrid; +namespace +{ + +// +// Encodings supported by the observers. We create one topic per +// encoding version and subscribe the observer to the appropriate +// topic depending on its encoding. +// +Ice::EncodingVersion encodings[] = { + { 1, 0 }, + { 1, 1 } +}; + +} ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const string& name) : _logger(topicManager->ice_getCommunicator()->getLogger()), _serial(0) { - IceStorm::TopicPrx t; - try - { - t = topicManager->create(name); - } - catch(const IceStorm::TopicExists&) + for(int i = 0; i < sizeof(encodings) / sizeof(Ice::EncodingVersion); ++i) { - t = topicManager->retrieve(name); - } + ostringstream os; + os << name << "-" << Ice::encodingVersionToString(encodings[i]); + IceStorm::TopicPrx t; + try + { + t = topicManager->create(os.str()); + } + catch(const IceStorm::TopicExists&) + { + t = topicManager->retrieve(os.str()); + } - // - // NOTE: collocation optimization needs to be turned on for the - // topic because the subscribe() method is given a fixed proxy - // which can't be marshalled. - // - _topic = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true)); - _basePublisher = _topic->getPublisher()->ice_collocationOptimized(false); + // + // NOTE: collocation optimization needs to be turned on for the + // topic because the subscribe() method is given a fixed proxy + // which can't be marshalled. + // + _topics[encodings[i]] = t->ice_collocationOptimized(true); + _basePublishers.push_back( + t->getPublisher()->ice_collocationOptimized(false)->ice_encodingVersion(encodings[i])); + } } ObserverTopic::~ObserverTopic() @@ -46,7 +66,7 @@ int ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return -1; } @@ -56,7 +76,15 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name) { IceStorm::QoS qos; qos["reliability"] = "ordered"; - initObserver(_topic->subscribeAndGetPublisher(qos, obsv->ice_twoway())); + Ice::EncodingVersion v = IceInternal::getCompatibleEncoding(obsv->ice_getEncodingVersion()); + map<Ice::EncodingVersion, IceStorm::TopicPrx>::const_iterator p = _topics.find(v); + if(p == _topics.end()) + { + Ice::Warning out(_logger); + out << "unsupported encoding version for observer `" << obsv << "'"; + return -1; + } + initObserver(p->second->subscribeAndGetPublisher(qos, obsv->ice_twoway())); } catch(const IceStorm::AlreadySubscribed&) { @@ -77,10 +105,13 @@ void ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name) { Lock sync(*this); - if(_topic) + Ice::EncodingVersion v = IceInternal::getCompatibleEncoding(observer->ice_getEncodingVersion()); + map<Ice::EncodingVersion, IceStorm::TopicPrx>::const_iterator p = _topics.find(v); + if(p == _topics.end()) { - _topic->unsubscribe(observer); + return; } + p->second->unsubscribe(observer); assert(observer); @@ -116,7 +147,7 @@ void ObserverTopic::destroy() { Lock sync(*this); - _topic = 0; + _topics.clear(); notifyAll(); } @@ -186,7 +217,7 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name) // // Wait until all the updates are received or the service shutdown. // - while(_topic) + while(!_topics.empty()) { map<int, set<string> >::const_iterator p = _waitForUpdates.find(serial); if(p == _waitForUpdates.end()) @@ -239,14 +270,14 @@ ObserverTopic::getContext(int serial) const RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : ObserverTopic(topicManager, "RegistryObserver") { - const_cast<RegistryObserverPrx&>(_publisher) = RegistryObserverPrx::uncheckedCast(_basePublisher); + _publishers = getPublishers<RegistryObserverPrx>(); } void RegistryObserverTopic::registryUp(const RegistryInfo& info) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return; } @@ -254,7 +285,10 @@ RegistryObserverTopic::registryUp(const RegistryInfo& info) _registries.insert(make_pair(info.name, info)); try { - _publisher->registryUp(info); + for(vector<RegistryObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->registryUp(info); + } } catch(const Ice::LocalException& ex) { @@ -267,7 +301,7 @@ void RegistryObserverTopic::registryDown(const string& name) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return; } @@ -281,7 +315,10 @@ RegistryObserverTopic::registryDown(const string& name) _registries.erase(name); try { - _publisher->registryDown(name); + for(vector<RegistryObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->registryDown(name); + } } catch(const Ice::LocalException& ex) { @@ -307,7 +344,7 @@ NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManag const Ice::ObjectAdapterPtr& adapter) : ObserverTopic(topicManager, "NodeObserver") { - const_cast<NodeObserverPrx&>(_publisher) = NodeObserverPrx::uncheckedCast(_basePublisher); + _publishers = getPublishers<NodeObserverPrx>(); try { const_cast<NodeObserverPrx&>(_externalPublisher) = NodeObserverPrx::uncheckedCast(adapter->addWithUUID(this)); @@ -327,7 +364,7 @@ void NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return; } @@ -335,7 +372,10 @@ NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&) _nodes.insert(make_pair(info.info.name, info)); try { - _publisher->nodeUp(info); + for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->nodeUp(info); + } } catch(const Ice::LocalException& ex) { @@ -354,7 +394,7 @@ void NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return; } @@ -394,7 +434,10 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser try { - _publisher->updateServer(node, server); + for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->updateServer(node, server); + } } catch(const Ice::LocalException& ex) { @@ -407,7 +450,7 @@ void NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& adapter, const Ice::Current&) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return; } @@ -447,7 +490,10 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a try { - _publisher->updateAdapter(node, adapter); + for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->updateAdapter(node, adapter); + } } catch(const Ice::LocalException& ex) { @@ -460,7 +506,7 @@ void NodeObserverTopic::nodeDown(const string& name) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return; } @@ -472,7 +518,10 @@ NodeObserverTopic::nodeDown(const string& name) _nodes.erase(name); try { - _publisher->nodeDown(name); + for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->nodeDown(name); + } } catch(const Ice::LocalException& ex) { @@ -500,14 +549,14 @@ ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerP ObserverTopic(topicManager, "ApplicationObserver"), _applications(applications) { - const_cast<ApplicationObserverPrx&>(_publisher) = ApplicationObserverPrx::uncheckedCast(_basePublisher); + _publishers = getPublishers<ApplicationObserverPrx>(); } int ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& apps) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return -1; } @@ -519,7 +568,10 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& } try { - _publisher->applicationInit(serial, apps, getContext(serial)); + for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->applicationInit(serial, apps, getContext(serial)); + } } catch(const Ice::LocalException& ex) { @@ -534,7 +586,7 @@ int ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& info) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return -1; } @@ -543,7 +595,10 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in _applications.insert(make_pair(info.descriptor.name, info)); try { - _publisher->applicationAdded(serial, info, getContext(serial)); + for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->applicationAdded(serial, info, getContext(serial)); + } } catch(const Ice::LocalException& ex) { @@ -558,7 +613,7 @@ int ApplicationObserverTopic::applicationRemoved(int serial, const string& name) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return -1; } @@ -566,7 +621,10 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name) _applications.erase(name); try { - _publisher->applicationRemoved(serial, name, getContext(serial)); + for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->applicationRemoved(serial, name, getContext(serial)); + } } catch(const Ice::LocalException& ex) { @@ -581,7 +639,7 @@ int ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInfo& info) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return -1; } @@ -592,7 +650,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate map<string, ApplicationInfo>::iterator p = _applications.find(info.descriptor.name); if(p != _applications.end()) { - ApplicationHelper helper(_publisher->ice_getCommunicator(), p->second.descriptor); + ApplicationHelper helper(_publishers[0]->ice_getCommunicator(), p->second.descriptor); p->second.descriptor = helper.update(info.descriptor); p->second.updateTime = info.updateTime; p->second.updateUser = info.updateUser; @@ -625,7 +683,10 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate } try { - _publisher->applicationUpdated(serial, info, getContext(serial)); + for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->applicationUpdated(serial, info, getContext(serial)); + } } catch(const Ice::LocalException& ex) { @@ -653,14 +714,14 @@ AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topi ObserverTopic(topicManager, "AdapterObserver"), _adapters(adapters) { - const_cast<AdapterObserverPrx&>(_publisher) = AdapterObserverPrx::uncheckedCast(_basePublisher); + _publishers = getPublishers<AdapterObserverPrx>(); } int AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return -1; } @@ -672,7 +733,10 @@ AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts) } try { - _publisher->adapterInit(adpts, getContext(_serial)); + for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->adapterInit(adpts, getContext(_serial)); + } } catch(const Ice::LocalException& ex) { @@ -687,7 +751,7 @@ int AdapterObserverTopic::adapterAdded(const AdapterInfo& info) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return -1; } @@ -695,7 +759,10 @@ AdapterObserverTopic::adapterAdded(const AdapterInfo& info) _adapters.insert(make_pair(info.id, info)); try { - _publisher->adapterAdded(info, getContext(_serial)); + for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->adapterAdded(info, getContext(_serial)); + } } catch(const Ice::LocalException& ex) { @@ -710,7 +777,7 @@ int AdapterObserverTopic::adapterUpdated(const AdapterInfo& info) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return -1; } @@ -718,7 +785,10 @@ AdapterObserverTopic::adapterUpdated(const AdapterInfo& info) _adapters[info.id] = info; try { - _publisher->adapterUpdated(info, getContext(_serial)); + for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->adapterUpdated(info, getContext(_serial)); + } } catch(const Ice::LocalException& ex) { @@ -733,7 +803,7 @@ int AdapterObserverTopic::adapterRemoved(const string& id) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return -1; } @@ -741,7 +811,10 @@ AdapterObserverTopic::adapterRemoved(const string& id) _adapters.erase(id); try { - _publisher->adapterRemoved(id, getContext(_serial)); + for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->adapterRemoved(id, getContext(_serial)); + } } catch(const Ice::LocalException& ex) { @@ -769,14 +842,14 @@ ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicM ObserverTopic(topicManager, "ObjectObserver"), _objects(objects) { - const_cast<ObjectObserverPrx&>(_publisher) = ObjectObserverPrx::uncheckedCast(_basePublisher); + _publishers = getPublishers<ObjectObserverPrx>(); } int ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return -1; } @@ -788,7 +861,10 @@ ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects) } try { - _publisher->objectInit(objects, getContext(_serial)); + for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->objectInit(objects, getContext(_serial)); + } } catch(const Ice::LocalException& ex) { @@ -803,7 +879,7 @@ int ObjectObserverTopic::objectAdded(const ObjectInfo& info) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return -1; } @@ -811,7 +887,10 @@ ObjectObserverTopic::objectAdded(const ObjectInfo& info) _objects.insert(make_pair(info.proxy->ice_getIdentity(), info)); try { - _publisher->objectAdded(info, getContext(_serial)); + for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->objectAdded(info, getContext(_serial)); + } } catch(const Ice::LocalException& ex) { @@ -826,7 +905,7 @@ int ObjectObserverTopic::objectUpdated(const ObjectInfo& info) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return -1; } @@ -834,7 +913,10 @@ ObjectObserverTopic::objectUpdated(const ObjectInfo& info) _objects[info.proxy->ice_getIdentity()] = info; try { - _publisher->objectUpdated(info, getContext(_serial)); + for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->objectUpdated(info, getContext(_serial)); + } } catch(const Ice::LocalException& ex) { @@ -849,7 +931,7 @@ int ObjectObserverTopic::objectRemoved(const Ice::Identity& id) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return -1; } @@ -857,7 +939,10 @@ ObjectObserverTopic::objectRemoved(const Ice::Identity& id) _objects.erase(id); try { - _publisher->objectRemoved(id, getContext(_serial)); + for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + { + (*p)->objectRemoved(id, getContext(_serial)); + } } catch(const Ice::LocalException& ex) { @@ -872,7 +957,7 @@ int ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return -1; } @@ -886,7 +971,10 @@ ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos) q->second = *p; try { - _publisher->objectUpdated(*p, getContext(_serial)); + for(vector<ObjectObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q) + { + (*q)->objectUpdated(*p, getContext(_serial)); + } } catch(const Ice::LocalException& ex) { @@ -899,7 +987,10 @@ ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos) _objects.insert(make_pair(p->proxy->ice_getIdentity(), *p)); try { - _publisher->objectAdded(*p, getContext(_serial)); + for(vector<ObjectObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q) + { + (*q)->objectAdded(*p, getContext(_serial)); + } } catch(const Ice::LocalException& ex) { @@ -922,7 +1013,7 @@ int ObjectObserverTopic::objectsRemoved(const ObjectInfoSeq& infos) { Lock sync(*this); - if(!_topic) + if(_topics.empty()) { return -1; } @@ -933,7 +1024,10 @@ ObjectObserverTopic::objectsRemoved(const ObjectInfoSeq& infos) _objects.erase(p->proxy->ice_getIdentity()); try { - _publisher->objectRemoved(p->proxy->ice_getIdentity(), getContext(_serial)); + for(vector<ObjectObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q) + { + (*q)->objectRemoved(p->proxy->ice_getIdentity(), getContext(_serial)); + } } catch(const Ice::LocalException& ex) { |