summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Topics.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2013-09-03 15:42:19 +0200
committerBenoit Foucher <benoit@zeroc.com>2013-09-03 15:42:19 +0200
commit91f6ebb998532b36fc70187b641a5b7404060422 (patch)
treeac88e961c68e4b09eb819f4b57b9ecac56854567 /cpp/src/IceGrid/Topics.cpp
parentICE-5378 - Remove slice35d.dll from Windows installer (diff)
downloadice-91f6ebb998532b36fc70187b641a5b7404060422.tar.bz2
ice-91f6ebb998532b36fc70187b641a5b7404060422.tar.xz
ice-91f6ebb998532b36fc70187b641a5b7404060422.zip
Fixed ICE-5358 - allow IceGrid replica to initialize its database from another replica
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r--cpp/src/IceGrid/Topics.cpp170
1 files changed, 93 insertions, 77 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 31ac3815a89..24fa297b4de 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -29,9 +29,8 @@ Ice::EncodingVersion encodings[] = {
}
-ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const string& name) :
- _logger(topicManager->ice_getCommunicator()->getLogger()),
- _serial(0)
+ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const string& name, Ice::Long dbSerial) :
+ _logger(topicManager->ice_getCommunicator()->getLogger()), _serial(0), _dbSerial(dbSerial)
{
for(int i = 0; i < static_cast<int>(sizeof(encodings) / sizeof(Ice::EncodingVersion)); ++i)
{
@@ -186,6 +185,13 @@ ObserverTopic::waitForSyncedSubscribers(int serial, const string& name)
waitForSyncedSubscribersNoSync(serial, name);
}
+int
+ObserverTopic::getSerial() const
+{
+ Lock sync(*this);
+ return _serial;
+}
+
void
ObserverTopic::addExpectedUpdate(int serial, const string& name)
{
@@ -250,20 +256,30 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
}
void
-ObserverTopic::updateSerial(int serial)
+ObserverTopic::updateSerial(Ice::Long dbSerial)
{
- assert(_serial + 1 == serial);
- _serial = serial;
+ ++_serial;
+ if(dbSerial > 0)
+ {
+ _dbSerial = dbSerial;
+ }
}
Ice::Context
-ObserverTopic::getContext(int serial) const
+ObserverTopic::getContext(int serial, Ice::Long dbSerial) const
{
- ostringstream os;
- os << serial;
-
Ice::Context context;
- context["serial"] = os.str();
+ {
+ ostringstream os;
+ os << serial;
+ context["serial"] = os.str();
+ }
+ if(dbSerial > 0)
+ {
+ ostringstream os;
+ os << dbSerial;
+ context["dbSerial"] = os.str();
+ }
return context;
}
@@ -281,7 +297,7 @@ RegistryObserverTopic::registryUp(const RegistryInfo& info)
{
return;
}
- updateSerial(_serial + 1);
+ updateSerial();
_registries.insert(make_pair(info.name, info));
try
{
@@ -311,7 +327,7 @@ RegistryObserverTopic::registryDown(const string& name)
return;
}
- updateSerial(_serial + 1);
+ updateSerial();
_registries.erase(name);
try
{
@@ -368,7 +384,7 @@ NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&)
{
return;
}
- updateSerial(_serial + 1);
+ updateSerial();
_nodes.insert(make_pair(info.info.name, info));
try
{
@@ -407,7 +423,7 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
return;
}
- updateSerial(_serial + 1);
+ updateSerial();
ServerDynamicInfoSeq& servers = _nodes[node].servers;
ServerDynamicInfoSeq::iterator p = servers.begin();
@@ -463,7 +479,7 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a
return;
}
- updateSerial(_serial + 1);
+ updateSerial();
AdapterDynamicInfoSeq& adapters = _nodes[node].adapters;
AdapterDynamicInfoSeq::iterator p = adapters.begin();
@@ -511,7 +527,7 @@ NodeObserverTopic::nodeDown(const string& name)
return;
}
- updateSerial(_serial + 1);
+ updateSerial();
if(_nodes.find(name) != _nodes.end())
{
@@ -545,22 +561,22 @@ NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
}
ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
- const map<string, ApplicationInfo>& applications) :
- ObserverTopic(topicManager, "ApplicationObserver"),
- _applications(applications)
+ const ApplicationsWrapperPtr& wrapper) :
+ ObserverTopic(topicManager, "ApplicationObserver", wrapper->getSerial()),
+ _applications(wrapper->getMap())
{
_publishers = getPublishers<ApplicationObserverPrx>();
}
int
-ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& apps)
+ApplicationObserverTopic::applicationInit(Ice::Long dbSerial, const ApplicationInfoSeq& apps)
{
Lock sync(*this);
if(_topics.empty())
{
return -1;
}
- updateSerial(serial);
+ updateSerial(dbSerial);
_applications.clear();
for(ApplicationInfoSeq::const_iterator p = apps.begin(); p != apps.end(); ++p)
{
@@ -570,7 +586,7 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq&
{
for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
{
- (*p)->applicationInit(serial, apps, getContext(serial));
+ (*p)->applicationInit(_serial, apps, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -578,12 +594,12 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq&
Ice::Warning out(_logger);
out << "unexpected exception while publishing `applicationInit' update:\n" << ex;
}
- addExpectedUpdate(serial);
- return serial;
+ addExpectedUpdate(_serial);
+ return _serial;
}
int
-ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& info)
+ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const ApplicationInfo& info)
{
Lock sync(*this);
if(_topics.empty())
@@ -591,13 +607,13 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in
return -1;
}
- updateSerial(serial);
+ updateSerial(dbSerial);
_applications.insert(make_pair(info.descriptor.name, info));
try
{
for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
{
- (*p)->applicationAdded(serial, info, getContext(serial));
+ (*p)->applicationAdded(_serial, info, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -605,25 +621,25 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in
Ice::Warning out(_logger);
out << "unexpected exception while publishing `applicationAdded' update:\n" << ex;
}
- addExpectedUpdate(serial);
- return serial;
+ addExpectedUpdate(_serial);
+ return _serial;
}
int
-ApplicationObserverTopic::applicationRemoved(int serial, const string& name)
+ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& name)
{
Lock sync(*this);
if(_topics.empty())
{
return -1;
}
- updateSerial(serial);
+ updateSerial(dbSerial);
_applications.erase(name);
try
{
for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
{
- (*p)->applicationRemoved(serial, name, getContext(serial));
+ (*p)->applicationRemoved(_serial, name, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -631,12 +647,12 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name)
Ice::Warning out(_logger);
out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex;
}
- addExpectedUpdate(serial);
- return serial;
+ addExpectedUpdate(_serial);
+ return _serial;
}
int
-ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInfo& info)
+ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const ApplicationUpdateInfo& info)
{
Lock sync(*this);
if(_topics.empty())
@@ -644,7 +660,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate
return -1;
}
- updateSerial(serial);
+ updateSerial(dbSerial);
try
{
map<string, ApplicationInfo>::iterator p = _applications.find(info.descriptor.name);
@@ -685,7 +701,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate
{
for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
{
- (*p)->applicationUpdated(serial, info, getContext(serial));
+ (*p)->applicationUpdated(_serial, info, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -693,8 +709,8 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate
Ice::Warning out(_logger);
out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex;
}
- addExpectedUpdate(serial);
- return serial;
+ addExpectedUpdate(_serial);
+ return _serial;
}
void
@@ -706,26 +722,26 @@ ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
applications.push_back(p->second);
}
- observer->applicationInit(_serial, applications, getContext(_serial));
+ observer->applicationInit(_serial, applications, getContext(_serial, _dbSerial));
}
AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
- const map<string, AdapterInfo>& adapters) :
- ObserverTopic(topicManager, "AdapterObserver"),
- _adapters(adapters)
+ const AdaptersWrapperPtr& wrapper) :
+ ObserverTopic(topicManager, "AdapterObserver", wrapper->getSerial()),
+ _adapters(wrapper->getMap())
{
_publishers = getPublishers<AdapterObserverPrx>();
}
int
-AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts)
+AdapterObserverTopic::adapterInit(Ice::Long dbSerial, const AdapterInfoSeq& adpts)
{
Lock sync(*this);
if(_topics.empty())
{
return -1;
}
- updateSerial(_serial + 1);
+ updateSerial(dbSerial);
_adapters.clear();
for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q)
{
@@ -735,7 +751,7 @@ AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts)
{
for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
{
- (*p)->adapterInit(adpts, getContext(_serial));
+ (*p)->adapterInit(adpts, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -748,20 +764,20 @@ AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts)
}
int
-AdapterObserverTopic::adapterAdded(const AdapterInfo& info)
+AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info)
{
Lock sync(*this);
if(_topics.empty())
{
return -1;
}
- updateSerial(_serial + 1);
+ updateSerial(dbSerial);
_adapters.insert(make_pair(info.id, info));
try
{
for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
{
- (*p)->adapterAdded(info, getContext(_serial));
+ (*p)->adapterAdded(info, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -774,20 +790,20 @@ AdapterObserverTopic::adapterAdded(const AdapterInfo& info)
}
int
-AdapterObserverTopic::adapterUpdated(const AdapterInfo& info)
+AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info)
{
Lock sync(*this);
if(_topics.empty())
{
return -1;
}
- updateSerial(_serial + 1);
+ updateSerial(dbSerial);
_adapters[info.id] = info;
try
{
for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
{
- (*p)->adapterUpdated(info, getContext(_serial));
+ (*p)->adapterUpdated(info, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -800,20 +816,20 @@ AdapterObserverTopic::adapterUpdated(const AdapterInfo& info)
}
int
-AdapterObserverTopic::adapterRemoved(const string& id)
+AdapterObserverTopic::adapterRemoved(Ice::Long dbSerial, const string& id)
{
Lock sync(*this);
if(_topics.empty())
{
return -1;
}
- updateSerial(_serial + 1);
+ updateSerial(dbSerial);
_adapters.erase(id);
try
{
for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
{
- (*p)->adapterRemoved(id, getContext(_serial));
+ (*p)->adapterRemoved(id, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -834,26 +850,26 @@ AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
adapters.push_back(p->second);
}
- observer->adapterInit(adapters, getContext(_serial));
+ observer->adapterInit(adapters, getContext(_serial, _dbSerial));
}
ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
- const map<Ice::Identity, ObjectInfo>& objects) :
- ObserverTopic(topicManager, "ObjectObserver"),
- _objects(objects)
+ const ObjectsWrapperPtr& wrapper) :
+ ObserverTopic(topicManager, "ObjectObserver", wrapper->getSerial()),
+ _objects(wrapper->getMap())
{
_publishers = getPublishers<ObjectObserverPrx>();
}
int
-ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects)
+ObjectObserverTopic::objectInit(Ice::Long dbSerial, const ObjectInfoSeq& objects)
{
Lock sync(*this);
if(_topics.empty())
{
return -1;
}
- updateSerial(_serial + 1);
+ updateSerial(dbSerial);
_objects.clear();
for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r)
{
@@ -863,7 +879,7 @@ ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects)
{
for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
{
- (*p)->objectInit(objects, getContext(_serial));
+ (*p)->objectInit(objects, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -876,20 +892,20 @@ ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects)
}
int
-ObjectObserverTopic::objectAdded(const ObjectInfo& info)
+ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info)
{
Lock sync(*this);
if(_topics.empty())
{
return -1;
}
- updateSerial(_serial + 1);
+ updateSerial(dbSerial);
_objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
try
{
for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
{
- (*p)->objectAdded(info, getContext(_serial));
+ (*p)->objectAdded(info, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -902,20 +918,20 @@ ObjectObserverTopic::objectAdded(const ObjectInfo& info)
}
int
-ObjectObserverTopic::objectUpdated(const ObjectInfo& info)
+ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info)
{
Lock sync(*this);
if(_topics.empty())
{
return -1;
}
- updateSerial(_serial + 1);
+ updateSerial(dbSerial);
_objects[info.proxy->ice_getIdentity()] = info;
try
{
for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
{
- (*p)->objectUpdated(info, getContext(_serial));
+ (*p)->objectUpdated(info, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -928,20 +944,20 @@ ObjectObserverTopic::objectUpdated(const ObjectInfo& info)
}
int
-ObjectObserverTopic::objectRemoved(const Ice::Identity& id)
+ObjectObserverTopic::objectRemoved(Ice::Long dbSerial, const Ice::Identity& id)
{
Lock sync(*this);
if(_topics.empty())
{
return -1;
}
- updateSerial(_serial + 1);
+ updateSerial(dbSerial);
_objects.erase(id);
try
{
for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
{
- (*p)->objectRemoved(id, getContext(_serial));
+ (*p)->objectRemoved(id, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -954,14 +970,14 @@ ObjectObserverTopic::objectRemoved(const Ice::Identity& id)
}
int
-ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos)
+ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos)
{
Lock sync(*this);
if(_topics.empty())
{
return -1;
}
- updateSerial(_serial + 1);
+ updateSerial();
for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
{
@@ -1010,14 +1026,14 @@ ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos)
}
int
-ObjectObserverTopic::objectsRemoved(const ObjectInfoSeq& infos)
+ObjectObserverTopic::wellKnownObjectsRemoved(const ObjectInfoSeq& infos)
{
Lock sync(*this);
if(_topics.empty())
{
return -1;
}
- updateSerial(_serial + 1);
+ updateSerial();
for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
{
@@ -1055,5 +1071,5 @@ ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
objects.push_back(p->second);
}
- observer->objectInit(objects, getContext(_serial));
+ observer->objectInit(objects, getContext(_serial, _dbSerial));
}