diff options
Diffstat (limited to 'python/test/Ice/ami/AllTests.py')
-rw-r--r-- | python/test/Ice/ami/AllTests.py | 170 |
1 files changed, 111 insertions, 59 deletions
diff --git a/python/test/Ice/ami/AllTests.py b/python/test/Ice/ami/AllTests.py index 549915ee12e..71d6237b61a 100644 --- a/python/test/Ice/ami/AllTests.py +++ b/python/test/Ice/ami/AllTests.py @@ -769,7 +769,7 @@ def allTests(communicator, collocated): test(p.opBatchCount() == 0) b1 = p.ice_batchOneway() b1.opBatch() - b1.ice_getConnection().close(False) + b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FlushCallback() r = b1.begin_ice_flushBatchRequests(cb.exception, cb.sent) cb.check() @@ -783,7 +783,7 @@ def allTests(communicator, collocated): test(p.opBatchCount() == 0) b1 = p.ice_batchOneway() b1.opBatch() - b1.ice_getConnection().close(False) + b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FlushCallback(cookie) r = b1.begin_ice_flushBatchRequests(lambda ex: cb.exceptionWC(ex, cookie), lambda ss: cb.sentWC(ss, cookie)) cb.check() @@ -830,7 +830,7 @@ def allTests(communicator, collocated): test(p.opBatchCount() == 0) b1 = Test.TestIntfPrx.uncheckedCast(p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()) b1.opBatch() - b1.ice_getConnection().close(False) + b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FlushExCallback() r = b1.ice_getConnection().begin_flushBatchRequests(cb.exception, cb.sent) cb.check() @@ -844,7 +844,7 @@ def allTests(communicator, collocated): test(p.opBatchCount() == 0) b1 = Test.TestIntfPrx.uncheckedCast(p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()) b1.opBatch() - b1.ice_getConnection().close(False) + b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FlushExCallback(cookie) r = b1.ice_getConnection().begin_flushBatchRequests(lambda ex: cb.exceptionWC(ex, cookie), lambda ss: cb.sentWC(ss, cookie)) @@ -876,7 +876,7 @@ def allTests(communicator, collocated): test(p.opBatchCount() == 0) b1 = Test.TestIntfPrx.uncheckedCast(p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()) b1.opBatch() - b1.ice_getConnection().close(False) + b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FlushCallback() r = communicator.begin_flushBatchRequests(cb.exception, cb.sent) cb.check() @@ -916,7 +916,7 @@ def allTests(communicator, collocated): b2.ice_getConnection() # Ensure connection is established. b1.opBatch() b2.opBatch() - b1.ice_getConnection().close(False) + b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FlushCallback() r = communicator.begin_flushBatchRequests(cb.exception, cb.sent) cb.check() @@ -936,8 +936,8 @@ def allTests(communicator, collocated): b2.ice_getConnection() # Ensure connection is established. b1.opBatch() b2.opBatch() - b1.ice_getConnection().close(False) - b2.ice_getConnection().close(False) + b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) + b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FlushCallback() r = communicator.begin_flushBatchRequests(cb.exception, cb.sent) cb.check() @@ -1120,9 +1120,27 @@ def allTests(communicator, collocated): print("ok") if p.ice_getConnection(): - sys.stdout.write("testing close connection with sending queue... ") + sys.stdout.write("testing graceful close connection with wait... ") sys.stdout.flush() + # + # Local case: begin several requests, close the connection gracefully, and make sure it waits + # for the requests to complete. + # + results = [] + for i in range(0, 3): + results.append(p.begin_sleep(50)) + p.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) + for r in results: + r.waitForCompleted() + try: + r.throwLocalException() + except: + test(False) + + # + # Remote case. + # if sys.version_info[0] == 2: b = [chr(random.randint(0, 255)) for x in range(0, 10*1024)] seq = ''.join(b) @@ -1143,7 +1161,7 @@ def allTests(communicator, collocated): results = [] for i in range(0, maxQueue): results.append(p.begin_opWithPayload(seq)) - if not p.begin_close(False).isSent(): + if not p.begin_close(Test.CloseMode.CloseGracefullyAndWait).isSent(): for i in range(0, maxQueue): r = p.begin_opWithPayload(seq) results.append(r) @@ -1163,6 +1181,83 @@ def allTests(communicator, collocated): print("ok") + sys.stdout.write("testing graceful close connection without wait... ") + sys.stdout.flush() + + # + # Local case: start a lengthy operation and then close the connection gracefully on the client side + # without waiting for the pending invocation to complete. There will be no retry and we expect the + # invocation to fail with ConnectionManuallyClosedException. + # + # This test requires two threads in the server's thread pool: one will block in sleep() and the other + # will process the CloseConnection message. + # + p.ice_ping() + con = p.ice_getConnection() + r = p.begin_sleep(1000) + con.close(Ice.ConnectionClose.CloseGracefully) + r.waitForCompleted() + try: + r.throwLocalException() + test(False) + except Ice.ConnectionManuallyClosedException, ex: + test(ex.graceful) + + # + # Remote case: the server closes the connection gracefully. Our call to TestIntf::close() + # completes successfully and then the connection should be closed immediately afterward, + # despite the fact that there's a pending call to sleep(). The call to sleep() should be + # automatically retried and complete successfully. + # + p.ice_ping() + con = p.ice_getConnection() + cb = CallbackBase() + con.setCloseCallback(lambda c: cb.called()) + r = p.begin_sleep(250) + p.close(Test.CloseMode.CloseGracefully) + cb.check() + r.waitForCompleted() + try: + r.throwLocalException() + except: + test(false) + p.ice_ping() + test(p.ice_getConnection() != con) + + print("ok") + + sys.stdout.write("testing forceful close connection... ") + sys.stdout.flush() + + # + # Local case: start a lengthy operation and then close the connection forcefully on the client side. + # There will be no retry and we expect the invocation to fail with ConnectionManuallyClosedException. + # + p.ice_ping() + con = p.ice_getConnection() + r = p.begin_sleep(100) + con.close(Ice.ConnectionClose.CloseForcefully) + r.waitForCompleted() + try: + r.throwLocalException() + test(False) + except Ice.ConnectionManuallyClosedException, ex: + test(not ex.graceful) + + # + # Remote case: the server closes the connection forcefully. This causes the request to fail + # with a ConnectionLostException. Since the close() operation is not idempotent, the client + # will not retry. + # + try: + p.close(Test.CloseMode.CloseForcefully) + test(False) + except Ice.ConnectionLostException: + # Expected. + pass + + print("ok") + def allTestsFuture(communicator, collocated): sref = "test:default -p 12010" obj = communicator.stringToProxy(sref) @@ -1404,7 +1499,7 @@ def allTestsFuture(communicator, collocated): test(p.opBatchCount() == 0) b1 = p.ice_batchOneway() b1.opBatch() - b1.ice_getConnection().close(False) + b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FutureFlushCallback() f = b1.ice_flushBatchRequestsAsync() f.add_sent_callback(cb.sent) @@ -1436,7 +1531,7 @@ def allTestsFuture(communicator, collocated): test(p.opBatchCount() == 0) b1 = Test.TestIntfPrx.uncheckedCast(p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()) b1.opBatch() - b1.ice_getConnection().close(False) + b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FutureFlushExCallback() f = b1.ice_getConnection().flushBatchRequestsAsync() f.add_done_callback(cb.exception) @@ -1473,7 +1568,7 @@ def allTestsFuture(communicator, collocated): test(p.opBatchCount() == 0) b1 = Test.TestIntfPrx.uncheckedCast(p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()) b1.opBatch() - b1.ice_getConnection().close(False) + b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FutureFlushCallback() f = communicator.flushBatchRequestsAsync() f.add_sent_callback(cb.sent) @@ -1517,7 +1612,7 @@ def allTestsFuture(communicator, collocated): b2.ice_getConnection() # Ensure connection is established. b1.opBatch() b2.opBatch() - b1.ice_getConnection().close(False) + b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FutureFlushCallback() f = communicator.flushBatchRequestsAsync() f.add_sent_callback(cb.sent) @@ -1539,8 +1634,8 @@ def allTestsFuture(communicator, collocated): b2.ice_getConnection() # Ensure connection is established. b1.opBatch() b2.opBatch() - b1.ice_getConnection().close(False) - b2.ice_getConnection().close(False) + b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) + b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FutureFlushCallback() f = communicator.flushBatchRequestsAsync() f.add_sent_callback(cb.sent) @@ -1723,47 +1818,4 @@ def allTestsFuture(communicator, collocated): print("ok") - if p.ice_getConnection(): - sys.stdout.write("testing close connection with sending queue... ") - sys.stdout.flush() - - if sys.version_info[0] == 2: - b = [chr(random.randint(0, 255)) for x in range(0, 10*1024)] - seq = ''.join(b) - else: - b = [random.randint(0, 255) for x in range(0, 10*1024)] - seq = bytes(b) - - # - # Send multiple opWithPayload, followed by a close and followed by multiple opWithPaylod. - # The goal is to make sure that none of the opWithPayload fail even if the server closes - # the connection gracefully in between. - # - maxQueue = 2 - done = False - while not done and maxQueue < 50: - done = True - p.ice_ping() - results = [] - for i in range(0, maxQueue): - results.append(p.opWithPayloadAsync(seq)) - if not p.closeAsync(False).is_sent(): - for i in range(0, maxQueue): - f = p.opWithPayloadAsync(seq) - results.append(f) - if f.is_sent(): - done = False - maxQueue = maxQueue * 2 - break - else: - maxQueue = maxQueue * 2 - done = False - for f in results: - try: - f.result() - except Ice.LocalException: - test(False) - - print("ok") - p.shutdown() |