diff options
Diffstat (limited to 'cpp/test/IceStorm/federation2/run.py')
-rwxr-xr-x | cpp/test/IceStorm/federation2/run.py | 642 |
1 files changed, 287 insertions, 355 deletions
diff --git a/cpp/test/IceStorm/federation2/run.py b/cpp/test/IceStorm/federation2/run.py index 411533e1ff0..a66c664d724 100755 --- a/cpp/test/IceStorm/federation2/run.py +++ b/cpp/test/IceStorm/federation2/run.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # ********************************************************************** # -# Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +# Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved. # # This copy of Ice is licensed to you under the terms described in the # ICE_LICENSE file included in this distribution. @@ -24,45 +24,51 @@ TestUtil.processCmdLine() name = os.path.join("IceStorm", "federation2") testdir = os.path.dirname(os.path.abspath(__file__)) -exedir = testdir +import IceStormUtil -iceBox = TestUtil.getIceBox(exedir) -iceBoxAdmin = os.path.join(TestUtil.getBinDir(__file__), "iceboxadmin") -iceStormAdmin = os.path.join(TestUtil.getBinDir(__file__), "icestormadmin") +iceBox = TestUtil.getIceBox(testdir) +iceBoxAdmin = os.path.join(TestUtil.getBinDir(testdir), "iceboxadmin") +iceStormAdmin = os.path.join(TestUtil.getBinDir(testdir), "icestormadmin") -iceBoxEndpoints = ' --IceBox.ServiceManager.Endpoints="default -p 12010" --Ice.Default.Locator=' - -iceStormService = " --IceBox.Service.IceStorm=IceStormService," + TestUtil.getIceSoVersion() + ":createIceStorm" + \ - ' --IceStorm.TopicManager.Endpoints="default -p 12011"' + \ - ' --IceStorm.Publish.Endpoints="default -p 12012"' + \ - ' --IceStorm.InstanceName=TestIceStorm1 ' + \ - ' --IceStorm.Discard.Interval=2' + \ - ' --IceBox.PrintServicesReady=IceStorm' + \ - " --IceBox.InheritProperties=1" -iceStormReference = ' --IceStorm.TopicManager.Proxy="TestIceStorm1/TopicManager: default -p 12011"' - -iceBoxEndpoints2 = ' --IceBox.ServiceManager.Endpoints="default -p 12020" --Ice.Default.Locator=' +publisher = os.path.join(testdir, "publisher") +subscriber = os.path.join(testdir, "subscriber") + +def admin(ref, command): + pipe = TestUtil.startClient(iceStormAdmin, ref + r' -e "%s"' % command) + all = "" + while True: + line = pipe.readline(); + if not line: + break + all = all + line + status = TestUtil.closePipe(pipe) + if status: + TestUtil.killServers() + sys.exit(1) + return all -iceStormService2 = " --IceBox.Service.IceStorm=IceStormService," + TestUtil.getIceSoVersion() + ":createIceStorm" + \ - ' --IceStorm.TopicManager.Endpoints="default -p 12021"' + \ - ' --IceStorm.Publish.Endpoints="default -p 12022"' + \ - ' --IceStorm.InstanceName=TestIceStorm2 ' + \ - ' --IceStorm.Discard.Interval=2' + \ - ' --IceBox.PrintServicesReady=IceStorm' + \ - " --IceBox.InheritProperties=1" -iceStormReference2 = ' --IceStorm.TopicManager.Proxy="TestIceStorm2/TopicManager: default -p 12021"' +def printOutput(pipe): + try: + while True: + line = pipe.readline() + if not line: + break + print line, + sys.stdout.flush() + except IOError: + pass -adminIceStormReference = ' --IceStormAdmin.TopicManager.Proxy="TestIceStorm1/TopicManager: default -p 12011" ' + \ - '--IceStormAdmin.TopicManager.Proxy2="TestIceStorm2/TopicManager: default -p 12021"' +def runPublisher(icestorm1, opt = ""): + publisherPipe = TestUtil.startClient(publisher, opt + icestorm1.reference()) -def doTest(batch, subscriberRef = None): - global testdir - global iceStormReference - global iceStormReference2 + printOutput(publisherPipe) - publisher = os.path.join(testdir, "publisher") - subscriber = os.path.join(testdir, "subscriber") + publisherStatus = TestUtil.closePipe(publisherPipe) + if publisherStatus: + TestUtil.killServers() + sys.exit(1) +def doTest(icestorm1, icestorm2, batch, subscriberRef = None): if batch: name = "batch subscriber" batchOptions = " -b" @@ -71,10 +77,9 @@ def doTest(batch, subscriberRef = None): batchOptions = "" if subscriberRef == None: - subscriberRef = iceStormReference2 + subscriberRef = icestorm2.reference() - - subscriberPipe = TestUtil.startServer(subscriber, batchOptions + subscriberRef + " 2>&1") + subscriberPipe = TestUtil.startServer(subscriber, batchOptions + subscriberRef) TestUtil.getServerPid(subscriberPipe) TestUtil.getAdapterReady(subscriberPipe) @@ -82,56 +87,10 @@ def doTest(batch, subscriberRef = None): # Start the publisher. This should publish events which eventually # causes subscriber to terminate. # - publisherPipe = TestUtil.startClient(publisher, iceStormReference + " 2>&1") - - TestUtil.printOutputFromPipe(publisherPipe) + runPublisher(icestorm1) subscriberStatus = TestUtil.specificServerStatus(subscriberPipe, 30) - publisherStatus = TestUtil.closePipe(publisherPipe) - - return subscriberStatus or publisherStatus - -def startServers(): - global iceBox - global iceBoxEndpoints - global iceBoxEndpoints2 - global iceStormService - global iceStormService2 - global iceStormDBEnv - global iceStormDBEnv2 - iceBoxPipe = TestUtil.startServer(iceBox, iceBoxEndpoints + iceStormService + iceStormDBEnv) - TestUtil.getServerPid(iceBoxPipe) - TestUtil.waitServiceReady(iceBoxPipe, "IceStorm") - iceBoxPipe2 = TestUtil.startServer(iceBox, iceBoxEndpoints2 + iceStormService2 + iceStormDBEnv2) - TestUtil.getServerPid(iceBoxPipe2) - TestUtil.waitServiceReady(iceBoxPipe2, "IceStorm") - - return iceBoxPipe, iceBoxPipe2 - -def stopServers(p1, p2 = None): - global iceBox - global iceBoxAdmin - global iceBoxEndpoints - global iceBoxEndpoints2 - pipe = TestUtil.startClient(iceBoxAdmin, iceBoxEndpoints + r' shutdown' + " 2>&1") - status = TestUtil.closePipe(pipe) - if status or TestUtil.specificServerStatus(p1): - TestUtil.killServers() - sys.exit(1) - if p2: - pipe = TestUtil.startClient(iceBoxAdmin, iceBoxEndpoints2 + r' shutdown' + " 2>&1") - status = TestUtil.closePipe(pipe) - if status or TestUtil.specificServerStatus(p2): - TestUtil.killServers() - sys.exit(1) - -dbHome = os.path.join(testdir, "db") -TestUtil.cleanDbDir(dbHome) -iceStormDBEnv=" --Freeze.DbEnv.IceStorm.DbHome=" + dbHome - -dbHome2 = os.path.join(testdir, "db2") -TestUtil.cleanDbDir(dbHome2) -iceStormDBEnv2=" --Freeze.DbEnv.IceStorm.DbHome=" + dbHome2 + return subscriberStatus # # Test #1: @@ -139,296 +98,269 @@ iceStormDBEnv2=" --Freeze.DbEnv.IceStorm.DbHome=" + dbHome2 # create a cross service link fed1->fed2 and ensure the events are # published between them correctly. # -print "starting IceStorm services...", -sys.stdout.flush() -iceBoxPipe1, iceBoxPipe2 = startServers() -print "ok" - -print "setting up the topics...", -sys.stdout.flush() -command = r' -e "create TestIceStorm1/fed1 TestIceStorm2/fed1; link TestIceStorm1/fed1 TestIceStorm2/fed1"' -iceStormAdminPipe = TestUtil.startClient(iceStormAdmin, adminIceStormReference + command + " 2>&1") -iceStormAdminStatus = TestUtil.closePipe(iceStormAdminPipe) -if iceStormAdminStatus: - TestUtil.killServers() - sys.exit(1) -print "ok" +def runtest(type, **args): + icestorm1 = IceStormUtil.init(toplevel, testdir, type, additional = '--IceStorm.Discard.Interval=2', + dbDir = "db", instanceName = "TestIceStorm1", port = 12000, **args) + icestorm1.start() + icestorm2 = IceStormUtil.init(toplevel, testdir, type, additional = '--IceStorm.Discard.Interval=2', + dbDir = "db2", instanceName = "TestIceStorm2", port = 12500, **args) + icestorm2.start() -# -# Test oneway subscribers. -# -print "testing federation with oneway subscribers...", -sys.stdout.flush() -onewayStatus = doTest(0) -print "ok" - -# -# Test batch oneway subscribers. -# -print "testing federation with batch subscribers...", -sys.stdout.flush() -batchStatus = doTest(1) -print "ok" + adminIceStormReference = ' --IceStormAdmin.TopicManager.Proxy="%s" --IceStormAdmin.TopicManager.Proxy2="%s"' % ( + icestorm1.proxy(), icestorm2.proxy()) -if onewayStatus or batchStatus: - TestUtil.killServers() - sys.exit(1) + print "setting up the topics...", + sys.stdout.flush() + admin(adminIceStormReference, "create TestIceStorm1/fed1 TestIceStorm2/fed1; link TestIceStorm1/fed1 TestIceStorm2/fed1") + print "ok" -# -# Test #2: -# -# Stop and restart the service and repeat the test. This ensures that -# the database is correct. -# -print "restarting services to ensure that the database content is preserved...", -sys.stdout.flush() - -# -# Shutdown icestorm. -# -stopServers(iceBoxPipe1, iceBoxPipe2) - -iceBoxPipe1, iceBoxPipe2 = startServers() -print "ok" - -# -# Test oneway subscribers. -# -print "retesting federation with oneway subscribers... ", -sys.stdout.flush() -onewayStatus = doTest(0) -print "ok" - -# -# Test batch oneway subscribers. -# -print "retesting federation with batch subscribers... ", -sys.stdout.flush() -batchStatus = doTest(1) -print "ok" - -if onewayStatus or batchStatus: - TestUtil.killServers() - sys.exit(1) - -# -# Shutdown icestorm. -# -stopServers(iceBoxPipe1, iceBoxPipe2) + # + # Test oneway subscribers. + # + print "testing federation with oneway subscribers...", + sys.stdout.flush() + onewayStatus = doTest(icestorm1, icestorm2, 0) + print "ok" -# -# This is used by the below test to confirm that the link warning is -# emitted. This class conforms with the TestUtil.ReaderThread protocol. -# -class ExpectorThread(threading.Thread): - def __init__(self, pipe): - self.mutex = threading.Lock() - self.pipe = pipe - # Suppress "adapter ready" messages. Under windows the eol isn't \n. - self.re = [ [ re.compile(" ready\r?\n$"), 0 ] ] - threading.Thread.__init__(self) - - def run(self): - try: - while 1: - line = self.pipe.readline() - if not line: break - found = False; - self.mutex.acquire() - for item in self.re: - if item[0].search(line): - found = True - item[1] = item[1] + 1 - break - self.mutex.release() - if not found: - print line, - except IOError: - pass - - self.status = TestUtil.closePipe(self.pipe) - - # To comply with the ReaderThread protocol. - def getPipe(self): - return self.pipe - - # To comply with the ReaderThread protocol. - def getStatus(self): - return self.status - - def matches(self, index): - self.mutex.acquire() - m = self.re[index][1] - self.mutex.release() - return m - - def expect(self, r): - self.mutex.acquire() - self.re.append([r, 0]) - l = len(self.re)-1 - self.mutex.release() - return l + # + # Test batch oneway subscribers. + # + print "testing federation with batch subscribers...", + sys.stdout.flush() + batchStatus = doTest(icestorm1, icestorm2, 1) + print "ok" -# -# Test #3: -# -# Restart the first server and publish some events. Attach a -# subscriber to the channel and make sure the events are received. -# -# Then re-start the linked downstream server and publish the events. -# Ensure they are received by the linked server. -# -print "restarting only one IceStorm server...", -sys.stdout.flush() -iceBoxPipe1 = TestUtil.startServer(iceBox, iceBoxEndpoints + iceStormService + iceStormDBEnv + " 2>&1") -TestUtil.getServerPid(iceBoxPipe1) -TestUtil.waitServiceReady(iceBoxPipe1, "IceStorm", False) -expectorThread = ExpectorThread(iceBoxPipe1) -expectorThread.start() -global serverThreads -TestUtil.serverThreads.append(expectorThread) -index = expectorThread.expect(re.compile("fed1.link.*link offline")) -expectorThread.expect(re.compile("connection refused")) -print "ok" + if onewayStatus or batchStatus: + TestUtil.killServers() + sys.exit(1) -# -# Test oneway subscribers. -# -print "testing that the federation link reports an error...", -sys.stdout.flush() -onewayStatus = doTest(0, iceStormReference) -print "ok" + # + # Test #2: + # + # Stop and restart the service and repeat the test. This ensures that + # the database is correct. + # + print "restarting services to ensure that the database content is preserved...", + sys.stdout.flush() -# -# Give some time for the error to be reported -# -time.sleep(2) + # + # Shutdown icestorm. + # + icestorm1.stop() + icestorm2.stop() -if onewayStatus or expectorThread.matches(index) != 1: - TestUtil.killServers() - sys.exit(1) + icestorm1.start(echo=False) + icestorm2.start(echo=False) + print "ok" -print "starting downstream icestorm server...", -sys.stdout.flush() -iceBoxPipe2 = TestUtil.startServer(iceBox, iceBoxEndpoints2 + iceStormService2 + iceStormDBEnv2 + " 2>&1") -TestUtil.getServerPid(iceBoxPipe2) -TestUtil.waitServiceReady(iceBoxPipe2, "IceStorm") -print "ok" + # + # Test oneway subscribers. + # + print "retesting federation with oneway subscribers... ", + sys.stdout.flush() + onewayStatus = doTest(icestorm1, icestorm2, 0) + print "ok" -# -# Need to sleep for at least the discard interval. -# -time.sleep(3) + # + # Test batch oneway subscribers. + # + print "retesting federation with batch subscribers... ", + sys.stdout.flush() + batchStatus = doTest(icestorm1, icestorm2, 1) + print "ok" -# -# Test oneway subscribers. -# -print "testing link is reestablished...", -sys.stdout.flush() -onewayStatus = doTest(0) -print "ok" + if onewayStatus or batchStatus: + TestUtil.killServers() + sys.exit(1) -if onewayStatus or expectorThread.matches(index) != 1: - TestUtil.killServers() - sys.exit(1) + # + # Shutdown icestorm. + # + icestorm1.stop() + icestorm2.stop() -# -# Test #4: -# -# Trash the TestIceStorm2 database. Then restart the servers and -# verify that the link is removed. -# -print "destroying the downstream IceStorm service database...", -sys.stdout.flush() -stopServers(iceBoxPipe1, iceBoxPipe2) - -TestUtil.cleanDbDir(dbHome2) -print "ok" - -print "restarting IceStorm servers...", -sys.stdout.flush() -iceBoxPipe1, iceBoxPipe2 = startServers() -print "ok" - -print "checking link still exists...", -iceStormAdminPipe = TestUtil.startClient(iceStormAdmin, adminIceStormReference + r' -e "links TestIceStorm1"') -line = iceStormAdminPipe.readline() -if not re.compile("fed1 with cost 0").search(line): - print line - TestUtil.killServers() - sys.exit(1) -iceStormAdminStatus = TestUtil.closePipe(iceStormAdminPipe) -if iceStormAdminStatus: - TestUtil.killServers() - sys.exit(1) -print "ok" - -print "publishing some events...", -sys.stdout.flush() -publisher = os.path.join(testdir, "publisher") -publisherPipe = TestUtil.startClient(publisher, iceStormReference + " 2>&1") + # + # This is used by the below test to confirm that the link warning is + # emitted. This class conforms with the TestUtil.ReaderThread protocol. + # + class ExpectorThread(threading.Thread): + def __init__(self, pipe): + self.mutex = threading.Lock() + self.pipe = pipe + # Suppress "adapter ready" messages. Under windows the eol isn't \n. + self.re = [ [ re.compile(" ready\r?\n$"), 0 ] ] + threading.Thread.__init__(self) + + def run(self): + try: + while 1: + line = self.pipe.readline() + if not line: break + found = False + self.mutex.acquire() + for item in self.re: + if item[0].search(line): + found = True + item[1] = item[1] + 1 + break + self.mutex.release() + if not found: + print line, + except IOError: + pass + + self.status = TestUtil.closePipe(self.pipe) + + # To comply with the ReaderThread protocol. + def getPipe(self): + return self.pipe + + # To comply with the ReaderThread protocol. + def getStatus(self): + return self.status + + def matches(self, index): + self.mutex.acquire() + m = self.re[index][1] + self.mutex.release() + return m + + def expect(self, r): + self.mutex.acquire() + self.re.append([r, 0]) + l = len(self.re)-1 + self.mutex.release() + return l -TestUtil.printOutputFromPipe(publisherPipe) + # + # Test #3: + # + # Restart the first server and publish some events. Attach a + # subscriber to the channel and make sure the events are received. + # + # Then re-start the linked downstream server and publish the events. + # Ensure they are received by the linked server. + # + if type != "replicated": + print "restarting only one IceStorm server...", + sys.stdout.flush() + pipe = icestorm1.start(echo=False, createThread = False) + expectorThread = ExpectorThread(pipe) + expectorThread.start() + global serverThreads + + TestUtil.serverThreads.append(expectorThread) + index = expectorThread.expect(re.compile("topic.fed1.*subscriber offline")) + expectorThread.expect(re.compile("connection refused")) + print "ok" + + # + # Test oneway subscribers. + # + print "testing that the federation link reports an error...", + sys.stdout.flush() + onewayStatus = doTest(icestorm1, icestorm2, 0, icestorm1.reference()) + + # Give some time for the output to be sent. + time.sleep(2) + + if onewayStatus or expectorThread.matches(index) != 1: + TestUtil.killServers() + sys.exit(1) + print "ok" + + print "starting downstream icestorm server...", + sys.stdout.flush() + icestorm2.start(echo=False) + print "ok" + + # + # Need to sleep for at least the discard interval. + # + time.sleep(3) + + # + # Test oneway subscribers. + # + print "testing link is reestablished...", + sys.stdout.flush() + onewayStatus = doTest(icestorm1, icestorm2, 0) + print "ok" + + if onewayStatus or expectorThread.matches(index) != 1: + TestUtil.killServers() + sys.exit(1) -publisherStatus = TestUtil.closePipe(publisherPipe) -if publisherStatus: - TestUtil.killServers() - sys.exit(1) + icestorm1.stop() + icestorm2.stop() -# The publisher must be run twice because all the events can be sent -# out in one batch to the linked subscriber which means that the link -# is not reaped until the next batch is sent. -time.sleep(1) -publisher = os.path.join(testdir, "publisher") -publisherPipe = TestUtil.startClient(publisher, iceStormReference + iceStormReference + " 2>&1") -print "ok" - -TestUtil.printOutputFromPipe(publisherPipe) - -publisherStatus = TestUtil.closePipe(publisherPipe) -if publisherStatus: - TestUtil.killServers() - sys.exit(1) - -# Verify that the link has disappeared. -print "verifying that the link has been destroyed...", -sys.stdout.flush() -iceStormAdminPipe = TestUtil.startClient(iceStormAdmin, adminIceStormReference + r' -e "links TestIceStorm1"' + " 2>&1") -line = iceStormAdminPipe.readline() -try: - if line and len(line) > 0: + # + # Test #4: + # + # Trash the TestIceStorm2 database. Then restart the servers and + # verify that the link is removed. + # + print "destroying the downstream IceStorm service database...", + sys.stdout.flush() + icestorm2.clean() + + print "ok" + + print "restarting IceStorm servers...", + sys.stdout.flush() + icestorm1.start(echo = False) + icestorm2.start(echo = False) + print "ok" + + print "checking link still exists...", + line = admin(adminIceStormReference, "links TestIceStorm1") + if not re.compile("fed1 with cost 0").search(line): + print line + TestUtil.killServers() + sys.exit(1) + print "ok" + + print "publishing some events...", + sys.stdout.flush() + # The publisher must be run twice because all the events can be + # sent out in one batch to the linked subscriber which means that + # the link is not reaped until the next batch is + # sent. Furthermore, with a replicated IceStorm both sets of + # events must be set to the same replica. + runPublisher(icestorm1, opt = " --count 2") + print "ok" + + # Give the unsubscription time to propagate. + time.sleep(1) + + # Verify that the link has disappeared. + print "verifying that the link has been destroyed...", + sys.stdout.flush() + line = admin(adminIceStormReference, "links TestIceStorm1") + if len(line) > 0: + print line TestUtil.killServers() sys.exit(1) -except IOError: - pass -iceStormAdminStatus = TestUtil.closePipe(iceStormAdminPipe) -if iceStormAdminStatus: - print iceStormAdminStatus - TestUtil.killServers() - sys.exit(1) -print "ok" + print "ok" -# -# Destroy the remaining topic. -# -print "destroying topics...", -command = r' -e "destroy TestIceStorm1/fed1"' + " 2>&1" -iceStormAdminPipe = TestUtil.startClient(iceStormAdmin, adminIceStormReference + command + " 2>&1") -iceStormAdminStatus = TestUtil.closePipe(iceStormAdminPipe) -if iceStormAdminStatus: - TestUtil.killServers() - sys.exit(1) -print "ok" + # + # Destroy the remaining topic. + # + print "destroying topics...", + admin(adminIceStormReference, "destroy TestIceStorm1/fed1") + print "ok" -# -# Shutdown icestorm. -# -print "shutting down icestorm services...", -sys.stdout.flush() -stopServers(iceBoxPipe1, iceBoxPipe2) -if TestUtil.serverStatus(): - TestUtil.killServers() - sys.exit(1) -print "ok" + # + # Shutdown icestorm. + # + print "shutting down icestorm services...", + sys.stdout.flush() + icestorm1.stop() + icestorm2.stop() + print "ok" + +runtest("persistent") +runtest("replicated", replicatedPublisher = False) +runtest("replicated", replicatedPublisher = True) sys.exit(0) |