diff options
author | Benoit Foucher <benoit@zeroc.com> | 2008-09-26 14:04:54 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2008-09-26 14:04:54 +0200 |
commit | a3c9dfeead519e87ba197e5e66b1013f39fa4366 (patch) | |
tree | a1bb69cdea16ca854173a2e3241b242aacf1b14e /cpp/src/IceGrid/NodeI.cpp | |
parent | Fixed locator potential hang when resolving round-robin replica group which c... (diff) | |
download | ice-a3c9dfeead519e87ba197e5e66b1013f39fa4366.tar.bz2 ice-a3c9dfeead519e87ba197e5e66b1013f39fa4366.tar.xz ice-a3c9dfeead519e87ba197e5e66b1013f39fa4366.zip |
IceGrid fixes to ensure the registry/node don't wait too long if a replica becomes unreachable
Diffstat (limited to 'cpp/src/IceGrid/NodeI.cpp')
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 212 |
1 files changed, 182 insertions, 30 deletions
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index 4328a4aef85..b721c553271 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -188,6 +188,137 @@ private: string _dest; }; +class NodeUp : public NodeI::Update, public AMI_NodeObserver_nodeUp +{ +public: + + NodeUp(const NodeIPtr& node, const NodeObserverPrx& observer, NodeDynamicInfo info) : + Update(node, observer), _info(info) + { + } + + virtual void + send() + { + try + { + _observer->nodeUp_async(this, _info); + } + catch(const Ice::LocalException&) + { + finished(false); + } + } + + virtual void + ice_response() + { + finished(true); + } + + virtual void + ice_exception(const Ice::Exception&) + { + finished(false); + } + +private: + + NodeDynamicInfo _info; +}; + +class UpdateServer : public NodeI::Update, public AMI_NodeObserver_updateServer +{ +public: + + UpdateServer(const NodeIPtr& node, const NodeObserverPrx& observer, ServerDynamicInfo info) : + NodeI::Update(node, observer), _info(info) + { + } + + virtual void + send() + { + try + { + _observer->updateServer_async(this, _node->getName(), _info); + } + catch(const Ice::LocalException&) + { + finished(false); + } + } + + virtual void + ice_response() + { + finished(true); + } + + virtual void + ice_exception(const Ice::Exception&) + { + finished(false); + } + +private: + + ServerDynamicInfo _info; +}; + +class UpdateAdapter : public NodeI::Update, public AMI_NodeObserver_updateAdapter +{ +public: + + UpdateAdapter(const NodeIPtr& node, const NodeObserverPrx& observer, AdapterDynamicInfo info) : + NodeI::Update(node, observer), _info(info) + { + } + + virtual void + send() + { + try + { + _observer->updateAdapter_async(this, _node->getName(), _info); + } + catch(const Ice::LocalException&) + { + finished(false); + } + } + + virtual void + ice_response() + { + finished(true); + } + + virtual void + ice_exception(const Ice::Exception&) + { + finished(false); + } + +private: + + AdapterDynamicInfo _info; +}; + +} + +NodeI::Update::Update(const NodeIPtr& node, const NodeObserverPrx& observer) : _node(node), _observer(observer) +{ +} + +NodeI::Update::~Update() +{ +} + +void +NodeI::Update::finished(bool success) +{ + _node->dequeueUpdate(_observer, this, !success); } NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter, @@ -847,6 +978,8 @@ NodeI::addObserver(const NodeSessionPrx& session, const NodeObserverPrx& observe assert(_observers.find(session) == _observers.end()); _observers.insert(make_pair(session, observer)); + _observerUpdates.erase(observer); // Remove any updates from the previous session. + ServerDynamicInfoSeq serverInfos; AdapterDynamicInfoSeq adapterInfos; for(map<string, ServerDynamicInfo>::const_iterator p = _serversDynamicInfo.begin(); @@ -863,19 +996,11 @@ NodeI::addObserver(const NodeSessionPrx& session, const NodeObserverPrx& observe adapterInfos.push_back(q->second); } - try - { - NodeDynamicInfo info; - info.info = _platform.getNodeInfo(); - info.servers = serverInfos; - info.adapters = adapterInfos; - observer->nodeUp(info); - } - catch(const Ice::LocalException& ex) - { - Ice::Warning out(_traceLevels->logger); - out << "unexpected observer exception:\n" << ex; - } + NodeDynamicInfo info; + info.info = _platform.getNodeInfo(); + info.servers = serverInfos; + info.adapters = adapterInfos; + queueUpdate(observer, new NodeUp(this, observer, info)); } void @@ -910,15 +1035,7 @@ NodeI::observerUpdateServer(const ServerDynamicInfo& info) { if(sent.find(p->second) == sent.end()) { - try - { - p->second->updateServer(_name, info); - sent.insert(p->second); - } - catch(const Ice::LocalException&) - { - // IGNORE - } + queueUpdate(p->second, new UpdateServer(this, p->second, info)); } } } @@ -948,18 +1065,53 @@ NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info) { if(sent.find(p->second) == sent.end()) { - try - { - p->second->updateAdapter(_name, info); - } - catch(const Ice::LocalException&) - { - // IGNORE - } + queueUpdate(p->second, new UpdateAdapter(this, p->second, info)); } } } +void +NodeI::queueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update) +{ + //Lock sync(*this); Called within the synchronization + deque<UpdatePtr>& queue = _observerUpdates[proxy]; + queue.push_back(update); + if(queue.size() == 1) + { + queue.front()->send(); + } +} + +void +NodeI::dequeueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update, bool all) +{ + IceUtil::Mutex::Lock sync(_observerMutex); + map<NodeObserverPrx, deque<UpdatePtr> >::iterator p = _observerUpdates.find(proxy); + if(p == _observerUpdates.end() || p->second.front().get() != update.get()) + { + return; + } + + deque<UpdatePtr>& queue = p->second; + if(all) + { + queue.clear(); + } + else + { + queue.pop_front(); + } + + if(!queue.empty()) + { + queue.front()->send(); + } + else + { + _observerUpdates.erase(p); + } +} + void NodeI::addServer(const ServerIPtr& server, const string& application) { |