summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/NodeI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/NodeI.cpp')
-rw-r--r--cpp/src/IceGrid/NodeI.cpp1383
1 files changed, 1383 insertions, 0 deletions
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
new file mode 100644
index 00000000000..964f4de9a32
--- /dev/null
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -0,0 +1,1383 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <IceUtil/Timer.h>
+#include <IceUtil/FileUtil.h>
+#include <Ice/Ice.h>
+#include <IcePatch2/Util.h>
+#include <IcePatch2/ClientUtil.h>
+#include <IceGrid/NodeI.h>
+#include <IceGrid/Activator.h>
+#include <IceGrid/ServerI.h>
+#include <IceGrid/ServerAdapterI.h>
+#include <IceGrid/Util.h>
+#include <IceGrid/TraceLevels.h>
+#include <IceGrid/NodeSessionManager.h>
+
+using namespace std;
+using namespace IcePatch2;
+using namespace IceGrid;
+
+namespace
+{
+
+class LogPatcherFeedback : public IcePatch2::PatcherFeedback
+{
+public:
+
+ LogPatcherFeedback(const TraceLevelsPtr& traceLevels, const string& dest) :
+ _traceLevels(traceLevels),
+ _startedPatch(false),
+ _lastProgress(0),
+ _dest(dest)
+ {
+ }
+
+ void
+ setPatchingPath(const string& path)
+ {
+ _path = path;
+ _startedPatch = false;
+ _lastProgress = 0;
+ }
+
+ virtual bool
+ noFileSummary(const string& reason)
+ {
+ if(_traceLevels->patch > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
+ out << _dest << ": can't load summary file (will perform a thorough patch):\n" << reason;
+ }
+ return true;
+ }
+
+ virtual bool
+ checksumStart()
+ {
+ if(_traceLevels->patch > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
+ out << _dest << ": started checksum calculation";
+ }
+ return true;
+ }
+
+ virtual bool
+ checksumProgress(const string& path)
+ {
+ if(_traceLevels->patch > 2)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
+ out << _dest << ": calculating checksum for " << getBasename(path);
+ }
+ return true;
+ }
+
+ virtual bool
+ checksumEnd()
+ {
+ if(_traceLevels->patch > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
+ out << _dest << ": finished checksum calculation";
+ }
+ return true;
+ }
+
+ virtual bool
+ fileListStart()
+ {
+ if(_traceLevels->patch > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
+ out << _dest << ": getting list of file to patch";
+ }
+ return true;
+ }
+
+ virtual bool
+ fileListProgress(Ice::Int percent)
+ {
+ return true;
+ }
+
+ virtual bool
+ fileListEnd()
+ {
+ if(_traceLevels->patch > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
+ out << _dest << ": getting list of file to patch completed";
+ }
+ return true;
+ }
+
+ virtual bool
+ patchStart(const string& path, Ice::Long size, Ice::Long totalProgress, Ice::Long totalSize)
+ {
+ if(_traceLevels->patch > 1 && totalSize > (1024 * 1024))
+ {
+ int progress = static_cast<int>(static_cast<double>(totalProgress) / totalSize * 100.0);
+ progress /= 5;
+ progress *= 5;
+ if(progress != _lastProgress)
+ {
+ _lastProgress = progress;
+ Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
+ out << _dest << ": downloaded " << progress << "% (" << totalProgress << '/' << totalSize << ')';
+ if(!_path.empty())
+ {
+ out << " of " << _path;
+ }
+ }
+ }
+ else if(_traceLevels->patch > 0)
+ {
+ if(!_startedPatch)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
+ int roundedSize = static_cast<int>(static_cast<double>(totalSize) / 1024);
+ if(roundedSize == 0 && totalSize > 0)
+ {
+ roundedSize = 1;
+ }
+ out << _dest << ": downloading " << (_path.empty() ? string("") : (_path + " ")) << roundedSize
+ << "KB ";
+ _startedPatch = true;
+ }
+ }
+
+ return true;
+ }
+
+ virtual bool
+ patchProgress(Ice::Long progress, Ice::Long size, Ice::Long totalProgress, Ice::Long totalSize)
+ {
+ return true;
+ }
+
+ virtual bool
+ patchEnd()
+ {
+ return true;
+ }
+
+ void
+ finishPatch()
+ {
+ if(_traceLevels->patch > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->patchCat);
+ out << _dest << ": downloading completed";
+ }
+ }
+
+private:
+
+ const TraceLevelsPtr _traceLevels;
+ bool _startedPatch;
+ int _lastProgress;
+ string _path;
+ string _dest;
+};
+
+class NodeUp : public NodeI::Update, public AMI_NodeObserver_nodeUp
+{
+public:
+
+ NodeUp(const NodeIPtr& node, const NodeObserverPrx& observer, NodeDynamicInfo info) :
+ NodeI::Update(node, observer), _info(info)
+ {
+ }
+
+ virtual bool
+ send()
+ {
+ try
+ {
+ _observer->nodeUp_async(this, _info);
+ }
+ catch(const Ice::LocalException&)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ virtual void
+ ice_response()
+ {
+ finished(true);
+ }
+
+ virtual void
+ ice_exception(const Ice::Exception&)
+ {
+ finished(false);
+ }
+
+private:
+
+ NodeDynamicInfo _info;
+};
+
+class UpdateServer : public NodeI::Update, public AMI_NodeObserver_updateServer
+{
+public:
+
+ UpdateServer(const NodeIPtr& node, const NodeObserverPrx& observer, ServerDynamicInfo info) :
+ NodeI::Update(node, observer), _info(info)
+ {
+ }
+
+ virtual bool
+ send()
+ {
+ try
+ {
+ _observer->updateServer_async(this, _node->getName(), _info);
+ }
+ catch(const Ice::LocalException&)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ virtual void
+ ice_response()
+ {
+ finished(true);
+ }
+
+ virtual void
+ ice_exception(const Ice::Exception&)
+ {
+ finished(false);
+ }
+
+private:
+
+ ServerDynamicInfo _info;
+};
+
+class UpdateAdapter : public NodeI::Update, public AMI_NodeObserver_updateAdapter
+{
+public:
+
+ UpdateAdapter(const NodeIPtr& node, const NodeObserverPrx& observer, AdapterDynamicInfo info) :
+ NodeI::Update(node, observer), _info(info)
+ {
+ }
+
+ virtual bool
+ send()
+ {
+ try
+ {
+ _observer->updateAdapter_async(this, _node->getName(), _info);
+ }
+ catch(const Ice::LocalException&)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ virtual void
+ ice_response()
+ {
+ finished(true);
+ }
+
+ virtual void
+ ice_exception(const Ice::Exception&)
+ {
+ finished(false);
+ }
+
+private:
+
+ AdapterDynamicInfo _info;
+};
+
+}
+
+NodeI::Update::Update(const NodeIPtr& node, const NodeObserverPrx& observer) : _node(node), _observer(observer)
+{
+}
+
+NodeI::Update::~Update()
+{
+}
+
+void
+NodeI::Update::finished(bool success)
+{
+ _node->dequeueUpdate(_observer, this, !success);
+}
+
+NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter,
+ NodeSessionManager& sessions,
+ const ActivatorPtr& activator,
+ const IceUtil::TimerPtr& timer,
+ const TraceLevelsPtr& traceLevels,
+ const NodePrx& proxy,
+ const string& name,
+ const UserAccountMapperPrx& mapper) :
+ _communicator(adapter->getCommunicator()),
+ _adapter(adapter),
+ _sessions(sessions),
+ _activator(activator),
+ _timer(timer),
+ _traceLevels(traceLevels),
+ _name(name),
+ _proxy(proxy),
+ _redirectErrToOut(false),
+ _allowEndpointsOverride(false),
+ _waitTime(0),
+ _userAccountMapper(mapper),
+ _platform("IceGrid.Node", _communicator, _traceLevels),
+ _fileCache(new FileCache(_communicator)),
+ _serial(1),
+ _consistencyCheckDone(false)
+{
+ Ice::PropertiesPtr props = _communicator->getProperties();
+
+ const_cast<string&>(_dataDir) = _platform.getDataDir();
+ const_cast<string&>(_serversDir) = _dataDir + "/servers";
+ const_cast<string&>(_tmpDir) = _dataDir + "/tmp";
+ const_cast<string&>(_instanceName) = _communicator->getDefaultLocator()->ice_getIdentity().category;
+ const_cast<Ice::Int&>(_waitTime) = props->getPropertyAsIntWithDefault("IceGrid.Node.WaitTime", 60);
+ const_cast<string&>(_outputDir) = props->getProperty("IceGrid.Node.Output");
+ const_cast<bool&>(_redirectErrToOut) = props->getPropertyAsInt("IceGrid.Node.RedirectErrToOut") > 0;
+ const_cast<bool&>(_allowEndpointsOverride) = props->getPropertyAsInt("IceGrid.Node.AllowEndpointsOverride") > 0;
+
+ //
+ // Parse the properties override property.
+ //
+ vector<string> overrides = props->getPropertyAsList("IceGrid.Node.PropertiesOverride");
+ if(!overrides.empty())
+ {
+ for(vector<string>::iterator p = overrides.begin(); p != overrides.end(); ++p)
+ {
+ if(p->find("--") != 0)
+ {
+ *p = "--" + *p;
+ }
+ }
+
+ Ice::PropertiesPtr p = Ice::createProperties();
+ p->parseCommandLineOptions("", overrides);
+ Ice::PropertyDict propDict = p->getPropertiesForPrefix("");
+ for(Ice::PropertyDict::const_iterator q = propDict.begin(); q != propDict.end(); ++q)
+ {
+ _propertiesOverride.push_back(createProperty(q->first, q->second));
+ }
+ }
+}
+
+NodeI::~NodeI()
+{
+}
+
+void
+NodeI::loadServer_async(const AMD_Node_loadServerPtr& amdCB,
+ const InternalServerDescriptorPtr& descriptor,
+ const string& replicaName,
+ const Ice::Current& current)
+{
+ ServerCommandPtr command;
+ {
+ Lock sync(*this);
+ ++_serial;
+
+ Ice::Identity id = createServerIdentity(descriptor->id);
+
+ //
+ // Check if we already have a servant for this server. If that's
+ // the case, the server is already loaded and we just need to
+ // update it.
+ //
+ while(true)
+ {
+ bool added = false;
+ ServerIPtr server;
+ try
+ {
+ server = ServerIPtr::dynamicCast(_adapter->find(id));
+ if(!server)
+ {
+ ServerPrx proxy = ServerPrx::uncheckedCast(_adapter->createProxy(id));
+ server = new ServerI(this, proxy, _serversDir, descriptor->id, _waitTime);
+ _adapter->add(server, id);
+ added = true;
+ }
+ }
+ catch(const Ice::ObjectAdapterDeactivatedException&)
+ {
+ //
+ // We throw an object not exist exception to avoid
+ // dispatch warnings. The registry will consider the
+ // node has being unreachable upon receival of this
+ // exception (like any other Ice::LocalException). We
+ // could also have disabled dispatch warnings but they
+ // can still useful to catch other issues.
+ //
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__, current.id, current.facet, current.operation);
+ }
+
+ try
+ {
+ command = server->load(amdCB, descriptor, replicaName);
+ }
+ catch(const Ice::ObjectNotExistException&)
+ {
+ assert(!added);
+ continue;
+ }
+ catch(const Ice::Exception&)
+ {
+ if(added)
+ {
+ try
+ {
+ _adapter->remove(id);
+ }
+ catch(const Ice::ObjectAdapterDeactivatedException&)
+ {
+ // IGNORE
+ }
+ }
+ throw;
+ }
+ break;
+ }
+ }
+ if(command)
+ {
+ command->execute();
+ }
+}
+
+void
+NodeI::destroyServer_async(const AMD_Node_destroyServerPtr& amdCB,
+ const string& serverId,
+ const string& uuid,
+ int revision,
+ const string& replicaName,
+ const Ice::Current& current)
+{
+ ServerCommandPtr command;
+ {
+ Lock sync(*this);
+ ++_serial;
+
+ ServerIPtr server;
+ try
+ {
+ server = ServerIPtr::dynamicCast(_adapter->find(createServerIdentity(serverId)));
+ }
+ catch(const Ice::ObjectAdapterDeactivatedException&)
+ {
+ //
+ // We throw an object not exist exception to avoid
+ // dispatch warnings. The registry will consider the node
+ // has being unreachable upon receival of this exception
+ // (like any other Ice::LocalException). We could also
+ // have disabled dispatch warnings but they can still
+ // useful to catch other issues.
+ //
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__, current.id, current.facet, current.operation);
+ }
+
+ if(!server)
+ {
+ server = new ServerI(this, 0, _serversDir, serverId, _waitTime);
+ }
+
+ //
+ // Destroy the server object if it's loaded.
+ //
+ try
+ {
+ command = server->destroy(amdCB, uuid, revision, replicaName);
+ }
+ catch(const Ice::ObjectNotExistException&)
+ {
+ amdCB->ice_response();
+ return;
+ }
+ }
+ if(command)
+ {
+ command->execute();
+ }
+}
+
+void
+NodeI::patch_async(const AMD_Node_patchPtr& amdCB,
+ const PatcherFeedbackPrx& feedback,
+ const string& application,
+ const string& server,
+ const InternalDistributionDescriptorPtr& appDistrib,
+ bool shutdown,
+ const Ice::Current&)
+{
+ amdCB->ice_response();
+
+ {
+ Lock sync(*this);
+ while(_patchInProgress.find(application) != _patchInProgress.end())
+ {
+ wait();
+ }
+ _patchInProgress.insert(application);
+ }
+
+
+ set<ServerIPtr> servers;
+ bool patchApplication = !appDistrib->icepatch.empty();
+ if(server.empty())
+ {
+ //
+ // Patch all the servers from the application.
+ //
+ servers = getApplicationServers(application);
+ }
+ else
+ {
+ ServerIPtr svr;
+ try
+ {
+ svr = ServerIPtr::dynamicCast(_adapter->find(createServerIdentity(server)));
+ }
+ catch(const Ice::ObjectAdapterDeactivatedException&)
+ {
+ }
+
+ if(svr)
+ {
+ if(appDistrib->icepatch.empty() || !svr->dependsOnApplicationDistrib())
+ {
+ //
+ // Don't patch the application if the server doesn't
+ // depend on it.
+ //
+ patchApplication = false;
+ servers.insert(svr);
+ }
+ else
+ {
+ //
+ // If the server to patch depends on the application,
+ // we need to shutdown all the application servers
+ // that depend on the application.
+ //
+ servers = getApplicationServers(application);
+ }
+ }
+ }
+
+ set<ServerIPtr>::iterator s = servers.begin();
+ while(s != servers.end())
+ {
+ if(!appDistrib->icepatch.empty() && (*s)->dependsOnApplicationDistrib())
+ {
+ ++s;
+ }
+ else if((*s)->getDistribution() && (server.empty() || server == (*s)->getId()))
+ {
+ ++s;
+ }
+ else
+ {
+ //
+ // Exclude servers which don't depend on the application distribution
+ // or don't have a distribution.
+ //
+ servers.erase(s++);
+ }
+ }
+
+ string failure;
+ if(!servers.empty())
+ {
+ try
+ {
+ set<ServerIPtr>::iterator s = servers.begin();
+ vector<string> running;
+ while(s != servers.end())
+ {
+ try
+ {
+ if(!(*s)->startPatch(shutdown))
+ {
+ running.push_back((*s)->getId());
+ servers.erase(s++);
+ }
+ else
+ {
+ ++s;
+ }
+ }
+ catch(const Ice::ObjectNotExistException&)
+ {
+ servers.erase(s++);
+ }
+ }
+
+ if(!running.empty())
+ {
+ if(running.size() == 1)
+ {
+ throw "server `" + toString(running) + "' is active";
+ }
+ else
+ {
+ throw "servers `" + toString(running, ", ") + "' are active";
+ }
+ }
+
+ for(s = servers.begin(); s != servers.end(); ++s)
+ {
+ (*s)->waitForPatch();
+ }
+
+ //
+ // Patch the application.
+ //
+ FileServerPrx icepatch;
+ if(patchApplication)
+ {
+ assert(!appDistrib->icepatch.empty());
+ icepatch = FileServerPrx::checkedCast(_communicator->stringToProxy(appDistrib->icepatch));
+ if(!icepatch)
+ {
+ throw "proxy `" + appDistrib->icepatch + "' is not a file server.";
+ }
+ patch(icepatch, "distrib/" + application, appDistrib->directories);
+ }
+
+ //
+ // Patch the server(s).
+ //
+ for(s = servers.begin(); s != servers.end(); ++s)
+ {
+ InternalDistributionDescriptorPtr dist = (*s)->getDistribution();
+ if(dist && (server.empty() || (*s)->getId() == server))
+ {
+ icepatch = FileServerPrx::checkedCast(_communicator->stringToProxy(dist->icepatch));
+ if(!icepatch)
+ {
+ throw "proxy `" + dist->icepatch + "' is not a file server.";
+ }
+ patch(icepatch, "servers/" + (*s)->getId() + "/distrib", dist->directories);
+
+ if(!server.empty())
+ {
+ break; // No need to continue.
+ }
+ }
+ }
+ }
+ catch(const Ice::LocalException& e)
+ {
+ ostringstream os;
+ os << e;
+ failure = os.str();
+ }
+ catch(const string& e)
+ {
+ failure = e;
+ }
+ catch(const char* e)
+ {
+ failure = e;
+ }
+
+ for(set<ServerIPtr>::const_iterator s = servers.begin(); s != servers.end(); ++s)
+ {
+ (*s)->finishPatch();
+ }
+ }
+
+ {
+ Lock sync(*this);
+ _patchInProgress.erase(application);
+ notifyAll();
+ }
+
+ try
+ {
+ if(failure.empty())
+ {
+ feedback->finished();
+ }
+ else
+ {
+ feedback->failed(failure);
+ }
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+}
+
+void
+NodeI::registerWithReplica(const InternalRegistryPrx& replica, const Ice::Current&)
+{
+ _sessions.create(replica);
+}
+
+void
+NodeI::replicaInit(const InternalRegistryPrxSeq& replicas, const Ice::Current&)
+{
+ _sessions.replicaInit(replicas);
+}
+
+void
+NodeI::replicaAdded(const InternalRegistryPrx& replica, const Ice::Current&)
+{
+ _sessions.replicaAdded(replica);
+}
+
+void
+NodeI::replicaRemoved(const InternalRegistryPrx& replica, const Ice::Current&)
+{
+ _sessions.replicaRemoved(replica);
+}
+
+std::string
+NodeI::getName(const Ice::Current&) const
+{
+ return _name;
+}
+
+std::string
+NodeI::getHostname(const Ice::Current&) const
+{
+ return _platform.getHostname();
+}
+
+LoadInfo
+NodeI::getLoad(const Ice::Current&) const
+{
+ return _platform.getLoadInfo();
+}
+
+int
+NodeI::getProcessorSocketCount(const Ice::Current&) const
+{
+ return _platform.getProcessorSocketCount();
+}
+
+void
+NodeI::shutdown(const Ice::Current&) const
+{
+ _activator->shutdown();
+}
+
+Ice::Long
+NodeI::getOffsetFromEnd(const string& filename, int count, const Ice::Current&) const
+{
+ return _fileCache->getOffsetFromEnd(getFilePath(filename), count);
+}
+
+bool
+NodeI::read(const string& filename, Ice::Long pos, int size, Ice::Long& newPos, Ice::StringSeq& lines,
+ const Ice::Current&) const
+{
+ return _fileCache->read(getFilePath(filename), pos, size, newPos, lines);
+}
+
+void
+NodeI::shutdown()
+{
+ IceUtil::Mutex::Lock sync(_serversLock);
+ for(map<string, set<ServerIPtr> >::const_iterator p = _serversByApplication.begin();
+ p != _serversByApplication.end(); ++p)
+ {
+ for(set<ServerIPtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q)
+ {
+ (*q)->shutdown();
+ }
+ }
+ _serversByApplication.clear();
+}
+
+Ice::CommunicatorPtr
+NodeI::getCommunicator() const
+{
+ return _communicator;
+}
+
+Ice::ObjectAdapterPtr
+NodeI::getAdapter() const
+{
+ return _adapter;
+}
+
+ActivatorPtr
+NodeI::getActivator() const
+{
+ return _activator;
+}
+
+IceUtil::TimerPtr
+NodeI::getTimer() const
+{
+ return _timer;
+}
+
+TraceLevelsPtr
+NodeI::getTraceLevels() const
+{
+ return _traceLevels;
+}
+
+UserAccountMapperPrx
+NodeI::getUserAccountMapper() const
+{
+ return _userAccountMapper;
+}
+
+PlatformInfo&
+NodeI::getPlatformInfo() const
+{
+ return _platform;
+}
+
+FileCachePtr
+NodeI::getFileCache() const
+{
+ return _fileCache;
+}
+
+NodePrx
+NodeI::getProxy() const
+{
+ return _proxy;
+}
+
+const PropertyDescriptorSeq&
+NodeI::getPropertiesOverride() const
+{
+ return _propertiesOverride;
+}
+
+string
+NodeI::getOutputDir() const
+{
+ return _outputDir;
+}
+
+bool
+NodeI::getRedirectErrToOut() const
+{
+ return _redirectErrToOut;
+}
+
+bool
+NodeI::allowEndpointsOverride() const
+{
+ return _allowEndpointsOverride;
+}
+
+NodeSessionPrx
+NodeI::registerWithRegistry(const InternalRegistryPrx& registry)
+{
+ return registry->registerNode(_platform.getInternalNodeInfo(), _proxy, _platform.getLoadInfo());
+}
+
+void
+NodeI::checkConsistency(const NodeSessionPrx& session)
+{
+ //
+ // Only do the consistency check on the startup. This ensures that servers can't
+ // be removed by a bogus master when the master session is re-established.
+ //
+ if(_consistencyCheckDone)
+ {
+ return;
+ }
+ _consistencyCheckDone = true;
+
+ //
+ // We use a serial number to keep track of the concurrent changes
+ // on the node. When a server is loaded/destroyed the serial is
+ // incremented. This allows to ensure that the list of servers
+ // returned by the registry is consistent with the servers
+ // currently deployed on the node: if the serial didn't change
+ // after getting the list of servers from the registry, we have
+ // the accurate list of servers that should be deployed on the
+ // node.
+ //
+ unsigned long serial = 0;
+ Ice::StringSeq servers;
+ vector<ServerCommandPtr> commands;
+ while(true)
+ {
+ {
+ Lock sync(*this);
+ if(serial == _serial)
+ {
+ _serial = 1; // We can reset the serial number.
+ commands = checkConsistencyNoSync(servers);
+ break;
+ }
+ serial = _serial;
+ }
+ assert(session);
+ try
+ {
+ servers = session->getServers();
+ }
+ catch(const Ice::LocalException&)
+ {
+ return; // The connection with the session was lost.
+ }
+ sort(servers.begin(), servers.end());
+ }
+
+ for_each(commands.begin(), commands.end(), IceUtil::voidMemFun(&ServerCommand::execute));
+}
+
+void
+NodeI::addObserver(const NodeSessionPrx& session, const NodeObserverPrx& observer)
+{
+ IceUtil::Mutex::Lock sync(_observerMutex);
+ assert(_observers.find(session) == _observers.end());
+ _observers.insert(make_pair(session, observer));
+
+ _observerUpdates.erase(observer); // Remove any updates from the previous session.
+
+ ServerDynamicInfoSeq serverInfos;
+ AdapterDynamicInfoSeq adapterInfos;
+ for(map<string, ServerDynamicInfo>::const_iterator p = _serversDynamicInfo.begin();
+ p != _serversDynamicInfo.end(); ++p)
+ {
+ assert(p->second.state != Destroyed && (p->second.state != Inactive || !p->second.enabled));
+ serverInfos.push_back(p->second);
+ }
+
+ for(map<string, AdapterDynamicInfo>::const_iterator q = _adaptersDynamicInfo.begin();
+ q != _adaptersDynamicInfo.end(); ++q)
+ {
+ assert(q->second.proxy);
+ adapterInfos.push_back(q->second);
+ }
+
+ NodeDynamicInfo info;
+ info.info = _platform.getNodeInfo();
+ info.servers = serverInfos;
+ info.adapters = adapterInfos;
+ queueUpdate(observer, new NodeUp(this, observer, info));
+}
+
+void
+NodeI::removeObserver(const NodeSessionPrx& session)
+{
+ IceUtil::Mutex::Lock sync(_observerMutex);
+ _observers.erase(session);
+}
+
+void
+NodeI::observerUpdateServer(const ServerDynamicInfo& info)
+{
+ IceUtil::Mutex::Lock sync(_observerMutex);
+
+ if(info.state == Destroyed || (info.state == Inactive && info.enabled))
+ {
+ _serversDynamicInfo.erase(info.id);
+ }
+ else
+ {
+ _serversDynamicInfo[info.id] = info;
+ }
+
+ //
+ // Send the update and make sure we don't send the update twice to
+ // the same observer (it's possible for the observer to be
+ // registered twice if a replica is removed and added right away
+ // after).
+ //
+ set<NodeObserverPrx> sent;
+ for(map<NodeSessionPrx, NodeObserverPrx>::const_iterator p = _observers.begin(); p != _observers.end(); ++p)
+ {
+ if(sent.find(p->second) == sent.end())
+ {
+ queueUpdate(p->second, new UpdateServer(this, p->second, info));
+ }
+ }
+}
+
+void
+NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info)
+{
+ IceUtil::Mutex::Lock sync(_observerMutex);
+
+ if(info.proxy)
+ {
+ _adaptersDynamicInfo[info.id] = info;
+ }
+ else
+ {
+ _adaptersDynamicInfo.erase(info.id);
+ }
+
+ //
+ // Send the update and make sure we don't send the update twice to
+ // the same observer (it's possible for the observer to be
+ // registered twice if a replica is removed and added right away
+ // after).
+ //
+ set<NodeObserverPrx> sent;
+ for(map<NodeSessionPrx, NodeObserverPrx>::const_iterator p = _observers.begin(); p != _observers.end(); ++p)
+ {
+ if(sent.find(p->second) == sent.end())
+ {
+ queueUpdate(p->second, new UpdateAdapter(this, p->second, info));
+ }
+ }
+}
+
+void
+NodeI::queueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update)
+{
+ //Lock sync(*this); Called within the synchronization
+ map<NodeObserverPrx, deque<UpdatePtr> >::iterator p = _observerUpdates.find(proxy);
+ if(p == _observerUpdates.end())
+ {
+ if(update->send())
+ {
+ _observerUpdates[proxy].push_back(update);
+ }
+ }
+ else
+ {
+ p->second.push_back(update);
+ }
+}
+
+void
+NodeI::dequeueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update, bool all)
+{
+ IceUtil::Mutex::Lock sync(_observerMutex);
+ map<NodeObserverPrx, deque<UpdatePtr> >::iterator p = _observerUpdates.find(proxy);
+ if(p == _observerUpdates.end() || p->second.front().get() != update.get())
+ {
+ return;
+ }
+
+ p->second.pop_front();
+
+ if(all || (!p->second.empty() && !p->second.front()->send()))
+ {
+ p->second.clear();
+ }
+
+ if(p->second.empty())
+ {
+ _observerUpdates.erase(p);
+ }
+}
+
+void
+NodeI::addServer(const ServerIPtr& server, const string& application)
+{
+ IceUtil::Mutex::Lock sync(_serversLock);
+ map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application);
+ if(p == _serversByApplication.end())
+ {
+ map<string, set<ServerIPtr> >::value_type v(application, set<ServerIPtr>());
+ p = _serversByApplication.insert(p, v);
+ }
+ p->second.insert(server);
+}
+
+void
+NodeI::removeServer(const ServerIPtr& server, const std::string& application)
+{
+ IceUtil::Mutex::Lock sync(_serversLock);
+ map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application);
+ if(p != _serversByApplication.end())
+ {
+ p->second.erase(server);
+ if(p->second.empty())
+ {
+ _serversByApplication.erase(p);
+
+ string appDir = _dataDir + "/distrib/" + application;
+ if(IceUtilInternal::directoryExists(appDir))
+ {
+ try
+ {
+ IcePatch2::removeRecursive(appDir);
+ }
+ catch(const string& msg)
+ {
+ Ice::Warning out(_traceLevels->logger);
+ out << "removing application directory `" << appDir << "' failed:\n" << msg;
+ }
+ }
+ }
+ }
+}
+
+Ice::Identity
+NodeI::createServerIdentity(const string& name) const
+{
+ Ice::Identity id;
+ id.category = _instanceName + "-Server";
+ id.name = name;
+ return id;
+}
+
+string
+NodeI::getServerAdminCategory() const
+{
+ return _instanceName + "-NodeRouter";
+}
+
+vector<ServerCommandPtr>
+NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers)
+{
+ vector<ServerCommandPtr> commands;
+
+ //
+ // Check if the servers directory doesn't contain more servers
+ // than the registry really knows.
+ //
+ Ice::StringSeq contents;
+ try
+ {
+ contents = readDirectory(_serversDir);
+ }
+ catch(const string& msg)
+ {
+ Ice::Error out(_traceLevels->logger);
+ out << "couldn't read directory `" << _serversDir << "':\n" << msg;
+ return commands;
+ }
+
+ vector<string> remove;
+ set_difference(contents.begin(), contents.end(), servers.begin(), servers.end(), back_inserter(remove));
+
+ //
+ // Remove the extra servers if possible.
+ //
+ try
+ {
+ vector<string>::iterator p = remove.begin();
+ while(p != remove.end())
+ {
+ ServerIPtr server = ServerIPtr::dynamicCast(_adapter->find(createServerIdentity(*p)));
+ if(server)
+ {
+ //
+ // If the server is loaded, we invoke on it to destroy it.
+ //
+ try
+ {
+ ServerCommandPtr command = server->destroy(0, "", 0, "Master");
+ if(command)
+ {
+ commands.push_back(command);
+ }
+ p = remove.erase(p);
+ continue;
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Error out(_traceLevels->logger);
+ out << "server `" << *p << "' destroy failed:\n" << ex;
+ }
+ catch(const string&)
+ {
+ assert(false);
+ }
+ }
+
+ try
+ {
+ if(canRemoveServerDirectory(*p))
+ {
+ //
+ // If the server directory can be removed and we
+ // either remove it or back it up before to remove it.
+ //
+ removeRecursive(_serversDir + "/" + *p);
+ p = remove.erase(p);
+ continue;
+ }
+ }
+ catch(const string& msg)
+ {
+ Ice::Warning out(_traceLevels->logger);
+ out << "removing server directory `" << _serversDir << "/" << *p << "' failed:\n" << msg;
+ }
+
+ *p = _serversDir + "/" + *p;
+ ++p;
+ }
+ }
+ catch(const Ice::ObjectAdapterDeactivatedException&)
+ {
+ //
+ // Just return the server commands, we'll finish the
+ // consistency check next time the node is started.
+ //
+ return commands;
+ }
+
+ if(!remove.empty())
+ {
+ Ice::Warning out(_traceLevels->logger);
+ out << "server directories containing data not created or written by IceGrid were not removed:\n";
+ out << toString(remove);
+ }
+ return commands;
+}
+
+NodeSessionPrx
+NodeI::getMasterNodeSession() const
+{
+ return _sessions.getMasterNodeSession();
+}
+
+bool
+NodeI::canRemoveServerDirectory(const string& name)
+{
+ //
+ // Check if there's files which we didn't create.
+ //
+ Ice::StringSeq c = readDirectory(_serversDir + "/" + name);
+ set<string> contents(c.begin(), c.end());
+ contents.erase("dbs");
+ contents.erase("config");
+ contents.erase("distrib");
+ contents.erase("revision");
+ if(!contents.empty())
+ {
+ return false;
+ }
+
+ c = readDirectory(_serversDir + "/" + name + "/config");
+ Ice::StringSeq::const_iterator p;
+ for(p = c.begin() ; p != c.end(); ++p)
+ {
+ if(p->find("config") != 0)
+ {
+ return false;
+ }
+ }
+
+ c = readDirectory(_serversDir + "/" + name + "/dbs");
+ for(p = c.begin() ; p != c.end(); ++p)
+ {
+ try
+ {
+ Ice::StringSeq files = readDirectory(_serversDir + "/" + name + "/dbs/" + *p);
+ files.erase(remove(files.begin(), files.end(), "DB_CONFIG"), files.end());
+ files.erase(remove(files.begin(), files.end(), "__Freeze"), files.end());
+ if(!files.empty())
+ {
+ return false;
+ }
+ }
+ catch(const string&)
+ {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void
+NodeI::patch(const FileServerPrx& icepatch, const string& dest, const vector<string>& directories)
+{
+ IcePatch2::PatcherFeedbackPtr feedback = new LogPatcherFeedback(_traceLevels, dest);
+ IcePatch2::createDirectory(_dataDir + "/" + dest);
+ PatcherPtr patcher = new Patcher(icepatch, feedback, _dataDir + "/" + dest, false, 100, 1);
+ bool aborted = !patcher->prepare();
+ if(!aborted)
+ {
+ if(directories.empty())
+ {
+ aborted = !patcher->patch("");
+ dynamic_cast<LogPatcherFeedback*>(feedback.get())->finishPatch();
+ }
+ else
+ {
+ for(vector<string>::const_iterator p = directories.begin(); p != directories.end(); ++p)
+ {
+ dynamic_cast<LogPatcherFeedback*>(feedback.get())->setPatchingPath(*p);
+ if(!patcher->patch(*p))
+ {
+ aborted = true;
+ break;
+ }
+ dynamic_cast<LogPatcherFeedback*>(feedback.get())->finishPatch();
+ }
+ }
+ }
+ if(!aborted)
+ {
+ patcher->finish();
+ }
+
+ //
+ // Update the files owner/group
+ //
+}
+
+set<ServerIPtr>
+NodeI::getApplicationServers(const string& application) const
+{
+ IceUtil::Mutex::Lock sync(_serversLock);
+ set<ServerIPtr> servers;
+ map<string, set<ServerIPtr> >::const_iterator p = _serversByApplication.find(application);
+ if(p != _serversByApplication.end())
+ {
+ servers = p->second;
+ }
+ return servers;
+}
+
+
+
+string
+NodeI::getFilePath(const string& filename) const
+{
+ string file;
+ if(filename == "stderr")
+ {
+ file = _communicator->getProperties()->getProperty("Ice.StdErr");
+ if(file.empty())
+ {
+ throw FileNotAvailableException("Ice.StdErr configuration property is not set");
+ }
+ }
+ else if(filename == "stdout")
+ {
+ file = _communicator->getProperties()->getProperty("Ice.StdOut");
+ if(file.empty())
+ {
+ throw FileNotAvailableException("Ice.StdOut configuration property is not set");
+ }
+ }
+ else
+ {
+ throw FileNotAvailableException("unknown file");
+ }
+ return file;
+}