diff options
author | Joe George <joe@zeroc.com> | 2021-01-28 16:26:44 -0500 |
---|---|---|
committer | Joe George <joe@zeroc.com> | 2021-02-01 16:59:30 -0500 |
commit | 92a6531e409f2691d82591e185a92299d415fc0f (patch) | |
tree | 60c79e2a8f327b8f0b6ebc06b06f48a2e8086f6a /cpp/src/IceStorm/Observers.cpp | |
parent | Port Glacier2, IceBox, IceBridge, IceDB, IceXML, icegriddb (diff) | |
download | ice-92a6531e409f2691d82591e185a92299d415fc0f.tar.bz2 ice-92a6531e409f2691d82591e185a92299d415fc0f.tar.xz ice-92a6531e409f2691d82591e185a92299d415fc0f.zip |
IceGrid and IceStorm
Diffstat (limited to 'cpp/src/IceStorm/Observers.cpp')
-rw-r--r-- | cpp/src/IceStorm/Observers.cpp | 80 |
1 files changed, 38 insertions, 42 deletions
diff --git a/cpp/src/IceStorm/Observers.cpp b/cpp/src/IceStorm/Observers.cpp index b38a2db5da5..f714477b82c 100644 --- a/cpp/src/IceStorm/Observers.cpp +++ b/cpp/src/IceStorm/Observers.cpp @@ -10,8 +10,8 @@ using namespace std; using namespace IceStorm; using namespace IceStormElection; -Observers::Observers(const InstancePtr& instance) : - _traceLevels(instance->traceLevels()), +Observers::Observers(shared_ptr<TraceLevels> traceLevels) : + _traceLevels(move(traceLevels)), _majority(0) { } @@ -25,10 +25,11 @@ Observers::setMajority(unsigned int majority) bool Observers::check() { - Lock sync(*this); + lock_guard<mutex> lg(_mutex); + if(_observers.size() >= _majority) { - vector<ObserverInfo>::iterator p = _observers.begin(); + auto p = _observers.begin(); while(p != _observers.end()) { try @@ -45,11 +46,8 @@ Observers::check() int id = p->id; p = _observers.erase(p); - // COMPILERFIX: Just using following causes double unlock with C++Builder 2007 - //IceUtil::Mutex::Lock sync(_reapedMutex); - _reapedMutex.lock(); + lock_guard<mutex> reapedLock(_reapedMutex); _reaped.push_back(id); - _reapedMutex.unlock(); continue; } ++p; @@ -61,14 +59,14 @@ Observers::check() void Observers::clear() { - Lock sync(*this); + lock_guard<mutex> lg(_mutex); _observers.clear(); } void Observers::getReapedSlaves(std::vector<int>& d) { - IceUtil::Mutex::Lock sync(_reapedMutex); + lock_guard<mutex> reapedLock(_reapedMutex); d.swap(_reaped); } @@ -76,65 +74,64 @@ void Observers::init(const set<GroupNodeInfo>& slaves, const LogUpdate& llu, const TopicContentSeq& content) { { - IceUtil::Mutex::Lock sync(_reapedMutex); + lock_guard<mutex> reapedLock(_reapedMutex); _reaped.clear(); } - Lock sync(*this); + lock_guard<mutex> lg(_mutex); _observers.clear(); vector<ObserverInfo> observers; - for(set<GroupNodeInfo>::const_iterator p = slaves.begin(); p != slaves.end(); ++p) + for(const auto& slave : slaves) { try { - assert(p->observer); + assert(slave.observer); + + auto observer = Ice::uncheckedCast<ReplicaObserverPrx>(slave.observer); - ReplicaObserverPrx observer = ReplicaObserverPrx::uncheckedCast(p->observer); + auto future = observer->initAsync(llu, content); - Ice::AsyncResultPtr result = observer->begin_init(llu, content); - observers.push_back(ObserverInfo(p->id, observer, result)); + observers.push_back({ slave.id, observer, move(future) }); } catch(const Ice::Exception& ex) { if(_traceLevels->replication > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat); - out << "error calling init on " << p->id << ", exception: " << ex; + out << "error calling init on " << slave.id << ", exception: " << ex; } throw; } } - for(vector<ObserverInfo>::iterator p = observers.begin(); p != observers.end(); ++p) + for(auto& o : observers) { try { - p->observer->end_init(p->result); - p->result = 0; + o.future.get(); } catch(const Ice::Exception& ex) { if(_traceLevels->replication > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat); - out << "init on " << p->id << " failed with exception " << ex; + out << "init on " << o.id << " failed with exception " << ex; } throw; } } - - _observers.swap(observers); + _observers = move(observers); } void Observers::createTopic(const LogUpdate& llu, const string& name) { - Lock sync(*this); - for(vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p) + lock_guard<mutex> lg(_mutex); + for(auto& o : _observers) { - p->result = p->observer->begin_createTopic(llu, name); + o.future = o.observer->createTopicAsync(llu, name); } wait("createTopic"); } @@ -142,10 +139,10 @@ Observers::createTopic(const LogUpdate& llu, const string& name) void Observers::destroyTopic(const LogUpdate& llu, const string& id) { - Lock sync(*this); - for(vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p) + lock_guard<mutex> lg(_mutex); + for(auto& o : _observers) { - p->result = p->observer->begin_destroyTopic(llu, id); + o.future = o.observer->destroyTopicAsync(llu, id); } wait("destroyTopic"); } @@ -153,10 +150,10 @@ Observers::destroyTopic(const LogUpdate& llu, const string& id) void Observers::addSubscriber(const LogUpdate& llu, const string& name, const SubscriberRecord& rec) { - Lock sync(*this); - for(vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p) + lock_guard<mutex> lg(_mutex); + for(auto& o : _observers) { - p->result = p->observer->begin_addSubscriber(llu, name, rec); + o.future = o.observer->addSubscriberAsync(llu, name, rec); } wait("addSubscriber"); } @@ -164,10 +161,10 @@ Observers::addSubscriber(const LogUpdate& llu, const string& name, const Subscri void Observers::removeSubscriber(const LogUpdate& llu, const string& name, const Ice::IdentitySeq& id) { - Lock sync(*this); - for(vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p) + lock_guard<mutex> lg(_mutex); + for(auto& o : _observers) { - p->result = p->observer->begin_removeSubscriber(llu, name, id); + o.future = o.observer->removeSubscriberAsync(llu, name, id); } wait("removeSubscriber"); } @@ -180,8 +177,7 @@ Observers::wait(const string& op) { try { - p->result->waitForCompleted(); - p->result->throwLocalException(); + p->future.get(); } catch(const Ice::Exception& ex) { @@ -193,18 +189,18 @@ Observers::wait(const string& op) int id = p->id; p = _observers.erase(p); - IceUtil::Mutex::Lock sync(_reapedMutex); + lock_guard<mutex> reapedLock(_mutex); _reaped.push_back(id); continue; } ++p; } + // If we now no longer have the majority of observers we raise. if(_observers.size() < _majority) { - // TODO: Trace here? - //Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat); - //out << op; + Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat); + out << "number of observers `" << _observers.size() << "' is less than the majority '" << _majority << "'"; throw Ice::UnknownException(__FILE__, __LINE__); } } |