diff options
-rw-r--r-- | python/python/Ice/Py3/IceFuture.py | 31 | ||||
-rw-r--r-- | python/python/Ice/__init__.py | 8 | ||||
-rw-r--r-- | python/test/Ice/asyncio/AllTests.py | 87 | ||||
-rwxr-xr-x | python/test/Ice/asyncio/Client.py | 20 | ||||
-rwxr-xr-x | python/test/Ice/asyncio/Server.py | 30 | ||||
-rw-r--r-- | python/test/Ice/asyncio/Test.ice | 26 | ||||
-rw-r--r-- | python/test/Ice/asyncio/TestI.py | 71 | ||||
-rw-r--r-- | python/test/Ice/asyncio/test.py | 10 |
8 files changed, 270 insertions, 13 deletions
diff --git a/python/python/Ice/Py3/IceFuture.py b/python/python/Ice/Py3/IceFuture.py index 9beb8bffb49..f9178bf58ee 100644 --- a/python/python/Ice/Py3/IceFuture.py +++ b/python/python/Ice/Py3/IceFuture.py @@ -8,6 +8,7 @@ import asyncio + # # This class defines an __await__ method so that coroutines can call 'await <future>'. # @@ -19,6 +20,7 @@ class FutureBase(object): yield self return self.result() + def wrap_future(future, *, loop=None): '''Wrap Ice.Future object into an asyncio.Future.''' if isinstance(future, asyncio.Future): @@ -26,18 +28,25 @@ def wrap_future(future, *, loop=None): assert isinstance(future, FutureBase), 'Ice.Future is expected, got {!r}'.format(future) + def forwardCompletion(sourceFuture, targetFuture): + if not targetFuture.done(): + if sourceFuture.cancelled(): + targetFuture.cancel() + elif sourceFuture.exception(): + targetFuture.set_exception(sourceFuture.exception()) + else: + targetFuture.set_result(sourceFuture.result()) + if loop is None: loop = asyncio.get_event_loop() + asyncioFuture = loop.create_future() - af = asyncio.Future() - - def callback(): - if future.cancelled(): - af.cancel() - elif future.exception(): - af.set_exception(future.exception()) - else: - af.set_result(future.result()) + if future.done(): + # As long as no don callbacks are registered, completing the asyncio future should be thread safe + # even if the future is constructed with a loop which isn't the current thread's loop. + forwardCompletion(future, asyncioFuture) + else: + asyncioFuture.add_done_callback(lambda f: forwardCompletion(asyncioFuture, future)) + future.add_done_callback(lambda f: loop.call_soon_threadsafe(forwardCompletion, future, asyncioFuture)) - future.add_done_callback(lambda f: loop.call_soon_threadsafe(callback)) - return af + return asyncioFuture diff --git a/python/python/Ice/__init__.py b/python/python/Ice/__init__.py index 1b64975c43b..05cd803f90b 100644 --- a/python/python/Ice/__init__.py +++ b/python/python/Ice/__init__.py @@ -427,8 +427,12 @@ Returns: else: result = coro.send(value) - # Calling 'await <future>' will return the future. Check if we've received a future. - if isinstance(result, Future) or callable(getattr(result, "add_done_callback", None)): + if result is None: + # The result can be None if the coroutine performs a bare yield (such as asyncio.sleep(0)) + cb.response(None) + elif isinstance(result, Future) or callable(getattr(result, "add_done_callback", None)): + # If we've received a future from the coroutine setup a done callback to continue the dispatching + # when the future completes. def handler(future): try: self._iceDispatchCoroutine(cb, coro, value=future.result()) diff --git a/python/test/Ice/asyncio/AllTests.py b/python/test/Ice/asyncio/AllTests.py new file mode 100644 index 00000000000..604d8b8aeb7 --- /dev/null +++ b/python/test/Ice/asyncio/AllTests.py @@ -0,0 +1,87 @@ +# +# Copyright (c) ZeroC, Inc. All rights reserved. +# + +import sys +import asyncio +import Ice +import Test + + +def test(b): + if not b: + raise RuntimeError('test assertion failed') + + +async def allTestsAsync(helper, communicator): + + sref = "test:{0}".format(helper.getTestEndpoint(num=0)) + obj = communicator.stringToProxy(sref) + test(obj) + + p = Test.TestIntfPrx.uncheckedCast(obj) + + sys.stdout.write("testing invocation... ") + sys.stdout.flush() + test(await Ice.wrap_future(p.opAsync()) == 5) + await Ice.wrap_future(p.sleepAsync(0)) + await Ice.wrap_future(p.sleepAsync(20)) + test(await Ice.wrap_future(p.callOpOnAsync(p)) == 5) + print("ok") + + sys.stdout.write("testing exceptions... ") + sys.stdout.flush() + try: + await Ice.wrap_future(p.throwUserException1()) + test(False) + except Test.TestException: + pass + try: + await Ice.wrap_future(p.throwUserException2()) + test(False) + except Test.TestException: + pass + try: + await Ice.wrap_future(p.throwUnhandledException1()) + test(False) + except Ice.UnknownException: + pass + try: + await Ice.wrap_future(p.throwUnhandledException2()) + test(False) + except Ice.UnknownException: + pass + print("ok") + + sys.stdout.write("testing cancellation... ") + sys.stdout.flush() + + future = p.sleepAsync(500) + asyncioFuture = Ice.wrap_future(future) + future.cancel() + try: + await asyncioFuture + test(False) + except asyncio.CancelledError: + test(future.cancelled() and asyncioFuture.cancelled()) + + future = p.sleepAsync(500) + asyncioFuture = Ice.wrap_future(future) + asyncioFuture.cancel() + try: + await asyncioFuture + test(False) + except asyncio.CancelledError: + # Wait a little to ensure the cancellation propagates to the Ice future + await asyncio.sleep(0.01) + test(future.cancelled() and asyncioFuture.cancelled()) + + # Try to cancel a done future + future = p.opAsync() + while not future.done(): + await asyncio.sleep(0.01) + Ice.wrap_future(future).cancel() + + print("ok") + + await Ice.wrap_future(p.shutdownAsync()) diff --git a/python/test/Ice/asyncio/Client.py b/python/test/Ice/asyncio/Client.py new file mode 100755 index 00000000000..b63e85eb41c --- /dev/null +++ b/python/test/Ice/asyncio/Client.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python +# +# Copyright (c) ZeroC, Inc. All rights reserved. +# + +import asyncio +from TestHelper import TestHelper +TestHelper.loadSlice("Test.ice") +import AllTests + + +class Client(TestHelper): + + def run(self, args): + + async def runAsync(): + with self.initialize(properties=self.createTestProperties(args)) as communicator: + await AllTests.allTestsAsync(self, communicator) + + asyncio.run(runAsync(), debug=True) diff --git a/python/test/Ice/asyncio/Server.py b/python/test/Ice/asyncio/Server.py new file mode 100755 index 00000000000..f6eff5e3e4b --- /dev/null +++ b/python/test/Ice/asyncio/Server.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python +# +# Copyright (c) ZeroC, Inc. All rights reserved. +# + +import Ice +import asyncio +from TestHelper import TestHelper +TestHelper.loadSlice("Test.ice") +import TestI + + +class Server(TestHelper): + + def run(self, args): + + async def runAsync(): + loop = asyncio.get_event_loop() + initData = Ice.InitializationData() + initData.properties = self.createTestProperties(args) + initData.properties.setProperty("Ice.Warn.Dispatch", "0") + initData.dispatcher = lambda method, connection: loop.call_soon_threadsafe(method) + with self.initialize(initData) as communicator: + communicator.getProperties().setProperty("TestAdapter.Endpoints", self.getTestEndpoint()) + adapter = communicator.createObjectAdapter("TestAdapter") + adapter.add(TestI.TestIntfI(), Ice.stringToIdentity("test")) + adapter.activate() + await loop.run_in_executor(None, communicator.waitForShutdown) + + asyncio.run(runAsync(), debug=True) diff --git a/python/test/Ice/asyncio/Test.ice b/python/test/Ice/asyncio/Test.ice new file mode 100644 index 00000000000..1a6f23ff21f --- /dev/null +++ b/python/test/Ice/asyncio/Test.ice @@ -0,0 +1,26 @@ +// +// Copyright (c) ZeroC, Inc. All rights reserved. +// + +#pragma once + +module Test +{ + +exception TestException +{ +} + +interface TestIntf +{ + int op(); + int callOpOn(TestIntf* proxy); + void throwUserException1() throws TestException; + void throwUserException2() throws TestException; + void throwUnhandledException1(); + void throwUnhandledException2(); + void sleep(int ms); + void shutdown(); +} + +} diff --git a/python/test/Ice/asyncio/TestI.py b/python/test/Ice/asyncio/TestI.py new file mode 100644 index 00000000000..4392b2216b1 --- /dev/null +++ b/python/test/Ice/asyncio/TestI.py @@ -0,0 +1,71 @@ +# +# Copyright (c) ZeroC, Inc. All rights reserved. +# + +import asyncio +import Ice +import Test + + +# The implementation of the coroutines below assume the use of an Ice dispatcher which +# dispatch the calls on an asyncio event loop + +class TestIntfI(Test.TestIntf): + + async def op(self, current): + # make sure this is called from an asyncio event loop + asyncio.get_running_loop() + + # ensure that returning the result from the coroutine without await works + return 5 + + async def throwUserException1(self, current): + # make sure this is called from an asyncio event loop + asyncio.get_running_loop() + + # ensure that raising the exception from the coroutine without await works + raise Test.TestException() + + async def throwUserException2(self, current): + # make sure this is called from an asyncio event loop + asyncio.get_running_loop() + + # ensure that raising the exception after from the coroutine after the await works + await asyncio.sleep(0.01) + raise Test.TestException() + + async def throwUnhandledException1(self, current): + # make sure this is called from an asyncio event loop + asyncio.get_running_loop() + + # ensure that raising an unhandled exception from the coroutine without await works + raise Exception("unexpected") + + async def throwUnhandledException2(self, current): + # make sure this is called from an asyncio event loop + asyncio.get_running_loop() + + # ensure that raising an unhandled exception from the coroutine after the await works + await asyncio.sleep(0.01) + raise Exception("unexpected") + + async def sleep(self, ms, current): + # make sure this is called from an asyncio event loop + asyncio.get_running_loop() + + # ensure that awaiting before returning the result works + await asyncio.sleep(ms / 1000.0) + + async def callOpOn(self, proxy, current): + # make sure this is called from an asyncio event loop + asyncio.get_running_loop() + + # ensure that awaiting proxy invocations works + await Ice.wrap_future(proxy.sleepAsync(10)) + return await Ice.wrap_future(proxy.opAsync()) + + def shutdown(self, current=None): + # make sure this is called from an asyncio event loop + asyncio.get_running_loop() + + current.adapter.getCommunicator().shutdown() diff --git a/python/test/Ice/asyncio/test.py b/python/test/Ice/asyncio/test.py new file mode 100644 index 00000000000..40ed163c546 --- /dev/null +++ b/python/test/Ice/asyncio/test.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) ZeroC, Inc. All rights reserved. +# + +# This test doesn't support running with IceSSL, the Router object in the client process uses +# the client certificate and fails with "unsupported certificate purpose" + +if sys.version_info >= (3, 5): + TestSuite(__name__) |