summaryrefslogtreecommitdiff
path: root/scripts/IceStormUtil.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/IceStormUtil.py')
-rw-r--r--scripts/IceStormUtil.py441
1 files changed, 204 insertions, 237 deletions
diff --git a/scripts/IceStormUtil.py b/scripts/IceStormUtil.py
index a619fe5809c..7aeb5ddd775 100644
--- a/scripts/IceStormUtil.py
+++ b/scripts/IceStormUtil.py
@@ -1,5 +1,3 @@
-#!/usr/bin/env python
-
# **********************************************************************
#
# Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved.
@@ -9,254 +7,223 @@
#
# **********************************************************************
-import os, sys
-import TestUtil
+import sys, os
+from Util import *
-global testdir
-global toplevel
+class IceStorm(ProcessFromBinDir, Server):
-origIceBoxService = ' --Ice.Admin.Endpoints="default -p {0}"' + \
- ' --Ice.Admin.InstanceName=IceBox{0}' + \
- ' --Ice.Default.Locator='
+ def __init__(self, instanceName="IceStorm", replica=0, nreplicas=0, transient=False, portnum=0, *args, **kargs):
+ Server.__init__(self, exe="icebox", ready="IceStorm", mapping=Mapping.getByName("cpp"), *args, **kargs)
+ self.portnum = portnum
+ self.replica = replica
+ self.nreplicas = nreplicas
+ self.transient = transient
+ self.instanceName = instanceName
+ self.desc = self.instanceName if self.nreplicas == 0 else "{0} replica #{1}".format(self.instanceName,
+ self.replica)
+
+ def setup(self, current):
+ # Create the database directory
+ os.mkdir(os.path.join(current.testcase.getPath(), "{0}-{1}.db".format(self.instanceName, self.replica)))
+
+ def teardown(self, current, success):
+ # Remove the database directory tree
+ try:
+ shutil.rmtree(os.path.join(current.testcase.getPath(), "{0}-{1}.db".format(self.instanceName, self.replica)))
+ except:
+ pass
+
+ def getProps(self, current):
+ props = Server.getProps(self, current)
+
+ # Default properties
+ props.update({
+ 'IceBox.Service.IceStorm' : 'IceStormService,' + getIceSoVersion() + ':createIceStorm',
+ 'IceBox.PrintServicesReady' : 'IceStorm',
+ 'IceBox.InheritProperties' : 1,
+ 'IceStorm.InstanceName' : self.instanceName,
+ 'Ice.Admin.InstanceName' : 'IceBox',
+ 'Ice.Warn.Dispatch' : 0,
+ 'Ice.Warn.Connections' : 0,
+ 'IceStorm.LMDB.MapSize' : 1,
+ 'IceStorm.LMDB.Path' : '{testdir}/{process.instanceName}-{process.replica}.db',
+ })
+
+ if self.nreplicas > 0:
+ props['IceStorm.NodeId'] = self.replica
-origIceBoxEndpoints = ' --IceBoxAdmin.ServiceManager.Proxy="IceBox{0}/admin -f IceBox.ServiceManager: default -p {0}"'
+ if self.transient:
+ props["IceStorm.Transient"] = 1
+
+ #
+ # Add endpoint properties here as these properties depend on the worker thread running the
+ # the test case for the port number. The port number is computed by the driver based on a
+ # fixed portnum index for each IceStorm endpoint (portnum = 0 for the topic manager endpoint,
+ # portnum=1 for the publish endpoint, portnum=2 for the node endpoint and portnum=3 for the
+ # icebox admin endpoint).
+ #
+
+ # Manager, publish, node and admin ndpoints for given replica number
+ manager = lambda replica: current.getTestEndpoint(self.portnum + replica * 4 + 0)
+ publish = lambda replica: "{0}:{1}".format(current.getTestEndpoint(self.portnum + replica * 4 + 1),
+ current.getTestEndpoint(self.portnum + replica * 4 + 1, "udp"))
+ node = lambda replica: current.getTestEndpoint(self.portnum + replica * 4 + 2)
+ admin = lambda replica: current.getTestEndpoint(self.portnum + replica * 4 + 3)
+
+ # The endpoints for the given replica
+ props.update({
+ "IceStorm.TopicManager.Endpoints" : manager(self.replica),
+ "IceStorm.Publish.Endpoints" : publish(self.replica),
+ "Ice.Admin.Endpoints" : admin(self.replica),
+ })
+
+ # Compute the node and replicated endpoints to be configured for each replica
+ if self.nreplicas > 0:
+ props['IceStorm.Node.Endpoints'] = node(self.replica)
+ for i in range(0, self.nreplicas):
+ props["IceStorm.Nodes.{0}".format(i)] = "{2}/node{0}:{1}".format(i, node(i), self.instanceName)
+ props['IceStorm.ReplicatedTopicManagerEndpoints'] = ":".join([manager(i) for i in range(0, self.nreplicas)])
+ props['IceStorm.ReplicatedPublishEndpoints'] = ":".join([publish(i) for i in range(0, self.nreplicas)])
+
+ return props
+
+ def getInstanceName(self):
+ return self.instanceName
+
+ def getTopicManager(self, current):
+ # Return the endpoint for this IceStorm replica
+ return "{1}/TopicManager:{0}".format(current.getTestEndpoint(self.portnum + self.replica * 4), self.instanceName)
+
+ def getReplicatedTopicManager(self, current):
+ # Return the replicated endpoints for IceStorm
+ if self.nreplicas == 0:
+ return self.getTopicManager(current)
+ manager = lambda replica: current.getTestEndpoint(self.portnum + replica * 4)
+ return "{1}/TopicManager:{0}".format(":".join([manager(i) for i in range(0, self.nreplicas)]), self.instanceName)
+
+ def shutdown(self, current):
+ # Shutdown this replica by connecting to the IceBox service manager with iceboxadmin
+ endpoint = current.getTestEndpoint(self.portnum + self.replica * 4 + 3)
+ props = { "IceBoxAdmin.ServiceManager.Proxy" : "IceBox/admin -f IceBox.ServiceManager:" + endpoint }
+ IceBoxAdmin().run(current, props=props, args=['shutdown'])
+
+class IceStormProcess:
+
+ def __init__(self, instanceName=None, instance=None):
+ self.instanceName = instanceName
+ self.instance = instance
+
+ def getProps(self, current):
+
+ #
+ # An IceStorm client is provided with the IceStormAdmin.TopicManager.Default property set
+ # to the "instance" topic manager proxy if "instance" is set. Otherwise, if a single it's
+ # set to the replicated topic manager if a specific "instance name" is provided or there's
+ # only one IceStorm instance name deployed. If IceStorm multiple instance names are set,
+ # the client is given an IceStormAdmin.<instance name> property for each instance containing
+ # the replicated topic manager proxy.
+ #
+
+ props = self.getParentProps(current)
+ testcase = current.testcase
+ while testcase and not isinstance(testcase, IceStormTestCase): testcase = testcase.parent
+ if self.instance:
+ props["IceStormAdmin.TopicManager.Default"] = self.instance.getTopicManager(current)
+ else:
+ instanceNames = [self.instanceName] if self.instanceName else testcase.getInstanceNames()
+ if len(instanceNames) == 1:
+ props["IceStormAdmin.TopicManager.Default"] = testcase.getTopicManager(current, instanceNames[0])
+ else:
+ for name in instanceNames:
+ props["IceStormAdmin.TopicManager.{0}".format(name)] = testcase.getTopicManager(current, name)
+ return props
-# Turn off the dispatch and connection warnings -- they are expected
-# when using a replicated IceStorm.
-origIceStormService = ' --IceBox.Service.IceStorm=IceStormService,' + TestUtil.getIceSoVersion() + ':createIceStorm' + \
- ' --IceStorm.TopicManager.Endpoints="default -p %d"' + \
- ' --IceBox.PrintServicesReady=IceStorm' + \
- ' --IceBox.InheritProperties=1' + \
- ' --Ice.Warn.Dispatch=0 --Ice.Warn.Connections=0' + \
- ' --Ice.ServerIdleTime=0'
+class IceStormAdmin(ProcessFromBinDir, IceStormProcess, Client):
-origIceStormProxy = '%s/TopicManager:default -p %d'
+ def __init__(self, instanceName=None, instance=None, *args, **kargs):
+ Client.__init__(self, exe="icestormadmin", mapping=Mapping.getByName("cpp"), *args, **kargs)
+ IceStormProcess.__init__(self, instanceName, instance)
-origIceStormReference = ' --IceStormAdmin.TopicManager.Default="%s"'
+ getParentProps = Client.getProps # Used by IceStormProcess to get the client properties
-class IceStormUtil(object):
- def __init__(self, toplevel, testdir):
- self.toplevel = toplevel
- self.testdir = testdir
- self.iceBox = TestUtil.getIceBox()
- self.iceBoxAdmin = os.path.join(TestUtil.getCppBinDir(), "iceboxadmin")
- self.iceStormAdmin = os.path.join(TestUtil.getCppBinDir(), "icestormadmin")
+class Subscriber(IceStormProcess, Server):
- def runIceBoxAdmin(self, endpts, command):
- proc = TestUtil.startClient(self.iceBoxAdmin, endpts + " " + command, echo = False)
- proc.waitTestSuccess()
- return proc.buf
+ processType = "subscriber"
- def admin(self, cmd, **args):
- self.adminWithRef(self.iceStormReference, cmd, **args)
+ def __init__(self, instanceName=None, instance=None, *args, **kargs):
+ Server.__init__(self, *args, **kargs)
+ IceStormProcess.__init__(self, instanceName, instance)
- def adminWithRef(self, ref, cmd, expect = None):
- proc = TestUtil.startClient(self.iceStormAdmin, ref + r' -e "%s"' % cmd, echo = False)
- if expect:
- proc.expect(expect)
- proc.wait()
- else:
- proc.waitTestSuccess()
-
- def reference(self):
- return self.iceStormReference
- def proxy(self):
- return self.iceStormProxy
-
-class Replicated(IceStormUtil):
- def __init__(self, toplevel, testdir,
- additional = None,
- replicatedPublisher = True,
- dbDir = "db",
- instanceName="IceStorm", port = 12010):
- IceStormUtil.__init__(self, toplevel, testdir)
- self.procs = []
- self.nendpoints = [] # Node endpoints
- self.instanceName = instanceName
- self.ibendpoints = [] # IceBox endpoints
- self.isendpoints = [] # TopicManager endpoints
- self.ipendpoints = [] # Publisher endpoints
- for replica in range(0, 3):
- self.nendpoints.append(port + 100 + replica * 10)
- self.ibendpoints.append(port + replica * 10)
- self.isendpoints.append(port + (replica * 10)+1)
- self.ipendpoints.append(port + (replica * 10)+2)
- replicaProperties = ""
- sep =''
- replicaTopicManagerEndpoints = ''
- replicaPublishEndpoints = ''
- for replica in range(0, 3):
- replicaProperties = replicaProperties + \
- ' --IceStorm.Nodes.%d="%s/node%d:default -t 10000 -p %d"' % (
- replica, instanceName, replica, self.nendpoints[replica])
- replicaTopicManagerEndpoints = replicaTopicManagerEndpoints + "%sdefault -p %d" % (
- sep, self.isendpoints[replica])
- replicaPublishEndpoints = replicaPublishEndpoints + "%sdefault -p %d" % (sep, self.ipendpoints[replica])
- sep = ':'
- replicaProperties = replicaProperties + \
- ' --IceStorm.NodeId=%d --IceStorm.Node.Endpoints="default -p %d"' + \
- ' --IceStorm.ReplicatedTopicManagerEndpoints="' +\
- replicaTopicManagerEndpoints + '"'
- if replicatedPublisher:
- replicaProperties = replicaProperties + \
- ' --IceStorm.ReplicatedPublishEndpoints="' + replicaPublishEndpoints + '"'
- self.iceBoxEndpoints = []
- self.iceBoxAdminEndpoints = []
- self.iceStormEndpoints = []
- self.replicaProperties = []
- self.dbHome= []
- self.iceStormDBEnv= []
- self.procs = []
- for replica in range(0, 3):
- self.iceBoxEndpoints.append(origIceBoxService.format(self.ibendpoints[replica]))
- self.iceBoxAdminEndpoints.append(origIceBoxEndpoints.format(self.ibendpoints[replica]))
- service = origIceStormService % self.isendpoints[replica]
- service = service + ' --IceStorm.Publish.Endpoints="default -p %d:udp -p %d"' % (
- self.ipendpoints[replica], self.ipendpoints[replica])
- if instanceName:
- service = service + ' --IceStorm.InstanceName=%s ' % instanceName
- #service = service + ' --IceStorm.Trace.Election=1'
- #service = service + ' --IceStorm.Trace.Replication=1'
- #service = service + ' --IceStorm.Trace.Subscriber=1'
- #service = service + ' --IceStorm.Trace.Topic=1'
- #service = service + ' --Ice.Trace.Network=3'
- #service = service + ' --Ice.Trace.Protocol=1'
- if additional:
- service = service + " " + additional
- self.iceStormEndpoints.append(service)
- self.replicaProperties.append(replicaProperties % (replica, self.nendpoints[replica]))
- dbHome = os.path.join(self.testdir, "%d.%s" % (replica, dbDir))
- self.dbHome.append(dbHome)
- TestUtil.cleanDbDir(dbHome)
-
- self.iceStormDBEnv.append(' --IceStorm.LMDB.MapSize=1 --IceStorm.LMDB.Path="%s"' % dbHome)
- self.procs.append(None)
-
- topicReplicaProxy = '%s/TopicManager:%s' % (instanceName, replicaTopicManagerEndpoints)
- self.iceStormProxy = topicReplicaProxy
- self.iceStormReference = ' --IceStormAdmin.TopicManager.Default="%s"' % topicReplicaProxy
-
- def adminForReplica(self, replica, cmd, expect = None, **args):
- ep = self.isendpoints[replica]
- proxy = origIceStormProxy % (self.instanceName, self.isendpoints[replica])
- ref = origIceStormReference % proxy
- self.adminWithRef(ref, cmd, expect, **args)
-
- def clean(self):
- for replica in range(0, 3):
- TestUtil.cleanDbDir(self.dbHome[replica])
-
- def start(self, echo = True, **args):
- if echo:
- sys.stdout.write("starting icestorm replicas... ")
- sys.stdout.flush()
- # Start replicas.
- for replica in range(0, 3):
- if echo:
- sys.stdout.write(str(replica) + " ")
- sys.stdout.flush()
- self.startReplica(replica, echo=False, **args)
- if echo:
- print("ok")
-
- def startReplica(self, replica, echo = True, additionalOptions = ""):
- if echo:
- sys.stdout.write("starting icestorm replica %d..." % replica + " ")
- sys.stdout.flush()
-
- proc = TestUtil.startServer(self.iceBox,
- self.iceBoxEndpoints[replica] +
- self.iceStormEndpoints[replica] +
- self.replicaProperties[replica] +
- self.iceStormDBEnv[replica] +
- additionalOptions,
- adapter = "IceStorm",
- echo = False)
- self.procs[replica] = proc
- if echo:
- print("ok")
-
- def stop(self):
- for replica in range(0, 3):
- self.stopReplica(replica)
-
- def stopReplica(self, replica):
- if self.procs[replica]:
- self.runIceBoxAdmin(self.iceBoxAdminEndpoints[replica], "shutdown")
- self.procs[replica].waitTestSuccess()
- self.procs[replica] = None
-
- def reference(self, replica=-1):
- if replica == -1:
- return self.iceStormReference
- ep = self.isendpoints[replica]
- proxy = origIceStormProxy % (self.instanceName, self.isendpoints[replica])
- return origIceStormReference % proxy
-
-class NonReplicated(IceStormUtil):
- def __init__(self, toplevel, testdir, transient, \
- additional = None, dbDir = "db", instanceName=None, port = 12010):
- IceStormUtil.__init__(self, toplevel, testdir)
- iceBoxPort = port
- iceStormPort = port + 1
- publisherPort = port + 2
- self.iceBoxService = origIceBoxService.format(iceBoxPort)
- self.iceBoxEndpoints = origIceBoxEndpoints.format(iceBoxPort)
- self.iceStormService = origIceStormService % (iceStormPort)
- self.dbDir = dbDir
-
- if instanceName:
- self.iceStormService = self.iceStormService + ' --IceStorm.InstanceName=%s ' % instanceName
- else:
- instanceName = "IceStorm"
+ getParentProps = Server.getProps # Used by IceStormProcess to get the server properties
- if publisherPort:
- self.iceStormService = self.iceStormService + ' --IceStorm.Publish.Endpoints="default -p %d:udp -p %d"' % (
- publisherPort, publisherPort)
- else:
- self.iceStormService = self.iceStormService + ' --IceStorm.Publish.Endpoints="default:udp"'
- self.transient = transient
- if self.transient:
- self.iceStormService = self.iceStormService + " --IceStorm.Transient=1"
- if additional:
- self.iceStormService = self.iceStormService + " " + additional
- self.iceStormProxy = origIceStormProxy % (instanceName, iceStormPort)
- self.iceStormReference = origIceStormReference % self.iceStormProxy
+class Publisher(IceStormProcess, Client):
- self.dbHome = os.path.join(self.testdir, self.dbDir)
- TestUtil.cleanDbDir(self.dbHome)
+ processType = "publisher"
- self.iceStormDBEnv = ' --IceStorm.LMDB.MapSize=1 --IceStorm.LMDB.Path="%s"' % self.dbHome
+ def __init__(self, instanceName=None, instance=None, *args, **kargs):
+ Client.__init__(self, *args, **kargs)
+ IceStormProcess.__init__(self, instanceName, instance)
- def clean(self):
- TestUtil.cleanDbDir(self.dbHome)
+ getParentProps = Client.getProps # Used by IceStormProcess to get the client properties
- def start(self, echo = True, additionalOptions = ""):
- if echo:
- if self.transient:
- sys.stdout.write("starting transient icestorm service... ")
- else:
- sys.stdout.write("starting icestorm service... ")
- sys.stdout.flush()
-
- self.proc = TestUtil.startServer(self.iceBox,
- self.iceBoxService +
- self.iceStormService +
- self.iceStormDBEnv +
- additionalOptions, adapter = "IceStorm",
- echo = False)
- if echo:
- print("ok")
- return self.proc
-
- def stop(self):
- self.runIceBoxAdmin(self.iceBoxEndpoints, "shutdown")
- self.proc.waitTestSuccess()
-
-def init(toplevel, testdir, type, **args):
- if type == "replicated":
- return Replicated(toplevel, testdir, **args)
- if type == "transient":
- return NonReplicated(toplevel, testdir, True, **args)
- return NonReplicated(toplevel, testdir, False, **args)
+class IceStormTestCase(TestCase):
+
+ def __init__(self, name, icestorm, *args, **kargs):
+ TestCase.__init__(self, name, *args, **kargs)
+ self.icestorm = icestorm if isinstance(icestorm, list) else [icestorm]
+
+ def init(self, mapping, testsuite):
+ TestCase.init(self, mapping, testsuite)
+
+ #
+ # Add icestorm servers at the begining of the server list, IceStorm needs to be
+ # started first!
+ #
+ self.servers = self.icestorm + self.servers
+
+ def runWithDriver(self, current):
+ current.driver.runClientServerTestCase(current)
+
+ def startIceStorm(self, current):
+ for icestorm in self.icestorm:
+ icestorm.start(current)
+
+ def stopIceStorm(self, current):
+ self.shutdown(current)
+ for icestorm in self.icestorm:
+ icestorm.stop(current, True)
+
+ def restartIceStorm(self, current):
+ self.stopIceStorm(current)
+ self.startIceStorm(current)
+
+ def shutdown(self, current):
+ for icestorm in self.icestorm:
+ icestorm.shutdown(current)
+
+ def runadmin(self, current, cmd, instanceName=None, instance=None, exitstatus=0, quiet=False):
+ admin = IceStormAdmin(instanceName, instance, args=["-e", cmd], quiet=quiet)
+ admin.run(current, exitstatus=exitstatus)
+ return admin.getOutput()
+
+ def getTopicManager(self, current, instanceName=None):
+ if not instanceName:
+ # Return the topic manager proxy from the first IceStorm server
+ return self.icestorm[0].getReplicatedTopicManager(current)
+
+ #
+ # Otherwise, search for an IceStorm server with the given instance
+ # name and return its replicated topic manager proxy
+ #
+ for s in self.icestorm:
+ if s.getInstanceName() == instanceName:
+ return s.getReplicatedTopicManager(current)
+
+ def getInstanceNames(self):
+ # Return the different IceStorm instance names deployed with this
+ # test case
+ names = set()
+ for s in self.icestorm:
+ names.add(s.getInstanceName())
+ return list(names)