diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-09-20 11:54:20 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-09-20 11:54:20 +0000 |
commit | 1ea8dd904580a2fba9d9cfbd03aa8ed062ae53b4 (patch) | |
tree | fa27771d0c6695f1cab4f6945ba036917bf9018c /cpp/src/IceGrid/Topics.cpp | |
parent | New Ice::initialize() overload for StringSeq. (diff) | |
download | ice-1ea8dd904580a2fba9d9cfbd03aa8ed062ae53b4.tar.bz2 ice-1ea8dd904580a2fba9d9cfbd03aa8ed062ae53b4.tar.xz ice-1ea8dd904580a2fba9d9cfbd03aa8ed062ae53b4.zip |
Removed null permission verifier well-known objects Various fixes for
replcia session establishment.
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 111 |
1 files changed, 101 insertions, 10 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 48dbaea12dd..cbb3f7766dc 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -43,7 +43,7 @@ ObserverTopic::~ObserverTopic() } void -ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name, int serial) +ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name) { Lock sync(*this); if(!_topic) @@ -57,7 +57,9 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name, int ser if(!name.empty()) { + assert(_syncSubscribers.find(name) == _syncSubscribers.end()); _syncSubscribers.insert(name); + waitForSyncedSubscribers(_serial, name); } } @@ -72,6 +74,7 @@ ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name) if(!name.empty()) { + assert(_syncSubscribers.find(name) != _syncSubscribers.end()); _syncSubscribers.erase(name); map<int, set<string> >::iterator p = _waitForUpdates.begin(); @@ -109,7 +112,6 @@ void ObserverTopic::receivedUpdate(const string& name, int serial, const string& failure) { Lock sync(*this); - map<int, set<string> >::iterator p = _waitForUpdates.find(serial); if(p != _waitForUpdates.end()) { @@ -134,14 +136,22 @@ ObserverTopic::receivedUpdate(const string& name, int serial, const string& fail } void -ObserverTopic::waitForSyncedSubscribers(int serial) +ObserverTopic::waitForSyncedSubscribers(int serial, const string& name) { - if(_syncSubscribers.empty()) + if(_syncSubscribers.empty() && name.empty()) { return; } - _waitForUpdates.insert(make_pair(serial, _syncSubscribers)); + if(name.empty()) + { + assert(_waitForUpdates[serial].empty()); + _waitForUpdates[serial] = _syncSubscribers; + } + else + { + _waitForUpdates[serial].insert(name); + } // // Wait until all the updates are received. @@ -262,7 +272,7 @@ RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { registries.push_back(p->second); } - observer->registryInit(registries, getContext(-1)); + observer->registryInit(registries, getContext(_serial)); } NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager, @@ -454,7 +464,7 @@ NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { nodes.push_back(p->second); } - observer->nodeInit(nodes, getContext(-1)); + observer->nodeInit(nodes, getContext(_serial)); } ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager, @@ -596,7 +606,7 @@ ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { applications.push_back(p->second); } - observer->applicationInit(_serial, applications, getContext(-1)); + observer->applicationInit(_serial, applications, getContext(_serial)); } AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topicManager, @@ -707,7 +717,7 @@ AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { adapters.push_back(p->second); } - observer->adapterInit(adapters, getContext(-1)); + observer->adapterInit(adapters, getContext(_serial)); } ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicManager, @@ -810,6 +820,87 @@ ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id) } void +ObjectObserverTopic::objectsAddedOrUpdated(int serial, const ObjectInfoSeq& infos) +{ + Lock sync(*this); + if(!_topic) + { + return; + } + updateSerial(serial); + + for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) + { + map<Ice::Identity, ObjectInfo>::iterator q = _objects.find(p->proxy->ice_getIdentity()); + if(q != _objects.end()) + { + q->second = *p; + try + { + _publisher->objectUpdated(*p, getContext(serial)); + } + catch(const Ice::LocalException& ex) + { + Ice::Warning out(_logger); + out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; + } + } + else + { + _objects.insert(make_pair(p->proxy->ice_getIdentity(), *p)); + try + { + _publisher->objectAdded(*p, getContext(serial)); + } + catch(const Ice::LocalException& ex) + { + Ice::Warning out(_logger); + out << "unexpected exception while publishing `objectAdded' update:\n" << ex; + } + } + } + + // + // We don't need to wait for the update to be received by the + // replicas here. This operation is only called internaly by + // IceGrid. + // + //waitForSyncedSubscribers(serial); +} + +void +ObjectObserverTopic::objectsRemoved(int serial, const ObjectInfoSeq& infos) +{ + Lock sync(*this); + if(!_topic) + { + return; + } + updateSerial(serial); + + for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) + { + _objects.erase(p->proxy->ice_getIdentity()); + try + { + _publisher->objectRemoved(p->proxy->ice_getIdentity(), getContext(serial)); + } + catch(const Ice::LocalException& ex) + { + Ice::Warning out(_logger); + out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; + } + } + + // + // We don't need to wait for the update to be received by the + // replicas here. This operation is only called internaly by + // IceGrid. + // + //waitForSyncedSubscribers(serial); +} + +void ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv); @@ -818,5 +909,5 @@ ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { objects.push_back(p->second); } - observer->objectInit(objects, getContext(-1)); + observer->objectInit(objects, getContext(_serial)); } |