summaryrefslogtreecommitdiff
path: root/cpp/test/IceStorm/federation2/run.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/test/IceStorm/federation2/run.py')
-rwxr-xr-xcpp/test/IceStorm/federation2/run.py175
1 files changed, 35 insertions, 140 deletions
diff --git a/cpp/test/IceStorm/federation2/run.py b/cpp/test/IceStorm/federation2/run.py
index f8cc85ac932..55d931c2e65 100755
--- a/cpp/test/IceStorm/federation2/run.py
+++ b/cpp/test/IceStorm/federation2/run.py
@@ -10,63 +10,29 @@
import os, sys, time, threading, re
-for toplevel in [".", "..", "../..", "../../..", "../../../.."]:
- toplevel = os.path.normpath(toplevel)
- if os.path.exists(os.path.join(toplevel, "config", "TestUtil.py")):
- break
-else:
+path = [ ".", "..", "../..", "../../..", "../../../.." ]
+head = os.path.dirname(sys.argv[0])
+if len(head) > 0:
+ path = [os.path.join(head, p) for p in path]
+path = [os.path.abspath(p) for p in path if os.path.exists(os.path.join(p, "scripts", "TestUtil.py")) ]
+if len(path) == 0:
raise "can't find toplevel directory!"
+sys.path.append(os.path.join(path[0]))
+from scripts import *
-sys.path.append(os.path.join(toplevel, "config"))
-import TestUtil
-TestUtil.processCmdLine()
-
-name = os.path.join("IceStorm", "federation2")
-testdir = os.path.dirname(os.path.abspath(__file__))
-
-import IceStormUtil
-
-iceBox = TestUtil.getIceBox(testdir)
-iceBoxAdmin = os.path.join(TestUtil.getCppBinDir(), "iceboxadmin")
iceStormAdmin = os.path.join(TestUtil.getCppBinDir(), "icestormadmin")
-publisher = os.path.join(testdir, "publisher")
-subscriber = os.path.join(testdir, "subscriber")
+publisher = os.path.join(os.getcwd(), "publisher")
+subscriber = os.path.join(os.getcwd(), "subscriber")
def admin(ref, command):
- pipe = TestUtil.startClient(iceStormAdmin, ref + r' -e "%s"' % command)
- all = ""
- while True:
- line = pipe.readline();
- if not line:
- break
- all = all + line
- status = TestUtil.closePipe(pipe)
- if status:
- TestUtil.killServers()
- sys.exit(1)
- return all
-
-def printOutput(pipe):
- try:
- while True:
- line = pipe.readline()
- if not line:
- break
- print line,
- sys.stdout.flush()
- except IOError:
- pass
+ proc = TestUtil.startClient(iceStormAdmin, ref + ' -e "%s"' % command, echo = False)
+ proc.waitTestSuccess()
+ return proc.buf
def runPublisher(icestorm1, opt = ""):
- publisherPipe = TestUtil.startClient(publisher, opt + icestorm1.reference())
-
- printOutput(publisherPipe)
-
- publisherStatus = TestUtil.closePipe(publisherPipe)
- if publisherStatus:
- TestUtil.killServers()
- sys.exit(1)
+ proc = TestUtil.startClient(publisher, opt + icestorm1.reference())
+ proc.waitTestSuccess()
def doTest(icestorm1, icestorm2, batch, subscriberRef = None):
if batch:
@@ -79,9 +45,7 @@ def doTest(icestorm1, icestorm2, batch, subscriberRef = None):
if subscriberRef == None:
subscriberRef = icestorm2.reference()
- subscriberPipe = TestUtil.startServer(subscriber, batchOptions + subscriberRef)
- TestUtil.getServerPid(subscriberPipe)
- TestUtil.getAdapterReady(subscriberPipe)
+ subscriberProc = TestUtil.startServer(subscriber, batchOptions + subscriberRef)
#
# Start the publisher. This should publish events which eventually
@@ -89,8 +53,7 @@ def doTest(icestorm1, icestorm2, batch, subscriberRef = None):
#
runPublisher(icestorm1)
- subscriberStatus = TestUtil.specificServerStatus(subscriberPipe, 30)
- return subscriberStatus
+ subscriberProc.waitTestSuccess(timeout=30)
#
# Test #1:
@@ -99,10 +62,10 @@ def doTest(icestorm1, icestorm2, batch, subscriberRef = None):
# published between them correctly.
#
def runtest(type, **args):
- icestorm1 = IceStormUtil.init(toplevel, testdir, type, additional = '--IceStorm.Discard.Interval=2',
+ icestorm1 = IceStormUtil.init(TestUtil.toplevel, os.getcwd(), type, additional = '--IceStorm.Discard.Interval=2',
dbDir = "db", instanceName = "TestIceStorm1", port = 12000, **args)
icestorm1.start()
- icestorm2 = IceStormUtil.init(toplevel, testdir, type, additional = '--IceStorm.Discard.Interval=2',
+ icestorm2 = IceStormUtil.init(TestUtil.toplevel, os.getcwd(), type, additional = '--IceStorm.Discard.Interval=2',
dbDir = "db2", instanceName = "TestIceStorm2", port = 12500, **args)
icestorm2.start()
@@ -119,7 +82,7 @@ def runtest(type, **args):
#
print "testing federation with oneway subscribers...",
sys.stdout.flush()
- onewayStatus = doTest(icestorm1, icestorm2, 0)
+ doTest(icestorm1, icestorm2, 0)
print "ok"
#
@@ -127,13 +90,9 @@ def runtest(type, **args):
#
print "testing federation with batch subscribers...",
sys.stdout.flush()
- batchStatus = doTest(icestorm1, icestorm2, 1)
+ doTest(icestorm1, icestorm2, 1)
print "ok"
- if onewayStatus or batchStatus:
- TestUtil.killServers()
- sys.exit(1)
-
#
# Test #2:
#
@@ -158,7 +117,7 @@ def runtest(type, **args):
#
print "retesting federation with oneway subscribers... ",
sys.stdout.flush()
- onewayStatus = doTest(icestorm1, icestorm2, 0)
+ doTest(icestorm1, icestorm2, 0)
print "ok"
#
@@ -166,13 +125,9 @@ def runtest(type, **args):
#
print "retesting federation with batch subscribers... ",
sys.stdout.flush()
- batchStatus = doTest(icestorm1, icestorm2, 1)
+ doTest(icestorm1, icestorm2, 1)
print "ok"
- if onewayStatus or batchStatus:
- TestUtil.killServers()
- sys.exit(1)
-
#
# Shutdown icestorm.
#
@@ -180,61 +135,6 @@ def runtest(type, **args):
icestorm2.stop()
#
- # This is used by the below test to confirm that the link warning is
- # emitted. This class conforms with the TestUtil.ReaderThread protocol.
- #
- class ExpectorThread(threading.Thread):
- def __init__(self, pipe):
- self.mutex = threading.Lock()
- self.pipe = pipe
- # Suppress "adapter ready" messages. Under windows the eol isn't \n.
- self.re = [ [ re.compile(" ready\r?\n$"), 0 ] ]
- threading.Thread.__init__(self)
-
- def run(self):
- try:
- while 1:
- line = self.pipe.readline()
- if not line: break
- found = False
- self.mutex.acquire()
- for item in self.re:
- if item[0].search(line):
- found = True
- item[1] = item[1] + 1
- break
- self.mutex.release()
- if not found:
- print line,
- except IOError:
- pass
-
- self.status = TestUtil.closePipe(self.pipe)
-
- # To comply with the ReaderThread protocol.
- def getPipe(self):
- return self.pipe
-
- # To comply with the ReaderThread protocol.
- def getStatus(self):
- return self.status
-
- def matches(self, index):
- self.mutex.acquire()
- m = self.re[index][1]
- self.mutex.release()
- return m
-
- def expect(self, r):
- self.mutex.acquire()
- self.re.append([r, 0])
- l = len(self.re)-1
- self.mutex.release()
- return l
-
- #
- # Test #3:
- #
# Restart the first server and publish some events. Attach a
# subscriber to the channel and make sure the events are received.
#
@@ -244,14 +144,10 @@ def runtest(type, **args):
if type != "replicated":
print "restarting only one IceStorm server...",
sys.stdout.flush()
- pipe = icestorm1.start(echo=False, createThread = False)
- expectorThread = ExpectorThread(pipe)
- expectorThread.start()
- global serverThreads
-
- TestUtil.serverThreads.append(expectorThread)
- index = expectorThread.expect(re.compile("topic.fed1.*subscriber offline"))
- expectorThread.expect(re.compile("connection refused"))
+ proc = icestorm1.start(echo=False)
+
+ #proc.expect("topic.fed1.*subscriber offline")
+ #proc.expect("connection refused")
print "ok"
#
@@ -259,14 +155,12 @@ def runtest(type, **args):
#
print "testing that the federation link reports an error...",
sys.stdout.flush()
- onewayStatus = doTest(icestorm1, icestorm2, 0, icestorm1.reference())
+ doTest(icestorm1, icestorm2, 0, icestorm1.reference())
# Give some time for the output to be sent.
time.sleep(2)
- if onewayStatus or expectorThread.matches(index) != 1:
- TestUtil.killServers()
- sys.exit(1)
+ proc.expect("topic.fed1.*subscriber offline")
print "ok"
print "starting downstream icestorm server...",
@@ -284,12 +178,14 @@ def runtest(type, **args):
#
print "testing link is reestablished...",
sys.stdout.flush()
- onewayStatus = doTest(icestorm1, icestorm2, 0)
+ doTest(icestorm1, icestorm2, 0)
print "ok"
- if onewayStatus or expectorThread.matches(index) != 1:
- TestUtil.killServers()
- sys.exit(1)
+ try:
+ proc.expect("topic.fed1.*subscriber offline")
+ assert False
+ except Expect.TIMEOUT:
+ pass
icestorm1.stop()
icestorm2.stop()
@@ -316,7 +212,6 @@ def runtest(type, **args):
line = admin(adminIceStormReference, "links TestIceStorm1")
if not re.compile("fed1 with cost 0").search(line):
print line
- TestUtil.killServers()
sys.exit(1)
print "ok"