1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
#
# Copyright (c) ZeroC, Inc. All rights reserved.
#
import Ice, Test, Twoways, TwowaysFuture, TwowaysAMI, Oneways, OnewaysFuture, OnewaysAMI, BatchOneways, sys
import BatchOnewaysAMI, BatchOnewaysFuture
def test(b):
if not b:
raise RuntimeError('test assertion failed')
def allTests(helper, communicator):
ref = "test:{0}".format(helper.getTestEndpoint())
base = communicator.stringToProxy(ref)
cl = Test.MyClassPrx.checkedCast(base)
derived = Test.MyDerivedClassPrx.checkedCast(cl)
sys.stdout.write("testing twoway operations... ")
sys.stdout.flush()
Twoways.twoways(helper, cl)
Twoways.twoways(helper, derived)
derived.opDerived()
print("ok")
sys.stdout.write("testing oneway operations... ")
sys.stdout.flush()
Oneways.oneways(helper, cl)
print("ok")
sys.stdout.write("testing twoway operations with futures... ")
sys.stdout.flush()
TwowaysFuture.twowaysFuture(helper, cl)
print("ok")
sys.stdout.write("testing twoway operations with AMI... ")
sys.stdout.flush()
TwowaysAMI.twowaysAMI(helper, cl)
print("ok")
sys.stdout.write("testing oneway operations with futures... ")
sys.stdout.flush()
OnewaysFuture.onewaysFuture(helper, cl)
print("ok")
sys.stdout.write("testing oneway operations with AMI... ")
sys.stdout.flush()
OnewaysAMI.onewaysAMI(helper, cl)
print("ok")
sys.stdout.write("testing batch oneway operations... ")
sys.stdout.flush()
BatchOneways.batchOneways(cl)
BatchOneways.batchOneways(derived)
print("ok")
sys.stdout.write("testing batch oneway operations with futures... ")
sys.stdout.flush()
BatchOnewaysFuture.batchOneways(cl)
BatchOnewaysFuture.batchOneways(derived)
print("ok")
sys.stdout.write("testing batch oneway operations with AMI... ")
sys.stdout.flush()
BatchOnewaysAMI.batchOneways(cl)
BatchOnewaysAMI.batchOneways(derived)
print("ok")
sys.stdout.write("testing server shutdown... ")
sys.stdout.flush()
cl.shutdown()
try:
cl.ice_timeout(100).ice_ping() # Use timeout to speed up testing on Windows
test(False)
except Ice.LocalException:
print("ok")
|