diff options
Diffstat (limited to 'cpp/src/IceGrid/RegistryI.cpp')
-rw-r--r-- | cpp/src/IceGrid/RegistryI.cpp | 84 |
1 files changed, 76 insertions, 8 deletions
diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp index 0e0f5f84305..a8a3f1ad1ce 100644 --- a/cpp/src/IceGrid/RegistryI.cpp +++ b/cpp/src/IceGrid/RegistryI.cpp @@ -19,6 +19,10 @@ #include <IceGrid/Database.h> #include <IceGrid/NodeSessionI.h> #include <IceGrid/ReapThread.h> +#include <IceGrid/SessionManagerI.h> + +#include <IceStorm/TraceLevels.h> +#include <IceStorm/TopicManagerI.h> #include <sys/types.h> #include <sys/stat.h> @@ -211,8 +215,6 @@ RegistryI::start(bool nowarn) properties->getPropertyAsIntWithDefault(serverThreadPool + ".SizeWarn", serverSizeMax * 80 / 100) * 2; properties->setProperty(internalThreadPool + ".SizeWarn", IceGrid::intToString(clientSizeWarn + serverSizeWarn)); - _nodeSessionTimeout = properties->getPropertyAsIntWithDefault("IceGrid.Registry.NodeSessionTimeout", 10) * 1000; - TraceLevelsPtr traceLevels = new TraceLevels(properties, _communicator->getLogger(), false); _communicator->setDefaultLocator(0); @@ -226,6 +228,13 @@ RegistryI::start(bool nowarn) ObjectAdapterPtr registryAdapter = _communicator->createObjectAdapter("IceGrid.Registry.Internal"); // + // Start the reaper thread. + // + _nodeSessionTimeout = properties->getPropertyAsIntWithDefault("IceGrid.Registry.NodeSessionTimeout", 10) * 1000; + _reaper = new ReapThread(_nodeSessionTimeout); + _reaper->start(); + + // // Create the internal registries (node, server, adapter, object). // const string envName = "Registry"; @@ -301,11 +310,62 @@ RegistryI::start(bool nowarn) _communicator->setDefaultLocator(LocatorPrx::uncheckedCast(obj->ice_collocationOptimization(false))); // - // Start the reaper thread. The default value for the node session - // timeout is 10 seconds. + // Create the internal IceStorm service. // - _reaper = new ReapThread(_nodeSessionTimeout); - _reaper->start(); + IceStorm::TraceLevelsPtr t = new IceStorm::TraceLevels("IceGrid.Registry", properties, _communicator->getLogger()); + _topicManager = new IceStorm::TopicManagerI( + _communicator, registryAdapter, registryAdapter, t, "Registry", "topics"); + _topicManagerProxy = IceStorm::TopicManagerPrx::uncheckedCast( + registryAdapter->add(_topicManager, stringToIdentity("IceGrid/TopicManager"))); + + // + // Create the registry and node observer topic. + // + IceStorm::TopicPrx nodeObserverTopic; + IceStorm::TopicPrx registryObserverTopic; + try + { + registryObserverTopic = _topicManagerProxy->create("RegistryObserver"); + } + catch(const IceStorm::TopicExists&) + { + registryObserverTopic = _topicManagerProxy->retrieve("RegistryObserver"); + } + try + { + nodeObserverTopic = _topicManagerProxy->create("NodeObserver"); + } + catch(const IceStorm::TopicExists&) + { + nodeObserverTopic = _topicManagerProxy->retrieve("NodeObserver"); + } + + obj = registryObserverTopic->getPublisher()->ice_collocationOptimization(false); + obj = obj->ice_locator(_communicator->getDefaultLocator()); + _registryObserver = RegistryObserverPrx::uncheckedCast(obj); + + obj = nodeObserverTopic->getPublisher()->ice_collocationOptimization(false); + _nodeObserver = NodeObserverPrx::uncheckedCast(obj); + + // + // Create the session manager. + // + ObjectPtr sessionManager = new SessionManagerI(registryObserverTopic, nodeObserverTopic, _reaper); + const string sessionManagerIdProperty = "IceGrid.Registry.SessionManagerIdentity"; + Identity sessionManagerId = stringToIdentity(properties->getPropertyWithDefault(sessionManagerIdProperty, + "IceGrid/SessionManager")); + adminAdapter->add(sessionManager, sessionManagerId); + ObjectPrx sessionManagerPrx = adminAdapter->createDirectProxy(sessionManagerId); + try + { + _database->removeObjectDescriptor(sessionManagerPrx->ice_getIdentity()); + } + catch(const ObjectNotExistException&) + { + } + desc.proxy = sessionManagerPrx; + desc.type = "::IceGrid::SessionManager"; + _database->addObjectDescriptor(desc); // // We are ready to go! @@ -325,13 +385,15 @@ RegistryI::stop() { _reaper->terminate(); _reaper->getThreadControl().join(); + + _topicManager->shutdown(); } NodeSessionPrx RegistryI::registerNode(const std::string& name, const NodePrx& node, const Ice::Current& c) { NodePrx n = NodePrx::uncheckedCast(node->ice_timeout(_nodeSessionTimeout)); - NodeSessionIPtr session = new NodeSessionI(_database, name, n); + NodeSessionIPtr session = new NodeSessionI(_database, _registryObserver, name, n); NodeSessionPrx proxy = NodeSessionPrx::uncheckedCast(c.adapter->addWithUUID(session)); _reaper->add(proxy, session); return proxy; @@ -340,7 +402,7 @@ RegistryI::registerNode(const std::string& name, const NodePrx& node, const Ice: NodeObserverPrx RegistryI::getNodeObserver(const Ice::Current& current) { - return 0; + return _nodeObserver; } void @@ -348,3 +410,9 @@ RegistryI::shutdown(const Ice::Current& current) { _communicator->shutdown(); } + +IceStorm::TopicManagerPrx +RegistryI::getTopicManager() +{ + return _topicManagerProxy; +} |