1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
#!/usr/bin/env python
# **********************************************************************
#
# Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved.
#
# This copy of Ice is licensed to you under the terms described in the
# ICE_LICENSE file included in this distribution.
#
# **********************************************************************
import os, sys, traceback
for toplevel in [".", "..", "../..", "../../..", "../../../.."]:
toplevel = os.path.normpath(toplevel)
if os.path.exists(os.path.join(toplevel, "python", "Ice.py")):
break
else:
raise RuntimeError("can't find toplevel directory!")
import Ice
Ice.loadSlice('Key.ice')
import _and
class delI(_and._del):
def _elif_async(self, _cb, _else, current=None):
pass
class execI(_and._exec):
def _finally(self, current=None):
assert current.operation == "finally"
class forI(_and._for):
def foo(self, _from, current=None):
pass
class ifI(_and._if):
def _elif_async(self, _cb, _else, current=None):
pass
def _finally(self, current=None):
pass
def foo(self, _from, current=None):
pass
class printI(_and._print):
def _raise(self, _else, _return, _try, _while, _yield, _lambda, _or, _global):
pass
def testtypes():
sys.stdout.write("Testing generated type names... ")
sys.stdout.flush()
a = _and._assert._break
b = _and._continue
b._def = 0
c = _and.delPrx.uncheckedCast(None)
assert "_elif" in dir(_and.delPrx)
c1 = delI()
d = _and.execPrx.uncheckedCast(None)
assert "_finally" in dir(_and.execPrx)
d1 = execI()
e1 = forI()
f = _and.ifPrx.uncheckedCast(None)
assert "_finally" in dir(_and.ifPrx)
assert "_elif" in dir(_and.ifPrx)
f1 = ifI()
g = _and._is()
g._lamba = 0
h = _and._not()
h._lamba = 0
h._or = 1
h._pass = 2
i = printI()
j = _and._lambda;
en = _and.EnumNone._None
print("ok")
def run(args, communicator):
communicator.getProperties().setProperty("TestAdapter.Endpoints", "default -p 12010:udp")
adapter = communicator.createObjectAdapter("TestAdapter")
adapter.add(execI(), Ice.stringToIdentity("test"))
adapter.activate()
sys.stdout.write("Testing operation name... ")
sys.stdout.flush()
p = _and.execPrx.uncheckedCast(
adapter.createProxy(Ice.stringToIdentity("test")));
p._finally();
print("ok")
testtypes()
return True
try:
initData = Ice.InitializationData()
initData.properties = Ice.createProperties(sys.argv)
#
# Its possible to have batch oneway requests dispatched after the
# adapter is deactivated due to thread scheduling so we supress
# this warning.
#
initData.properties.setProperty("Ice.Warn.Dispatch", "0");
communicator = Ice.initialize(sys.argv, initData)
status = run(sys.argv, communicator)
except:
traceback.print_exc()
status = False
if communicator:
try:
communicator.destroy()
except:
traceback.print_exc()
status = False
sys.exit(not status)
|