summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Service.cpp')
-rw-r--r--cpp/src/IceStorm/Service.cpp588
1 files changed, 588 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/Service.cpp b/cpp/src/IceStorm/Service.cpp
new file mode 100644
index 00000000000..995d6dc9dce
--- /dev/null
+++ b/cpp/src/IceStorm/Service.cpp
@@ -0,0 +1,588 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <IceUtil/DisableWarnings.h>
+
+#include <Ice/PluginManagerI.h> // For loadPlugin
+
+#include <IceStorm/TopicI.h>
+#include <IceStorm/TopicManagerI.h>
+#include <IceStorm/TransientTopicManagerI.h>
+#include <IceStorm/Instance.h>
+#include <IceStorm/DB.h>
+
+#define ICE_STORM_API ICE_DECLSPEC_EXPORT
+#include <IceStorm/Service.h>
+
+#include <IceStorm/Observers.h>
+#include <IceStorm/TraceLevels.h>
+#include <IceUtil/StringUtil.h>
+
+#include <IceStorm/NodeI.h>
+#include <IceStorm/TransientTopicI.h>
+
+#include <IceGrid/Locator.h>
+#include <IceGrid/Query.h>
+
+using namespace std;
+using namespace Ice;
+using namespace IceStorm;
+using namespace IceStormInternal;
+using namespace IceStormElection;
+
+namespace IceStormInternal
+{
+
+class ServiceI : public IceStormInternal::Service
+{
+public:
+
+ ServiceI();
+ virtual ~ServiceI();
+
+ virtual void start(const string&,
+ const CommunicatorPtr&,
+ const StringSeq&);
+
+ virtual void start(const CommunicatorPtr&,
+ const ObjectAdapterPtr&,
+ const ObjectAdapterPtr&,
+ const string&,
+ const Ice::Identity&,
+ const string&);
+
+ virtual TopicManagerPrx getTopicManager() const;
+
+ virtual void stop();
+
+private:
+
+ void validateProperties(const string&, const PropertiesPtr&, const LoggerPtr&);
+
+ TopicManagerImplPtr _manager;
+ TransientTopicManagerImplPtr _transientManager;
+ TopicManagerPrx _managerProxy;
+ InstancePtr _instance;
+};
+
+}
+
+extern "C"
+{
+
+ICE_DECLSPEC_EXPORT ::IceBox::Service*
+createIceStorm(CommunicatorPtr communicator)
+{
+ return new ServiceI;
+}
+
+}
+
+ServicePtr
+Service::create(const CommunicatorPtr& communicator,
+ const ObjectAdapterPtr& topicAdapter,
+ const ObjectAdapterPtr& publishAdapter,
+ const string& name,
+ const Ice::Identity& id,
+ const string& dbEnv)
+{
+ ServiceI* service = new ServiceI;
+ ServicePtr svc = service;
+ service->start(communicator, topicAdapter, publishAdapter, name, id, dbEnv);
+ return svc;
+}
+
+ServiceI::ServiceI()
+{
+}
+
+ServiceI::~ServiceI()
+{
+}
+
+void
+ServiceI::start(
+ const string& name,
+ const CommunicatorPtr& communicator,
+ const StringSeq& args)
+{
+ PropertiesPtr properties = communicator->getProperties();
+
+ validateProperties(name, properties, communicator->getLogger());
+
+ int id = properties->getPropertyAsIntWithDefault(name + ".NodeId", -1);
+
+ // If we are using a replicated deployment and if the topic
+ // manager thread pool max size is not set then ensure it is set
+ // to some suitably high number. This ensures no deadlocks in the
+ // replicated case due to call forwarding from replicas to
+ // coordinators.
+ if(id != -1 && properties->getProperty(name + ".TopicManager.ThreadPool.SizeMax").empty())
+ {
+ properties->setProperty(name + ".TopicManager.ThreadPool.SizeMax", "100");
+ }
+
+ Ice::ObjectAdapterPtr topicAdapter = communicator->createObjectAdapter(name + ".TopicManager");
+ Ice::ObjectAdapterPtr publishAdapter = communicator->createObjectAdapter(name + ".Publish");
+
+ //
+ // We use the name of the service for the name of the database environment.
+ //
+ string instanceName = properties->getPropertyWithDefault(name + ".InstanceName", "IceStorm");
+ Identity topicManagerId;
+ topicManagerId.category = instanceName;
+ topicManagerId.name = "TopicManager";
+
+ if(properties->getPropertyAsIntWithDefault(name+ ".Transient", 0))
+ {
+ _instance = new Instance(instanceName, name, communicator, 0, publishAdapter, topicAdapter, 0);
+ try
+ {
+ TransientTopicManagerImplPtr manager = new TransientTopicManagerImpl(_instance);
+ _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(manager, topicManagerId));
+ }
+ catch(const Ice::Exception& ex)
+ {
+ _instance = 0;
+
+ ostringstream s;
+ s << "exception while starting IceStorm service " << name << ":\n";
+ s << ex;
+
+ IceBox::FailureException e(__FILE__, __LINE__);
+ e.reason = s.str();
+ throw e;
+ }
+ topicAdapter->activate();
+ publishAdapter->activate();
+ return;
+ }
+
+ //
+ // Create the database cache.
+ //
+ DatabasePluginPtr plugin;
+ try
+ {
+ plugin = DatabasePluginPtr::dynamicCast(communicator->getPluginManager()->getPlugin("DB"));
+ }
+ catch(const NotRegisteredException&)
+ {
+ try
+ {
+ Ice::StringSeq cmdArgs;
+ IceInternal::loadPlugin(communicator, "DB", "IceStormFreezeDB:createFreezeDB", cmdArgs);
+ plugin = DatabasePluginPtr::dynamicCast(communicator->getPluginManager()->getPlugin("DB"));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ ostringstream s;
+ s << "failed to load default Freeze database plugin:\n" << ex;
+
+ IceBox::FailureException e(__FILE__, __LINE__);
+ e.reason = s.str();
+ throw e;
+ }
+ }
+
+ if(!plugin)
+ {
+ ostringstream s;
+ s << "no database plugin configured with `Ice.Plugin.DB' or plugin is not an IceStorm database plugin";
+
+ IceBox::FailureException e(__FILE__, __LINE__);
+ e.reason = s.str();
+ throw e;
+ }
+ DatabaseCachePtr databaseCache = plugin->getDatabaseCache(name);
+
+ if(id == -1) // No replication.
+ {
+ _instance = new Instance(instanceName, name, communicator, databaseCache, publishAdapter, topicAdapter);
+
+ try
+ {
+ _manager = new TopicManagerImpl(_instance);
+ _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(_manager->getServant(), topicManagerId));
+ }
+ catch(const Ice::Exception& ex)
+ {
+ _instance = 0;
+
+ ostringstream s;
+ s << "exception while starting IceStorm service " << name << ":\n";
+ s << ex;
+
+ IceBox::FailureException e(__FILE__, __LINE__);
+ e.reason = s.str();
+ throw e;
+ }
+ }
+ else
+ {
+ // Here we want to create a map of id -> election node
+ // proxies.
+ map<int, NodePrx> nodes;
+
+ string topicManagerAdapterId = properties->getProperty(name + ".TopicManager.AdapterId");
+
+ // We support two possible deployments. The first is a manual
+ // deployment, the second is IceGrid.
+ //
+ // Here we check for the manual deployment
+ const string prefix = name + ".Nodes.";
+ Ice::PropertyDict props = properties->getPropertiesForPrefix(prefix);
+ if(!props.empty())
+ {
+ for(Ice::PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p)
+ {
+ int nodeid = atoi(p->first.substr(prefix.size()).c_str());
+ nodes[nodeid] = NodePrx::uncheckedCast(communicator->propertyToProxy(p->first));
+ }
+ }
+ else
+ {
+ // If adapter id's are defined for the topic manager or
+ // node adapters then we consider this an IceGrid based
+ // deployment.
+ string nodeAdapterId = properties->getProperty(name + ".Node.AdapterId");
+
+ // Validate first that the adapter ids match for the node
+ // and the topic manager otherwise some other deployment
+ // is being used.
+ const string suffix = ".TopicManager";
+ if(topicManagerAdapterId.empty() || nodeAdapterId.empty() ||
+ topicManagerAdapterId.replace(
+ topicManagerAdapterId.find(suffix), suffix.size(), ".Node") != nodeAdapterId)
+ {
+ Ice::Error error(communicator->getLogger());
+ error << "deployment error: `" << topicManagerAdapterId << "' prefix does not match `"
+ << nodeAdapterId << "'";
+ throw IceBox::FailureException(__FILE__, __LINE__, "IceGrid deployment is incorrect");
+ }
+
+ // Determine the set of node id and node proxies.
+ //
+ // This is determined by locating all topic manager
+ // replicas, and then working out the node for that
+ // replica.
+ //
+ // We work out the node id by removing the instance
+ // name. The node id must follow.
+ //
+ IceGrid::LocatorPrx locator = IceGrid::LocatorPrx::checkedCast(communicator->getDefaultLocator());
+ assert(locator);
+ IceGrid::QueryPrx query = locator->getLocalQuery();
+ Ice::ObjectProxySeq replicas = query->findAllReplicas(
+ communicator->stringToProxy(instanceName + "/TopicManager"));
+
+ for(Ice::ObjectProxySeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ {
+ string adapterid = (*p)->ice_getAdapterId();
+
+ // Replace TopicManager with the node endpoint.
+ adapterid = adapterid.replace(adapterid.find(suffix), suffix.size(), ".Node");
+
+ // The adapter id must start with the instance name.
+ if(adapterid.find(instanceName) != 0)
+ {
+ Ice::Error error(communicator->getLogger());
+ error << "deployment error: `" << adapterid << "' does not start with `" << instanceName << "'";
+ throw IceBox::FailureException(__FILE__, __LINE__, "IceGrid deployment is incorrect");
+ }
+
+ // The node id follows. We find the first digit (the
+ // start of the node id, and then the end of the
+ // digits).
+ string::size_type start = instanceName.size();
+ while(start < adapterid.size() && !IceUtilInternal::isDigit(adapterid[start]))
+ {
+ ++start;
+ }
+ string::size_type end = start;
+ while(end < adapterid.size() && IceUtilInternal::isDigit(adapterid[end]))
+ {
+ ++end;
+ }
+ if(start == end)
+ {
+ // We must have at least one digit, otherwise there is
+ // some sort of deployment error.
+ Ice::Error error(communicator->getLogger());
+ error << "deployment error: node id does not follow instance name. instance name:"
+ << instanceName << " adapter id: " << adapterid;
+ throw IceBox::FailureException(__FILE__, __LINE__, "IceGrid deployment is incorrect");
+ }
+
+ int nodeid = atoi(adapterid.substr(start, end-start).c_str());
+ ostringstream os;
+ os << "node" << nodeid;
+ Ice::Identity id;
+ id.category = instanceName;
+ id.name = os.str();
+
+ nodes[nodeid] = NodePrx::uncheckedCast((*p)->ice_adapterId(adapterid)->ice_identity(id));
+ }
+ }
+
+ if(nodes.size() < 3)
+ {
+ Ice::Error error(communicator->getLogger());
+ error << "Replication requires at least 3 Nodes";
+ throw IceBox::FailureException(__FILE__, __LINE__, "Replication requires at least 3 Nodes");
+ }
+
+ try
+ {
+ // If the node thread pool size is not set then initialize
+ // to the number of nodes + 1 and disable thread pool size
+ // warnings.
+ if(properties->getProperty(name + ".Node.ThreadPool.Size").empty())
+ {
+ ostringstream os;
+ os << nodes.size() + 1;
+ properties->setProperty(name + ".Node.ThreadPool.Size", os.str());
+ properties->setProperty(name + ".Node.ThreadPool.SizeWarn", "0");
+ }
+ Ice::ObjectAdapterPtr nodeAdapter = communicator->createObjectAdapter(name + ".Node");
+
+ _instance = new Instance(instanceName, name, communicator, databaseCache, publishAdapter, topicAdapter,
+ nodeAdapter, nodes[id]);
+ _instance->observers()->setMajority(static_cast<unsigned int>(nodes.size())/2);
+
+ // Trace replication information.
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->election > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->electionCat);
+ out << "I am node " << id << "\n";
+ for(map<int, NodePrx>::const_iterator p = nodes.begin(); p != nodes.end(); ++p)
+ {
+ out << "\tnode: " << p->first << " proxy: " << p->second->ice_toString() << "\n";
+ }
+ }
+
+ if(topicManagerAdapterId.empty())
+ {
+ // We're not using an IceGrid deployment. Here we need
+ // a proxy which is used to create proxies to the
+ // replicas later.
+ _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->createProxy(topicManagerId));
+ }
+ else
+ {
+ // If we're using IceGrid deployment we need to create
+ // indirect proxies.
+ _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->createIndirectProxy(topicManagerId));
+ }
+
+ _manager = new TopicManagerImpl(_instance);
+ topicAdapter->add(_manager->getServant(), topicManagerId);
+
+ ostringstream os; // The node object identity.
+ os << "node" << id;
+ Ice::Identity nodeid;
+ nodeid.category = instanceName;
+ nodeid.name = os.str();
+
+ NodeIPtr node = new NodeI(_instance, _manager, _managerProxy, id, nodes);
+ _instance->setNode(node);
+ nodeAdapter->add(node, nodeid);
+ nodeAdapter->activate();
+
+ node->start();
+ }
+ catch(const Ice::Exception& ex)
+ {
+ _instance = 0;
+
+ ostringstream s;
+ s << "exception while starting IceStorm service " << name << ":\n";
+ s << ex;
+
+ IceBox::FailureException e(__FILE__, __LINE__);
+ e.reason = s.str();
+ throw e;
+ }
+ }
+
+ topicAdapter->activate();
+ publishAdapter->activate();
+}
+
+void
+ServiceI::start(const CommunicatorPtr& communicator,
+ const ObjectAdapterPtr& topicAdapter,
+ const ObjectAdapterPtr& publishAdapter,
+ const string& name,
+ const Ice::Identity& id,
+ const string& dbEnv)
+{
+ //
+ // For IceGrid we don't validate the properties as all sorts of
+ // non-IceStorm properties are included in the prefix.
+ //
+ //validateProperties(name, communicator->getProperties(), communicator->getLogger());
+
+ // This is for IceGrid only and as such we use a transient
+ // implementation of IceStorm.
+ string instanceName = communicator->getProperties()->getPropertyWithDefault(name + ".InstanceName", "IceStorm");
+ _instance = new Instance(instanceName, name, communicator, 0, publishAdapter, topicAdapter);
+
+ try
+ {
+ TransientTopicManagerImplPtr manager = new TransientTopicManagerImpl(_instance);
+ _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(manager, id));
+ }
+ catch(const Ice::Exception& ex)
+ {
+ _instance = 0;
+ ostringstream s;
+ s << "exception while starting IceStorm service " << name << ":\n";
+ s << ex;
+
+ IceBox::FailureException e(__FILE__, __LINE__);
+ e.reason = s.str();
+ throw e;
+ }
+}
+
+TopicManagerPrx
+ServiceI::getTopicManager() const
+{
+ return _managerProxy;
+}
+
+void
+ServiceI::stop()
+{
+ // Shutdown the instance. This deactivates all OAs.
+ _instance->shutdown();
+
+ //
+ // It's necessary to reap all destroyed topics on shutdown.
+ //
+ if(_manager)
+ {
+ _manager->shutdown();
+ }
+ if(_transientManager)
+ {
+ _transientManager->shutdown();
+ }
+
+ //
+ // Destroy the instance. This step must occur last.
+ //
+ _instance->destroy();
+}
+
+void
+ServiceI::validateProperties(const string& name, const PropertiesPtr& properties, const LoggerPtr& logger)
+{
+ static const string suffixes[] =
+ {
+ "ReplicatedTopicManagerEndpoints",
+ "ReplicatedPublishEndpoints",
+ "Nodes.*",
+ "Transient",
+ "NodeId",
+ "Flush.Timeout",
+ "InstanceName",
+ "Election.MasterTimeout",
+ "Election.ElectionTimeout",
+ "Election.ResponseTimeout",
+ "Publish.AdapterId",
+ "Publish.Endpoints",
+ "Publish.Locator",
+ "Publish.PublishedEndpoints",
+ "Publish.RegisterProcess",
+ "Publish.ReplicaGroupId",
+ "Publish.Router",
+ "Publish.ThreadPool.Size",
+ "Publish.ThreadPool.SizeMax",
+ "Publish.ThreadPool.SizeWarn",
+ "Publish.ThreadPool.StackSize",
+ "Node.AdapterId",
+ "Node.Endpoints",
+ "Node.Locator",
+ "Node.PublishedEndpoints",
+ "Node.RegisterProcess",
+ "Node.ReplicaGroupId",
+ "Node.Router",
+ "Node.ThreadPool.Size",
+ "Node.ThreadPool.SizeMax",
+ "Node.ThreadPool.SizeWarn",
+ "Node.ThreadPool.StackSize",
+ "TopicManager.AdapterId",
+ "TopicManager.Endpoints",
+ "TopicManager.Locator",
+ "TopicManager.Proxy",
+ "TopicManager.Proxy.EndpointSelection",
+ "TopicManager.Proxy.ConnectionCached",
+ "TopicManager.Proxy.PreferSecure",
+ "TopicManager.Proxy.LocatorCacheTimeout",
+ "TopicManager.Proxy.Locator",
+ "TopicManager.Proxy.Router",
+ "TopicManager.Proxy.CollocationOptimization",
+ "TopicManager.PublishedEndpoints",
+ "TopicManager.RegisterProcess",
+ "TopicManager.ReplicaGroupId",
+ "TopicManager.Router",
+ "TopicManager.ThreadPool.Size",
+ "TopicManager.ThreadPool.SizeMax",
+ "TopicManager.ThreadPool.SizeWarn",
+ "TopicManager.ThreadPool.StackSize",
+ "Trace.Election",
+ "Trace.Replication",
+ "Trace.Subscriber",
+ "Trace.Topic",
+ "Trace.TopicManager",
+ "Send.Timeout",
+ "Discard.Interval",
+ "SQL.DatabaseType",
+ "SQL.HostName",
+ "SQL.Port",
+ "SQL.DatabaseName",
+ "SQL.UserName",
+ "SQL.Password"
+ };
+
+ vector<string> unknownProps;
+ string prefix = name + ".";
+ PropertyDict props = properties->getPropertiesForPrefix(prefix);
+ for(PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p)
+ {
+ bool valid = false;
+ for(unsigned int i = 0; i < sizeof(suffixes)/sizeof(*suffixes); ++i)
+ {
+ string prop = prefix + suffixes[i];
+ if(IceUtilInternal::match(p->first, prop))
+ {
+ valid = true;
+ break;
+ }
+ }
+ if(!valid)
+ {
+ unknownProps.push_back(p->first);
+ }
+ }
+
+ if(!unknownProps.empty())
+ {
+ Warning out(logger);
+ out << "found unknown properties for IceStorm service '" << name << "':";
+ for(vector<string>::const_iterator p = unknownProps.begin(); p != unknownProps.end(); ++p)
+ {
+ out << "\n " << *p;
+ }
+ }
+}
+