summaryrefslogtreecommitdiff
path: root/cpp/test/IceStorm/federation2/test.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/test/IceStorm/federation2/test.py')
-rw-r--r--cpp/test/IceStorm/federation2/test.py196
1 files changed, 196 insertions, 0 deletions
diff --git a/cpp/test/IceStorm/federation2/test.py b/cpp/test/IceStorm/federation2/test.py
new file mode 100644
index 00000000000..5ac2f315555
--- /dev/null
+++ b/cpp/test/IceStorm/federation2/test.py
@@ -0,0 +1,196 @@
+# -*- coding: utf-8 -*-
+# **********************************************************************
+#
+# Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved.
+#
+# This copy of Ice is licensed to you under the terms described in the
+# ICE_LICENSE file included in this distribution.
+#
+# **********************************************************************
+
+#
+# Publisher/subscriber test cases, publisher publishes on TestIceStorm1 instance(s) and
+# the subscriber subscribes to the TestIceStorm2 instance(s)
+#
+pub1Sub2Oneway=ClientServerTestCase(client=Publisher("TestIceStorm1"), server=Subscriber("TestIceStorm2"))
+pub1Sub2Batch=ClientServerTestCase(client=Publisher("TestIceStorm1"), server=Subscriber("TestIceStorm2", args=["-b"]))
+
+pub1Sub1Oneway=ClientServerTestCase(client=Publisher("TestIceStorm1"), server=Subscriber("TestIceStorm1"))
+
+class IceStormFederation2TestCase(IceStormTestCase):
+
+ def runClientSide(self, current):
+
+ current.write("setting up the topics... ")
+ self.runadmin(current, "create TestIceStorm1/fed1 TestIceStorm2/fed1; link TestIceStorm1/fed1 TestIceStorm2/fed1")
+ current.writeln("ok")
+
+ #
+ # Test oneway subscribers.
+ #
+ current.write("testing federation with oneway subscribers... ")
+ pub1Sub2Oneway.run(current)
+ current.writeln("ok")
+
+ #
+ # Test batch oneway subscribers.
+ #
+ current.write("testing federation with batch subscribers... ")
+ pub1Sub2Batch.run(current)
+ current.writeln("ok")
+
+ #
+ # Test #2:
+ #
+ # Stop and restart the service and repeat the test. This ensures that
+ # the database is correct.
+ #
+ current.write("restarting services to ensure that the database content is preserved... ")
+ self.restartIceStorm(current)
+ current.writeln("ok")
+
+ #
+ # Test oneway subscribers.
+ #
+ current.write("retesting federation with oneway subscribers... ")
+ pub1Sub2Oneway.run(current)
+ current.writeln("ok")
+
+ #
+ # Test batch oneway subscribers.
+ #
+ current.write("retesting federation with batch subscribers... ")
+ pub1Sub2Batch.run(current)
+ current.writeln("ok")
+
+ #
+ # Shutdown icestorm.
+ #
+ self.stopIceStorm(current)
+
+ icestorm1 = [icestorm for icestorm in self.icestorm if icestorm.getInstanceName() == "TestIceStorm1"]
+ icestorm2 = [icestorm for icestorm in self.icestorm if icestorm.getInstanceName() == "TestIceStorm2"]
+
+ #
+ # Restart the first server and publish some events. Attach a
+ # subscriber to the channel and make sure the events are received.
+ #
+ # Then re-start the linked downstream server and publish the events.
+ # Ensure they are received by the linked server.
+ #
+ if self.getName().find("replicated") == -1:
+
+ current.write("restarting only one IceStorm server... ")
+ icestorm1[0].start(current)
+ current.writeln("ok")
+
+ #
+ # Test oneway subscribers.
+ #
+ current.write("testing that the federation link reports an error... ")
+ pub1Sub1Oneway.run(current)
+
+ # Give some time for the output to be sent.
+ time.sleep(2)
+
+ icestorm1[0].expect("topic.fed1.*subscriber offline")
+ current.writeln("ok")
+
+ current.write("starting downstream icestorm server... ")
+ icestorm2[0].start(current)
+ current.writeln("ok")
+
+ #
+ # Need to sleep for at least the discard interval.
+ #
+ time.sleep(3)
+
+ #
+ # Test oneway subscribers.
+ #
+ current.write("testing link is reestablished... ")
+ pub1Sub2Oneway.run(current)
+ current.writeln("ok")
+
+ try:
+ icestorm1[0].expect("topic.fed1.*subscriber offline", timeout=1)
+ assert False
+ except Expect.TIMEOUT:
+ pass
+
+ self.stopIceStorm(current)
+
+ #
+ # Test #4:
+ #
+ # Trash the TestIceStorm2 database. Then restart the servers and
+ # verify that the link is removed.
+ #
+ current.write("destroying the downstream IceStorm service database... ")
+ for s in icestorm2:
+ s.teardown(current, True)
+ s.setup(current)
+
+ current.writeln("ok")
+
+ current.write("restarting IceStorm servers... ")
+ self.startIceStorm(current)
+ current.writeln("ok")
+
+ current.write("checking link still exists... ")
+ line = self.runadmin(current, "links TestIceStorm1", quiet=True)
+ if not re.compile("fed1 with cost 0").search(line):
+ raise RuntimeError("unexpected output (`{0}')".format(line))
+ current.writeln("ok")
+
+ current.write("publishing some events... ")
+ # The publisher must be run twice because all the events can be
+ # sent out in one batch to the linked subscriber which means that
+ # the link is not reaped until the next batch is
+ # sent. Furthermore, with a replicated IceStorm both sets of
+ # events must be set to the same replica.
+ Publisher("TestIceStorm1", args=["--count", 2]).run(current)
+ current.writeln("ok")
+
+ # Give the unsubscription time to propagate.
+ time.sleep(1)
+
+ # Verify that the link has disappeared.
+ current.write("verifying that the link has been destroyed... ")
+ line = self.runadmin(current, "links TestIceStorm1")
+ nRetry = 5
+ while len(line) > 0 and nRetry > 0:
+ line = self.runadmin(current, "links TestIceStorm1")
+ time.sleep(1) # Give more time for unsubscription to propagate.
+ nRetry -= 1
+ if len(line) > 0:
+ raise RuntimeError("unexpected output (`{0}')".format(line))
+ current.writeln("ok")
+
+ #
+ # Destroy the remaining topic.
+ #
+ current.write("destroying topics... ")
+ self.runadmin(current, "destroy TestIceStorm1/fed1links TestIceStorm1", quiet=True)
+ current.writeln("ok")
+
+ self.stopIceStorm(current)
+
+# Override ReplicatedPublishEndpoints property to empty for testing without replicated publisher
+props = { 'IceStorm.Discard.Interval' : 2 }
+nonRepProps = { 'IceStorm.Discard.Interval' : 2, 'IceStorm.ReplicatedPublishEndpoints' : '' }
+
+TestSuite(__file__, [
+
+ IceStormFederation2TestCase("persistent", icestorm=
+ [IceStorm("TestIceStorm1", quiet=True, props=props),
+ IceStorm("TestIceStorm2", quiet=True,portnum=20, props=props)]),
+
+ IceStormFederation2TestCase("replicated with non-replicated publisher", icestorm=
+ [IceStorm("TestIceStorm1", i, 3, quiet=True, props=nonRepProps) for i in range(0,3)] +
+ [IceStorm("TestIceStorm2", i, 3, quiet=True, portnum=20, props=nonRepProps) for i in range(0,3)]),
+
+ IceStormFederation2TestCase("replicated with replicated publisher", icestorm=
+ [IceStorm("TestIceStorm1", i, 3, quiet=True, props=props) for i in range(0,3)] +
+ [IceStorm("TestIceStorm2", i, 3, quiet=True, portnum=20, props=props) for i in range(0,3)]),
+], multihost=False)