diff options
author | Benoit Foucher <benoit@zeroc.com> | 2017-06-08 11:45:13 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2017-06-08 11:45:13 +0200 |
commit | 60effe77cfe0bfebc4cae2457ec1fe7463267b2c (patch) | |
tree | afbef2b5bd3192a2f83e8f40bf98a56369da6e65 /python/modules | |
parent | Fixed Util.py to account for new android location (diff) | |
download | ice-60effe77cfe0bfebc4cae2457ec1fe7463267b2c.tar.bz2 ice-60effe77cfe0bfebc4cae2457ec1fe7463267b2c.tar.xz ice-60effe77cfe0bfebc4cae2457ec1fe7463267b2c.zip |
Fixed ICE-7974 - Python AMI batch oneway requests now return a completed future object
Diffstat (limited to 'python/modules')
-rw-r--r-- | python/modules/IcePy/Communicator.cpp | 4 | ||||
-rw-r--r-- | python/modules/IcePy/Connection.cpp | 8 | ||||
-rw-r--r-- | python/modules/IcePy/Operation.cpp | 185 | ||||
-rw-r--r-- | python/modules/IcePy/Proxy.cpp | 8 |
4 files changed, 99 insertions, 106 deletions
diff --git a/python/modules/IcePy/Communicator.cpp b/python/modules/IcePy/Communicator.cpp index 89cb5f8030d..17f1ca92ab9 100644 --- a/python/modules/IcePy/Communicator.cpp +++ b/python/modules/IcePy/Communicator.cpp @@ -840,8 +840,6 @@ communicatorFlushBatchRequestsAsync(CommunicatorObject* self, PyObject* args, Py try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = (*self->communicator)->begin_flushBatchRequests(cb, callback); } catch(const Ice::Exception& ex) @@ -924,8 +922,6 @@ communicatorBeginFlushBatchRequests(CommunicatorObject* self, PyObject* args, Py Ice::AsyncResultPtr result; try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - if(callback) { result = (*self->communicator)->begin_flushBatchRequests(cb, callback); diff --git a/python/modules/IcePy/Connection.cpp b/python/modules/IcePy/Connection.cpp index 7375ff71106..af48ede2e53 100644 --- a/python/modules/IcePy/Connection.cpp +++ b/python/modules/IcePy/Connection.cpp @@ -529,8 +529,6 @@ connectionFlushBatchRequestsAsync(ConnectionObject* self, PyObject* args, PyObje try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = (*self->connection)->begin_flushBatchRequests(cb, callback); } catch(const Ice::Exception& ex) @@ -615,8 +613,6 @@ connectionBeginFlushBatchRequests(ConnectionObject* self, PyObject* args, PyObje Ice::AsyncResultPtr result; try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - if(callback) { result = (*self->connection)->begin_flushBatchRequests(cb, callback); @@ -798,8 +794,6 @@ connectionBeginHeartbeat(ConnectionObject* self, PyObject* args, PyObject* kwds) Ice::AsyncResultPtr result; try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - if(cb) { result = (*self->connection)->begin_heartbeat(cb); @@ -907,7 +901,6 @@ connectionSetACM(ConnectionObject* self, PyObject* args) try { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking invocations. (*self->connection)->setACM(timeout, close, heartbeat); } catch(const Ice::Exception& ex) @@ -935,7 +928,6 @@ connectionGetACM(ConnectionObject* self) try { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking invocations. acm = (*self->connection)->getACM(); } catch(const Ice::Exception& ex) diff --git a/python/modules/IcePy/Operation.cpp b/python/modules/IcePy/Operation.cpp index d3e7493d2ce..8804f18d28c 100644 --- a/python/modules/IcePy/Operation.cpp +++ b/python/modules/IcePy/Operation.cpp @@ -2454,7 +2454,6 @@ IcePy::AsyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) return 0; } - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. if(cb) { result = _prx->begin_ice_invoke(_op->name, _op->sendMode, params, ctx, cb); @@ -2466,7 +2465,6 @@ IcePy::AsyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) } else { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. if(cb) { result = _prx->begin_ice_invoke(_op->name, _op->sendMode, params, cb); @@ -2740,28 +2738,19 @@ IcePy::NewAsyncInvocation::invoke(PyObject* args, PyObject* kwds) assert(result); - PyObjectHandle communicatorObj = getCommunicatorWrapper(_communicator); - PyObjectHandle asyncResultObj; - - // - // Pass the AsyncResult object to the future. Note that this creates a circular reference. - // Don't do this for batch invocations because there is no opportunity to break the circular - // reference. - // - if(!_prx->ice_isBatchOneway() && !_prx->ice_isBatchDatagram()) - { - asyncResultObj = createAsyncResult(result, _pyProxy, 0, communicatorObj.get()); - if(!asyncResultObj.get()) - { - return 0; - } - } - // // NOTE: Any time we call into interpreted Python code there's a chance that another thread will be // allowed to run! // + PyObjectHandle communicatorObj = getCommunicatorWrapper(_communicator); + + PyObjectHandle asyncResultObj = createAsyncResult(result, _pyProxy, 0, communicatorObj.get()); + if(!asyncResultObj.get()) + { + return 0; + } + PyObjectHandle future = createFuture(_operation, asyncResultObj.get()); // Calls into Python code. if(!future.get()) { @@ -2771,54 +2760,64 @@ IcePy::NewAsyncInvocation::invoke(PyObject* args, PyObject* kwds) // // Check if any callbacks have been invoked already. // - - if(_sent) + if(!_prx->ice_isBatchOneway() && !_prx->ice_isBatchDatagram()) { - PyObjectHandle tmp = callMethod(future.get(), "set_sent", _sentSynchronously ? getTrue() : getFalse()); - if(PyErr_Occurred()) - { - return 0; - } - - if(!_twoway) + if(_sent) { - // - // For a oneway/datagram invocation, we consider it complete when sent. - // - tmp = callMethod(future.get(), "set_result", Py_None); + PyObjectHandle tmp = callMethod(future.get(), "set_sent", _sentSynchronously ? getTrue() : getFalse()); if(PyErr_Occurred()) { return 0; } - } - } - if(_done) - { - if(_exception) - { - PyObjectHandle tmp = callMethod(future.get(), "set_exception", _exception); - if(PyErr_Occurred()) + if(!_twoway) { - return 0; + // + // For a oneway/datagram invocation, we consider it complete when sent. + // + tmp = callMethod(future.get(), "set_result", Py_None); + if(PyErr_Occurred()) + { + return 0; + } } } - else + + if(_done) { - // - // Delegate to the subclass. - // - pair<const Ice::Byte*, const Ice::Byte*> p(&_results[0], &_results[0] + _results.size()); - handleResponse(future.get(), _ok, p); - if(PyErr_Occurred()) + if(_exception) { - return 0; + PyObjectHandle tmp = callMethod(future.get(), "set_exception", _exception); + if(PyErr_Occurred()) + { + return 0; + } + } + else + { + // + // Delegate to the subclass. + // + pair<const Ice::Byte*, const Ice::Byte*> p(&_results[0], &_results[0] + _results.size()); + handleResponse(future.get(), _ok, p); + if(PyErr_Occurred()) + { + return 0; + } } } + _future = future.release(); + return incRef(_future); + } + else + { + PyObjectHandle tmp = callMethod(future.get(), "set_result", Py_None); + if(PyErr_Occurred()) + { + return 0; + } + return future.release(); } - - _future = future.release(); - return incRef(_future); } void @@ -2979,9 +2978,14 @@ IcePy::NewAsyncTypedInvocation::handleInvoke(PyObject* args, PyObject* /* kwds * checkTwowayOnly(_op, _prx); NewAsyncInvocationPtr self = this; - Ice::Callback_Object_ice_invokePtr cb = - Ice::newCallback_Object_ice_invoke(self, &NewAsyncInvocation::response, &NewAsyncInvocation::exception, - &NewAsyncInvocation::sent); + Ice::Callback_Object_ice_invokePtr cb; + if(!_prx->ice_isBatchOneway() && !_prx->ice_isBatchDatagram()) + { + cb = Ice::newCallback_Object_ice_invoke(self, + &NewAsyncInvocation::response, + &NewAsyncInvocation::exception, + &NewAsyncInvocation::sent); + } // // Invoke the operation asynchronously. @@ -3001,13 +3005,25 @@ IcePy::NewAsyncTypedInvocation::handleInvoke(PyObject* args, PyObject* /* kwds * return 0; } - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - return _prx->begin_ice_invoke(_op->name, _op->sendMode, params, ctx, cb); + if(cb) + { + return _prx->begin_ice_invoke(_op->name, _op->sendMode, params, ctx, cb); + } + else + { + return _prx->begin_ice_invoke(_op->name, _op->sendMode, params, ctx); + } } else { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - return _prx->begin_ice_invoke(_op->name, _op->sendMode, params, cb); + if(cb) + { + return _prx->begin_ice_invoke(_op->name, _op->sendMode, params, cb); + } + else + { + return _prx->begin_ice_invoke(_op->name, _op->sendMode, params); + } } } @@ -3359,7 +3375,6 @@ IcePy::AsyncBlobjectInvocation::invoke(PyObject* args, PyObject* kwds) if(pyctx == Py_None) { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. if(cb) { result = _prx->begin_ice_invoke(operation, sendMode, in, cb); @@ -3377,7 +3392,6 @@ IcePy::AsyncBlobjectInvocation::invoke(PyObject* args, PyObject* kwds) return 0; } - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. if(cb) { result = _prx->begin_ice_invoke(operation, sendMode, in, context, cb); @@ -3648,18 +3662,25 @@ IcePy::NewAsyncBlobjectInvocation::handleInvoke(PyObject* args, PyObject* /* kwd #endif NewAsyncInvocationPtr self = this; - Ice::Callback_Object_ice_invokePtr cb = - Ice::newCallback_Object_ice_invoke(self, &NewAsyncInvocation::response, &NewAsyncInvocation::exception, - &NewAsyncInvocation::sent); + Ice::Callback_Object_ice_invokePtr cb; + if(!_prx->ice_isBatchOneway() && !_prx->ice_isBatchDatagram()) + { + cb = Ice::newCallback_Object_ice_invoke(self, + &NewAsyncInvocation::response, + &NewAsyncInvocation::exception, + &NewAsyncInvocation::sent); + } if(ctx == 0 || ctx == Py_None) { - // - // Don't release the GIL here. We want other threads to block until we've had a chance - // to create the future. - // - //AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - return _prx->begin_ice_invoke(operation, sendMode, in, cb); + if(cb) + { + return _prx->begin_ice_invoke(operation, sendMode, in, cb); + } + else + { + return _prx->begin_ice_invoke(operation, sendMode, in); + } } else { @@ -3669,12 +3690,14 @@ IcePy::NewAsyncBlobjectInvocation::handleInvoke(PyObject* args, PyObject* /* kwd return 0; } - // - // Don't release the GIL here. We want other threads to block until we've had a chance - // to create the future. - // - //AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - return _prx->begin_ice_invoke(operation, sendMode, in, context, cb); + if(cb) + { + return _prx->begin_ice_invoke(operation, sendMode, in, context, cb); + } + else + { + return _prx->begin_ice_invoke(operation, sendMode, in, context); + } } } @@ -3921,7 +3944,6 @@ IcePy::TypedUpcall::response(PyObject* result) if(PyObject_IsInstance(result, reinterpret_cast<PyObject*>(&MarshaledResultType))) { MarshaledResultObject* mro = reinterpret_cast<MarshaledResultObject*>(result); - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. _callback->ice_response(true, mro->out->finished()); } else @@ -3935,7 +3957,6 @@ IcePy::TypedUpcall::response(PyObject* result) os.endEncapsulation(); - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. _callback->ice_response(true, os.finished()); } catch(const AbortMarshaling&) @@ -3946,7 +3967,6 @@ IcePy::TypedUpcall::response(PyObject* result) } catch(const Ice::Exception& ex) { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. _callback->ice_exception(ex); } } @@ -3954,7 +3974,6 @@ IcePy::TypedUpcall::response(PyObject* result) } catch(const Ice::Exception& ex) { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. _callback->ice_exception(ex); } } @@ -3994,7 +4013,6 @@ IcePy::TypedUpcall::exception(PyException& ex) os.endEncapsulation(); - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. _callback->ice_response(false, os.finished()); } else @@ -4016,7 +4034,6 @@ IcePy::TypedUpcall::exception(PyException& ex) void IcePy::TypedUpcall::exception(const Ice::Exception& ex) { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. _callback->ice_exception(ex); } @@ -4141,7 +4158,6 @@ IcePy::BlobjectUpcall::response(PyObject* result) const pair<const ::Ice::Byte*, const ::Ice::Byte*> r(mem, mem + sz); #endif - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. _callback->ice_response(isTrue, r); } catch(const AbortMarshaling&) @@ -4185,7 +4201,6 @@ IcePy::BlobjectUpcall::exception(PyException& ex) void IcePy::BlobjectUpcall::exception(const Ice::Exception& ex) { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. _callback->ice_exception(ex); } @@ -4690,7 +4705,6 @@ IcePy::TypedServantWrapper::ice_invoke_async(const Ice::AMD_Object_ice_invokePtr } catch(const Ice::Exception& ex) { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. cb->ice_exception(ex); } } @@ -4717,7 +4731,6 @@ IcePy::BlobjectServantWrapper::ice_invoke_async(const Ice::AMD_Object_ice_invoke } catch(const Ice::Exception& ex) { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. cb->ice_exception(ex); } } diff --git a/python/modules/IcePy/Proxy.cpp b/python/modules/IcePy/Proxy.cpp index be082198227..86a4a828eb1 100644 --- a/python/modules/IcePy/Proxy.cpp +++ b/python/modules/IcePy/Proxy.cpp @@ -1821,8 +1821,6 @@ proxyIceGetConnectionAsync(ProxyObject* self, PyObject* /*args*/, PyObject* /*kw try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = (*self->proxy)->begin_ice_getConnection(cb); } catch(const Ice::Exception& ex) @@ -1897,8 +1895,6 @@ proxyBeginIceGetConnection(ProxyObject* self, PyObject* args, PyObject* kwds) Ice::AsyncResultPtr result; try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - if(cb) { result = (*self->proxy)->begin_ice_getConnection(cb); @@ -2026,8 +2022,6 @@ proxyIceFlushBatchRequestsAsync(ProxyObject* self, PyObject* /*args*/, PyObject* try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = (*self->proxy)->begin_ice_flushBatchRequests(cb); } catch(const Ice::Exception& ex) @@ -2100,8 +2094,6 @@ proxyBeginIceFlushBatchRequests(ProxyObject* self, PyObject* args, PyObject* kwd Ice::AsyncResultPtr result; try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - if(cb) { result = (*self->proxy)->begin_ice_flushBatchRequests(cb); |