summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Observers.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Observers.cpp')
-rw-r--r--cpp/src/IceStorm/Observers.cpp80
1 files changed, 38 insertions, 42 deletions
diff --git a/cpp/src/IceStorm/Observers.cpp b/cpp/src/IceStorm/Observers.cpp
index b38a2db5da5..f714477b82c 100644
--- a/cpp/src/IceStorm/Observers.cpp
+++ b/cpp/src/IceStorm/Observers.cpp
@@ -10,8 +10,8 @@ using namespace std;
using namespace IceStorm;
using namespace IceStormElection;
-Observers::Observers(const InstancePtr& instance) :
- _traceLevels(instance->traceLevels()),
+Observers::Observers(shared_ptr<TraceLevels> traceLevels) :
+ _traceLevels(move(traceLevels)),
_majority(0)
{
}
@@ -25,10 +25,11 @@ Observers::setMajority(unsigned int majority)
bool
Observers::check()
{
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
+
if(_observers.size() >= _majority)
{
- vector<ObserverInfo>::iterator p = _observers.begin();
+ auto p = _observers.begin();
while(p != _observers.end())
{
try
@@ -45,11 +46,8 @@ Observers::check()
int id = p->id;
p = _observers.erase(p);
- // COMPILERFIX: Just using following causes double unlock with C++Builder 2007
- //IceUtil::Mutex::Lock sync(_reapedMutex);
- _reapedMutex.lock();
+ lock_guard<mutex> reapedLock(_reapedMutex);
_reaped.push_back(id);
- _reapedMutex.unlock();
continue;
}
++p;
@@ -61,14 +59,14 @@ Observers::check()
void
Observers::clear()
{
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
_observers.clear();
}
void
Observers::getReapedSlaves(std::vector<int>& d)
{
- IceUtil::Mutex::Lock sync(_reapedMutex);
+ lock_guard<mutex> reapedLock(_reapedMutex);
d.swap(_reaped);
}
@@ -76,65 +74,64 @@ void
Observers::init(const set<GroupNodeInfo>& slaves, const LogUpdate& llu, const TopicContentSeq& content)
{
{
- IceUtil::Mutex::Lock sync(_reapedMutex);
+ lock_guard<mutex> reapedLock(_reapedMutex);
_reaped.clear();
}
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
_observers.clear();
vector<ObserverInfo> observers;
- for(set<GroupNodeInfo>::const_iterator p = slaves.begin(); p != slaves.end(); ++p)
+ for(const auto& slave : slaves)
{
try
{
- assert(p->observer);
+ assert(slave.observer);
+
+ auto observer = Ice::uncheckedCast<ReplicaObserverPrx>(slave.observer);
- ReplicaObserverPrx observer = ReplicaObserverPrx::uncheckedCast(p->observer);
+ auto future = observer->initAsync(llu, content);
- Ice::AsyncResultPtr result = observer->begin_init(llu, content);
- observers.push_back(ObserverInfo(p->id, observer, result));
+ observers.push_back({ slave.id, observer, move(future) });
}
catch(const Ice::Exception& ex)
{
if(_traceLevels->replication > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
- out << "error calling init on " << p->id << ", exception: " << ex;
+ out << "error calling init on " << slave.id << ", exception: " << ex;
}
throw;
}
}
- for(vector<ObserverInfo>::iterator p = observers.begin(); p != observers.end(); ++p)
+ for(auto& o : observers)
{
try
{
- p->observer->end_init(p->result);
- p->result = 0;
+ o.future.get();
}
catch(const Ice::Exception& ex)
{
if(_traceLevels->replication > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
- out << "init on " << p->id << " failed with exception " << ex;
+ out << "init on " << o.id << " failed with exception " << ex;
}
throw;
}
}
-
- _observers.swap(observers);
+ _observers = move(observers);
}
void
Observers::createTopic(const LogUpdate& llu, const string& name)
{
- Lock sync(*this);
- for(vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
+ lock_guard<mutex> lg(_mutex);
+ for(auto& o : _observers)
{
- p->result = p->observer->begin_createTopic(llu, name);
+ o.future = o.observer->createTopicAsync(llu, name);
}
wait("createTopic");
}
@@ -142,10 +139,10 @@ Observers::createTopic(const LogUpdate& llu, const string& name)
void
Observers::destroyTopic(const LogUpdate& llu, const string& id)
{
- Lock sync(*this);
- for(vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
+ lock_guard<mutex> lg(_mutex);
+ for(auto& o : _observers)
{
- p->result = p->observer->begin_destroyTopic(llu, id);
+ o.future = o.observer->destroyTopicAsync(llu, id);
}
wait("destroyTopic");
}
@@ -153,10 +150,10 @@ Observers::destroyTopic(const LogUpdate& llu, const string& id)
void
Observers::addSubscriber(const LogUpdate& llu, const string& name, const SubscriberRecord& rec)
{
- Lock sync(*this);
- for(vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
+ lock_guard<mutex> lg(_mutex);
+ for(auto& o : _observers)
{
- p->result = p->observer->begin_addSubscriber(llu, name, rec);
+ o.future = o.observer->addSubscriberAsync(llu, name, rec);
}
wait("addSubscriber");
}
@@ -164,10 +161,10 @@ Observers::addSubscriber(const LogUpdate& llu, const string& name, const Subscri
void
Observers::removeSubscriber(const LogUpdate& llu, const string& name, const Ice::IdentitySeq& id)
{
- Lock sync(*this);
- for(vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
+ lock_guard<mutex> lg(_mutex);
+ for(auto& o : _observers)
{
- p->result = p->observer->begin_removeSubscriber(llu, name, id);
+ o.future = o.observer->removeSubscriberAsync(llu, name, id);
}
wait("removeSubscriber");
}
@@ -180,8 +177,7 @@ Observers::wait(const string& op)
{
try
{
- p->result->waitForCompleted();
- p->result->throwLocalException();
+ p->future.get();
}
catch(const Ice::Exception& ex)
{
@@ -193,18 +189,18 @@ Observers::wait(const string& op)
int id = p->id;
p = _observers.erase(p);
- IceUtil::Mutex::Lock sync(_reapedMutex);
+ lock_guard<mutex> reapedLock(_mutex);
_reaped.push_back(id);
continue;
}
++p;
}
+
// If we now no longer have the majority of observers we raise.
if(_observers.size() < _majority)
{
- // TODO: Trace here?
- //Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
- //out << op;
+ Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
+ out << "number of observers `" << _observers.size() << "' is less than the majority '" << _majority << "'";
throw Ice::UnknownException(__FILE__, __LINE__);
}
}