diff options
Diffstat (limited to 'cpp/test/IceStorm/federation2/test.py')
-rw-r--r-- | cpp/test/IceStorm/federation2/test.py | 196 |
1 files changed, 196 insertions, 0 deletions
diff --git a/cpp/test/IceStorm/federation2/test.py b/cpp/test/IceStorm/federation2/test.py new file mode 100644 index 00000000000..5ac2f315555 --- /dev/null +++ b/cpp/test/IceStorm/federation2/test.py @@ -0,0 +1,196 @@ +# -*- coding: utf-8 -*- +# ********************************************************************** +# +# Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved. +# +# This copy of Ice is licensed to you under the terms described in the +# ICE_LICENSE file included in this distribution. +# +# ********************************************************************** + +# +# Publisher/subscriber test cases, publisher publishes on TestIceStorm1 instance(s) and +# the subscriber subscribes to the TestIceStorm2 instance(s) +# +pub1Sub2Oneway=ClientServerTestCase(client=Publisher("TestIceStorm1"), server=Subscriber("TestIceStorm2")) +pub1Sub2Batch=ClientServerTestCase(client=Publisher("TestIceStorm1"), server=Subscriber("TestIceStorm2", args=["-b"])) + +pub1Sub1Oneway=ClientServerTestCase(client=Publisher("TestIceStorm1"), server=Subscriber("TestIceStorm1")) + +class IceStormFederation2TestCase(IceStormTestCase): + + def runClientSide(self, current): + + current.write("setting up the topics... ") + self.runadmin(current, "create TestIceStorm1/fed1 TestIceStorm2/fed1; link TestIceStorm1/fed1 TestIceStorm2/fed1") + current.writeln("ok") + + # + # Test oneway subscribers. + # + current.write("testing federation with oneway subscribers... ") + pub1Sub2Oneway.run(current) + current.writeln("ok") + + # + # Test batch oneway subscribers. + # + current.write("testing federation with batch subscribers... ") + pub1Sub2Batch.run(current) + current.writeln("ok") + + # + # Test #2: + # + # Stop and restart the service and repeat the test. This ensures that + # the database is correct. + # + current.write("restarting services to ensure that the database content is preserved... ") + self.restartIceStorm(current) + current.writeln("ok") + + # + # Test oneway subscribers. + # + current.write("retesting federation with oneway subscribers... ") + pub1Sub2Oneway.run(current) + current.writeln("ok") + + # + # Test batch oneway subscribers. + # + current.write("retesting federation with batch subscribers... ") + pub1Sub2Batch.run(current) + current.writeln("ok") + + # + # Shutdown icestorm. + # + self.stopIceStorm(current) + + icestorm1 = [icestorm for icestorm in self.icestorm if icestorm.getInstanceName() == "TestIceStorm1"] + icestorm2 = [icestorm for icestorm in self.icestorm if icestorm.getInstanceName() == "TestIceStorm2"] + + # + # Restart the first server and publish some events. Attach a + # subscriber to the channel and make sure the events are received. + # + # Then re-start the linked downstream server and publish the events. + # Ensure they are received by the linked server. + # + if self.getName().find("replicated") == -1: + + current.write("restarting only one IceStorm server... ") + icestorm1[0].start(current) + current.writeln("ok") + + # + # Test oneway subscribers. + # + current.write("testing that the federation link reports an error... ") + pub1Sub1Oneway.run(current) + + # Give some time for the output to be sent. + time.sleep(2) + + icestorm1[0].expect("topic.fed1.*subscriber offline") + current.writeln("ok") + + current.write("starting downstream icestorm server... ") + icestorm2[0].start(current) + current.writeln("ok") + + # + # Need to sleep for at least the discard interval. + # + time.sleep(3) + + # + # Test oneway subscribers. + # + current.write("testing link is reestablished... ") + pub1Sub2Oneway.run(current) + current.writeln("ok") + + try: + icestorm1[0].expect("topic.fed1.*subscriber offline", timeout=1) + assert False + except Expect.TIMEOUT: + pass + + self.stopIceStorm(current) + + # + # Test #4: + # + # Trash the TestIceStorm2 database. Then restart the servers and + # verify that the link is removed. + # + current.write("destroying the downstream IceStorm service database... ") + for s in icestorm2: + s.teardown(current, True) + s.setup(current) + + current.writeln("ok") + + current.write("restarting IceStorm servers... ") + self.startIceStorm(current) + current.writeln("ok") + + current.write("checking link still exists... ") + line = self.runadmin(current, "links TestIceStorm1", quiet=True) + if not re.compile("fed1 with cost 0").search(line): + raise RuntimeError("unexpected output (`{0}')".format(line)) + current.writeln("ok") + + current.write("publishing some events... ") + # The publisher must be run twice because all the events can be + # sent out in one batch to the linked subscriber which means that + # the link is not reaped until the next batch is + # sent. Furthermore, with a replicated IceStorm both sets of + # events must be set to the same replica. + Publisher("TestIceStorm1", args=["--count", 2]).run(current) + current.writeln("ok") + + # Give the unsubscription time to propagate. + time.sleep(1) + + # Verify that the link has disappeared. + current.write("verifying that the link has been destroyed... ") + line = self.runadmin(current, "links TestIceStorm1") + nRetry = 5 + while len(line) > 0 and nRetry > 0: + line = self.runadmin(current, "links TestIceStorm1") + time.sleep(1) # Give more time for unsubscription to propagate. + nRetry -= 1 + if len(line) > 0: + raise RuntimeError("unexpected output (`{0}')".format(line)) + current.writeln("ok") + + # + # Destroy the remaining topic. + # + current.write("destroying topics... ") + self.runadmin(current, "destroy TestIceStorm1/fed1links TestIceStorm1", quiet=True) + current.writeln("ok") + + self.stopIceStorm(current) + +# Override ReplicatedPublishEndpoints property to empty for testing without replicated publisher +props = { 'IceStorm.Discard.Interval' : 2 } +nonRepProps = { 'IceStorm.Discard.Interval' : 2, 'IceStorm.ReplicatedPublishEndpoints' : '' } + +TestSuite(__file__, [ + + IceStormFederation2TestCase("persistent", icestorm= + [IceStorm("TestIceStorm1", quiet=True, props=props), + IceStorm("TestIceStorm2", quiet=True,portnum=20, props=props)]), + + IceStormFederation2TestCase("replicated with non-replicated publisher", icestorm= + [IceStorm("TestIceStorm1", i, 3, quiet=True, props=nonRepProps) for i in range(0,3)] + + [IceStorm("TestIceStorm2", i, 3, quiet=True, portnum=20, props=nonRepProps) for i in range(0,3)]), + + IceStormFederation2TestCase("replicated with replicated publisher", icestorm= + [IceStorm("TestIceStorm1", i, 3, quiet=True, props=props) for i in range(0,3)] + + [IceStorm("TestIceStorm2", i, 3, quiet=True, portnum=20, props=props) for i in range(0,3)]), +], multihost=False) |