diff options
Diffstat (limited to 'cpp/src/IceGrid/NodeI.cpp')
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 226 |
1 files changed, 194 insertions, 32 deletions
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index 4328a4aef85..140e2e73d81 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -1,6 +1,6 @@ // ********************************************************************** // -// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. // // This copy of Ice is licensed to you under the terms described in the // ICE_LICENSE file included in this distribution. @@ -188,6 +188,140 @@ private: string _dest; }; +class NodeUp : public NodeI::Update, public AMI_NodeObserver_nodeUp +{ +public: + + NodeUp(const NodeIPtr& node, const NodeObserverPrx& observer, NodeDynamicInfo info) : + NodeI::Update(node, observer), _info(info) + { + } + + virtual bool + send() + { + try + { + _observer->nodeUp_async(this, _info); + } + catch(const Ice::LocalException&) + { + return false; + } + return true; + } + + virtual void + ice_response() + { + finished(true); + } + + virtual void + ice_exception(const Ice::Exception&) + { + finished(false); + } + +private: + + NodeDynamicInfo _info; +}; + +class UpdateServer : public NodeI::Update, public AMI_NodeObserver_updateServer +{ +public: + + UpdateServer(const NodeIPtr& node, const NodeObserverPrx& observer, ServerDynamicInfo info) : + NodeI::Update(node, observer), _info(info) + { + } + + virtual bool + send() + { + try + { + _observer->updateServer_async(this, _node->getName(), _info); + } + catch(const Ice::LocalException&) + { + return false; + } + return true; + } + + virtual void + ice_response() + { + finished(true); + } + + virtual void + ice_exception(const Ice::Exception&) + { + finished(false); + } + +private: + + ServerDynamicInfo _info; +}; + +class UpdateAdapter : public NodeI::Update, public AMI_NodeObserver_updateAdapter +{ +public: + + UpdateAdapter(const NodeIPtr& node, const NodeObserverPrx& observer, AdapterDynamicInfo info) : + NodeI::Update(node, observer), _info(info) + { + } + + virtual bool + send() + { + try + { + _observer->updateAdapter_async(this, _node->getName(), _info); + } + catch(const Ice::LocalException&) + { + return false; + } + return true; + } + + virtual void + ice_response() + { + finished(true); + } + + virtual void + ice_exception(const Ice::Exception&) + { + finished(false); + } + +private: + + AdapterDynamicInfo _info; +}; + +} + +NodeI::Update::Update(const NodeIPtr& node, const NodeObserverPrx& observer) : _node(node), _observer(observer) +{ +} + +NodeI::Update::~Update() +{ +} + +void +NodeI::Update::finished(bool success) +{ + _node->dequeueUpdate(_observer, this, !success); } NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter, @@ -697,9 +831,17 @@ NodeI::read(const string& filename, Ice::Long pos, int size, Ice::Long& newPos, } void -NodeI::destroy() +NodeI::shutdown() { IceUtil::Mutex::Lock sync(_serversLock); + for(map<string, set<ServerIPtr> >::const_iterator p = _serversByApplication.begin(); + p != _serversByApplication.end(); ++p) + { + for(set<ServerIPtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q) + { + (*q)->shutdown(); + } + } _serversByApplication.clear(); } @@ -847,6 +989,8 @@ NodeI::addObserver(const NodeSessionPrx& session, const NodeObserverPrx& observe assert(_observers.find(session) == _observers.end()); _observers.insert(make_pair(session, observer)); + _observerUpdates.erase(observer); // Remove any updates from the previous session. + ServerDynamicInfoSeq serverInfos; AdapterDynamicInfoSeq adapterInfos; for(map<string, ServerDynamicInfo>::const_iterator p = _serversDynamicInfo.begin(); @@ -863,19 +1007,11 @@ NodeI::addObserver(const NodeSessionPrx& session, const NodeObserverPrx& observe adapterInfos.push_back(q->second); } - try - { - NodeDynamicInfo info; - info.info = _platform.getNodeInfo(); - info.servers = serverInfos; - info.adapters = adapterInfos; - observer->nodeUp(info); - } - catch(const Ice::LocalException& ex) - { - Ice::Warning out(_traceLevels->logger); - out << "unexpected observer exception:\n" << ex; - } + NodeDynamicInfo info; + info.info = _platform.getNodeInfo(); + info.servers = serverInfos; + info.adapters = adapterInfos; + queueUpdate(observer, new NodeUp(this, observer, info)); } void @@ -910,15 +1046,7 @@ NodeI::observerUpdateServer(const ServerDynamicInfo& info) { if(sent.find(p->second) == sent.end()) { - try - { - p->second->updateServer(_name, info); - sent.insert(p->second); - } - catch(const Ice::LocalException&) - { - // IGNORE - } + queueUpdate(p->second, new UpdateServer(this, p->second, info)); } } } @@ -948,16 +1076,50 @@ NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info) { if(sent.find(p->second) == sent.end()) { - try - { - p->second->updateAdapter(_name, info); - } - catch(const Ice::LocalException&) - { - // IGNORE - } + queueUpdate(p->second, new UpdateAdapter(this, p->second, info)); + } + } +} + +void +NodeI::queueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update) +{ + //Lock sync(*this); Called within the synchronization + map<NodeObserverPrx, deque<UpdatePtr> >::iterator p = _observerUpdates.find(proxy); + if(p == _observerUpdates.end()) + { + if(update->send()) + { + _observerUpdates[proxy].push_back(update); } } + else + { + p->second.push_back(update); + } +} + +void +NodeI::dequeueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update, bool all) +{ + IceUtil::Mutex::Lock sync(_observerMutex); + map<NodeObserverPrx, deque<UpdatePtr> >::iterator p = _observerUpdates.find(proxy); + if(p == _observerUpdates.end() || p->second.front().get() != update.get()) + { + return; + } + + p->second.pop_front(); + + if(all || (!p->second.empty() && !p->second.front()->send())) + { + p->second.clear(); + } + + if(p->second.empty()) + { + _observerUpdates.erase(p); + } } void |