1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
#
# Copyright (c) ZeroC, Inc. All rights reserved.
#
import Ice, Test, Dispatcher, sys, threading, random
def test(b):
if not b:
raise RuntimeError('test assertion failed')
class Callback:
def __init__(self):
self._called = False
self._cond = threading.Condition()
self._mainThread = threading.current_thread()
def check(self):
with self._cond:
while not self._called:
self._cond.wait()
self._called = False
def called(self):
with self._cond:
self._called = True
self._cond.notify()
def response(self, f):
test(f.exception() is None)
test(Dispatcher.Dispatcher.isDispatcherThread())
self.called()
def exception(self, f):
test(isinstance(f.exception(), Ice.NoEndpointException))
test(Dispatcher.Dispatcher.isDispatcherThread())
self.called()
def exceptionEx(self, f):
test(isinstance(f.exception(), Ice.InvocationTimeoutException))
test(Dispatcher.Dispatcher.isDispatcherThread())
self.called()
def payload(self, f):
if f.exception():
test(isinstance(f.exception(), Ice.CommunicatorDestroyedException))
else:
test(Dispatcher.Dispatcher.isDispatcherThread())
def allTests(helper, communicator):
sref = "test:{0}".format(helper.getTestEndpoint())
obj = communicator.stringToProxy(sref)
test(obj)
p = Test.TestIntfPrx.uncheckedCast(obj)
sref = "testController:{0}".format(helper.getTestEndpoint(num=1))
obj = communicator.stringToProxy(sref)
test(obj)
testController = Test.TestIntfControllerPrx.uncheckedCast(obj)
sys.stdout.write("testing dispatcher... ")
sys.stdout.flush()
p.op()
cb = Callback()
p.opAsync().add_done_callback_async(cb.response)
cb.check()
#
# Expect NoEndpointException.
#
i = p.ice_adapterId("dummy")
i.opAsync().add_done_callback_async(cb.exception)
cb.check()
#
# Expect InvocationTimeoutException.
#
to = p.ice_invocationTimeout(10);
to.sleepAsync(500).add_done_callback_async(cb.exceptionEx)
cb.check()
#
# Hold adapter to make sure invocations don't _complete_ synchronously
#
testController.holdAdapter()
if sys.version_info[0] == 2:
b = [chr(random.randint(0, 255)) for x in range(0, 1024)]
seq = ''.join(b)
else:
b = [random.randint(0, 255) for x in range(0, 1024)]
seq = bytes(b)
f = None
while True:
f = p.opWithPayloadAsync(seq)
f.add_done_callback(cb.payload)
if not f.is_sent_synchronously():
break
testController.resumeAdapter()
f.result()
print("ok")
p.shutdown()
|