summaryrefslogtreecommitdiff
path: root/python/test/Slice/escape/Client.py
blob: 882e9739329ab28cae3670ae2760ad6acf837232 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env python
# **********************************************************************
#
# Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved.
#
# This copy of Ice is licensed to you under the terms described in the
# ICE_LICENSE file included in this distribution.
#
# **********************************************************************

import os, sys, traceback

for toplevel in [".", "..", "../..", "../../..", "../../../.."]:
    toplevel = os.path.normpath(toplevel)
    if os.path.exists(os.path.join(toplevel, "python", "Ice.py")):
        break
else:
    raise RuntimeError("can't find toplevel directory!")

import Ice

Ice.loadSlice('Key.ice')
Ice.loadSlice('Clash.ice')

import _and

class delI(_and._delDisp):
    def _elifAsync(self, _else, current=None):
        pass

class execI(_and._execDisp):
    def _finally(self, current=None):
        assert current.operation == "finally"

class forI(_and._forDisp):
    def foo(self, _from, current=None):
        pass

class ifI(_and._ifDisp):
    def _elifAsync(self, _else, current=None):
        pass
    def _finally(self, current=None):
        pass
    def foo(self, _from, current=None):
        pass

class printI(_and._print):
    def _raise(self, _else, _return, _try, _while, _yield, _lambda, _or, _global):
        pass

def testtypes():
    sys.stdout.write("Testing generated type names... ")
    sys.stdout.flush()
    a = _and._assert._break
    b = _and._continue
    b._def = 0
    c = _and.delPrx.uncheckedCast(None)
    assert "_elif" in dir(_and.delPrx)
    c1 = delI()
    d = _and.execPrx.uncheckedCast(None)
    assert "_finally" in dir(_and.execPrx)
    d1 = execI()

    e1 = forI()
    f = _and.ifPrx.uncheckedCast(None)

    assert "_finally" in dir(_and.ifPrx)
    assert "_elif" in dir(_and.ifPrx)
    f1 = ifI()
    g =  _and._is()
    g._lamba = 0
    h = _and._not()
    h._lamba = 0
    h._or = 1
    h._pass = 2
    i = printI()
    j = _and._lambda;
    en = _and.EnumNone._None
    print("ok")

def run(args, communicator):
    communicator.getProperties().setProperty("TestAdapter.Endpoints", "default -p 12010:udp")
    adapter = communicator.createObjectAdapter("TestAdapter")
    adapter.add(execI(), Ice.stringToIdentity("test"))
    adapter.activate()

    sys.stdout.write("Testing operation name... ")
    sys.stdout.flush()
    p = _and.execPrx.uncheckedCast(
        adapter.createProxy(Ice.stringToIdentity("test")));
    p._finally();
    print("ok")

    testtypes()

    return True

try:
    initData = Ice.InitializationData()
    initData.properties = Ice.createProperties(sys.argv)
    #
    # Its possible to have batch oneway requests dispatched after the
    # adapter is deactivated due to thread scheduling so we supress
    # this warning.
    #
    initData.properties.setProperty("Ice.Warn.Dispatch", "0");
    with Ice.initialize(sys.argv, initData) as communicator:
        status = run(sys.argv, communicator)
except:
    traceback.print_exc()
    status = False

sys.exit(not status)