diff options
author | Benoit Foucher <benoit@zeroc.com> | 2013-09-03 15:42:19 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2013-09-03 15:42:19 +0200 |
commit | 91f6ebb998532b36fc70187b641a5b7404060422 (patch) | |
tree | ac88e961c68e4b09eb819f4b57b9ecac56854567 /cpp/src/IceGrid/Topics.cpp | |
parent | ICE-5378 - Remove slice35d.dll from Windows installer (diff) | |
download | ice-91f6ebb998532b36fc70187b641a5b7404060422.tar.bz2 ice-91f6ebb998532b36fc70187b641a5b7404060422.tar.xz ice-91f6ebb998532b36fc70187b641a5b7404060422.zip |
Fixed ICE-5358 - allow IceGrid replica to initialize its database from another replica
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 170 |
1 files changed, 93 insertions, 77 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 31ac3815a89..24fa297b4de 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -29,9 +29,8 @@ Ice::EncodingVersion encodings[] = { } -ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const string& name) : - _logger(topicManager->ice_getCommunicator()->getLogger()), - _serial(0) +ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const string& name, Ice::Long dbSerial) : + _logger(topicManager->ice_getCommunicator()->getLogger()), _serial(0), _dbSerial(dbSerial) { for(int i = 0; i < static_cast<int>(sizeof(encodings) / sizeof(Ice::EncodingVersion)); ++i) { @@ -186,6 +185,13 @@ ObserverTopic::waitForSyncedSubscribers(int serial, const string& name) waitForSyncedSubscribersNoSync(serial, name); } +int +ObserverTopic::getSerial() const +{ + Lock sync(*this); + return _serial; +} + void ObserverTopic::addExpectedUpdate(int serial, const string& name) { @@ -250,20 +256,30 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name) } void -ObserverTopic::updateSerial(int serial) +ObserverTopic::updateSerial(Ice::Long dbSerial) { - assert(_serial + 1 == serial); - _serial = serial; + ++_serial; + if(dbSerial > 0) + { + _dbSerial = dbSerial; + } } Ice::Context -ObserverTopic::getContext(int serial) const +ObserverTopic::getContext(int serial, Ice::Long dbSerial) const { - ostringstream os; - os << serial; - Ice::Context context; - context["serial"] = os.str(); + { + ostringstream os; + os << serial; + context["serial"] = os.str(); + } + if(dbSerial > 0) + { + ostringstream os; + os << dbSerial; + context["dbSerial"] = os.str(); + } return context; } @@ -281,7 +297,7 @@ RegistryObserverTopic::registryUp(const RegistryInfo& info) { return; } - updateSerial(_serial + 1); + updateSerial(); _registries.insert(make_pair(info.name, info)); try { @@ -311,7 +327,7 @@ RegistryObserverTopic::registryDown(const string& name) return; } - updateSerial(_serial + 1); + updateSerial(); _registries.erase(name); try { @@ -368,7 +384,7 @@ NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&) { return; } - updateSerial(_serial + 1); + updateSerial(); _nodes.insert(make_pair(info.info.name, info)); try { @@ -407,7 +423,7 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser return; } - updateSerial(_serial + 1); + updateSerial(); ServerDynamicInfoSeq& servers = _nodes[node].servers; ServerDynamicInfoSeq::iterator p = servers.begin(); @@ -463,7 +479,7 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a return; } - updateSerial(_serial + 1); + updateSerial(); AdapterDynamicInfoSeq& adapters = _nodes[node].adapters; AdapterDynamicInfoSeq::iterator p = adapters.begin(); @@ -511,7 +527,7 @@ NodeObserverTopic::nodeDown(const string& name) return; } - updateSerial(_serial + 1); + updateSerial(); if(_nodes.find(name) != _nodes.end()) { @@ -545,22 +561,22 @@ NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv) } ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager, - const map<string, ApplicationInfo>& applications) : - ObserverTopic(topicManager, "ApplicationObserver"), - _applications(applications) + const ApplicationsWrapperPtr& wrapper) : + ObserverTopic(topicManager, "ApplicationObserver", wrapper->getSerial()), + _applications(wrapper->getMap()) { _publishers = getPublishers<ApplicationObserverPrx>(); } int -ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& apps) +ApplicationObserverTopic::applicationInit(Ice::Long dbSerial, const ApplicationInfoSeq& apps) { Lock sync(*this); if(_topics.empty()) { return -1; } - updateSerial(serial); + updateSerial(dbSerial); _applications.clear(); for(ApplicationInfoSeq::const_iterator p = apps.begin(); p != apps.end(); ++p) { @@ -570,7 +586,7 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& { for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) { - (*p)->applicationInit(serial, apps, getContext(serial)); + (*p)->applicationInit(_serial, apps, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -578,12 +594,12 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& Ice::Warning out(_logger); out << "unexpected exception while publishing `applicationInit' update:\n" << ex; } - addExpectedUpdate(serial); - return serial; + addExpectedUpdate(_serial); + return _serial; } int -ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& info) +ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const ApplicationInfo& info) { Lock sync(*this); if(_topics.empty()) @@ -591,13 +607,13 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in return -1; } - updateSerial(serial); + updateSerial(dbSerial); _applications.insert(make_pair(info.descriptor.name, info)); try { for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) { - (*p)->applicationAdded(serial, info, getContext(serial)); + (*p)->applicationAdded(_serial, info, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -605,25 +621,25 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in Ice::Warning out(_logger); out << "unexpected exception while publishing `applicationAdded' update:\n" << ex; } - addExpectedUpdate(serial); - return serial; + addExpectedUpdate(_serial); + return _serial; } int -ApplicationObserverTopic::applicationRemoved(int serial, const string& name) +ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& name) { Lock sync(*this); if(_topics.empty()) { return -1; } - updateSerial(serial); + updateSerial(dbSerial); _applications.erase(name); try { for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) { - (*p)->applicationRemoved(serial, name, getContext(serial)); + (*p)->applicationRemoved(_serial, name, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -631,12 +647,12 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name) Ice::Warning out(_logger); out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex; } - addExpectedUpdate(serial); - return serial; + addExpectedUpdate(_serial); + return _serial; } int -ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInfo& info) +ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const ApplicationUpdateInfo& info) { Lock sync(*this); if(_topics.empty()) @@ -644,7 +660,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate return -1; } - updateSerial(serial); + updateSerial(dbSerial); try { map<string, ApplicationInfo>::iterator p = _applications.find(info.descriptor.name); @@ -685,7 +701,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate { for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) { - (*p)->applicationUpdated(serial, info, getContext(serial)); + (*p)->applicationUpdated(_serial, info, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -693,8 +709,8 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate Ice::Warning out(_logger); out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex; } - addExpectedUpdate(serial); - return serial; + addExpectedUpdate(_serial); + return _serial; } void @@ -706,26 +722,26 @@ ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { applications.push_back(p->second); } - observer->applicationInit(_serial, applications, getContext(_serial)); + observer->applicationInit(_serial, applications, getContext(_serial, _dbSerial)); } AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topicManager, - const map<string, AdapterInfo>& adapters) : - ObserverTopic(topicManager, "AdapterObserver"), - _adapters(adapters) + const AdaptersWrapperPtr& wrapper) : + ObserverTopic(topicManager, "AdapterObserver", wrapper->getSerial()), + _adapters(wrapper->getMap()) { _publishers = getPublishers<AdapterObserverPrx>(); } int -AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts) +AdapterObserverTopic::adapterInit(Ice::Long dbSerial, const AdapterInfoSeq& adpts) { Lock sync(*this); if(_topics.empty()) { return -1; } - updateSerial(_serial + 1); + updateSerial(dbSerial); _adapters.clear(); for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q) { @@ -735,7 +751,7 @@ AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts) { for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) { - (*p)->adapterInit(adpts, getContext(_serial)); + (*p)->adapterInit(adpts, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -748,20 +764,20 @@ AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts) } int -AdapterObserverTopic::adapterAdded(const AdapterInfo& info) +AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info) { Lock sync(*this); if(_topics.empty()) { return -1; } - updateSerial(_serial + 1); + updateSerial(dbSerial); _adapters.insert(make_pair(info.id, info)); try { for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) { - (*p)->adapterAdded(info, getContext(_serial)); + (*p)->adapterAdded(info, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -774,20 +790,20 @@ AdapterObserverTopic::adapterAdded(const AdapterInfo& info) } int -AdapterObserverTopic::adapterUpdated(const AdapterInfo& info) +AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info) { Lock sync(*this); if(_topics.empty()) { return -1; } - updateSerial(_serial + 1); + updateSerial(dbSerial); _adapters[info.id] = info; try { for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) { - (*p)->adapterUpdated(info, getContext(_serial)); + (*p)->adapterUpdated(info, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -800,20 +816,20 @@ AdapterObserverTopic::adapterUpdated(const AdapterInfo& info) } int -AdapterObserverTopic::adapterRemoved(const string& id) +AdapterObserverTopic::adapterRemoved(Ice::Long dbSerial, const string& id) { Lock sync(*this); if(_topics.empty()) { return -1; } - updateSerial(_serial + 1); + updateSerial(dbSerial); _adapters.erase(id); try { for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) { - (*p)->adapterRemoved(id, getContext(_serial)); + (*p)->adapterRemoved(id, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -834,26 +850,26 @@ AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { adapters.push_back(p->second); } - observer->adapterInit(adapters, getContext(_serial)); + observer->adapterInit(adapters, getContext(_serial, _dbSerial)); } ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicManager, - const map<Ice::Identity, ObjectInfo>& objects) : - ObserverTopic(topicManager, "ObjectObserver"), - _objects(objects) + const ObjectsWrapperPtr& wrapper) : + ObserverTopic(topicManager, "ObjectObserver", wrapper->getSerial()), + _objects(wrapper->getMap()) { _publishers = getPublishers<ObjectObserverPrx>(); } int -ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects) +ObjectObserverTopic::objectInit(Ice::Long dbSerial, const ObjectInfoSeq& objects) { Lock sync(*this); if(_topics.empty()) { return -1; } - updateSerial(_serial + 1); + updateSerial(dbSerial); _objects.clear(); for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r) { @@ -863,7 +879,7 @@ ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects) { for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) { - (*p)->objectInit(objects, getContext(_serial)); + (*p)->objectInit(objects, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -876,20 +892,20 @@ ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects) } int -ObjectObserverTopic::objectAdded(const ObjectInfo& info) +ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info) { Lock sync(*this); if(_topics.empty()) { return -1; } - updateSerial(_serial + 1); + updateSerial(dbSerial); _objects.insert(make_pair(info.proxy->ice_getIdentity(), info)); try { for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) { - (*p)->objectAdded(info, getContext(_serial)); + (*p)->objectAdded(info, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -902,20 +918,20 @@ ObjectObserverTopic::objectAdded(const ObjectInfo& info) } int -ObjectObserverTopic::objectUpdated(const ObjectInfo& info) +ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info) { Lock sync(*this); if(_topics.empty()) { return -1; } - updateSerial(_serial + 1); + updateSerial(dbSerial); _objects[info.proxy->ice_getIdentity()] = info; try { for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) { - (*p)->objectUpdated(info, getContext(_serial)); + (*p)->objectUpdated(info, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -928,20 +944,20 @@ ObjectObserverTopic::objectUpdated(const ObjectInfo& info) } int -ObjectObserverTopic::objectRemoved(const Ice::Identity& id) +ObjectObserverTopic::objectRemoved(Ice::Long dbSerial, const Ice::Identity& id) { Lock sync(*this); if(_topics.empty()) { return -1; } - updateSerial(_serial + 1); + updateSerial(dbSerial); _objects.erase(id); try { for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) { - (*p)->objectRemoved(id, getContext(_serial)); + (*p)->objectRemoved(id, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -954,14 +970,14 @@ ObjectObserverTopic::objectRemoved(const Ice::Identity& id) } int -ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos) +ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos) { Lock sync(*this); if(_topics.empty()) { return -1; } - updateSerial(_serial + 1); + updateSerial(); for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) { @@ -1010,14 +1026,14 @@ ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos) } int -ObjectObserverTopic::objectsRemoved(const ObjectInfoSeq& infos) +ObjectObserverTopic::wellKnownObjectsRemoved(const ObjectInfoSeq& infos) { Lock sync(*this); if(_topics.empty()) { return -1; } - updateSerial(_serial + 1); + updateSerial(); for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) { @@ -1055,5 +1071,5 @@ ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { objects.push_back(p->second); } - observer->objectInit(objects, getContext(_serial)); + observer->objectInit(objects, getContext(_serial, _dbSerial)); } |