diff options
Diffstat (limited to 'python/test')
-rw-r--r-- | python/test/Ice/info/AllTests.py | 99 | ||||
-rw-r--r-- | python/test/Ice/info/TestI.py | 25 |
2 files changed, 75 insertions, 49 deletions
diff --git a/python/test/Ice/info/AllTests.py b/python/test/Ice/info/AllTests.py index b80182b5c04..a03b69ecc4d 100644 --- a/python/test/Ice/info/AllTests.py +++ b/python/test/Ice/info/AllTests.py @@ -13,6 +13,18 @@ def test(b): if not b: raise RuntimeError('test assertion failed') +def getTCPEndpointInfo(info): + while(info): + if isinstance(info, Ice.TCPEndpointInfo): + return info + info = info.underlying + +def getTCPConnectionInfo(info): + while(info): + if isinstance(info, Ice.TCPConnectionInfo): + return info + info = info.underlying + def allTests(communicator): sys.stdout.write("testing proxy endpoint information... ") sys.stdout.flush() @@ -23,22 +35,23 @@ def allTests(communicator): endps = p1.ice_getEndpoints() - ipEndpoint = endps[0].getInfo() - test(isinstance(ipEndpoint, Ice.IPEndpointInfo)) - test(ipEndpoint.host == "tcphost") - test(ipEndpoint.port == 10000) - test(ipEndpoint.sourceAddress == "10.10.10.10") - test(ipEndpoint.timeout == 1200) - test(ipEndpoint.compress) - test(not ipEndpoint.datagram()) - test((ipEndpoint.type() == Ice.TCPEndpointType and not ipEndpoint.secure()) or - (ipEndpoint.type() == Ice.SSLEndpointType and ipEndpoint.secure()) or # SSL - (ipEndpoint.type() == Ice.WSEndpointType and not ipEndpoint.secure()) or # WS - (ipEndpoint.type() == Ice.WSSEndpointType and ipEndpoint.secure())) # WS - test((ipEndpoint.type() == Ice.TCPEndpointType and isinstance(ipEndpoint, Ice.TCPEndpointInfo)) or - (ipEndpoint.type() == Ice.SSLEndpointType and isinstance(ipEndpoint, Ice.SSLEndpointInfo)) or - (ipEndpoint.type() == Ice.WSEndpointType and isinstance(ipEndpoint, Ice.WSEndpointInfo)) or - (ipEndpoint.type() == Ice.WSSEndpointType and isinstance(ipEndpoint, Ice.WSSEndpointInfo))) + endpoint = endps[0].getInfo() + tcpEndpoint = getTCPEndpointInfo(endpoint) + test(isinstance(tcpEndpoint, Ice.TCPEndpointInfo)) + test(tcpEndpoint.host == "tcphost") + test(tcpEndpoint.port == 10000) + test(tcpEndpoint.sourceAddress == "10.10.10.10") + test(tcpEndpoint.timeout == 1200) + test(tcpEndpoint.compress) + test(not tcpEndpoint.datagram()) + test((tcpEndpoint.type() == Ice.TCPEndpointType and not tcpEndpoint.secure()) or + (tcpEndpoint.type() == Ice.SSLEndpointType and tcpEndpoint.secure()) or # SSL + (tcpEndpoint.type() == Ice.WSEndpointType and not tcpEndpoint.secure()) or # WS + (tcpEndpoint.type() == Ice.WSSEndpointType and tcpEndpoint.secure())) # WS + test((tcpEndpoint.type() == Ice.TCPEndpointType and isinstance(endpoint, Ice.TCPEndpointInfo)) or + (tcpEndpoint.type() == Ice.SSLEndpointType and isinstance(endpoint, Ice.SSLEndpointInfo)) or + (tcpEndpoint.type() == Ice.WSEndpointType and isinstance(endpoint, Ice.WSEndpointInfo)) or + (tcpEndpoint.type() == Ice.WSSEndpointType and isinstance(endpoint, Ice.WSEndpointInfo))) udpEndpoint = endps[1].getInfo() test(isinstance(udpEndpoint, Ice.UDPEndpointInfo)) @@ -71,12 +84,12 @@ def allTests(communicator): publishedEndpoints = adapter.getPublishedEndpoints() test(endpoints == publishedEndpoints) - ipEndpoint = endpoints[0].getInfo() - test(ipEndpoint.type() == Ice.TCPEndpointType or ipEndpoint.type() == 2 or ipEndpoint.type() == 4 or - ipEndpoint.type() == 5) - test(ipEndpoint.host == defaultHost) - test(ipEndpoint.port > 0) - test(ipEndpoint.timeout == 15000) + tcpEndpoint = getTCPEndpointInfo(endpoints[0].getInfo()) + test(tcpEndpoint.type() == Ice.TCPEndpointType or tcpEndpoint.type() == 2 or tcpEndpoint.type() == 4 or + tcpEndpoint.type() == 5) + test(tcpEndpoint.host == defaultHost) + test(tcpEndpoint.port > 0) + test(tcpEndpoint.timeout == 15000) udpEndpoint = endpoints[1].getInfo() test(udpEndpoint.host == defaultHost) @@ -95,12 +108,12 @@ def allTests(communicator): test(len(publishedEndpoints) == 1) for i in range(0, len(endpoints)): - ipEndpoint = endpoints[i].getInfo() - test(ipEndpoint.port == 12020) + tcpEndpoint = getTCPEndpointInfo(endpoints[i].getInfo()) + test(tcpEndpoint.port == 12020) - ipEndpoint = publishedEndpoints[0].getInfo() - test(ipEndpoint.host == "127.0.0.1") - test(ipEndpoint.port == 12020) + tcpEndpoint = getTCPEndpointInfo(publishedEndpoints[0].getInfo()) + test(tcpEndpoint.host == "127.0.0.1") + test(tcpEndpoint.port == 12020) adapter.destroy() @@ -112,13 +125,13 @@ def allTests(communicator): sys.stdout.write("test connection endpoint information... ") sys.stdout.flush() - ipinfo = base.ice_getConnection().getEndpoint().getInfo() - test(ipinfo.port == 12010) - test(not ipinfo.compress) - test(ipinfo.host == defaultHost) + tcpinfo = getTCPEndpointInfo(base.ice_getConnection().getEndpoint().getInfo()) + test(tcpinfo.port == 12010) + test(not tcpinfo.compress) + test(tcpinfo.host == defaultHost) ctx = testIntf.getEndpointInfoAsContext() - test(ctx["host"] == ipinfo.host) + test(ctx["host"] == tcpinfo.host) test(ctx["compress"] == "false") port = int(ctx["port"]) test(port > 0) @@ -136,26 +149,26 @@ def allTests(communicator): connection.setBufferSize(1024, 2048) info = connection.getInfo() + tcpinfo = getTCPConnectionInfo(info) test(not info.incoming) test(len(info.adapterName) == 0) - test(info.remotePort == 12010) + test(tcpinfo.remotePort == 12010) if defaultHost == '127.0.0.1': - test(info.remoteAddress == defaultHost) - test(info.localAddress == defaultHost) - test(info.rcvSize >= 1024) - test(info.sndSize >= 2048) + test(tcpinfo.remoteAddress == defaultHost) + test(tcpinfo.localAddress == defaultHost) + test(tcpinfo.rcvSize >= 1024) + test(tcpinfo.sndSize >= 2048) ctx = testIntf.getConnectionInfoAsContext() test(ctx["incoming"] == "true") test(ctx["adapterName"] == "TestAdapter") - test(ctx["remoteAddress"] == info.localAddress) - test(ctx["localAddress"] == info.remoteAddress) - test(ctx["remotePort"] == str(info.localPort)) - test(ctx["localPort"] == str(info.remotePort)) + test(ctx["remoteAddress"] == tcpinfo.localAddress) + test(ctx["localAddress"] == tcpinfo.remoteAddress) + test(ctx["remotePort"] == str(tcpinfo.localPort)) + test(ctx["localPort"] == str(tcpinfo.remotePort)) if(base.ice_getConnection().type() == "ws" or base.ice_getConnection().type() == "wss"): - test((base.ice_getConnection().type() == "ws" and isinstance(info, Ice.WSConnectionInfo)) or - (base.ice_getConnection().type() == "wss" and isinstance(info, Ice.WSSConnectionInfo))) + test(isinstance(info, Ice.WSConnectionInfo)) test(info.headers["Upgrade"] == "websocket") test(info.headers["Connection"] == "Upgrade") diff --git a/python/test/Ice/info/TestI.py b/python/test/Ice/info/TestI.py index ff98b4dd777..2325156db20 100644 --- a/python/test/Ice/info/TestI.py +++ b/python/test/Ice/info/TestI.py @@ -10,6 +10,18 @@ import Ice, Test import time +def getIPEndpointInfo(info): + while(info): + if isinstance(info, Ice.IPEndpointInfo): + return info + info = info.underlying + +def getIPConnectionInfo(info): + while(info): + if isinstance(info, Ice.IPConnectionInfo): + return info + info = info.underlying + class MyDerivedClassI(Test.TestIntf): def __init__(self): self.ctx = None @@ -19,7 +31,7 @@ class MyDerivedClassI(Test.TestIntf): def getEndpointInfoAsContext(self, current): ctx = {} - info = current.con.getEndpoint().getInfo() + info = getIPEndpointInfo(current.con.getEndpoint().getInfo()) ctx["timeout"] = str(info.timeout) if info.compress: ctx["compress"] = "true" @@ -51,18 +63,19 @@ class MyDerivedClassI(Test.TestIntf): def getConnectionInfoAsContext(self, current): ctx = {} info = current.con.getInfo() + ipinfo = getIPConnectionInfo(info) ctx["adapterName"] = info.adapterName if info.incoming: ctx["incoming"] = "true" else: ctx["incoming"] ="false" - ctx["localAddress"] = info.localAddress - ctx["localPort"] = str(info.localPort) - ctx["remoteAddress"] = info.remoteAddress - ctx["remotePort"] = str(info.remotePort) + ctx["localAddress"] = ipinfo.localAddress + ctx["localPort"] = str(ipinfo.localPort) + ctx["remoteAddress"] = ipinfo.remoteAddress + ctx["remotePort"] = str(ipinfo.remotePort) - if isinstance(info, Ice.WSConnectionInfo) or isinstance(info, Ice.WSSConnectionInfo): + if isinstance(info, Ice.WSConnectionInfo): for key, value in info.headers.items(): ctx["ws." + key] = value |