summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Service.cpp')
-rw-r--r--cpp/src/IceStorm/Service.cpp404
1 files changed, 369 insertions, 35 deletions
diff --git a/cpp/src/IceStorm/Service.cpp b/cpp/src/IceStorm/Service.cpp
index d224f1ef0a9..c147a8d8e00 100644
--- a/cpp/src/IceStorm/Service.cpp
+++ b/cpp/src/IceStorm/Service.cpp
@@ -10,16 +10,24 @@
#include <IceUtil/DisableWarnings.h>
#include <IceStorm/TopicI.h>
#include <IceStorm/TopicManagerI.h>
+#include <IceStorm/TransientTopicManagerI.h>
#include <IceStorm/Instance.h>
-#include <IceStorm/TraceLevels.h>
-#include <IceStorm/BatchFlusher.h>
-#include <IceStorm/SubscriberPool.h>
#include <IceStorm/Service.h>
+#include <IceStorm/Observers.h>
+#include <IceStorm/TraceLevels.h>
+#include <IceStorm/LoggerI.h>
+#include <IceUtil/StringUtil.h>
+
+#include <IceStorm/NodeI.h>
+
+#include <IceGrid/Locator.h>
+#include <IceGrid/Query.h>
using namespace std;
using namespace Ice;
-using namespace IceStorm;
using namespace Freeze;
+using namespace IceStorm;
+using namespace IceStormElection;
namespace IceStorm
{
@@ -48,11 +56,12 @@ public:
private:
- TopicManagerIPtr _manager;
+ void validateProperties(const string&, const PropertiesPtr&, const LoggerPtr&);
+
+ TopicManagerImplPtr _manager;
+ TransientTopicManagerImplPtr _transientManager;
TopicManagerPrx _managerProxy;
InstancePtr _instance;
- ObjectAdapterPtr _topicAdapter;
- ObjectAdapterPtr _publishAdapter;
};
}
@@ -98,8 +107,22 @@ IceStorm::ServiceI::start(
{
PropertiesPtr properties = communicator->getProperties();
- _topicAdapter = communicator->createObjectAdapter(name + ".TopicManager");
- _publishAdapter = communicator->createObjectAdapter(name + ".Publish");
+ validateProperties(name, properties, communicator->getLogger());
+
+ int id = properties->getPropertyAsIntWithDefault(name + ".NodeId", -1);
+
+ // If we are using a replicated deployment and if the topic
+ // manager thread pool max size is not set then ensure it is set
+ // to some suitably high number. This ensures no deadlocks in the
+ // replicated case due to call forwarding from replicas to
+ // coordinators.
+ if(id != -1 && properties->getProperty(name + ".TopicManager.ThreadPool.SizeMax").empty())
+ {
+ properties->setProperty(name + ".TopicManager.ThreadPool.SizeMax", "100");
+ }
+
+ Ice::ObjectAdapterPtr topicAdapter = communicator->createObjectAdapter(name + ".TopicManager");
+ Ice::ObjectAdapterPtr publishAdapter = communicator->createObjectAdapter(name + ".Publish");
//
// We use the name of the service for the name of the database environment.
@@ -109,21 +132,221 @@ IceStorm::ServiceI::start(
topicManagerId.category = instanceName;
topicManagerId.name = "TopicManager";
- _instance = new Instance(instanceName, name, communicator, _publishAdapter);
-
- try
+ if(properties->getPropertyAsIntWithDefault(name+ ".Transient", 0))
{
- _manager = new TopicManagerI(_instance, _topicAdapter, name, "topics");
- _managerProxy = TopicManagerPrx::uncheckedCast(_topicAdapter->add(_manager, topicManagerId));
+ _instance = new Instance(instanceName, name, communicator, publishAdapter, topicAdapter);
+ try
+ {
+ TransientTopicManagerImplPtr manager = new TransientTopicManagerImpl(_instance);
+ _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(manager, topicManagerId));
+ }
+ catch(const Ice::Exception&)
+ {
+ _instance = 0;
+ throw;
+ }
+ topicAdapter->activate();
+ publishAdapter->activate();
+ return;
}
- catch(const Ice::Exception&)
+
+ if(id == -1) // No replication.
{
- _instance = 0;
- throw;
+ _instance = new Instance(instanceName, name, communicator, publishAdapter, topicAdapter);
+
+ try
+ {
+ _manager = new TopicManagerImpl(_instance);
+ _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(_manager->getServant(), topicManagerId));
+ }
+ catch(const Ice::Exception&)
+ {
+ _instance = 0;
+ throw;
+ }
+ }
+ else
+ {
+ // Add a instance/node prefix to the IceStorm logger if so
+ // installed.
+ IceStorm::LoggerIPtr logger = IceStorm::LoggerIPtr::dynamicCast(
+ communicator->getLogger());
+ if(logger)
+ {
+ ostringstream os;
+ os << id << "/" << instanceName;
+ logger->setPrefix(os.str());
+ }
+
+ // Here we want to create a map of id -> election node
+ // proxies.
+ map<int, NodePrx> nodes;
+
+ bool iceGridDeployment = true;
+
+ // We support two possible deployments. The first is a manual
+ // deployment, the second is IceGrid.
+ //
+ // Here we check for the manual deployment
+ const string prefix = name + ".Nodes.";
+ Ice::PropertyDict props = properties->getPropertiesForPrefix(prefix);
+ if(!props.empty())
+ {
+ iceGridDeployment = false;
+ for(Ice::PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p)
+ {
+ int nodeid = atoi(p->first.substr(prefix.size()).c_str());
+ nodes[nodeid] = NodePrx::uncheckedCast(communicator->propertyToProxy(p->first));
+ }
+ }
+ else
+ {
+ // If adapter id's are defined for the topic manager or node
+ // adapters then we consider this an IceGrid based deployment.
+ string adapterid = properties->getProperty(name + ".TopicManager.AdapterId");
+ string nodeid = properties->getProperty(name + ".Node.AdapterId");
+
+ // Here I must validate first that the adapter ids match
+ // for the node and the topic manager otherwise some other
+ // deployment is being used.
+ const string suffix = ".TopicManager";
+ if(adapterid.empty() || nodeid.empty() ||
+ adapterid.replace(adapterid.find(suffix), suffix.size(), ".Node") != nodeid)
+ {
+ Ice::Error error(communicator->getLogger());
+ error << "IceGrid deployment is incorrect";
+ throw "IceGrid deployment is incorrect";
+ }
+
+ // This is a deployment using IceGrid.
+ //
+ // In this case we first locate all replicas for the topic
+ // manager. The topic manager adapter ids will be
+ // something like:
+ // "Inst1-1.IceStorm.IceStorm.TopicManager". This is
+ // <instance>-<node-id>.<name>.<endpoint>. From this we
+ // can extract node id and create the correct adapter id
+ // for the election node.
+ IceGrid::LocatorPrx locator = IceGrid::LocatorPrx::checkedCast(communicator->getDefaultLocator());
+ assert(locator);
+ IceGrid::QueryPrx query = locator->getLocalQuery();
+ Ice::ObjectProxySeq replicas = query->findAllReplicas(
+ communicator->stringToProxy(instanceName + "/TopicManager"));
+ for(Ice::ObjectProxySeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ {
+ adapterid = (*p)->ice_getAdapterId();
+
+ // Replace TopicManager with the node endpoint.
+ adapterid = adapterid.replace(adapterid.find(suffix), suffix.size(), ".Node");
+
+ // Now look for the node identity. This will be
+ // <prefix>-<id>.<name>.<name>.Node... For example, if
+ // the service name is IceStorm it will be
+ // ".IceStorm.IceStorm.Node"
+ string::size_type end = adapterid.find( "." + name + "." + name + ".Node");
+ if(end == string::npos)
+ {
+ Ice::Error error(communicator->getLogger());
+ error << "IceGrid deployment is incorrect: " << adapterid;
+ throw "IceGrid deployment is incorrect";
+ }
+
+ string::size_type start = adapterid.rfind("-", end);
+ if(start == string::npos)
+ {
+ Ice::Error error(communicator->getLogger());
+ error << "IceGrid deployment is incorrect: " << adapterid;
+ throw "IceGrid deployment is incorrect";
+ }
+ ++start;
+
+ int nodeid = atoi(adapterid.substr(start, end-start).c_str());
+ ostringstream os;
+ os << "node" << nodeid;
+ Ice::Identity id;
+ id.category = instanceName;
+ id.name = os.str();
+
+ nodes[nodeid] = NodePrx::uncheckedCast((*p)->ice_adapterId(adapterid)->ice_identity(id));
+ }
+ }
+
+ if(nodes.size() < 3)
+ {
+ Ice::Error error(communicator->getLogger());
+ error << "Replication requires at least 3 Nodes";
+ throw "error";
+ }
+
+ try
+ {
+ // If the node thread pool size is not set then initialize
+ // to the number of nodes + 1 and disable thread pool size
+ // warnings.
+ if(properties->getProperty(name + ".Node.ThreadPool.Size").empty())
+ {
+ ostringstream os;
+ os << nodes.size() + 1;
+ properties->setProperty(name + ".Node.ThreadPool.Size", os.str());
+ properties->setProperty(name + ".Node.ThreadPool.SizeWarn", "0");
+ }
+ Ice::ObjectAdapterPtr nodeAdapter = communicator->createObjectAdapter(name + ".Node");
+
+ _instance = new Instance(instanceName, name, communicator, publishAdapter, topicAdapter, iceGridDeployment,
+ nodeAdapter, nodes[id]);
+ _instance->observers()->setMajority(nodes.size()/2);
+
+ // Trace replication information.
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->election > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->electionCat);
+ out << "I am node " << id << "\n";
+ for(map<int, NodePrx>::const_iterator p = nodes.begin(); p != nodes.end(); ++p)
+ {
+ out << "\tnode: " << p->first << " proxy: " << p->second->ice_toString() << "\n";
+ }
+ }
+
+ if(!iceGridDeployment)
+ {
+ // We're not using an IceGrid deployment. Here we need
+ // a proxy which is used to create proxies to the
+ // replicas later.
+ _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->createProxy(topicManagerId));
+ }
+ else
+ {
+ // If we're using IceGrid deployment we need to create
+ // indirect proxies.
+ _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->createIndirectProxy(topicManagerId));
+ }
+
+ _manager = new TopicManagerImpl(_instance);
+ topicAdapter->add(_manager->getServant(), topicManagerId);
+
+ ostringstream os; // The node object identity.
+ os << "node" << id;
+ Ice::Identity nodeid;
+ nodeid.category = instanceName;
+ nodeid.name = os.str();
+
+ NodeIPtr node = new NodeI(_instance, _manager, _managerProxy, id, nodes);
+ _instance->setNode(node);
+ nodeAdapter->add(node, nodeid);
+ nodeAdapter->activate();
+
+ node->start();
+ }
+ catch(const Ice::Exception&)
+ {
+ _instance = 0;
+ throw;
+ }
}
- _topicAdapter->activate();
- _publishAdapter->activate();
+ topicAdapter->activate();
+ publishAdapter->activate();
}
void
@@ -134,16 +357,21 @@ IceStorm::ServiceI::start(const CommunicatorPtr& communicator,
const Ice::Identity& id,
const string& dbEnv)
{
- string instanceName = communicator->getProperties()->getPropertyWithDefault(name + ".InstanceName", "IceStorm");
- _instance = new Instance(instanceName, name, communicator, publishAdapter);
-
//
- // We use the name of the service for the name of the database environment.
+ // For IceGrid we don't validate the properties as all sorts of
+ // non-IceStorm properties are included in the prefix.
//
+ //validateProperties(name, communicator->getProperties(), communicator->getLogger());
+
+ // This is for IceGrid only and as such we use a transient
+ // implementation of IceStorm.
+ string instanceName = communicator->getProperties()->getPropertyWithDefault(name + ".InstanceName", "IceStorm");
+ _instance = new Instance(instanceName, name, communicator, publishAdapter, topicAdapter);
+
try
{
- _manager = new TopicManagerI(_instance, topicAdapter, dbEnv, "topics");
- _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(_manager, id));
+ TransientTopicManagerImplPtr manager = new TransientTopicManagerImpl(_instance);
+ _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(manager, id));
}
catch(const Ice::Exception&)
{
@@ -161,22 +389,128 @@ IceStorm::ServiceI::getTopicManager() const
void
IceStorm::ServiceI::stop()
{
- if(_topicAdapter)
+ // Shutdown the instance. This deactivates all OAs.
+ _instance->shutdown();
+
+ //
+ // It's necessary to reap all destroyed topics on shutdown.
+ //
+ if(_manager)
{
- _topicAdapter->destroy();
+ _manager->shutdown();
}
- if(_publishAdapter)
+ if(_transientManager)
{
- _publishAdapter->destroy();
+ _transientManager->shutdown();
}
//
- // Shutdown the instance.
+ // Destroy the instance. This step must occur last.
//
- _instance->shutdown();
+ _instance->destroy();
+}
- //
- // It's necessary to reap all destroyed topics on shutdown.
- //
- _manager->shutdown();
+void
+IceStorm::ServiceI::validateProperties(const string& name, const PropertiesPtr& properties, const LoggerPtr& logger)
+{
+ static const string suffixes[] =
+ {
+ "ReplicatedTopicManagerEndpoints",
+ "ReplicatedPublishEndpoints",
+ "Nodes.*",
+ "Transient",
+ "NodeId",
+ "Flush.Timeout",
+ "InstanceName",
+ "Election.MasterTimeout",
+ "Election.ElectionTimeout",
+ "Election.ResponseTimeout",
+ "Publish.AdapterId",
+ "Publish.Endpoints",
+ "Publish.Locator",
+ "Publish.PublishedEndpoints",
+ "Publish.RegisterProcess",
+ "Publish.ReplicaGroupId",
+ "Publish.Router",
+ "Publish.ThreadPerConnection",
+ "Publish.ThreadPerConnection.StackSize",
+ "Publish.ThreadPool.Size",
+ "Publish.ThreadPool.SizeMax",
+ "Publish.ThreadPool.SizeWarn",
+ "Publish.ThreadPool.StackSize",
+ "Node.AdapterId",
+ "Node.Endpoints",
+ "Node.Locator",
+ "Node.PublishedEndpoints",
+ "Node.RegisterProcess",
+ "Node.ReplicaGroupId",
+ "Node.Router",
+ "Node.ThreadPerConnection",
+ "Node.ThreadPerConnection.StackSize",
+ "Node.ThreadPool.Size",
+ "Node.ThreadPool.SizeMax",
+ "Node.ThreadPool.SizeWarn",
+ "Node.ThreadPool.StackSize",
+ "TopicManager.AdapterId",
+ "TopicManager.Endpoints",
+ "TopicManager.Locator",
+ "TopicManager.Proxy",
+ "TopicManager.Proxy.EndpointSelection",
+ "TopicManager.Proxy.ConnectionCached",
+ "TopicManager.Proxy.PreferSecure",
+ "TopicManager.Proxy.LocatorCacheTimeout",
+ "TopicManager.Proxy.Locator",
+ "TopicManager.Proxy.Router",
+ "TopicManager.Proxy.CollocationOptimization",
+ "TopicManager.Proxy.ThreadPerConnection",
+ "TopicManager.PublishedEndpoints",
+ "TopicManager.RegisterProcess",
+ "TopicManager.ReplicaGroupId",
+ "TopicManager.Router",
+ "TopicManager.ThreadPerConnection",
+ "TopicManager.ThreadPerConnection.StackSize",
+ "TopicManager.ThreadPool.Size",
+ "TopicManager.ThreadPool.SizeMax",
+ "TopicManager.ThreadPool.SizeWarn",
+ "TopicManager.ThreadPool.StackSize",
+ "Trace.Election",
+ "Trace.Replication",
+ "Trace.Subscriber",
+ "Trace.Topic",
+ "Trace.TopicManager",
+ "Send.Timeout",
+ "Discard.Interval",
+ };
+
+ vector<string> unknownProps;
+ string prefix = name + ".";
+ PropertyDict props = properties->getPropertiesForPrefix(prefix);
+ for(PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p)
+ {
+ bool valid = false;
+ for(unsigned int i = 0; i < sizeof(suffixes)/sizeof(*suffixes); ++i)
+ {
+ string prop = prefix + suffixes[i];
+ if(IceUtilInternal::match(p->first, prop))
+ {
+ valid = true;
+ break;
+ }
+ }
+ if(!valid)
+ {
+ unknownProps.push_back(p->first);
+ }
+ }
+
+ if(!unknownProps.empty())
+ {
+ Warning out(logger);
+ out << "found unknown properties for IceStorm service '" << name << "':";
+ for(vector<string>::const_iterator p = unknownProps.begin(); p != unknownProps.end(); ++p)
+ {
+ out << "\n " << *p;
+ }
+ }
}
+