diff options
Diffstat (limited to 'cpp/src/IceGrid/NodeSessionI.cpp')
-rw-r--r-- | cpp/src/IceGrid/NodeSessionI.cpp | 446 |
1 files changed, 446 insertions, 0 deletions
diff --git a/cpp/src/IceGrid/NodeSessionI.cpp b/cpp/src/IceGrid/NodeSessionI.cpp new file mode 100644 index 00000000000..d713db01a27 --- /dev/null +++ b/cpp/src/IceGrid/NodeSessionI.cpp @@ -0,0 +1,446 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <IceUtil/UUID.h> +#include <Ice/Ice.h> +#include <IceGrid/NodeSessionI.h> +#include <IceGrid/Database.h> +#include <IceGrid/Topics.h> + +using namespace std; +using namespace IceGrid; + +namespace IceGrid +{ + +class PatcherFeedbackI : public PatcherFeedback +{ +public: + + PatcherFeedbackI(const string& node, + const NodeSessionIPtr& session, + const Ice::Identity id, + const PatcherFeedbackAggregatorPtr& aggregator) : + _node(node), + _session(session), + _id(id), + _aggregator(aggregator) + { + } + + void finished(const Ice::Current&) + { + _aggregator->finished(_node); + _session->removeFeedback(this, _id); + } + + virtual void failed(const string& reason, const Ice::Current& = Ice::Current()) + { + _aggregator->failed(_node, reason); + _session->removeFeedback(this, _id); + } + +private: + + const std::string _node; + const NodeSessionIPtr _session; + const Ice::Identity _id; + const PatcherFeedbackAggregatorPtr _aggregator; +}; + +}; + +PatcherFeedbackAggregator::PatcherFeedbackAggregator(Ice::Identity id, + const TraceLevelsPtr& traceLevels, + const string& type, + const string& name, + int nodeCount) : + _id(id), + _traceLevels(traceLevels), + _type(type), + _name(name), + _count(nodeCount) +{ +} + +PatcherFeedbackAggregator::~PatcherFeedbackAggregator() +{ +} + +void +PatcherFeedbackAggregator::finished(const string& node) +{ + Lock sync(*this); + if(_successes.find(node) != _successes.end() || _failures.find(node) != _failures.end()) + { + return; + } + + if(_traceLevels->patch > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat); + out << "finished patching of " << _type << " `" << _name << "' on node `" << node << "'"; + } + + _successes.insert(node); + checkIfDone(); +} + +void +PatcherFeedbackAggregator::failed(const string& node, const string& failure) +{ + Lock sync(*this); + if(_successes.find(node) != _successes.end() || _failures.find(node) != _failures.end()) + { + return; + } + + if(_traceLevels->patch > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat); + out << "patching of " << _type << " `" << _name << "' on node `" << node <<"' failed:\n" << failure; + } + + _failures.insert(node); + _reasons.push_back("patch on node `" + node + "' failed:\n" + failure); + checkIfDone(); +} + +void +PatcherFeedbackAggregator::checkIfDone() +{ + if(static_cast<int>(_successes.size() + _failures.size()) == _count) + { + if(!_failures.empty()) + { + sort(_reasons.begin(), _reasons.end()); + PatchException ex; + ex.reasons = _reasons; + exception(ex); + } + else + { + response(); + } + } +} + +NodeSessionI::NodeSessionI(const DatabasePtr& database, + const NodePrx& node, + const InternalNodeInfoPtr& info, + int timeout, + const LoadInfo& load) : + _database(database), + _traceLevels(database->getTraceLevels()), + _node(node), + _info(info), + _timeout(timeout), + _timestamp(IceUtil::Time::now(IceUtil::Time::Monotonic)), + _load(load), + _destroy(false) +{ + __setNoDelete(true); + try + { + _database->getNode(info->name, true)->setSession(this); + + ObjectInfo objInfo; + objInfo.type = Node::ice_staticId(); + objInfo.proxy = _node; + _database->addInternalObject(objInfo, true); // Add or update previous node proxy. + + _proxy = NodeSessionPrx::uncheckedCast(_database->getInternalAdapter()->addWithUUID(this)); + } + catch(const NodeActiveException&) + { + __setNoDelete(false); + throw; + } + catch(...) + { + try + { + _database->removeInternalObject(_node->ice_getIdentity()); + } + catch(const ObjectNotRegisteredException&) + { + } + + _database->getNode(info->name)->setSession(0); + + __setNoDelete(false); + throw; + } + __setNoDelete(false); +} + +void +NodeSessionI::keepAlive(const LoadInfo& load, const Ice::Current& current) +{ + Lock sync(*this); + if(_destroy) + { + throw Ice::ObjectNotExistException(__FILE__, __LINE__); + } + + _timestamp = IceUtil::Time::now(IceUtil::Time::Monotonic); + _load = load; + + if(_traceLevels->node > 2) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat); + out << "node `" << _info->name << "' keep alive "; + out << "(load = " << _load.avg1 << ", " << _load.avg5 << ", " << _load.avg15 << ")"; + } +} + +void +NodeSessionI::setReplicaObserver(const ReplicaObserverPrx& observer, const Ice::Current&) +{ + Lock sync(*this); + if(_destroy) + { + return; + } + else if(_replicaObserver) // This might happen on activation of the node. + { + assert(_replicaObserver == observer); + return; + } + + _replicaObserver = observer; + _database->getReplicaCache().subscribe(observer); +} + +int +NodeSessionI::getTimeout(const Ice::Current& current) const +{ + return _timeout; +} + +NodeObserverPrx +NodeSessionI::getObserver(const Ice::Current& current) const +{ + return NodeObserverTopicPtr::dynamicCast(_database->getObserverTopic(NodeObserverTopicName))->getPublisher(); +} + +void +NodeSessionI::loadServers_async(const AMD_NodeSession_loadServersPtr& amdCB, const Ice::Current& current) const +{ + // + // No need to wait for the servers to be loaded. If we were + // waiting, we would have to figure out an appropriate timeout for + // calling this method since each load() call might take time to + // complete. + // + amdCB->ice_response(); + + // + // Get the server proxies to load them on the node. + // + ServerEntrySeq servers = _database->getNode(_info->name)->getServers(); + for(ServerEntrySeq::const_iterator p = servers.begin(); p != servers.end(); ++p) + { + (*p)->sync(); + (*p)->waitForSyncNoThrow(1); // Don't wait too long. + } +} + +Ice::StringSeq +NodeSessionI::getServers(const Ice::Current& current) const +{ + ServerEntrySeq servers = _database->getNode(_info->name)->getServers(); + Ice::StringSeq names; + for(ServerEntrySeq::const_iterator p = servers.begin(); p != servers.end(); ++p) + { + names.push_back((*p)->getId()); + } + return names; +} + +void +NodeSessionI::waitForApplicationUpdate_async(const AMD_NodeSession_waitForApplicationUpdatePtr& cb, + const std::string& application, + int revision, + const Ice::Current&) const +{ + _database->waitForApplicationUpdate(cb, application, revision); +} + +void +NodeSessionI::destroy(const Ice::Current&) +{ + destroyImpl(false); +} + +IceUtil::Time +NodeSessionI::timestamp() const +{ + Lock sync(*this); + if(_destroy) + { + throw Ice::ObjectNotExistException(__FILE__, __LINE__); + } + return _timestamp; +} + +void +NodeSessionI::shutdown() +{ + destroyImpl(true); +} + +void +NodeSessionI::patch(const PatcherFeedbackAggregatorPtr& aggregator, + const string& application, + const string& server, + const InternalDistributionDescriptorPtr& dist, + bool shutdown) +{ + Ice::Identity id; + id.category = _database->getInstanceName(); + id.name = IceUtil::generateUUID(); + + PatcherFeedbackPtr obj = new PatcherFeedbackI(_info->name, this, id, aggregator); + try + { + PatcherFeedbackPrx feedback = PatcherFeedbackPrx::uncheckedCast(_database->getInternalAdapter()->add(obj, id)); + _node->patch(feedback, application, server, dist, shutdown); + + Lock sync(*this); + if(_destroy) + { + throw NodeUnreachableException(_info->name, "node is down"); + } + _feedbacks.insert(obj); + } + catch(const Ice::LocalException& ex) + { + ostringstream os; + os << "node unreachable:\n" << ex; + obj->failed(os.str()); + } +} + +const NodePrx& +NodeSessionI::getNode() const +{ + return _node; +} + +const InternalNodeInfoPtr& +NodeSessionI::getInfo() const +{ + return _info; +} + +const LoadInfo& +NodeSessionI::getLoadInfo() const +{ + Lock sync(*this); + return _load; +} + +NodeSessionPrx +NodeSessionI::getProxy() const +{ + return _proxy; +} + +bool +NodeSessionI::isDestroyed() const +{ + Lock sync(*this); + return _destroy; +} + +void +NodeSessionI::destroyImpl(bool shutdown) +{ + { + Lock sync(*this); + if(_destroy) + { + throw Ice::ObjectNotExistException(__FILE__, __LINE__); + } + _destroy = true; + } + + ServerEntrySeq servers = _database->getNode(_info->name)->getServers(); + for_each(servers.begin(), servers.end(), IceUtil::voidMemFun(&ServerEntry::unsync)); + + // + // If the registry isn't being shutdown we remove the node + // internal proxy from the database. + // + if(!shutdown) + { + _database->removeInternalObject(_node->ice_getIdentity()); + } + + // + // Next we notify the observer. + // + NodeObserverTopicPtr::dynamicCast(_database->getObserverTopic(NodeObserverTopicName))->nodeDown(_info->name); + + // + // Unsubscribe the node replica observer. + // + if(_replicaObserver) + { + _database->getReplicaCache().unsubscribe(_replicaObserver); + _replicaObserver = 0; + } + + // + // Finally, we clear the session, this must be done last. As soon + // as the node entry session is set to 0 another session might be + // created. + // + _database->getNode(_info->name)->setSession(0); + + // + // Clean up the patcher feedback servants (this will call back + // removeFeedback so we need to use a temporary set). + // + set<PatcherFeedbackPtr> feedbacks; + _feedbacks.swap(feedbacks); + for(set<PatcherFeedbackPtr>::const_iterator p = feedbacks.begin(); p != feedbacks.end(); ++p) + { + (*p)->failed("node is down"); + } + + if(!shutdown) + { + try + { + _database->getInternalAdapter()->remove(_proxy->ice_getIdentity()); + } + catch(const Ice::ObjectAdapterDeactivatedException&) + { + } + } +} + +void +NodeSessionI::removeFeedback(const PatcherFeedbackPtr& feedback, const Ice::Identity& id) +{ + try + { + _database->getInternalAdapter()->remove(id); + } + catch(const Ice::LocalException&) + { + } + { + Lock sync(*this); + _feedbacks.erase(feedback); + } +} + |