summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Topics.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-11-09 10:41:35 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-11-09 10:41:35 +0000
commit0e7a98ce8b5c9fbb8e32c06ffa107c437a2d3647 (patch)
tree48a880b81005486a4eca09a0ec5357d674cdbbb0 /cpp/src/IceGrid/Topics.cpp
parentFixes as suggested by Bernard (namespace { }, PublisherProxyI -> (diff)
downloadice-0e7a98ce8b5c9fbb8e32c06ffa107c437a2d3647.tar.bz2
ice-0e7a98ce8b5c9fbb8e32c06ffa107c437a2d3647.tar.xz
ice-0e7a98ce8b5c9fbb8e32c06ffa107c437a2d3647.zip
Simplified topic publishing
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r--cpp/src/IceGrid/Topics.cpp115
1 files changed, 54 insertions, 61 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 687e7da7dd9..34d80ab18a9 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -212,17 +212,8 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
void
ObserverTopic::updateSerial(int serial)
{
- //
- // This loop ensures that updates from the database are processed
- // sequentially.
- //
- assert(_serial < serial);
- while(_serial + 1 != serial)
- {
- wait();
- }
+ assert(_serial + 1 == serial);
_serial = serial;
- notifyAll();
}
Ice::Context
@@ -664,14 +655,14 @@ AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topi
}
void
-AdapterObserverTopic::adapterInit(int serial, const AdapterInfoSeq& adpts)
+AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts)
{
Lock sync(*this);
if(!_topic)
{
return;
}
- updateSerial(serial);
+ updateSerial(_serial + 1);
_adapters.clear();
for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q)
{
@@ -679,7 +670,7 @@ AdapterObserverTopic::adapterInit(int serial, const AdapterInfoSeq& adpts)
}
try
{
- _publisher->adapterInit(adpts, getContext(serial));
+ _publisher->adapterInit(adpts, getContext(_serial));
}
catch(const Ice::LocalException& ex)
{
@@ -689,72 +680,72 @@ AdapterObserverTopic::adapterInit(int serial, const AdapterInfoSeq& adpts)
}
void
-AdapterObserverTopic::adapterAdded(int serial, const AdapterInfo& info)
+AdapterObserverTopic::adapterAdded(const AdapterInfo& info)
{
Lock sync(*this);
if(!_topic)
{
return;
}
- updateSerial(serial);
+ updateSerial(_serial + 1);
_adapters.insert(make_pair(info.id, info));
try
{
- _publisher->adapterAdded(info, getContext(serial));
+ _publisher->adapterAdded(info, getContext(_serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
out << "unexpected exception while publishing `adapterAdded' update:\n" << ex;
}
- addExpectedUpdate(serial);
- waitForSyncedSubscribersNoSync(serial);
+ addExpectedUpdate(_serial);
+ waitForSyncedSubscribersNoSync(_serial);
}
void
-AdapterObserverTopic::adapterUpdated(int serial, const AdapterInfo& info)
+AdapterObserverTopic::adapterUpdated(const AdapterInfo& info)
{
Lock sync(*this);
if(!_topic)
{
return;
}
- updateSerial(serial);
+ updateSerial(_serial + 1);
_adapters[info.id] = info;
try
{
- _publisher->adapterUpdated(info, getContext(serial));
+ _publisher->adapterUpdated(info, getContext(_serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex;
}
- addExpectedUpdate(serial);
- waitForSyncedSubscribersNoSync(serial);
+ addExpectedUpdate(_serial);
+ waitForSyncedSubscribersNoSync(_serial);
}
void
-AdapterObserverTopic::adapterRemoved(int serial, const string& id)
+AdapterObserverTopic::adapterRemoved(const string& id)
{
Lock sync(*this);
if(!_topic)
{
return;
}
- updateSerial(serial);
+ updateSerial(_serial + 1);
_adapters.erase(id);
try
{
- _publisher->adapterRemoved(id, getContext(serial));
+ _publisher->adapterRemoved(id, getContext(_serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex;
}
- addExpectedUpdate(serial);
- waitForSyncedSubscribersNoSync(serial);
+ addExpectedUpdate(_serial);
+ waitForSyncedSubscribersNoSync(_serial);
}
void
@@ -787,14 +778,14 @@ ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicM
}
void
-ObjectObserverTopic::objectInit(int serial, const ObjectInfoSeq& objects)
+ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects)
{
Lock sync(*this);
if(!_topic)
{
return;
}
- updateSerial(serial);
+ updateSerial(_serial + 1);
_objects.clear();
for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r)
{
@@ -802,7 +793,7 @@ ObjectObserverTopic::objectInit(int serial, const ObjectInfoSeq& objects)
}
try
{
- _publisher->objectInit(objects, getContext(serial));
+ _publisher->objectInit(objects, getContext(_serial));
}
catch(const Ice::LocalException& ex)
{
@@ -812,83 +803,83 @@ ObjectObserverTopic::objectInit(int serial, const ObjectInfoSeq& objects)
}
void
-ObjectObserverTopic::objectAdded(int serial, const ObjectInfo& info)
+ObjectObserverTopic::objectAdded(const ObjectInfo& info)
{
Lock sync(*this);
if(!_topic)
{
return;
}
- updateSerial(serial);
+ updateSerial(_serial + 1);
_objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
try
{
- _publisher->objectAdded(info, getContext(serial));
+ _publisher->objectAdded(info, getContext(_serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
}
- addExpectedUpdate(serial);
- waitForSyncedSubscribersNoSync(serial);
+ addExpectedUpdate(_serial);
+ waitForSyncedSubscribersNoSync(_serial);
}
void
-ObjectObserverTopic::objectUpdated(int serial, const ObjectInfo& info)
+ObjectObserverTopic::objectUpdated(const ObjectInfo& info)
{
Lock sync(*this);
if(!_topic)
{
return;
}
- updateSerial(serial);
+ updateSerial(_serial + 1);
_objects[info.proxy->ice_getIdentity()] = info;
try
{
- _publisher->objectUpdated(info, getContext(serial));
+ _publisher->objectUpdated(info, getContext(_serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
}
- addExpectedUpdate(serial);
- waitForSyncedSubscribersNoSync(serial);
+ addExpectedUpdate(_serial);
+ waitForSyncedSubscribersNoSync(_serial);
}
void
-ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id)
+ObjectObserverTopic::objectRemoved(const Ice::Identity& id)
{
Lock sync(*this);
if(!_topic)
{
return;
}
- updateSerial(serial);
+ updateSerial(_serial + 1);
_objects.erase(id);
try
{
- _publisher->objectRemoved(id, getContext(serial));
+ _publisher->objectRemoved(id, getContext(_serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
out << "unexpected exception while publishing `objectRemoved' update:\n" << ex;
}
- addExpectedUpdate(serial);
- waitForSyncedSubscribersNoSync(serial);
+ addExpectedUpdate(_serial);
+ waitForSyncedSubscribersNoSync(_serial);
}
-void
-ObjectObserverTopic::objectsAddedOrUpdated(int serial, const ObjectInfoSeq& infos)
+int
+ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return _serial;
}
- updateSerial(serial);
+ updateSerial(_serial + 1);
for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
{
@@ -898,7 +889,7 @@ ObjectObserverTopic::objectsAddedOrUpdated(int serial, const ObjectInfoSeq& info
q->second = *p;
try
{
- _publisher->objectUpdated(*p, getContext(serial));
+ _publisher->objectUpdated(*p, getContext(_serial));
}
catch(const Ice::LocalException& ex)
{
@@ -911,7 +902,7 @@ ObjectObserverTopic::objectsAddedOrUpdated(int serial, const ObjectInfoSeq& info
_objects.insert(make_pair(p->proxy->ice_getIdentity(), *p));
try
{
- _publisher->objectAdded(*p, getContext(serial));
+ _publisher->objectAdded(*p, getContext(_serial));
}
catch(const Ice::LocalException& ex)
{
@@ -925,26 +916,27 @@ ObjectObserverTopic::objectsAddedOrUpdated(int serial, const ObjectInfoSeq& info
// We don't wait for the update to be received by the replicas
// here. This operation is called by ReplicaSessionI.
//
- addExpectedUpdate(serial);
- //waitForSyncedSubscribersNoSync(serial);
+ addExpectedUpdate(_serial);
+ //waitForSyncedSubscribersNoSync(_serial);
+ return _serial;
}
-void
-ObjectObserverTopic::objectsRemoved(int serial, const ObjectInfoSeq& infos)
+int
+ObjectObserverTopic::objectsRemoved(const ObjectInfoSeq& infos)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return _serial;
}
- updateSerial(serial);
+ updateSerial(_serial + 1);
for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
{
_objects.erase(p->proxy->ice_getIdentity());
try
{
- _publisher->objectRemoved(p->proxy->ice_getIdentity(), getContext(serial));
+ _publisher->objectRemoved(p->proxy->ice_getIdentity(), getContext(_serial));
}
catch(const Ice::LocalException& ex)
{
@@ -958,8 +950,9 @@ ObjectObserverTopic::objectsRemoved(int serial, const ObjectInfoSeq& infos)
// replicas here. This operation is only called internaly by
// IceGrid.
//
- addExpectedUpdate(serial);
- //waitForSyncedSubscribersNoSync(serial);
+ addExpectedUpdate(_serial);
+ //waitForSyncedSubscribersNoSync(_serial);
+ return _serial;
}
void