diff options
-rw-r--r-- | cpp/src/IceStorm/Election.ice | 12 | ||||
-rw-r--r-- | cpp/src/IceStorm/Observers.cpp | 35 | ||||
-rw-r--r-- | cpp/src/IceStorm/Observers.h | 4 |
3 files changed, 36 insertions, 15 deletions
diff --git a/cpp/src/IceStorm/Election.ice b/cpp/src/IceStorm/Election.ice index 77fe974bada..0c2fc25c51f 100644 --- a/cpp/src/IceStorm/Election.ice +++ b/cpp/src/IceStorm/Election.ice @@ -52,7 +52,7 @@ interface ReplicaObserver * @throws ObserverInconsistencyException Raised if an * inconsisency was detected. * - **/ + **/ void init(LogUpdate llu, TopicContentSeq content) throws ObserverInconsistencyException; @@ -67,7 +67,7 @@ interface ReplicaObserver * @throws ObserverInconsistencyException Raised if an * inconsisency was detected. * - **/ + **/ void createTopic(LogUpdate llu, string name) throws ObserverInconsistencyException; @@ -82,7 +82,7 @@ interface ReplicaObserver * @throws ObserverInconsistencyException Raised if an * inconsisency was detected. * - **/ + **/ void destroyTopic(LogUpdate llu, string name) throws ObserverInconsistencyException; @@ -99,7 +99,7 @@ interface ReplicaObserver * @throws ObserverInconsistencyException Raised if an * inconsisency was detected. * - **/ + **/ void addSubscriber(LogUpdate llu, string topic, IceStorm::SubscriberRecord record) throws ObserverInconsistencyException; @@ -116,7 +116,7 @@ interface ReplicaObserver * @throws ObserverInconsistencyException Raised if an * inconsisency was detected. * - **/ + **/ void removeSubscriber(LogUpdate llu, string topic, Ice::IdentitySeq subscribers) throws ObserverInconsistencyException; }; @@ -293,6 +293,7 @@ interface Node * **/ ["cpp:const"] idempotent NodeInfoSeq nodes(); + /** * * Get the query information for the given node. @@ -304,4 +305,3 @@ interface Node }; }; - diff --git a/cpp/src/IceStorm/Observers.cpp b/cpp/src/IceStorm/Observers.cpp index 32c6951b1e4..a752c89ba16 100644 --- a/cpp/src/IceStorm/Observers.cpp +++ b/cpp/src/IceStorm/Observers.cpp @@ -87,29 +87,51 @@ Observers::init(const set<GroupNodeInfo>& slaves, const LogUpdate& llu, const To Lock sync(*this); _observers.clear(); + + vector<ObserverInfo> observers; + for(set<GroupNodeInfo>::const_iterator p = slaves.begin(); p != slaves.end(); ++p) { try { assert(p->observer); - //ReplicaObserverPrx observer = ReplicaObserverPrx::uncheckedCast(p->observer); - // 60s timeout for reliability in the event that a replica - // becomes unresponsive. + // 60s timeout for reliability in the event that a replica becomes unresponsive. ReplicaObserverPrx observer = ReplicaObserverPrx::uncheckedCast(p->observer->ice_timeout(60 * 1000)); - observer->init(llu, content); - _observers.push_back(ObserverInfo(p->id, observer)); + + Ice::AsyncResultPtr result = observer->begin_init(llu, content); + observers.push_back(ObserverInfo(p->id, observer, result)); } catch(const Ice::Exception& ex) { if(_traceLevels->replication > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat); - out << "error calling init on " << p->id << " ex: " << ex; + out << "error calling init on " << p->id << ", exception: " << ex; } throw; } } + + for(vector<ObserverInfo>::iterator p = observers.begin(); p != observers.end(); ++p) + { + try + { + p->observer->end_init(p->result); + p->result = 0; + } + catch(const Ice::Exception& ex) + { + if(_traceLevels->replication > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat); + out << "init on " << p->id << " failed with exception " << ex; + } + throw; + } + } + + _observers.swap(observers); } void @@ -193,4 +215,3 @@ Observers::wait(const string& op) throw Ice::UnknownException(__FILE__, __LINE__); } } - diff --git a/cpp/src/IceStorm/Observers.h b/cpp/src/IceStorm/Observers.h index 2af8053749b..83f67adc45b 100644 --- a/cpp/src/IceStorm/Observers.h +++ b/cpp/src/IceStorm/Observers.h @@ -57,8 +57,8 @@ private: unsigned int _majority; struct ObserverInfo { - ObserverInfo(int i, const ReplicaObserverPrx& o) : - id(i), observer(o) {} + ObserverInfo(int i, const ReplicaObserverPrx& o, const Ice::AsyncResultPtr& r = 0) : + id(i), observer(o), result (r) {} int id; ReplicaObserverPrx observer; ::Ice::AsyncResultPtr result; |