summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/RegistryI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/RegistryI.cpp')
-rw-r--r--cpp/src/IceGrid/RegistryI.cpp84
1 files changed, 76 insertions, 8 deletions
diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp
index 0e0f5f84305..a8a3f1ad1ce 100644
--- a/cpp/src/IceGrid/RegistryI.cpp
+++ b/cpp/src/IceGrid/RegistryI.cpp
@@ -19,6 +19,10 @@
#include <IceGrid/Database.h>
#include <IceGrid/NodeSessionI.h>
#include <IceGrid/ReapThread.h>
+#include <IceGrid/SessionManagerI.h>
+
+#include <IceStorm/TraceLevels.h>
+#include <IceStorm/TopicManagerI.h>
#include <sys/types.h>
#include <sys/stat.h>
@@ -211,8 +215,6 @@ RegistryI::start(bool nowarn)
properties->getPropertyAsIntWithDefault(serverThreadPool + ".SizeWarn", serverSizeMax * 80 / 100) * 2;
properties->setProperty(internalThreadPool + ".SizeWarn", IceGrid::intToString(clientSizeWarn + serverSizeWarn));
- _nodeSessionTimeout = properties->getPropertyAsIntWithDefault("IceGrid.Registry.NodeSessionTimeout", 10) * 1000;
-
TraceLevelsPtr traceLevels = new TraceLevels(properties, _communicator->getLogger(), false);
_communicator->setDefaultLocator(0);
@@ -226,6 +228,13 @@ RegistryI::start(bool nowarn)
ObjectAdapterPtr registryAdapter = _communicator->createObjectAdapter("IceGrid.Registry.Internal");
//
+ // Start the reaper thread.
+ //
+ _nodeSessionTimeout = properties->getPropertyAsIntWithDefault("IceGrid.Registry.NodeSessionTimeout", 10) * 1000;
+ _reaper = new ReapThread(_nodeSessionTimeout);
+ _reaper->start();
+
+ //
// Create the internal registries (node, server, adapter, object).
//
const string envName = "Registry";
@@ -301,11 +310,62 @@ RegistryI::start(bool nowarn)
_communicator->setDefaultLocator(LocatorPrx::uncheckedCast(obj->ice_collocationOptimization(false)));
//
- // Start the reaper thread. The default value for the node session
- // timeout is 10 seconds.
+ // Create the internal IceStorm service.
//
- _reaper = new ReapThread(_nodeSessionTimeout);
- _reaper->start();
+ IceStorm::TraceLevelsPtr t = new IceStorm::TraceLevels("IceGrid.Registry", properties, _communicator->getLogger());
+ _topicManager = new IceStorm::TopicManagerI(
+ _communicator, registryAdapter, registryAdapter, t, "Registry", "topics");
+ _topicManagerProxy = IceStorm::TopicManagerPrx::uncheckedCast(
+ registryAdapter->add(_topicManager, stringToIdentity("IceGrid/TopicManager")));
+
+ //
+ // Create the registry and node observer topic.
+ //
+ IceStorm::TopicPrx nodeObserverTopic;
+ IceStorm::TopicPrx registryObserverTopic;
+ try
+ {
+ registryObserverTopic = _topicManagerProxy->create("RegistryObserver");
+ }
+ catch(const IceStorm::TopicExists&)
+ {
+ registryObserverTopic = _topicManagerProxy->retrieve("RegistryObserver");
+ }
+ try
+ {
+ nodeObserverTopic = _topicManagerProxy->create("NodeObserver");
+ }
+ catch(const IceStorm::TopicExists&)
+ {
+ nodeObserverTopic = _topicManagerProxy->retrieve("NodeObserver");
+ }
+
+ obj = registryObserverTopic->getPublisher()->ice_collocationOptimization(false);
+ obj = obj->ice_locator(_communicator->getDefaultLocator());
+ _registryObserver = RegistryObserverPrx::uncheckedCast(obj);
+
+ obj = nodeObserverTopic->getPublisher()->ice_collocationOptimization(false);
+ _nodeObserver = NodeObserverPrx::uncheckedCast(obj);
+
+ //
+ // Create the session manager.
+ //
+ ObjectPtr sessionManager = new SessionManagerI(registryObserverTopic, nodeObserverTopic, _reaper);
+ const string sessionManagerIdProperty = "IceGrid.Registry.SessionManagerIdentity";
+ Identity sessionManagerId = stringToIdentity(properties->getPropertyWithDefault(sessionManagerIdProperty,
+ "IceGrid/SessionManager"));
+ adminAdapter->add(sessionManager, sessionManagerId);
+ ObjectPrx sessionManagerPrx = adminAdapter->createDirectProxy(sessionManagerId);
+ try
+ {
+ _database->removeObjectDescriptor(sessionManagerPrx->ice_getIdentity());
+ }
+ catch(const ObjectNotExistException&)
+ {
+ }
+ desc.proxy = sessionManagerPrx;
+ desc.type = "::IceGrid::SessionManager";
+ _database->addObjectDescriptor(desc);
//
// We are ready to go!
@@ -325,13 +385,15 @@ RegistryI::stop()
{
_reaper->terminate();
_reaper->getThreadControl().join();
+
+ _topicManager->shutdown();
}
NodeSessionPrx
RegistryI::registerNode(const std::string& name, const NodePrx& node, const Ice::Current& c)
{
NodePrx n = NodePrx::uncheckedCast(node->ice_timeout(_nodeSessionTimeout));
- NodeSessionIPtr session = new NodeSessionI(_database, name, n);
+ NodeSessionIPtr session = new NodeSessionI(_database, _registryObserver, name, n);
NodeSessionPrx proxy = NodeSessionPrx::uncheckedCast(c.adapter->addWithUUID(session));
_reaper->add(proxy, session);
return proxy;
@@ -340,7 +402,7 @@ RegistryI::registerNode(const std::string& name, const NodePrx& node, const Ice:
NodeObserverPrx
RegistryI::getNodeObserver(const Ice::Current& current)
{
- return 0;
+ return _nodeObserver;
}
void
@@ -348,3 +410,9 @@ RegistryI::shutdown(const Ice::Current& current)
{
_communicator->shutdown();
}
+
+IceStorm::TopicManagerPrx
+RegistryI::getTopicManager()
+{
+ return _topicManagerProxy;
+}