diff options
Diffstat (limited to 'scripts/IceStormUtil.py')
-rw-r--r-- | scripts/IceStormUtil.py | 441 |
1 files changed, 204 insertions, 237 deletions
diff --git a/scripts/IceStormUtil.py b/scripts/IceStormUtil.py index a619fe5809c..7aeb5ddd775 100644 --- a/scripts/IceStormUtil.py +++ b/scripts/IceStormUtil.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python - # ********************************************************************** # # Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved. @@ -9,254 +7,223 @@ # # ********************************************************************** -import os, sys -import TestUtil +import sys, os +from Util import * -global testdir -global toplevel +class IceStorm(ProcessFromBinDir, Server): -origIceBoxService = ' --Ice.Admin.Endpoints="default -p {0}"' + \ - ' --Ice.Admin.InstanceName=IceBox{0}' + \ - ' --Ice.Default.Locator=' + def __init__(self, instanceName="IceStorm", replica=0, nreplicas=0, transient=False, portnum=0, *args, **kargs): + Server.__init__(self, exe="icebox", ready="IceStorm", mapping=Mapping.getByName("cpp"), *args, **kargs) + self.portnum = portnum + self.replica = replica + self.nreplicas = nreplicas + self.transient = transient + self.instanceName = instanceName + self.desc = self.instanceName if self.nreplicas == 0 else "{0} replica #{1}".format(self.instanceName, + self.replica) + + def setup(self, current): + # Create the database directory + os.mkdir(os.path.join(current.testcase.getPath(), "{0}-{1}.db".format(self.instanceName, self.replica))) + + def teardown(self, current, success): + # Remove the database directory tree + try: + shutil.rmtree(os.path.join(current.testcase.getPath(), "{0}-{1}.db".format(self.instanceName, self.replica))) + except: + pass + + def getProps(self, current): + props = Server.getProps(self, current) + + # Default properties + props.update({ + 'IceBox.Service.IceStorm' : 'IceStormService,' + getIceSoVersion() + ':createIceStorm', + 'IceBox.PrintServicesReady' : 'IceStorm', + 'IceBox.InheritProperties' : 1, + 'IceStorm.InstanceName' : self.instanceName, + 'Ice.Admin.InstanceName' : 'IceBox', + 'Ice.Warn.Dispatch' : 0, + 'Ice.Warn.Connections' : 0, + 'IceStorm.LMDB.MapSize' : 1, + 'IceStorm.LMDB.Path' : '{testdir}/{process.instanceName}-{process.replica}.db', + }) + + if self.nreplicas > 0: + props['IceStorm.NodeId'] = self.replica -origIceBoxEndpoints = ' --IceBoxAdmin.ServiceManager.Proxy="IceBox{0}/admin -f IceBox.ServiceManager: default -p {0}"' + if self.transient: + props["IceStorm.Transient"] = 1 + + # + # Add endpoint properties here as these properties depend on the worker thread running the + # the test case for the port number. The port number is computed by the driver based on a + # fixed portnum index for each IceStorm endpoint (portnum = 0 for the topic manager endpoint, + # portnum=1 for the publish endpoint, portnum=2 for the node endpoint and portnum=3 for the + # icebox admin endpoint). + # + + # Manager, publish, node and admin ndpoints for given replica number + manager = lambda replica: current.getTestEndpoint(self.portnum + replica * 4 + 0) + publish = lambda replica: "{0}:{1}".format(current.getTestEndpoint(self.portnum + replica * 4 + 1), + current.getTestEndpoint(self.portnum + replica * 4 + 1, "udp")) + node = lambda replica: current.getTestEndpoint(self.portnum + replica * 4 + 2) + admin = lambda replica: current.getTestEndpoint(self.portnum + replica * 4 + 3) + + # The endpoints for the given replica + props.update({ + "IceStorm.TopicManager.Endpoints" : manager(self.replica), + "IceStorm.Publish.Endpoints" : publish(self.replica), + "Ice.Admin.Endpoints" : admin(self.replica), + }) + + # Compute the node and replicated endpoints to be configured for each replica + if self.nreplicas > 0: + props['IceStorm.Node.Endpoints'] = node(self.replica) + for i in range(0, self.nreplicas): + props["IceStorm.Nodes.{0}".format(i)] = "{2}/node{0}:{1}".format(i, node(i), self.instanceName) + props['IceStorm.ReplicatedTopicManagerEndpoints'] = ":".join([manager(i) for i in range(0, self.nreplicas)]) + props['IceStorm.ReplicatedPublishEndpoints'] = ":".join([publish(i) for i in range(0, self.nreplicas)]) + + return props + + def getInstanceName(self): + return self.instanceName + + def getTopicManager(self, current): + # Return the endpoint for this IceStorm replica + return "{1}/TopicManager:{0}".format(current.getTestEndpoint(self.portnum + self.replica * 4), self.instanceName) + + def getReplicatedTopicManager(self, current): + # Return the replicated endpoints for IceStorm + if self.nreplicas == 0: + return self.getTopicManager(current) + manager = lambda replica: current.getTestEndpoint(self.portnum + replica * 4) + return "{1}/TopicManager:{0}".format(":".join([manager(i) for i in range(0, self.nreplicas)]), self.instanceName) + + def shutdown(self, current): + # Shutdown this replica by connecting to the IceBox service manager with iceboxadmin + endpoint = current.getTestEndpoint(self.portnum + self.replica * 4 + 3) + props = { "IceBoxAdmin.ServiceManager.Proxy" : "IceBox/admin -f IceBox.ServiceManager:" + endpoint } + IceBoxAdmin().run(current, props=props, args=['shutdown']) + +class IceStormProcess: + + def __init__(self, instanceName=None, instance=None): + self.instanceName = instanceName + self.instance = instance + + def getProps(self, current): + + # + # An IceStorm client is provided with the IceStormAdmin.TopicManager.Default property set + # to the "instance" topic manager proxy if "instance" is set. Otherwise, if a single it's + # set to the replicated topic manager if a specific "instance name" is provided or there's + # only one IceStorm instance name deployed. If IceStorm multiple instance names are set, + # the client is given an IceStormAdmin.<instance name> property for each instance containing + # the replicated topic manager proxy. + # + + props = self.getParentProps(current) + testcase = current.testcase + while testcase and not isinstance(testcase, IceStormTestCase): testcase = testcase.parent + if self.instance: + props["IceStormAdmin.TopicManager.Default"] = self.instance.getTopicManager(current) + else: + instanceNames = [self.instanceName] if self.instanceName else testcase.getInstanceNames() + if len(instanceNames) == 1: + props["IceStormAdmin.TopicManager.Default"] = testcase.getTopicManager(current, instanceNames[0]) + else: + for name in instanceNames: + props["IceStormAdmin.TopicManager.{0}".format(name)] = testcase.getTopicManager(current, name) + return props -# Turn off the dispatch and connection warnings -- they are expected -# when using a replicated IceStorm. -origIceStormService = ' --IceBox.Service.IceStorm=IceStormService,' + TestUtil.getIceSoVersion() + ':createIceStorm' + \ - ' --IceStorm.TopicManager.Endpoints="default -p %d"' + \ - ' --IceBox.PrintServicesReady=IceStorm' + \ - ' --IceBox.InheritProperties=1' + \ - ' --Ice.Warn.Dispatch=0 --Ice.Warn.Connections=0' + \ - ' --Ice.ServerIdleTime=0' +class IceStormAdmin(ProcessFromBinDir, IceStormProcess, Client): -origIceStormProxy = '%s/TopicManager:default -p %d' + def __init__(self, instanceName=None, instance=None, *args, **kargs): + Client.__init__(self, exe="icestormadmin", mapping=Mapping.getByName("cpp"), *args, **kargs) + IceStormProcess.__init__(self, instanceName, instance) -origIceStormReference = ' --IceStormAdmin.TopicManager.Default="%s"' + getParentProps = Client.getProps # Used by IceStormProcess to get the client properties -class IceStormUtil(object): - def __init__(self, toplevel, testdir): - self.toplevel = toplevel - self.testdir = testdir - self.iceBox = TestUtil.getIceBox() - self.iceBoxAdmin = os.path.join(TestUtil.getCppBinDir(), "iceboxadmin") - self.iceStormAdmin = os.path.join(TestUtil.getCppBinDir(), "icestormadmin") +class Subscriber(IceStormProcess, Server): - def runIceBoxAdmin(self, endpts, command): - proc = TestUtil.startClient(self.iceBoxAdmin, endpts + " " + command, echo = False) - proc.waitTestSuccess() - return proc.buf + processType = "subscriber" - def admin(self, cmd, **args): - self.adminWithRef(self.iceStormReference, cmd, **args) + def __init__(self, instanceName=None, instance=None, *args, **kargs): + Server.__init__(self, *args, **kargs) + IceStormProcess.__init__(self, instanceName, instance) - def adminWithRef(self, ref, cmd, expect = None): - proc = TestUtil.startClient(self.iceStormAdmin, ref + r' -e "%s"' % cmd, echo = False) - if expect: - proc.expect(expect) - proc.wait() - else: - proc.waitTestSuccess() - - def reference(self): - return self.iceStormReference - def proxy(self): - return self.iceStormProxy - -class Replicated(IceStormUtil): - def __init__(self, toplevel, testdir, - additional = None, - replicatedPublisher = True, - dbDir = "db", - instanceName="IceStorm", port = 12010): - IceStormUtil.__init__(self, toplevel, testdir) - self.procs = [] - self.nendpoints = [] # Node endpoints - self.instanceName = instanceName - self.ibendpoints = [] # IceBox endpoints - self.isendpoints = [] # TopicManager endpoints - self.ipendpoints = [] # Publisher endpoints - for replica in range(0, 3): - self.nendpoints.append(port + 100 + replica * 10) - self.ibendpoints.append(port + replica * 10) - self.isendpoints.append(port + (replica * 10)+1) - self.ipendpoints.append(port + (replica * 10)+2) - replicaProperties = "" - sep ='' - replicaTopicManagerEndpoints = '' - replicaPublishEndpoints = '' - for replica in range(0, 3): - replicaProperties = replicaProperties + \ - ' --IceStorm.Nodes.%d="%s/node%d:default -t 10000 -p %d"' % ( - replica, instanceName, replica, self.nendpoints[replica]) - replicaTopicManagerEndpoints = replicaTopicManagerEndpoints + "%sdefault -p %d" % ( - sep, self.isendpoints[replica]) - replicaPublishEndpoints = replicaPublishEndpoints + "%sdefault -p %d" % (sep, self.ipendpoints[replica]) - sep = ':' - replicaProperties = replicaProperties + \ - ' --IceStorm.NodeId=%d --IceStorm.Node.Endpoints="default -p %d"' + \ - ' --IceStorm.ReplicatedTopicManagerEndpoints="' +\ - replicaTopicManagerEndpoints + '"' - if replicatedPublisher: - replicaProperties = replicaProperties + \ - ' --IceStorm.ReplicatedPublishEndpoints="' + replicaPublishEndpoints + '"' - self.iceBoxEndpoints = [] - self.iceBoxAdminEndpoints = [] - self.iceStormEndpoints = [] - self.replicaProperties = [] - self.dbHome= [] - self.iceStormDBEnv= [] - self.procs = [] - for replica in range(0, 3): - self.iceBoxEndpoints.append(origIceBoxService.format(self.ibendpoints[replica])) - self.iceBoxAdminEndpoints.append(origIceBoxEndpoints.format(self.ibendpoints[replica])) - service = origIceStormService % self.isendpoints[replica] - service = service + ' --IceStorm.Publish.Endpoints="default -p %d:udp -p %d"' % ( - self.ipendpoints[replica], self.ipendpoints[replica]) - if instanceName: - service = service + ' --IceStorm.InstanceName=%s ' % instanceName - #service = service + ' --IceStorm.Trace.Election=1' - #service = service + ' --IceStorm.Trace.Replication=1' - #service = service + ' --IceStorm.Trace.Subscriber=1' - #service = service + ' --IceStorm.Trace.Topic=1' - #service = service + ' --Ice.Trace.Network=3' - #service = service + ' --Ice.Trace.Protocol=1' - if additional: - service = service + " " + additional - self.iceStormEndpoints.append(service) - self.replicaProperties.append(replicaProperties % (replica, self.nendpoints[replica])) - dbHome = os.path.join(self.testdir, "%d.%s" % (replica, dbDir)) - self.dbHome.append(dbHome) - TestUtil.cleanDbDir(dbHome) - - self.iceStormDBEnv.append(' --IceStorm.LMDB.MapSize=1 --IceStorm.LMDB.Path="%s"' % dbHome) - self.procs.append(None) - - topicReplicaProxy = '%s/TopicManager:%s' % (instanceName, replicaTopicManagerEndpoints) - self.iceStormProxy = topicReplicaProxy - self.iceStormReference = ' --IceStormAdmin.TopicManager.Default="%s"' % topicReplicaProxy - - def adminForReplica(self, replica, cmd, expect = None, **args): - ep = self.isendpoints[replica] - proxy = origIceStormProxy % (self.instanceName, self.isendpoints[replica]) - ref = origIceStormReference % proxy - self.adminWithRef(ref, cmd, expect, **args) - - def clean(self): - for replica in range(0, 3): - TestUtil.cleanDbDir(self.dbHome[replica]) - - def start(self, echo = True, **args): - if echo: - sys.stdout.write("starting icestorm replicas... ") - sys.stdout.flush() - # Start replicas. - for replica in range(0, 3): - if echo: - sys.stdout.write(str(replica) + " ") - sys.stdout.flush() - self.startReplica(replica, echo=False, **args) - if echo: - print("ok") - - def startReplica(self, replica, echo = True, additionalOptions = ""): - if echo: - sys.stdout.write("starting icestorm replica %d..." % replica + " ") - sys.stdout.flush() - - proc = TestUtil.startServer(self.iceBox, - self.iceBoxEndpoints[replica] + - self.iceStormEndpoints[replica] + - self.replicaProperties[replica] + - self.iceStormDBEnv[replica] + - additionalOptions, - adapter = "IceStorm", - echo = False) - self.procs[replica] = proc - if echo: - print("ok") - - def stop(self): - for replica in range(0, 3): - self.stopReplica(replica) - - def stopReplica(self, replica): - if self.procs[replica]: - self.runIceBoxAdmin(self.iceBoxAdminEndpoints[replica], "shutdown") - self.procs[replica].waitTestSuccess() - self.procs[replica] = None - - def reference(self, replica=-1): - if replica == -1: - return self.iceStormReference - ep = self.isendpoints[replica] - proxy = origIceStormProxy % (self.instanceName, self.isendpoints[replica]) - return origIceStormReference % proxy - -class NonReplicated(IceStormUtil): - def __init__(self, toplevel, testdir, transient, \ - additional = None, dbDir = "db", instanceName=None, port = 12010): - IceStormUtil.__init__(self, toplevel, testdir) - iceBoxPort = port - iceStormPort = port + 1 - publisherPort = port + 2 - self.iceBoxService = origIceBoxService.format(iceBoxPort) - self.iceBoxEndpoints = origIceBoxEndpoints.format(iceBoxPort) - self.iceStormService = origIceStormService % (iceStormPort) - self.dbDir = dbDir - - if instanceName: - self.iceStormService = self.iceStormService + ' --IceStorm.InstanceName=%s ' % instanceName - else: - instanceName = "IceStorm" + getParentProps = Server.getProps # Used by IceStormProcess to get the server properties - if publisherPort: - self.iceStormService = self.iceStormService + ' --IceStorm.Publish.Endpoints="default -p %d:udp -p %d"' % ( - publisherPort, publisherPort) - else: - self.iceStormService = self.iceStormService + ' --IceStorm.Publish.Endpoints="default:udp"' - self.transient = transient - if self.transient: - self.iceStormService = self.iceStormService + " --IceStorm.Transient=1" - if additional: - self.iceStormService = self.iceStormService + " " + additional - self.iceStormProxy = origIceStormProxy % (instanceName, iceStormPort) - self.iceStormReference = origIceStormReference % self.iceStormProxy +class Publisher(IceStormProcess, Client): - self.dbHome = os.path.join(self.testdir, self.dbDir) - TestUtil.cleanDbDir(self.dbHome) + processType = "publisher" - self.iceStormDBEnv = ' --IceStorm.LMDB.MapSize=1 --IceStorm.LMDB.Path="%s"' % self.dbHome + def __init__(self, instanceName=None, instance=None, *args, **kargs): + Client.__init__(self, *args, **kargs) + IceStormProcess.__init__(self, instanceName, instance) - def clean(self): - TestUtil.cleanDbDir(self.dbHome) + getParentProps = Client.getProps # Used by IceStormProcess to get the client properties - def start(self, echo = True, additionalOptions = ""): - if echo: - if self.transient: - sys.stdout.write("starting transient icestorm service... ") - else: - sys.stdout.write("starting icestorm service... ") - sys.stdout.flush() - - self.proc = TestUtil.startServer(self.iceBox, - self.iceBoxService + - self.iceStormService + - self.iceStormDBEnv + - additionalOptions, adapter = "IceStorm", - echo = False) - if echo: - print("ok") - return self.proc - - def stop(self): - self.runIceBoxAdmin(self.iceBoxEndpoints, "shutdown") - self.proc.waitTestSuccess() - -def init(toplevel, testdir, type, **args): - if type == "replicated": - return Replicated(toplevel, testdir, **args) - if type == "transient": - return NonReplicated(toplevel, testdir, True, **args) - return NonReplicated(toplevel, testdir, False, **args) +class IceStormTestCase(TestCase): + + def __init__(self, name, icestorm, *args, **kargs): + TestCase.__init__(self, name, *args, **kargs) + self.icestorm = icestorm if isinstance(icestorm, list) else [icestorm] + + def init(self, mapping, testsuite): + TestCase.init(self, mapping, testsuite) + + # + # Add icestorm servers at the begining of the server list, IceStorm needs to be + # started first! + # + self.servers = self.icestorm + self.servers + + def runWithDriver(self, current): + current.driver.runClientServerTestCase(current) + + def startIceStorm(self, current): + for icestorm in self.icestorm: + icestorm.start(current) + + def stopIceStorm(self, current): + self.shutdown(current) + for icestorm in self.icestorm: + icestorm.stop(current, True) + + def restartIceStorm(self, current): + self.stopIceStorm(current) + self.startIceStorm(current) + + def shutdown(self, current): + for icestorm in self.icestorm: + icestorm.shutdown(current) + + def runadmin(self, current, cmd, instanceName=None, instance=None, exitstatus=0, quiet=False): + admin = IceStormAdmin(instanceName, instance, args=["-e", cmd], quiet=quiet) + admin.run(current, exitstatus=exitstatus) + return admin.getOutput() + + def getTopicManager(self, current, instanceName=None): + if not instanceName: + # Return the topic manager proxy from the first IceStorm server + return self.icestorm[0].getReplicatedTopicManager(current) + + # + # Otherwise, search for an IceStorm server with the given instance + # name and return its replicated topic manager proxy + # + for s in self.icestorm: + if s.getInstanceName() == instanceName: + return s.getReplicatedTopicManager(current) + + def getInstanceNames(self): + # Return the different IceStorm instance names deployed with this + # test case + names = set() + for s in self.icestorm: + names.add(s.getInstanceName()) + return list(names) |