#
# Copyright (c) ZeroC, Inc. All rights reserved.
#
import sys, os, time, threading
from Util import *
from Component import component
isPython2 = sys.version_info[0] == 2
#
# The Executor class runs testsuites on multiple worker threads.
#
class Executor:
def __init__(self, threadlocal, workers, continueOnFailure):
self.threadlocal = threadlocal
self.workers = workers - 1
self.queue = []
self.mainThreadQueue = []
self.queueLength = 0
self.failure = False
self.interrupted = False
self.continueOnFailure = continueOnFailure
self.lock = threading.Lock()
def submit(self, testsuite, crossMappings, driver):
mainThreadOnly = testsuite.isMainThreadOnly(driver) or self.workers == 0
#
# If the test supports workers and we are cross testing, ensure that all the cross
# testing mappings support workers as well.
#
if not mainThreadOnly:
for cross in crossMappings:
if cross:
t = cross.findTestSuite(testsuite)
if t and t.isMainThreadOnly(driver):
mainThreadOnly = True
break
if mainThreadOnly:
self.mainThreadQueue.append(testsuite)
else:
self.queue.append(testsuite)
self.queueLength += 1
def get(self, total, mainThread=False):
with self.lock:
if self.failure:
return None
queue = self.mainThreadQueue if mainThread else self.queue
if len(queue) == 0:
return None
self.queueLength -= 1
return (queue.pop(0), total - self.queueLength)
def isInterrupted(self):
with self.lock:
return self.interrupted
def setInterrupt(self, value):
with self.lock:
self.interrupted = value
def runTestSuites(self, driver, total, results, mainThread=False):
while True:
item = self.get(total, mainThread)
if not item:
results.put(None) # Notify the main thread that there are not more tests to run
break
(testsuite, index) = item
result = Result(testsuite, not driver.isWorkerThread())
current = LocalDriver.Current(driver, testsuite, result, index, total)
try:
testsuite.run(current)
except KeyboardInterrupt:
if mainThread:
raise
except:
pass
finally:
current.destroy()
results.put((result, mainThread))
if not result.isSuccess() and not self.continueOnFailure:
with self.lock: self.failure = True
def runUntilCompleted(self, driver, start):
if self.queueLength == 0:
return []
total = self.queueLength
if self.workers == 0 and start > 0:
for i in range(1, start):
if len(self.mainThreadQueue) == 0:
break
self.mainThreadQueue.pop(0)
self.queueLength -= 1
#
# Worker threads dequeue and run testsuites. They push results to the results
# queue. The thread stops when there are no more testsuite to dequeue.
#
resultList = []
results = queue.Queue()
def worker(num):
self.threadlocal.num = num
try:
self.runTestSuites(driver, total, results)
except Exception as ex:
print("unexpected exception raised from worker thread:\n" + str(ex))
results.put(None) # Notify the main thread that we're done
#
# Start the worker threads
#
threads=[]
for i in range(min(self.workers, total)):
t = threading.Thread(target=worker, args=[i])
t.start()
threads.append(t)
try:
#
# Run the main thread testsuites.
#
self.runTestSuites(driver, total, results, True)
#
# Dequeue results and print out the testuite output for each test.
#
count = len(threads) + 1
while count > 0:
try:
r = results.get(timeout=1)
if not r:
count -= 1
continue
except queue.Empty:
continue
(result, mainThread) = r
resultList.append(result)
if not mainThread:
sys.stdout.write(result.getOutput())
except KeyboardInterrupt:
with self.lock:
self.failure = True
self.interrupted = True
if threads:
print("Terminating (waiting for worker threads to terminate)...")
else:
print("")
raise
finally:
#
# Wait for worker threads to be done.
#
for t in threads:
t.join()
#
# Print out remaining testsuites.
#
try:
while True:
r = results.get_nowait()
if r:
(result, mainThread) = r
resultList.append(result)
if not mainThread:
sys.stdout.write(result.getOutput())
except queue.Empty:
pass
return resultList
#
# Runner to run the test cases locally.
#
class TestCaseRunner:
def getTestSuites(self, mapping, testSuiteIds):
return mapping.getTestSuites(testSuiteIds)
def filterOptions(self, options):
return options
def startServerSide(self, testcase, current):
return testcase._startServerSide(current)
def stopServerSide(self, testcase, current, success):
testcase._stopServerSide(current, success)
def runClientSide(self, testcase, current, host):
testcase._runClientSide(current, host)
#
# Runner to run the test cases remotely with the controller (requires IcePy)
#
class RemoteTestCaseRunner(TestCaseRunner):
def __init__(self, communicator, clientPrx, serverPrx):
import Test
if clientPrx:
self.clientController = communicator.stringToProxy(clientPrx)
self.clientController = Test.Common.ControllerPrx.checkedCast(self.clientController)
self.clientOptions = self.clientController.getOptionOverrides()
else:
self.clientController = None
self.clientOptions = {}
if serverPrx:
self.serverController = communicator.stringToProxy(serverPrx)
self.serverController = Test.Common.ControllerPrx.checkedCast(self.serverController)
self.serverOptions = self.serverController.getOptionOverrides()
else:
self.serverController = None
self.serverOptions = {}
def getTestSuites(self, mapping, testSuiteIds):
if self.clientController:
clientTestSuiteIds = self.clientController.getTestSuites(str(mapping))
if testSuiteIds:
testSuiteIds = [ts for ts in clientTestSuiteIds if ts in testSuiteIds]
else:
testSuiteIds = clientTestSuiteIds
if self.serverController:
serverTestSuiteIds = self.serverController.getTestSuites(str(mapping))
if testSuiteIds:
testSuiteIds = [ts for ts in serverTestSuiteIds if ts in testSuiteIds]
else:
testSuiteIds = serverTestSuiteIds
return mapping.getTestSuites(testSuiteIds)
def getHost(self, protocol, ipv6):
if self.clientController:
return self.clientController.getHost(protocol, ipv6)
else:
return self.serverController.getHost(protocol, ipv6)
def filterOptions(self, options):
if options is None:
return None
import Ice
options = options.copy()
for (key, values) in options.items():
for opts in [self.serverOptions, self.clientOptions]:
if hasattr(opts, key) and getattr(opts, key) is not Ice.Unset:
options[key] = [v for v in values if v in getattr(opts, key)]
return options
def startServerSide(self, testcase, current):
if not self.serverController:
return TestCaseRunner.startServerSide(self, testcase, current)
import Test
current.serverTestCase = self.serverController.runTestCase(str(testcase.getMapping()),
testcase.getTestSuite().getId(),
testcase.getName(),
str(current.driver.cross))
try:
try:
return current.serverTestCase.startServerSide(self.getConfig(current))
except Test.Common.TestCaseFailedException as ex:
current.result.writeln(ex.output)
raise RuntimeError("test failed:\n" + str(ex))
except:
current.serverTestCase.destroy()
current.serverTestCase = None
raise
def stopServerSide(self, testcase, current, success):
if not self.serverController:
TestCaseRunner.stopServerSide(self, testcase, current, success)
return
import Test
try:
current.result.write(current.serverTestCase.stopServerSide(success))
current.host = None
except Test.Common.TestCaseFailedException as ex:
current.result.writeln(ex.output)
raise RuntimeError("test failed")
finally:
current.serverTestCase = None
def runClientSide(self, testcase, current, host):
import Test
if not self.clientController:
TestCaseRunner.runClientSide(self, testcase, current, host)
return
clientTestCase = self.clientController.runTestCase(str(testcase.getMapping()),
testcase.getTestSuite().getId(),
testcase.getName(),
str(current.driver.cross))
try:
current.result.write(clientTestCase.runClientSide(host, self.getConfig(current)))
except Test.Common.TestCaseFailedException as ex:
current.result.writeln(ex.output)
raise RuntimeError("test failed")
finally:
clientTestCase.destroy()
def getConfig(self, current):
import Test
return Test.Common.Config(current.config.protocol,
current.config.mx,
current.config.serialize,
current.config.compress,
current.config.ipv6,
current.config.cprops,
current.config.sprops)
class XmlExporter:
def __init__(self, results, duration, failures):
self.results = results
self.duration = duration
self.failures = failures
def save(self, filename, hostname):
with open(filename, "w") if isPython2 else open(filename, "w", encoding="utf-8") as out:
out.write('\n')
out.write('\n'.format(len(self.results),
self.duration,
len(self.failures)))
for r in self.results:
r.writeAsXml(out, hostname)
out.write('\n')
class LocalDriver(Driver):
class Current(Driver.Current):
def __init__(self, driver, testsuite, result, index, total):
Driver.Current.__init__(self, driver, testsuite, result)
self.index = index
self.total = total
@classmethod
def getSupportedArgs(self):
return ("", ["cross=", "workers=", "continue", "loop", "start=", "all", "all-cross", "host=",
"client=", "server=", "show-durations", "export-xml="])
@classmethod
def usage(self):
print("")
print("Local driver options:")
print("--cross= Run with servers from given mapping.")
print("--workers= The number of worker threads to run the tests.")
print("--start= Start running the tests at the given index.")
print("--loop Run the tests in a loop.")
print("--continue Don't stop on failures.")
print("--all Run all sensible permutations of the tests.")
print("--all-cross Run all sensible permutations of cross language tests.")
print("--client= The endpoint of the controller to run the client side.")
print("--server= The endpoint of the controller to run the server side.")
print("--show-durations Print out the duration of each tests.")
print("--export-xml= Export JUnit XML test report.")
def __init__(self, options, *args, **kargs):
Driver.__init__(self, options, *args, **kargs)
self.cross = ""
self.allCross = False
self.workers = 1
self.continueOnFailure = False
self.loop = False
self.start = 0
self.all = False
self.showDurations = False
self.exportToXml = ""
self.clientCtlPrx = ""
self.serverCtlPrx = ""
parseOptions(self, options, { "continue" : "continueOnFailure",
"l" : "loop",
"all-cross" : "allCross",
"client" : "clientCtlPrx",
"server" : "serverCtlPrx",
"show-durations" : "showDurations",
"export-xml" : "exportToXml" })
if self.cross:
self.cross = Mapping.getByName(self.cross)
if not self.cross:
raise RuntimeError("unknown mapping `{0}' for --cross option".format(self.cross))
self.results = []
self.threadlocal = threading.local()
self.loopCount = 1
self.executor = Executor(self.threadlocal, self.workers, self.continueOnFailure)
def run(self, mappings, testSuiteIds):
if self.clientCtlPrx or self.serverCtlPrx:
self.initCommunicator()
self.runner = RemoteTestCaseRunner(self.communicator, self.clientCtlPrx, self.serverCtlPrx)
else:
self.runner = TestCaseRunner()
try:
while True:
for mapping in mappings:
testsuites = self.runner.getTestSuites(mapping, testSuiteIds)
#
# Sort the test suites to run tests in the following order.
#
runOrder = self.component.getRunOrder()
def testsuiteKey(testsuite):
for k in runOrder:
if testsuite.getId().startswith(k + '/'):
return testsuite.getId().replace(k, str(runOrder.index(k)))
return testsuite.getId()
testsuites = sorted(testsuites, key=testsuiteKey)
for testsuite in testsuites:
if mapping.filterTestSuite(testsuite.getId(), self.configs[mapping], self.filters, self.rfilters):
continue
if testsuite.getId() == "Ice/echo":
continue
elif (self.cross or self.allCross) and not self.component.isCross(testsuite.getId()):
continue
elif isinstance(self.runner, RemoteTestCaseRunner) and not testsuite.isMultiHost():
continue
self.executor.submit(testsuite, Mapping.getAll(self) if self.allCross else [self.cross], self)
#
# Run all the tests and wait for the executor to complete.
#
now = time.time()
results = self.executor.runUntilCompleted(self, self.start)
failures = [r for r in results if not r.isSuccess()]
duration = time.time() - now
if self.exportToXml:
XmlExporter(results, duration, failures).save(self.exportToXml, os.getenv("NODE_NAME", ""))
m, s = divmod(duration, 60)
print("")
if m > 0:
print("Ran {0} tests in {1} minutes {2:02.2f} seconds".format(len(results), m, s))
else:
print("Ran {0} tests in {1:02.2f} seconds".format(len(results), s))
if self.showDurations:
for r in sorted(results, key = lambda r : r.getDuration()):
print("- {0} took {1:02.2f} seconds".format(r.testsuite, r.getDuration()))
self.loopCount += 1
if len(failures) > 0:
print("{0} succeeded and {1} failed:".format(len(results) - len(failures), len(failures)))
for r in failures:
print("- {0}".format(r.testsuite))
for (c, ex) in r.getFailed().items():
lines = r.getOutput(c).strip().split('\n')
for i in range(0, min(4, len(lines))):
print(" " + lines[i])
if len(lines) > 4:
print(" [...]")
for i in range(max(4, len(lines) - 8), len(lines)):
print(" " + lines[i])
return 1
else:
print("{0} succeeded".format(len(results)))
if not self.loop:
return 0
finally:
Expect.cleanup() # Cleanup processes which might still be around
def runTestSuite(self, current):
if self.loop:
current.result.write("*** [{0}/{1} loop={2}] ".format(current.index, current.total, self.loopCount))
else:
current.result.write("*** [{0}/{1}] ".format(current.index, current.total))
current.result.writeln("Running {0}/{1} tests ***".format(current.testsuite.getMapping(), current.testsuite))
success = False
try:
try:
current.result.started("setup")
current.testsuite.setup(current)
current.result.succeeded("setup")
except Exception as ex:
current.result.failed("setup", traceback.format_exc())
raise
for testcase in current.testsuite.getTestCases():
config = current.config
try:
for conf in current.config.getAll(current, testcase) if self.all else [current.config]:
current.config = conf
testcase.run(current)
except:
if current.driver.debug:
current.result.writeln(traceback.format_exc())
raise
finally:
current.config = config
success = True
finally:
try:
current.result.started("teardown")
current.testsuite.teardown(current, success)
current.result.succeeded("teardown")
except Exception as ex:
current.result.failed("teardown", traceback.format_exc())
raise
def runClientServerTestCase(self, current):
if current.testcase.getParent():
success = False
host = current.testcase._startServerSide(current)
try:
current.testcase._runClientSide(current, host)
success = True
finally:
current.testcase._stopServerSide(current, success)
return
client = current.testcase.getClientTestCase()
for cross in (Mapping.getAll(self) if self.allCross else [self.cross]):
# Only run cross tests with allCross
if self.allCross and cross == current.testcase.getMapping():
continue
# Skip if the cross test server mapping is another mapping than the cross mapping
if cross and cross != cross.getServerMapping():
continue
# Skip if the mapping doesn't provide the test case
server = current.testcase.getServerTestCase(cross)
if not server:
continue
current.writeln("[ running {0} test - {1} ]".format(current.testcase, time.strftime("%x %X")))
if not self.all:
current.config = current.config.cloneRunnable(current)
confStr = str(current.config)
if confStr:
current.writeln("- Config: {0}".format(confStr))
current.desc = confStr
else:
current.desc = ""
if cross:
current.writeln("- Mappings: {0},{1}".format(client.getMapping(), server.getMapping()))
current.desc += (" " if current.desc else "") + "cross={0}".format(server.getMapping())
if not current.config.canRun(current.testsuite.getId(), current) or not current.testcase.canRun(current):
current.result.skipped(current, "not supported with this configuration")
return
success = False
host = self.runner.startServerSide(server, current)
try:
self.runner.runClientSide(client, current, host)
success = True
finally:
#
# We start a thread to stop the servers, this ensures that stopServerSide doesn't get
# interrupted by potential KeyboardInterrupt exceptions which could leave some servers
# behind.
#
failure = []
sem = threading.Semaphore(0)
def stopServerSide():
try:
self.runner.stopServerSide(server, current, success)
except Exception as ex:
failure.append(ex)
sem.release()
t=threading.Thread(target = stopServerSide)
t.start()
while True:
try:
#
# NOTE: we can't just use join() here because of https://bugs.python.org/issue21822
# We use a semaphore to wait for the servers to be stopped and return.
#
sem.acquire()
if failure:
raise failure[0]
t.join()
break
except KeyboardInterrupt:
pass # Ignore keyboard interrupts
def runTestCase(self, current):
if self.cross or self.allCross:
#current.result.skipped(current, "only client/server tests are ran with cross tests")
return
if not current.testcase.getParent():
current.writeln("[ running {0} test - {1} ]".format(current.testcase, time.strftime("%x %X")))
if not self.all:
current.config = current.config.cloneRunnable(current)
confStr = str(current.config)
if confStr:
current.writeln("- Config: {0}".format(confStr))
current.desc = confStr
if not current.config.canRun(current.testsuite.getId(), current) or not current.testcase.canRun(current):
current.result.skipped(current, "not supported with this configuration")
return
current.testcase._runClientSide(current)
def getHost(self, protocol, ipv6):
if isinstance(self.runner, RemoteTestCaseRunner):
return self.runner.getHost(protocol, ipv6)
else:
return Driver.getHost(self, protocol, ipv6)
def isWorkerThread(self):
return hasattr(self.threadlocal, "num")
def isInterrupted(self):
return self.executor.isInterrupted()
def setInterrupt(self, value):
self.executor.setInterrupt(value)
def getTestPort(self, portnum):
# Return a port number in the range 14100-14199 for the first thread, 14200-14299 for the
# second thread, etc.
assert(portnum < 100)
baseport = 14000 + self.threadlocal.num * 100 if hasattr(self.threadlocal, "num") else 12010
return baseport + portnum
def getProps(self, process, current):
props = Driver.getProps(self, process, current)
if isinstance(process, IceProcess):
if current.host:
props["Ice.Default.Host"] = current.host
# Ice process from the bin directory don't support Test.BasePort
if not process.isFromBinDir() and hasattr(self.threadlocal, "num"):
props["Test.BasePort"] = 14000 + self.threadlocal.num * 100
return props
def getMappings(self):
return Mapping.getAll(self) if self.allCross else [self.cross] if self.cross else []
def filterOptions(self, options):
return self.runner.filterOptions(options)
Driver.add("local", LocalDriver)