diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-11-09 10:41:35 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-11-09 10:41:35 +0000 |
commit | 0e7a98ce8b5c9fbb8e32c06ffa107c437a2d3647 (patch) | |
tree | 48a880b81005486a4eca09a0ec5357d674cdbbb0 /cpp/src/IceGrid/Topics.cpp | |
parent | Fixes as suggested by Bernard (namespace { }, PublisherProxyI -> (diff) | |
download | ice-0e7a98ce8b5c9fbb8e32c06ffa107c437a2d3647.tar.bz2 ice-0e7a98ce8b5c9fbb8e32c06ffa107c437a2d3647.tar.xz ice-0e7a98ce8b5c9fbb8e32c06ffa107c437a2d3647.zip |
Simplified topic publishing
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 115 |
1 files changed, 54 insertions, 61 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 687e7da7dd9..34d80ab18a9 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -212,17 +212,8 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name) void ObserverTopic::updateSerial(int serial) { - // - // This loop ensures that updates from the database are processed - // sequentially. - // - assert(_serial < serial); - while(_serial + 1 != serial) - { - wait(); - } + assert(_serial + 1 == serial); _serial = serial; - notifyAll(); } Ice::Context @@ -664,14 +655,14 @@ AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topi } void -AdapterObserverTopic::adapterInit(int serial, const AdapterInfoSeq& adpts) +AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts) { Lock sync(*this); if(!_topic) { return; } - updateSerial(serial); + updateSerial(_serial + 1); _adapters.clear(); for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q) { @@ -679,7 +670,7 @@ AdapterObserverTopic::adapterInit(int serial, const AdapterInfoSeq& adpts) } try { - _publisher->adapterInit(adpts, getContext(serial)); + _publisher->adapterInit(adpts, getContext(_serial)); } catch(const Ice::LocalException& ex) { @@ -689,72 +680,72 @@ AdapterObserverTopic::adapterInit(int serial, const AdapterInfoSeq& adpts) } void -AdapterObserverTopic::adapterAdded(int serial, const AdapterInfo& info) +AdapterObserverTopic::adapterAdded(const AdapterInfo& info) { Lock sync(*this); if(!_topic) { return; } - updateSerial(serial); + updateSerial(_serial + 1); _adapters.insert(make_pair(info.id, info)); try { - _publisher->adapterAdded(info, getContext(serial)); + _publisher->adapterAdded(info, getContext(_serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); out << "unexpected exception while publishing `adapterAdded' update:\n" << ex; } - addExpectedUpdate(serial); - waitForSyncedSubscribersNoSync(serial); + addExpectedUpdate(_serial); + waitForSyncedSubscribersNoSync(_serial); } void -AdapterObserverTopic::adapterUpdated(int serial, const AdapterInfo& info) +AdapterObserverTopic::adapterUpdated(const AdapterInfo& info) { Lock sync(*this); if(!_topic) { return; } - updateSerial(serial); + updateSerial(_serial + 1); _adapters[info.id] = info; try { - _publisher->adapterUpdated(info, getContext(serial)); + _publisher->adapterUpdated(info, getContext(_serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex; } - addExpectedUpdate(serial); - waitForSyncedSubscribersNoSync(serial); + addExpectedUpdate(_serial); + waitForSyncedSubscribersNoSync(_serial); } void -AdapterObserverTopic::adapterRemoved(int serial, const string& id) +AdapterObserverTopic::adapterRemoved(const string& id) { Lock sync(*this); if(!_topic) { return; } - updateSerial(serial); + updateSerial(_serial + 1); _adapters.erase(id); try { - _publisher->adapterRemoved(id, getContext(serial)); + _publisher->adapterRemoved(id, getContext(_serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex; } - addExpectedUpdate(serial); - waitForSyncedSubscribersNoSync(serial); + addExpectedUpdate(_serial); + waitForSyncedSubscribersNoSync(_serial); } void @@ -787,14 +778,14 @@ ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicM } void -ObjectObserverTopic::objectInit(int serial, const ObjectInfoSeq& objects) +ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects) { Lock sync(*this); if(!_topic) { return; } - updateSerial(serial); + updateSerial(_serial + 1); _objects.clear(); for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r) { @@ -802,7 +793,7 @@ ObjectObserverTopic::objectInit(int serial, const ObjectInfoSeq& objects) } try { - _publisher->objectInit(objects, getContext(serial)); + _publisher->objectInit(objects, getContext(_serial)); } catch(const Ice::LocalException& ex) { @@ -812,83 +803,83 @@ ObjectObserverTopic::objectInit(int serial, const ObjectInfoSeq& objects) } void -ObjectObserverTopic::objectAdded(int serial, const ObjectInfo& info) +ObjectObserverTopic::objectAdded(const ObjectInfo& info) { Lock sync(*this); if(!_topic) { return; } - updateSerial(serial); + updateSerial(_serial + 1); _objects.insert(make_pair(info.proxy->ice_getIdentity(), info)); try { - _publisher->objectAdded(info, getContext(serial)); + _publisher->objectAdded(info, getContext(_serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); out << "unexpected exception while publishing `objectAdded' update:\n" << ex; } - addExpectedUpdate(serial); - waitForSyncedSubscribersNoSync(serial); + addExpectedUpdate(_serial); + waitForSyncedSubscribersNoSync(_serial); } void -ObjectObserverTopic::objectUpdated(int serial, const ObjectInfo& info) +ObjectObserverTopic::objectUpdated(const ObjectInfo& info) { Lock sync(*this); if(!_topic) { return; } - updateSerial(serial); + updateSerial(_serial + 1); _objects[info.proxy->ice_getIdentity()] = info; try { - _publisher->objectUpdated(info, getContext(serial)); + _publisher->objectUpdated(info, getContext(_serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; } - addExpectedUpdate(serial); - waitForSyncedSubscribersNoSync(serial); + addExpectedUpdate(_serial); + waitForSyncedSubscribersNoSync(_serial); } void -ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id) +ObjectObserverTopic::objectRemoved(const Ice::Identity& id) { Lock sync(*this); if(!_topic) { return; } - updateSerial(serial); + updateSerial(_serial + 1); _objects.erase(id); try { - _publisher->objectRemoved(id, getContext(serial)); + _publisher->objectRemoved(id, getContext(_serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); out << "unexpected exception while publishing `objectRemoved' update:\n" << ex; } - addExpectedUpdate(serial); - waitForSyncedSubscribersNoSync(serial); + addExpectedUpdate(_serial); + waitForSyncedSubscribersNoSync(_serial); } -void -ObjectObserverTopic::objectsAddedOrUpdated(int serial, const ObjectInfoSeq& infos) +int +ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos) { Lock sync(*this); if(!_topic) { - return; + return _serial; } - updateSerial(serial); + updateSerial(_serial + 1); for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) { @@ -898,7 +889,7 @@ ObjectObserverTopic::objectsAddedOrUpdated(int serial, const ObjectInfoSeq& info q->second = *p; try { - _publisher->objectUpdated(*p, getContext(serial)); + _publisher->objectUpdated(*p, getContext(_serial)); } catch(const Ice::LocalException& ex) { @@ -911,7 +902,7 @@ ObjectObserverTopic::objectsAddedOrUpdated(int serial, const ObjectInfoSeq& info _objects.insert(make_pair(p->proxy->ice_getIdentity(), *p)); try { - _publisher->objectAdded(*p, getContext(serial)); + _publisher->objectAdded(*p, getContext(_serial)); } catch(const Ice::LocalException& ex) { @@ -925,26 +916,27 @@ ObjectObserverTopic::objectsAddedOrUpdated(int serial, const ObjectInfoSeq& info // We don't wait for the update to be received by the replicas // here. This operation is called by ReplicaSessionI. // - addExpectedUpdate(serial); - //waitForSyncedSubscribersNoSync(serial); + addExpectedUpdate(_serial); + //waitForSyncedSubscribersNoSync(_serial); + return _serial; } -void -ObjectObserverTopic::objectsRemoved(int serial, const ObjectInfoSeq& infos) +int +ObjectObserverTopic::objectsRemoved(const ObjectInfoSeq& infos) { Lock sync(*this); if(!_topic) { - return; + return _serial; } - updateSerial(serial); + updateSerial(_serial + 1); for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) { _objects.erase(p->proxy->ice_getIdentity()); try { - _publisher->objectRemoved(p->proxy->ice_getIdentity(), getContext(serial)); + _publisher->objectRemoved(p->proxy->ice_getIdentity(), getContext(_serial)); } catch(const Ice::LocalException& ex) { @@ -958,8 +950,9 @@ ObjectObserverTopic::objectsRemoved(int serial, const ObjectInfoSeq& infos) // replicas here. This operation is only called internaly by // IceGrid. // - addExpectedUpdate(serial); - //waitForSyncedSubscribersNoSync(serial); + addExpectedUpdate(_serial); + //waitForSyncedSubscribersNoSync(_serial); + return _serial; } void |