summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Observers.cpp
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2008-02-29 15:51:11 +0800
committerMatthew Newhook <matthew@zeroc.com>2008-02-29 16:39:54 +0800
commitfb4132881dde7c9b135d713a06a3b64db1f706db (patch)
tree8a037e9d4cae7ed15360ab0878d14b32ac3150a4 /cpp/src/IceStorm/Observers.cpp
parentfixing mode on php/config/Make.rules.mak (diff)
downloadice-fb4132881dde7c9b135d713a06a3b64db1f706db.tar.bz2
ice-fb4132881dde7c9b135d713a06a3b64db1f706db.tar.xz
ice-fb4132881dde7c9b135d713a06a3b64db1f706db.zip
Merge HA IceStorm branch.
- http://bugzilla/bugzilla/show_bug.cgi?id=2706 - http://bugzilla/bugzilla/show_bug.cgi?id=2705
Diffstat (limited to 'cpp/src/IceStorm/Observers.cpp')
-rw-r--r--cpp/src/IceStorm/Observers.cpp274
1 files changed, 274 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/Observers.cpp b/cpp/src/IceStorm/Observers.cpp
new file mode 100644
index 00000000000..8240a555c3a
--- /dev/null
+++ b/cpp/src/IceStorm/Observers.cpp
@@ -0,0 +1,274 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <IceStorm/Observers.h>
+#include <IceStorm/Instance.h>
+#include <IceStorm/TraceLevels.h>
+
+using namespace std;
+using namespace IceStorm;
+using namespace IceStormElection;
+
+namespace
+{
+
+class AMI_ReplicaObserver_createTopicI : public AMI_ReplicaObserver_createTopic, public AMICall
+{
+public:
+
+ virtual void ice_response() { response(); }
+ virtual void ice_exception(const Ice::Exception& e) { exception(e); }
+};
+typedef IceUtil::Handle<AMI_ReplicaObserver_createTopicI> AMI_ReplicaObserver_createTopicIPtr;
+
+class AMI_ReplicaObserver_addSubscriberI : public AMI_ReplicaObserver_addSubscriber, public AMICall
+{
+public:
+
+ virtual void ice_response() { response(); }
+ virtual void ice_exception(const Ice::Exception& e) { exception(e); }
+};
+typedef IceUtil::Handle<AMI_ReplicaObserver_addSubscriberI> AMI_ReplicaObserver_addSubscriberIPtr;
+
+class AMI_ReplicaObserver_removeSubscriberI : public AMI_ReplicaObserver_removeSubscriber, public AMICall
+{
+public:
+
+ virtual void ice_response() { response(); }
+ virtual void ice_exception(const Ice::Exception& e) { exception(e); }
+};
+typedef IceUtil::Handle<AMI_ReplicaObserver_removeSubscriberI> AMI_ReplicaObserver_removeSubscriberIPtr;
+
+class AMI_ReplicaObserver_destroyTopicI : public AMI_ReplicaObserver_destroyTopic, public AMICall
+{
+public:
+
+ virtual void ice_response() { response(); }
+ virtual void ice_exception(const Ice::Exception& e) { exception(e); }
+};
+typedef IceUtil::Handle<AMI_ReplicaObserver_destroyTopicI> AMI_ReplicaObserver_destroyTopicIPtr;
+
+}
+
+AMICall::AMICall() :
+ _response(false)
+{
+}
+
+void
+AMICall::response()
+{
+ Lock sync(*this);
+ _response = true;
+ notify();
+}
+void
+AMICall::exception(const IceUtil::Exception& e)
+{
+ Lock sync(*this);
+ _response = true;
+ _ex.reset(e.ice_clone());
+ notify();
+}
+
+void
+AMICall::waitResponse()
+{
+ Lock sync(*this);
+ while(!_response)
+ {
+ wait();
+ }
+ if(_ex.get())
+ {
+ _ex->ice_throw();
+ }
+}
+
+Observers::Observers(const InstancePtr& instance) :
+ _traceLevels(instance->traceLevels()),
+ _majority(0)
+{
+}
+
+void
+Observers::setMajority(unsigned int majority)
+{
+ _majority = majority;
+}
+
+bool
+Observers::check()
+{
+ Lock sync(*this);
+ if(_observers.size() >= _majority)
+ {
+ vector<ObserverInfo>::iterator p = _observers.begin();
+ while(p != _observers.end())
+ {
+ try
+ {
+ p->observer->ice_ping();
+ }
+ catch(const Ice::Exception& ex)
+ {
+ if(_traceLevels->replication > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
+ out << "ice_ping failed: " << ex;
+ }
+ int id = p->id;
+ p = _observers.erase(p);
+ IceUtil::Mutex::Lock sync(_reapedMutex);
+ _reaped.push_back(id);
+ continue;
+ }
+ ++p;
+ }
+ }
+ return _majority == 0 || _observers.size() >= _majority;
+}
+
+void
+Observers::clear()
+{
+ Lock sync(*this);
+ _observers.clear();
+}
+
+void
+Observers::getReapedSlaves(std::vector<int>& d)
+{
+ IceUtil::Mutex::Lock sync(_reapedMutex);
+ d.swap(_reaped);
+}
+
+void
+Observers::init(const set<GroupNodeInfo>& slaves, const LogUpdate& llu, const TopicContentSeq& content)
+{
+ {
+ IceUtil::Mutex::Lock sync(_reapedMutex);
+ _reaped.clear();
+ }
+
+ Lock sync(*this);
+ _observers.clear();
+ for(set<GroupNodeInfo>::const_iterator p = slaves.begin(); p != slaves.end(); ++p)
+ {
+ try
+ {
+ assert(p->observer);
+ //ReplicaObserverPrx observer = ReplicaObserverPrx::uncheckedCast(p->observer);
+
+ // 60s timeout for reliability in the event that a replica
+ // becomes unresponsive.
+ ReplicaObserverPrx observer = ReplicaObserverPrx::uncheckedCast(p->observer->ice_timeout(60 * 1000));
+ observer->init(llu, content);
+ _observers.push_back(ObserverInfo(p->id, observer));
+ }
+ catch(const Ice::Exception& ex)
+ {
+ if(_traceLevels->replication > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
+ out << "error calling init on " << p->id << " ex: " << ex;
+ }
+ throw;
+ }
+ }
+}
+
+void
+Observers::createTopic(const LogUpdate& llu, const string& name)
+{
+ Lock sync(*this);
+ for(vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
+ {
+ AMI_ReplicaObserver_createTopicIPtr cb = new AMI_ReplicaObserver_createTopicI;
+ p->call = cb;
+ p->observer->createTopic_async(cb, llu, name);
+ }
+ wait("createTopic");
+}
+
+void
+Observers::destroyTopic(const LogUpdate& llu, const string& id)
+{
+ Lock sync(*this);
+ for(vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
+ {
+ AMI_ReplicaObserver_destroyTopicIPtr cb = new AMI_ReplicaObserver_destroyTopicI;
+ p->call = cb;
+ p->observer->destroyTopic_async(cb, llu, id);
+ }
+ wait("destroyTopic");
+}
+
+
+void
+Observers::addSubscriber(const LogUpdate& llu, const string& name, const SubscriberRecord& rec)
+{
+ Lock sync(*this);
+ for(vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
+ {
+ AMI_ReplicaObserver_addSubscriberIPtr cb = new AMI_ReplicaObserver_addSubscriberI;
+ p->call = cb;
+ p->observer->addSubscriber_async(cb, llu, name, rec);
+ }
+ wait("addSubscriber");
+}
+
+void
+Observers::removeSubscriber(const LogUpdate& llu, const string& name, const Ice::IdentitySeq& id)
+{
+ Lock sync(*this);
+ for(vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
+ {
+ AMI_ReplicaObserver_removeSubscriberIPtr cb = new AMI_ReplicaObserver_removeSubscriberI;
+ p->call = cb;
+ p->observer->removeSubscriber_async(cb, llu, name, id);
+ }
+ wait("removeSubscriber");
+}
+
+void
+Observers::wait(const string& op)
+{
+ vector<ObserverInfo>::iterator p = _observers.begin();
+ while(p != _observers.end())
+ {
+ try
+ {
+ p->call->waitResponse();
+ }
+ catch(const Ice::Exception& ex)
+ {
+ if(_traceLevels->replication > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
+ out << op << ": " << ex;
+ }
+ int id = p->id;
+ p = _observers.erase(p);
+ IceUtil::Mutex::Lock sync(_reapedMutex);
+ _reaped.push_back(id);
+ continue;
+ }
+ ++p;
+ }
+ // If we now no longer have the majority of observers we raise.
+ if(_observers.size() < _majority)
+ {
+ // TODO: Trace here?
+ //Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
+ //out << op;
+ throw Ice::UnknownException(__FILE__, __LINE__);
+ }
+}
+