summaryrefslogtreecommitdiff
path: root/python/test
diff options
context:
space:
mode:
Diffstat (limited to 'python/test')
-rw-r--r--python/test/Ice/info/AllTests.py99
-rw-r--r--python/test/Ice/info/TestI.py25
2 files changed, 75 insertions, 49 deletions
diff --git a/python/test/Ice/info/AllTests.py b/python/test/Ice/info/AllTests.py
index b80182b5c04..a03b69ecc4d 100644
--- a/python/test/Ice/info/AllTests.py
+++ b/python/test/Ice/info/AllTests.py
@@ -13,6 +13,18 @@ def test(b):
if not b:
raise RuntimeError('test assertion failed')
+def getTCPEndpointInfo(info):
+ while(info):
+ if isinstance(info, Ice.TCPEndpointInfo):
+ return info
+ info = info.underlying
+
+def getTCPConnectionInfo(info):
+ while(info):
+ if isinstance(info, Ice.TCPConnectionInfo):
+ return info
+ info = info.underlying
+
def allTests(communicator):
sys.stdout.write("testing proxy endpoint information... ")
sys.stdout.flush()
@@ -23,22 +35,23 @@ def allTests(communicator):
endps = p1.ice_getEndpoints()
- ipEndpoint = endps[0].getInfo()
- test(isinstance(ipEndpoint, Ice.IPEndpointInfo))
- test(ipEndpoint.host == "tcphost")
- test(ipEndpoint.port == 10000)
- test(ipEndpoint.sourceAddress == "10.10.10.10")
- test(ipEndpoint.timeout == 1200)
- test(ipEndpoint.compress)
- test(not ipEndpoint.datagram())
- test((ipEndpoint.type() == Ice.TCPEndpointType and not ipEndpoint.secure()) or
- (ipEndpoint.type() == Ice.SSLEndpointType and ipEndpoint.secure()) or # SSL
- (ipEndpoint.type() == Ice.WSEndpointType and not ipEndpoint.secure()) or # WS
- (ipEndpoint.type() == Ice.WSSEndpointType and ipEndpoint.secure())) # WS
- test((ipEndpoint.type() == Ice.TCPEndpointType and isinstance(ipEndpoint, Ice.TCPEndpointInfo)) or
- (ipEndpoint.type() == Ice.SSLEndpointType and isinstance(ipEndpoint, Ice.SSLEndpointInfo)) or
- (ipEndpoint.type() == Ice.WSEndpointType and isinstance(ipEndpoint, Ice.WSEndpointInfo)) or
- (ipEndpoint.type() == Ice.WSSEndpointType and isinstance(ipEndpoint, Ice.WSSEndpointInfo)))
+ endpoint = endps[0].getInfo()
+ tcpEndpoint = getTCPEndpointInfo(endpoint)
+ test(isinstance(tcpEndpoint, Ice.TCPEndpointInfo))
+ test(tcpEndpoint.host == "tcphost")
+ test(tcpEndpoint.port == 10000)
+ test(tcpEndpoint.sourceAddress == "10.10.10.10")
+ test(tcpEndpoint.timeout == 1200)
+ test(tcpEndpoint.compress)
+ test(not tcpEndpoint.datagram())
+ test((tcpEndpoint.type() == Ice.TCPEndpointType and not tcpEndpoint.secure()) or
+ (tcpEndpoint.type() == Ice.SSLEndpointType and tcpEndpoint.secure()) or # SSL
+ (tcpEndpoint.type() == Ice.WSEndpointType and not tcpEndpoint.secure()) or # WS
+ (tcpEndpoint.type() == Ice.WSSEndpointType and tcpEndpoint.secure())) # WS
+ test((tcpEndpoint.type() == Ice.TCPEndpointType and isinstance(endpoint, Ice.TCPEndpointInfo)) or
+ (tcpEndpoint.type() == Ice.SSLEndpointType and isinstance(endpoint, Ice.SSLEndpointInfo)) or
+ (tcpEndpoint.type() == Ice.WSEndpointType and isinstance(endpoint, Ice.WSEndpointInfo)) or
+ (tcpEndpoint.type() == Ice.WSSEndpointType and isinstance(endpoint, Ice.WSEndpointInfo)))
udpEndpoint = endps[1].getInfo()
test(isinstance(udpEndpoint, Ice.UDPEndpointInfo))
@@ -71,12 +84,12 @@ def allTests(communicator):
publishedEndpoints = adapter.getPublishedEndpoints()
test(endpoints == publishedEndpoints)
- ipEndpoint = endpoints[0].getInfo()
- test(ipEndpoint.type() == Ice.TCPEndpointType or ipEndpoint.type() == 2 or ipEndpoint.type() == 4 or
- ipEndpoint.type() == 5)
- test(ipEndpoint.host == defaultHost)
- test(ipEndpoint.port > 0)
- test(ipEndpoint.timeout == 15000)
+ tcpEndpoint = getTCPEndpointInfo(endpoints[0].getInfo())
+ test(tcpEndpoint.type() == Ice.TCPEndpointType or tcpEndpoint.type() == 2 or tcpEndpoint.type() == 4 or
+ tcpEndpoint.type() == 5)
+ test(tcpEndpoint.host == defaultHost)
+ test(tcpEndpoint.port > 0)
+ test(tcpEndpoint.timeout == 15000)
udpEndpoint = endpoints[1].getInfo()
test(udpEndpoint.host == defaultHost)
@@ -95,12 +108,12 @@ def allTests(communicator):
test(len(publishedEndpoints) == 1)
for i in range(0, len(endpoints)):
- ipEndpoint = endpoints[i].getInfo()
- test(ipEndpoint.port == 12020)
+ tcpEndpoint = getTCPEndpointInfo(endpoints[i].getInfo())
+ test(tcpEndpoint.port == 12020)
- ipEndpoint = publishedEndpoints[0].getInfo()
- test(ipEndpoint.host == "127.0.0.1")
- test(ipEndpoint.port == 12020)
+ tcpEndpoint = getTCPEndpointInfo(publishedEndpoints[0].getInfo())
+ test(tcpEndpoint.host == "127.0.0.1")
+ test(tcpEndpoint.port == 12020)
adapter.destroy()
@@ -112,13 +125,13 @@ def allTests(communicator):
sys.stdout.write("test connection endpoint information... ")
sys.stdout.flush()
- ipinfo = base.ice_getConnection().getEndpoint().getInfo()
- test(ipinfo.port == 12010)
- test(not ipinfo.compress)
- test(ipinfo.host == defaultHost)
+ tcpinfo = getTCPEndpointInfo(base.ice_getConnection().getEndpoint().getInfo())
+ test(tcpinfo.port == 12010)
+ test(not tcpinfo.compress)
+ test(tcpinfo.host == defaultHost)
ctx = testIntf.getEndpointInfoAsContext()
- test(ctx["host"] == ipinfo.host)
+ test(ctx["host"] == tcpinfo.host)
test(ctx["compress"] == "false")
port = int(ctx["port"])
test(port > 0)
@@ -136,26 +149,26 @@ def allTests(communicator):
connection.setBufferSize(1024, 2048)
info = connection.getInfo()
+ tcpinfo = getTCPConnectionInfo(info)
test(not info.incoming)
test(len(info.adapterName) == 0)
- test(info.remotePort == 12010)
+ test(tcpinfo.remotePort == 12010)
if defaultHost == '127.0.0.1':
- test(info.remoteAddress == defaultHost)
- test(info.localAddress == defaultHost)
- test(info.rcvSize >= 1024)
- test(info.sndSize >= 2048)
+ test(tcpinfo.remoteAddress == defaultHost)
+ test(tcpinfo.localAddress == defaultHost)
+ test(tcpinfo.rcvSize >= 1024)
+ test(tcpinfo.sndSize >= 2048)
ctx = testIntf.getConnectionInfoAsContext()
test(ctx["incoming"] == "true")
test(ctx["adapterName"] == "TestAdapter")
- test(ctx["remoteAddress"] == info.localAddress)
- test(ctx["localAddress"] == info.remoteAddress)
- test(ctx["remotePort"] == str(info.localPort))
- test(ctx["localPort"] == str(info.remotePort))
+ test(ctx["remoteAddress"] == tcpinfo.localAddress)
+ test(ctx["localAddress"] == tcpinfo.remoteAddress)
+ test(ctx["remotePort"] == str(tcpinfo.localPort))
+ test(ctx["localPort"] == str(tcpinfo.remotePort))
if(base.ice_getConnection().type() == "ws" or base.ice_getConnection().type() == "wss"):
- test((base.ice_getConnection().type() == "ws" and isinstance(info, Ice.WSConnectionInfo)) or
- (base.ice_getConnection().type() == "wss" and isinstance(info, Ice.WSSConnectionInfo)))
+ test(isinstance(info, Ice.WSConnectionInfo))
test(info.headers["Upgrade"] == "websocket")
test(info.headers["Connection"] == "Upgrade")
diff --git a/python/test/Ice/info/TestI.py b/python/test/Ice/info/TestI.py
index ff98b4dd777..2325156db20 100644
--- a/python/test/Ice/info/TestI.py
+++ b/python/test/Ice/info/TestI.py
@@ -10,6 +10,18 @@
import Ice, Test
import time
+def getIPEndpointInfo(info):
+ while(info):
+ if isinstance(info, Ice.IPEndpointInfo):
+ return info
+ info = info.underlying
+
+def getIPConnectionInfo(info):
+ while(info):
+ if isinstance(info, Ice.IPConnectionInfo):
+ return info
+ info = info.underlying
+
class MyDerivedClassI(Test.TestIntf):
def __init__(self):
self.ctx = None
@@ -19,7 +31,7 @@ class MyDerivedClassI(Test.TestIntf):
def getEndpointInfoAsContext(self, current):
ctx = {}
- info = current.con.getEndpoint().getInfo()
+ info = getIPEndpointInfo(current.con.getEndpoint().getInfo())
ctx["timeout"] = str(info.timeout)
if info.compress:
ctx["compress"] = "true"
@@ -51,18 +63,19 @@ class MyDerivedClassI(Test.TestIntf):
def getConnectionInfoAsContext(self, current):
ctx = {}
info = current.con.getInfo()
+ ipinfo = getIPConnectionInfo(info)
ctx["adapterName"] = info.adapterName
if info.incoming:
ctx["incoming"] = "true"
else:
ctx["incoming"] ="false"
- ctx["localAddress"] = info.localAddress
- ctx["localPort"] = str(info.localPort)
- ctx["remoteAddress"] = info.remoteAddress
- ctx["remotePort"] = str(info.remotePort)
+ ctx["localAddress"] = ipinfo.localAddress
+ ctx["localPort"] = str(ipinfo.localPort)
+ ctx["remoteAddress"] = ipinfo.remoteAddress
+ ctx["remotePort"] = str(ipinfo.remotePort)
- if isinstance(info, Ice.WSConnectionInfo) or isinstance(info, Ice.WSSConnectionInfo):
+ if isinstance(info, Ice.WSConnectionInfo):
for key, value in info.headers.items():
ctx["ws." + key] = value