#!/usr/bin/env python # ********************************************************************** # # Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. # # This copy of Ice is licensed to you under the terms described in the # ICE_LICENSE file included in this distribution. # # ********************************************************************** import os, sys import time path = [ ".", "..", "../..", "../../..", "../../../.." ] head = os.path.dirname(sys.argv[0]) if len(head) > 0: path = [os.path.join(head, p) for p in path] path = [os.path.abspath(p) for p in path if os.path.exists(os.path.join(p, "scripts", "TestUtil.py")) ] if len(path) == 0: raise "can't find toplevel directory!" sys.path.append(os.path.join(path[0])) from scripts import * def dotest(type): icestorm = IceStormUtil.init(TestUtil.toplevel, os.getcwd(), type) icestorm.start() print "creating topic...", sys.stdout.flush() icestorm.admin("create single") print "ok" publisher = os.path.join(os.getcwd(), "publisher") subscriber = os.path.join(os.getcwd(), "subscriber") print "starting subscriber...", sys.stdout.flush() subscriberProc = TestUtil.startServer(subscriber, icestorm.reference(), count = 5) print "ok" # # Start the publisher. This should publish 10 events which eventually # causes subscriber to terminate. # print "starting publisher...", sys.stdout.flush() publisherProc = TestUtil.startClient(publisher, icestorm.reference()) print "ok" subscriberProc.waitTestSuccess() publisherProc.waitTestSuccess() # # Destroy the topic. # print "destroy topic...", sys.stdout.flush() icestorm.admin("destroy single") print "ok" # # Shutdown icestorm. # icestorm.stop() dotest("persistent") dotest("transient") dotest("replicated") sys.exit(0)