summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2021-01-07 20:42:19 +0100
committerGitHub <noreply@github.com>2021-01-07 20:42:19 +0100
commiteba3ea4801b3b643d4cb590174f2984562efe76f (patch)
tree0fcb195b663a473333a3570974260802d8309875
parentUse readline only when stdin is a tty (#1230) (diff)
downloadice-eba3ea4801b3b643d4cb590174f2984562efe76f.tar.bz2
ice-eba3ea4801b3b643d4cb590174f2984562efe76f.tar.xz
ice-eba3ea4801b3b643d4cb590174f2984562efe76f.zip
Fixed Ice.wrap_future cancellation bug and added asyncio test (#1229)
-rw-r--r--python/python/Ice/Py3/IceFuture.py31
-rw-r--r--python/python/Ice/__init__.py8
-rw-r--r--python/test/Ice/asyncio/AllTests.py87
-rwxr-xr-xpython/test/Ice/asyncio/Client.py20
-rwxr-xr-xpython/test/Ice/asyncio/Server.py30
-rw-r--r--python/test/Ice/asyncio/Test.ice26
-rw-r--r--python/test/Ice/asyncio/TestI.py71
-rw-r--r--python/test/Ice/asyncio/test.py10
8 files changed, 270 insertions, 13 deletions
diff --git a/python/python/Ice/Py3/IceFuture.py b/python/python/Ice/Py3/IceFuture.py
index 9beb8bffb49..f9178bf58ee 100644
--- a/python/python/Ice/Py3/IceFuture.py
+++ b/python/python/Ice/Py3/IceFuture.py
@@ -8,6 +8,7 @@
import asyncio
+
#
# This class defines an __await__ method so that coroutines can call 'await <future>'.
#
@@ -19,6 +20,7 @@ class FutureBase(object):
yield self
return self.result()
+
def wrap_future(future, *, loop=None):
'''Wrap Ice.Future object into an asyncio.Future.'''
if isinstance(future, asyncio.Future):
@@ -26,18 +28,25 @@ def wrap_future(future, *, loop=None):
assert isinstance(future, FutureBase), 'Ice.Future is expected, got {!r}'.format(future)
+ def forwardCompletion(sourceFuture, targetFuture):
+ if not targetFuture.done():
+ if sourceFuture.cancelled():
+ targetFuture.cancel()
+ elif sourceFuture.exception():
+ targetFuture.set_exception(sourceFuture.exception())
+ else:
+ targetFuture.set_result(sourceFuture.result())
+
if loop is None:
loop = asyncio.get_event_loop()
+ asyncioFuture = loop.create_future()
- af = asyncio.Future()
-
- def callback():
- if future.cancelled():
- af.cancel()
- elif future.exception():
- af.set_exception(future.exception())
- else:
- af.set_result(future.result())
+ if future.done():
+ # As long as no don callbacks are registered, completing the asyncio future should be thread safe
+ # even if the future is constructed with a loop which isn't the current thread's loop.
+ forwardCompletion(future, asyncioFuture)
+ else:
+ asyncioFuture.add_done_callback(lambda f: forwardCompletion(asyncioFuture, future))
+ future.add_done_callback(lambda f: loop.call_soon_threadsafe(forwardCompletion, future, asyncioFuture))
- future.add_done_callback(lambda f: loop.call_soon_threadsafe(callback))
- return af
+ return asyncioFuture
diff --git a/python/python/Ice/__init__.py b/python/python/Ice/__init__.py
index 1b64975c43b..05cd803f90b 100644
--- a/python/python/Ice/__init__.py
+++ b/python/python/Ice/__init__.py
@@ -427,8 +427,12 @@ Returns:
else:
result = coro.send(value)
- # Calling 'await <future>' will return the future. Check if we've received a future.
- if isinstance(result, Future) or callable(getattr(result, "add_done_callback", None)):
+ if result is None:
+ # The result can be None if the coroutine performs a bare yield (such as asyncio.sleep(0))
+ cb.response(None)
+ elif isinstance(result, Future) or callable(getattr(result, "add_done_callback", None)):
+ # If we've received a future from the coroutine setup a done callback to continue the dispatching
+ # when the future completes.
def handler(future):
try:
self._iceDispatchCoroutine(cb, coro, value=future.result())
diff --git a/python/test/Ice/asyncio/AllTests.py b/python/test/Ice/asyncio/AllTests.py
new file mode 100644
index 00000000000..604d8b8aeb7
--- /dev/null
+++ b/python/test/Ice/asyncio/AllTests.py
@@ -0,0 +1,87 @@
+#
+# Copyright (c) ZeroC, Inc. All rights reserved.
+#
+
+import sys
+import asyncio
+import Ice
+import Test
+
+
+def test(b):
+ if not b:
+ raise RuntimeError('test assertion failed')
+
+
+async def allTestsAsync(helper, communicator):
+
+ sref = "test:{0}".format(helper.getTestEndpoint(num=0))
+ obj = communicator.stringToProxy(sref)
+ test(obj)
+
+ p = Test.TestIntfPrx.uncheckedCast(obj)
+
+ sys.stdout.write("testing invocation... ")
+ sys.stdout.flush()
+ test(await Ice.wrap_future(p.opAsync()) == 5)
+ await Ice.wrap_future(p.sleepAsync(0))
+ await Ice.wrap_future(p.sleepAsync(20))
+ test(await Ice.wrap_future(p.callOpOnAsync(p)) == 5)
+ print("ok")
+
+ sys.stdout.write("testing exceptions... ")
+ sys.stdout.flush()
+ try:
+ await Ice.wrap_future(p.throwUserException1())
+ test(False)
+ except Test.TestException:
+ pass
+ try:
+ await Ice.wrap_future(p.throwUserException2())
+ test(False)
+ except Test.TestException:
+ pass
+ try:
+ await Ice.wrap_future(p.throwUnhandledException1())
+ test(False)
+ except Ice.UnknownException:
+ pass
+ try:
+ await Ice.wrap_future(p.throwUnhandledException2())
+ test(False)
+ except Ice.UnknownException:
+ pass
+ print("ok")
+
+ sys.stdout.write("testing cancellation... ")
+ sys.stdout.flush()
+
+ future = p.sleepAsync(500)
+ asyncioFuture = Ice.wrap_future(future)
+ future.cancel()
+ try:
+ await asyncioFuture
+ test(False)
+ except asyncio.CancelledError:
+ test(future.cancelled() and asyncioFuture.cancelled())
+
+ future = p.sleepAsync(500)
+ asyncioFuture = Ice.wrap_future(future)
+ asyncioFuture.cancel()
+ try:
+ await asyncioFuture
+ test(False)
+ except asyncio.CancelledError:
+ # Wait a little to ensure the cancellation propagates to the Ice future
+ await asyncio.sleep(0.01)
+ test(future.cancelled() and asyncioFuture.cancelled())
+
+ # Try to cancel a done future
+ future = p.opAsync()
+ while not future.done():
+ await asyncio.sleep(0.01)
+ Ice.wrap_future(future).cancel()
+
+ print("ok")
+
+ await Ice.wrap_future(p.shutdownAsync())
diff --git a/python/test/Ice/asyncio/Client.py b/python/test/Ice/asyncio/Client.py
new file mode 100755
index 00000000000..b63e85eb41c
--- /dev/null
+++ b/python/test/Ice/asyncio/Client.py
@@ -0,0 +1,20 @@
+#!/usr/bin/env python
+#
+# Copyright (c) ZeroC, Inc. All rights reserved.
+#
+
+import asyncio
+from TestHelper import TestHelper
+TestHelper.loadSlice("Test.ice")
+import AllTests
+
+
+class Client(TestHelper):
+
+ def run(self, args):
+
+ async def runAsync():
+ with self.initialize(properties=self.createTestProperties(args)) as communicator:
+ await AllTests.allTestsAsync(self, communicator)
+
+ asyncio.run(runAsync(), debug=True)
diff --git a/python/test/Ice/asyncio/Server.py b/python/test/Ice/asyncio/Server.py
new file mode 100755
index 00000000000..f6eff5e3e4b
--- /dev/null
+++ b/python/test/Ice/asyncio/Server.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+#
+# Copyright (c) ZeroC, Inc. All rights reserved.
+#
+
+import Ice
+import asyncio
+from TestHelper import TestHelper
+TestHelper.loadSlice("Test.ice")
+import TestI
+
+
+class Server(TestHelper):
+
+ def run(self, args):
+
+ async def runAsync():
+ loop = asyncio.get_event_loop()
+ initData = Ice.InitializationData()
+ initData.properties = self.createTestProperties(args)
+ initData.properties.setProperty("Ice.Warn.Dispatch", "0")
+ initData.dispatcher = lambda method, connection: loop.call_soon_threadsafe(method)
+ with self.initialize(initData) as communicator:
+ communicator.getProperties().setProperty("TestAdapter.Endpoints", self.getTestEndpoint())
+ adapter = communicator.createObjectAdapter("TestAdapter")
+ adapter.add(TestI.TestIntfI(), Ice.stringToIdentity("test"))
+ adapter.activate()
+ await loop.run_in_executor(None, communicator.waitForShutdown)
+
+ asyncio.run(runAsync(), debug=True)
diff --git a/python/test/Ice/asyncio/Test.ice b/python/test/Ice/asyncio/Test.ice
new file mode 100644
index 00000000000..1a6f23ff21f
--- /dev/null
+++ b/python/test/Ice/asyncio/Test.ice
@@ -0,0 +1,26 @@
+//
+// Copyright (c) ZeroC, Inc. All rights reserved.
+//
+
+#pragma once
+
+module Test
+{
+
+exception TestException
+{
+}
+
+interface TestIntf
+{
+ int op();
+ int callOpOn(TestIntf* proxy);
+ void throwUserException1() throws TestException;
+ void throwUserException2() throws TestException;
+ void throwUnhandledException1();
+ void throwUnhandledException2();
+ void sleep(int ms);
+ void shutdown();
+}
+
+}
diff --git a/python/test/Ice/asyncio/TestI.py b/python/test/Ice/asyncio/TestI.py
new file mode 100644
index 00000000000..4392b2216b1
--- /dev/null
+++ b/python/test/Ice/asyncio/TestI.py
@@ -0,0 +1,71 @@
+#
+# Copyright (c) ZeroC, Inc. All rights reserved.
+#
+
+import asyncio
+import Ice
+import Test
+
+
+# The implementation of the coroutines below assume the use of an Ice dispatcher which
+# dispatch the calls on an asyncio event loop
+
+class TestIntfI(Test.TestIntf):
+
+ async def op(self, current):
+ # make sure this is called from an asyncio event loop
+ asyncio.get_running_loop()
+
+ # ensure that returning the result from the coroutine without await works
+ return 5
+
+ async def throwUserException1(self, current):
+ # make sure this is called from an asyncio event loop
+ asyncio.get_running_loop()
+
+ # ensure that raising the exception from the coroutine without await works
+ raise Test.TestException()
+
+ async def throwUserException2(self, current):
+ # make sure this is called from an asyncio event loop
+ asyncio.get_running_loop()
+
+ # ensure that raising the exception after from the coroutine after the await works
+ await asyncio.sleep(0.01)
+ raise Test.TestException()
+
+ async def throwUnhandledException1(self, current):
+ # make sure this is called from an asyncio event loop
+ asyncio.get_running_loop()
+
+ # ensure that raising an unhandled exception from the coroutine without await works
+ raise Exception("unexpected")
+
+ async def throwUnhandledException2(self, current):
+ # make sure this is called from an asyncio event loop
+ asyncio.get_running_loop()
+
+ # ensure that raising an unhandled exception from the coroutine after the await works
+ await asyncio.sleep(0.01)
+ raise Exception("unexpected")
+
+ async def sleep(self, ms, current):
+ # make sure this is called from an asyncio event loop
+ asyncio.get_running_loop()
+
+ # ensure that awaiting before returning the result works
+ await asyncio.sleep(ms / 1000.0)
+
+ async def callOpOn(self, proxy, current):
+ # make sure this is called from an asyncio event loop
+ asyncio.get_running_loop()
+
+ # ensure that awaiting proxy invocations works
+ await Ice.wrap_future(proxy.sleepAsync(10))
+ return await Ice.wrap_future(proxy.opAsync())
+
+ def shutdown(self, current=None):
+ # make sure this is called from an asyncio event loop
+ asyncio.get_running_loop()
+
+ current.adapter.getCommunicator().shutdown()
diff --git a/python/test/Ice/asyncio/test.py b/python/test/Ice/asyncio/test.py
new file mode 100644
index 00000000000..40ed163c546
--- /dev/null
+++ b/python/test/Ice/asyncio/test.py
@@ -0,0 +1,10 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) ZeroC, Inc. All rights reserved.
+#
+
+# This test doesn't support running with IceSSL, the Router object in the client process uses
+# the client certificate and fails with "unsupported certificate purpose"
+
+if sys.version_info >= (3, 5):
+ TestSuite(__name__)