summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBernard Normier <bernard@zeroc.com>2016-09-01 16:01:40 -0400
committerBernard Normier <bernard@zeroc.com>2016-09-01 16:01:40 -0400
commit768fdb02990de5504c193b5c1c20da3bcd425c02 (patch)
tree1d4ec52c476a32c32466448561508f8e4002b800 /cpp/src
parentRemove bogus message check from Java Ice/exceptions test logger (diff)
downloadice-768fdb02990de5504c193b5c1c20da3bcd425c02.tar.bz2
ice-768fdb02990de5504c193b5c1c20da3bcd425c02.tar.xz
ice-768fdb02990de5504c193b5c1c20da3bcd425c02.zip
Call init on replicas (observers) in parallel using AMI
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceStorm/Election.ice12
-rw-r--r--cpp/src/IceStorm/Observers.cpp35
-rw-r--r--cpp/src/IceStorm/Observers.h4
3 files changed, 36 insertions, 15 deletions
diff --git a/cpp/src/IceStorm/Election.ice b/cpp/src/IceStorm/Election.ice
index 77fe974bada..0c2fc25c51f 100644
--- a/cpp/src/IceStorm/Election.ice
+++ b/cpp/src/IceStorm/Election.ice
@@ -52,7 +52,7 @@ interface ReplicaObserver
* @throws ObserverInconsistencyException Raised if an
* inconsisency was detected.
*
- **/
+ **/
void init(LogUpdate llu, TopicContentSeq content)
throws ObserverInconsistencyException;
@@ -67,7 +67,7 @@ interface ReplicaObserver
* @throws ObserverInconsistencyException Raised if an
* inconsisency was detected.
*
- **/
+ **/
void createTopic(LogUpdate llu, string name)
throws ObserverInconsistencyException;
@@ -82,7 +82,7 @@ interface ReplicaObserver
* @throws ObserverInconsistencyException Raised if an
* inconsisency was detected.
*
- **/
+ **/
void destroyTopic(LogUpdate llu, string name)
throws ObserverInconsistencyException;
@@ -99,7 +99,7 @@ interface ReplicaObserver
* @throws ObserverInconsistencyException Raised if an
* inconsisency was detected.
*
- **/
+ **/
void addSubscriber(LogUpdate llu, string topic, IceStorm::SubscriberRecord record)
throws ObserverInconsistencyException;
@@ -116,7 +116,7 @@ interface ReplicaObserver
* @throws ObserverInconsistencyException Raised if an
* inconsisency was detected.
*
- **/
+ **/
void removeSubscriber(LogUpdate llu, string topic, Ice::IdentitySeq subscribers)
throws ObserverInconsistencyException;
};
@@ -293,6 +293,7 @@ interface Node
*
**/
["cpp:const"] idempotent NodeInfoSeq nodes();
+
/**
*
* Get the query information for the given node.
@@ -304,4 +305,3 @@ interface Node
};
};
-
diff --git a/cpp/src/IceStorm/Observers.cpp b/cpp/src/IceStorm/Observers.cpp
index 32c6951b1e4..a752c89ba16 100644
--- a/cpp/src/IceStorm/Observers.cpp
+++ b/cpp/src/IceStorm/Observers.cpp
@@ -87,29 +87,51 @@ Observers::init(const set<GroupNodeInfo>& slaves, const LogUpdate& llu, const To
Lock sync(*this);
_observers.clear();
+
+ vector<ObserverInfo> observers;
+
for(set<GroupNodeInfo>::const_iterator p = slaves.begin(); p != slaves.end(); ++p)
{
try
{
assert(p->observer);
- //ReplicaObserverPrx observer = ReplicaObserverPrx::uncheckedCast(p->observer);
- // 60s timeout for reliability in the event that a replica
- // becomes unresponsive.
+ // 60s timeout for reliability in the event that a replica becomes unresponsive.
ReplicaObserverPrx observer = ReplicaObserverPrx::uncheckedCast(p->observer->ice_timeout(60 * 1000));
- observer->init(llu, content);
- _observers.push_back(ObserverInfo(p->id, observer));
+
+ Ice::AsyncResultPtr result = observer->begin_init(llu, content);
+ observers.push_back(ObserverInfo(p->id, observer, result));
}
catch(const Ice::Exception& ex)
{
if(_traceLevels->replication > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
- out << "error calling init on " << p->id << " ex: " << ex;
+ out << "error calling init on " << p->id << ", exception: " << ex;
}
throw;
}
}
+
+ for(vector<ObserverInfo>::iterator p = observers.begin(); p != observers.end(); ++p)
+ {
+ try
+ {
+ p->observer->end_init(p->result);
+ p->result = 0;
+ }
+ catch(const Ice::Exception& ex)
+ {
+ if(_traceLevels->replication > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
+ out << "init on " << p->id << " failed with exception " << ex;
+ }
+ throw;
+ }
+ }
+
+ _observers.swap(observers);
}
void
@@ -193,4 +215,3 @@ Observers::wait(const string& op)
throw Ice::UnknownException(__FILE__, __LINE__);
}
}
-
diff --git a/cpp/src/IceStorm/Observers.h b/cpp/src/IceStorm/Observers.h
index 2af8053749b..83f67adc45b 100644
--- a/cpp/src/IceStorm/Observers.h
+++ b/cpp/src/IceStorm/Observers.h
@@ -57,8 +57,8 @@ private:
unsigned int _majority;
struct ObserverInfo
{
- ObserverInfo(int i, const ReplicaObserverPrx& o) :
- id(i), observer(o) {}
+ ObserverInfo(int i, const ReplicaObserverPrx& o, const Ice::AsyncResultPtr& r = 0) :
+ id(i), observer(o), result (r) {}
int id;
ReplicaObserverPrx observer;
::Ice::AsyncResultPtr result;