diff options
-rw-r--r-- | csharp/test/Ice/ami/AllTests.cs | 4 | ||||
-rw-r--r-- | python/modules/IcePy/Communicator.cpp | 4 | ||||
-rw-r--r-- | python/modules/IcePy/Connection.cpp | 8 | ||||
-rw-r--r-- | python/modules/IcePy/Operation.cpp | 185 | ||||
-rw-r--r-- | python/modules/IcePy/Proxy.cpp | 8 | ||||
-rw-r--r-- | python/python/Ice/__init__.py | 21 | ||||
-rw-r--r-- | python/test/Ice/operations/BatchOnewaysFuture.py | 5 |
7 files changed, 112 insertions, 123 deletions
diff --git a/csharp/test/Ice/ami/AllTests.cs b/csharp/test/Ice/ami/AllTests.cs index 40422445f5d..9b6463939db 100644 --- a/csharp/test/Ice/ami/AllTests.cs +++ b/csharp/test/Ice/ami/AllTests.cs @@ -2102,7 +2102,9 @@ public class AllTests : TestCommon.AllTests // test(p.opBatchCount() == 0); TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); - b1.opBatch(); + bf = b1.opBatchAsync(); + test(bf.done()) + test(!bf.is_sent()) b1.opBatch(); FlushCallback cb = new FlushCallback(cookie); Ice.AsyncResult r = b1.begin_ice_flushBatchRequests(cb.completedAsync, cookie); diff --git a/python/modules/IcePy/Communicator.cpp b/python/modules/IcePy/Communicator.cpp index 89cb5f8030d..17f1ca92ab9 100644 --- a/python/modules/IcePy/Communicator.cpp +++ b/python/modules/IcePy/Communicator.cpp @@ -840,8 +840,6 @@ communicatorFlushBatchRequestsAsync(CommunicatorObject* self, PyObject* args, Py try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = (*self->communicator)->begin_flushBatchRequests(cb, callback); } catch(const Ice::Exception& ex) @@ -924,8 +922,6 @@ communicatorBeginFlushBatchRequests(CommunicatorObject* self, PyObject* args, Py Ice::AsyncResultPtr result; try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - if(callback) { result = (*self->communicator)->begin_flushBatchRequests(cb, callback); diff --git a/python/modules/IcePy/Connection.cpp b/python/modules/IcePy/Connection.cpp index 7375ff71106..af48ede2e53 100644 --- a/python/modules/IcePy/Connection.cpp +++ b/python/modules/IcePy/Connection.cpp @@ -529,8 +529,6 @@ connectionFlushBatchRequestsAsync(ConnectionObject* self, PyObject* args, PyObje try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = (*self->connection)->begin_flushBatchRequests(cb, callback); } catch(const Ice::Exception& ex) @@ -615,8 +613,6 @@ connectionBeginFlushBatchRequests(ConnectionObject* self, PyObject* args, PyObje Ice::AsyncResultPtr result; try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - if(callback) { result = (*self->connection)->begin_flushBatchRequests(cb, callback); @@ -798,8 +794,6 @@ connectionBeginHeartbeat(ConnectionObject* self, PyObject* args, PyObject* kwds) Ice::AsyncResultPtr result; try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - if(cb) { result = (*self->connection)->begin_heartbeat(cb); @@ -907,7 +901,6 @@ connectionSetACM(ConnectionObject* self, PyObject* args) try { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking invocations. (*self->connection)->setACM(timeout, close, heartbeat); } catch(const Ice::Exception& ex) @@ -935,7 +928,6 @@ connectionGetACM(ConnectionObject* self) try { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking invocations. acm = (*self->connection)->getACM(); } catch(const Ice::Exception& ex) diff --git a/python/modules/IcePy/Operation.cpp b/python/modules/IcePy/Operation.cpp index d3e7493d2ce..8804f18d28c 100644 --- a/python/modules/IcePy/Operation.cpp +++ b/python/modules/IcePy/Operation.cpp @@ -2454,7 +2454,6 @@ IcePy::AsyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) return 0; } - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. if(cb) { result = _prx->begin_ice_invoke(_op->name, _op->sendMode, params, ctx, cb); @@ -2466,7 +2465,6 @@ IcePy::AsyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) } else { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. if(cb) { result = _prx->begin_ice_invoke(_op->name, _op->sendMode, params, cb); @@ -2740,28 +2738,19 @@ IcePy::NewAsyncInvocation::invoke(PyObject* args, PyObject* kwds) assert(result); - PyObjectHandle communicatorObj = getCommunicatorWrapper(_communicator); - PyObjectHandle asyncResultObj; - - // - // Pass the AsyncResult object to the future. Note that this creates a circular reference. - // Don't do this for batch invocations because there is no opportunity to break the circular - // reference. - // - if(!_prx->ice_isBatchOneway() && !_prx->ice_isBatchDatagram()) - { - asyncResultObj = createAsyncResult(result, _pyProxy, 0, communicatorObj.get()); - if(!asyncResultObj.get()) - { - return 0; - } - } - // // NOTE: Any time we call into interpreted Python code there's a chance that another thread will be // allowed to run! // + PyObjectHandle communicatorObj = getCommunicatorWrapper(_communicator); + + PyObjectHandle asyncResultObj = createAsyncResult(result, _pyProxy, 0, communicatorObj.get()); + if(!asyncResultObj.get()) + { + return 0; + } + PyObjectHandle future = createFuture(_operation, asyncResultObj.get()); // Calls into Python code. if(!future.get()) { @@ -2771,54 +2760,64 @@ IcePy::NewAsyncInvocation::invoke(PyObject* args, PyObject* kwds) // // Check if any callbacks have been invoked already. // - - if(_sent) + if(!_prx->ice_isBatchOneway() && !_prx->ice_isBatchDatagram()) { - PyObjectHandle tmp = callMethod(future.get(), "set_sent", _sentSynchronously ? getTrue() : getFalse()); - if(PyErr_Occurred()) - { - return 0; - } - - if(!_twoway) + if(_sent) { - // - // For a oneway/datagram invocation, we consider it complete when sent. - // - tmp = callMethod(future.get(), "set_result", Py_None); + PyObjectHandle tmp = callMethod(future.get(), "set_sent", _sentSynchronously ? getTrue() : getFalse()); if(PyErr_Occurred()) { return 0; } - } - } - if(_done) - { - if(_exception) - { - PyObjectHandle tmp = callMethod(future.get(), "set_exception", _exception); - if(PyErr_Occurred()) + if(!_twoway) { - return 0; + // + // For a oneway/datagram invocation, we consider it complete when sent. + // + tmp = callMethod(future.get(), "set_result", Py_None); + if(PyErr_Occurred()) + { + return 0; + } } } - else + + if(_done) { - // - // Delegate to the subclass. - // - pair<const Ice::Byte*, const Ice::Byte*> p(&_results[0], &_results[0] + _results.size()); - handleResponse(future.get(), _ok, p); - if(PyErr_Occurred()) + if(_exception) { - return 0; + PyObjectHandle tmp = callMethod(future.get(), "set_exception", _exception); + if(PyErr_Occurred()) + { + return 0; + } + } + else + { + // + // Delegate to the subclass. + // + pair<const Ice::Byte*, const Ice::Byte*> p(&_results[0], &_results[0] + _results.size()); + handleResponse(future.get(), _ok, p); + if(PyErr_Occurred()) + { + return 0; + } } } + _future = future.release(); + return incRef(_future); + } + else + { + PyObjectHandle tmp = callMethod(future.get(), "set_result", Py_None); + if(PyErr_Occurred()) + { + return 0; + } + return future.release(); } - - _future = future.release(); - return incRef(_future); } void @@ -2979,9 +2978,14 @@ IcePy::NewAsyncTypedInvocation::handleInvoke(PyObject* args, PyObject* /* kwds * checkTwowayOnly(_op, _prx); NewAsyncInvocationPtr self = this; - Ice::Callback_Object_ice_invokePtr cb = - Ice::newCallback_Object_ice_invoke(self, &NewAsyncInvocation::response, &NewAsyncInvocation::exception, - &NewAsyncInvocation::sent); + Ice::Callback_Object_ice_invokePtr cb; + if(!_prx->ice_isBatchOneway() && !_prx->ice_isBatchDatagram()) + { + cb = Ice::newCallback_Object_ice_invoke(self, + &NewAsyncInvocation::response, + &NewAsyncInvocation::exception, + &NewAsyncInvocation::sent); + } // // Invoke the operation asynchronously. @@ -3001,13 +3005,25 @@ IcePy::NewAsyncTypedInvocation::handleInvoke(PyObject* args, PyObject* /* kwds * return 0; } - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - return _prx->begin_ice_invoke(_op->name, _op->sendMode, params, ctx, cb); + if(cb) + { + return _prx->begin_ice_invoke(_op->name, _op->sendMode, params, ctx, cb); + } + else + { + return _prx->begin_ice_invoke(_op->name, _op->sendMode, params, ctx); + } } else { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - return _prx->begin_ice_invoke(_op->name, _op->sendMode, params, cb); + if(cb) + { + return _prx->begin_ice_invoke(_op->name, _op->sendMode, params, cb); + } + else + { + return _prx->begin_ice_invoke(_op->name, _op->sendMode, params); + } } } @@ -3359,7 +3375,6 @@ IcePy::AsyncBlobjectInvocation::invoke(PyObject* args, PyObject* kwds) if(pyctx == Py_None) { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. if(cb) { result = _prx->begin_ice_invoke(operation, sendMode, in, cb); @@ -3377,7 +3392,6 @@ IcePy::AsyncBlobjectInvocation::invoke(PyObject* args, PyObject* kwds) return 0; } - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. if(cb) { result = _prx->begin_ice_invoke(operation, sendMode, in, context, cb); @@ -3648,18 +3662,25 @@ IcePy::NewAsyncBlobjectInvocation::handleInvoke(PyObject* args, PyObject* /* kwd #endif NewAsyncInvocationPtr self = this; - Ice::Callback_Object_ice_invokePtr cb = - Ice::newCallback_Object_ice_invoke(self, &NewAsyncInvocation::response, &NewAsyncInvocation::exception, - &NewAsyncInvocation::sent); + Ice::Callback_Object_ice_invokePtr cb; + if(!_prx->ice_isBatchOneway() && !_prx->ice_isBatchDatagram()) + { + cb = Ice::newCallback_Object_ice_invoke(self, + &NewAsyncInvocation::response, + &NewAsyncInvocation::exception, + &NewAsyncInvocation::sent); + } if(ctx == 0 || ctx == Py_None) { - // - // Don't release the GIL here. We want other threads to block until we've had a chance - // to create the future. - // - //AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - return _prx->begin_ice_invoke(operation, sendMode, in, cb); + if(cb) + { + return _prx->begin_ice_invoke(operation, sendMode, in, cb); + } + else + { + return _prx->begin_ice_invoke(operation, sendMode, in); + } } else { @@ -3669,12 +3690,14 @@ IcePy::NewAsyncBlobjectInvocation::handleInvoke(PyObject* args, PyObject* /* kwd return 0; } - // - // Don't release the GIL here. We want other threads to block until we've had a chance - // to create the future. - // - //AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - return _prx->begin_ice_invoke(operation, sendMode, in, context, cb); + if(cb) + { + return _prx->begin_ice_invoke(operation, sendMode, in, context, cb); + } + else + { + return _prx->begin_ice_invoke(operation, sendMode, in, context); + } } } @@ -3921,7 +3944,6 @@ IcePy::TypedUpcall::response(PyObject* result) if(PyObject_IsInstance(result, reinterpret_cast<PyObject*>(&MarshaledResultType))) { MarshaledResultObject* mro = reinterpret_cast<MarshaledResultObject*>(result); - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. _callback->ice_response(true, mro->out->finished()); } else @@ -3935,7 +3957,6 @@ IcePy::TypedUpcall::response(PyObject* result) os.endEncapsulation(); - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. _callback->ice_response(true, os.finished()); } catch(const AbortMarshaling&) @@ -3946,7 +3967,6 @@ IcePy::TypedUpcall::response(PyObject* result) } catch(const Ice::Exception& ex) { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. _callback->ice_exception(ex); } } @@ -3954,7 +3974,6 @@ IcePy::TypedUpcall::response(PyObject* result) } catch(const Ice::Exception& ex) { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. _callback->ice_exception(ex); } } @@ -3994,7 +4013,6 @@ IcePy::TypedUpcall::exception(PyException& ex) os.endEncapsulation(); - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. _callback->ice_response(false, os.finished()); } else @@ -4016,7 +4034,6 @@ IcePy::TypedUpcall::exception(PyException& ex) void IcePy::TypedUpcall::exception(const Ice::Exception& ex) { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. _callback->ice_exception(ex); } @@ -4141,7 +4158,6 @@ IcePy::BlobjectUpcall::response(PyObject* result) const pair<const ::Ice::Byte*, const ::Ice::Byte*> r(mem, mem + sz); #endif - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. _callback->ice_response(isTrue, r); } catch(const AbortMarshaling&) @@ -4185,7 +4201,6 @@ IcePy::BlobjectUpcall::exception(PyException& ex) void IcePy::BlobjectUpcall::exception(const Ice::Exception& ex) { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. _callback->ice_exception(ex); } @@ -4690,7 +4705,6 @@ IcePy::TypedServantWrapper::ice_invoke_async(const Ice::AMD_Object_ice_invokePtr } catch(const Ice::Exception& ex) { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. cb->ice_exception(ex); } } @@ -4717,7 +4731,6 @@ IcePy::BlobjectServantWrapper::ice_invoke_async(const Ice::AMD_Object_ice_invoke } catch(const Ice::Exception& ex) { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. cb->ice_exception(ex); } } diff --git a/python/modules/IcePy/Proxy.cpp b/python/modules/IcePy/Proxy.cpp index be082198227..86a4a828eb1 100644 --- a/python/modules/IcePy/Proxy.cpp +++ b/python/modules/IcePy/Proxy.cpp @@ -1821,8 +1821,6 @@ proxyIceGetConnectionAsync(ProxyObject* self, PyObject* /*args*/, PyObject* /*kw try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = (*self->proxy)->begin_ice_getConnection(cb); } catch(const Ice::Exception& ex) @@ -1897,8 +1895,6 @@ proxyBeginIceGetConnection(ProxyObject* self, PyObject* args, PyObject* kwds) Ice::AsyncResultPtr result; try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - if(cb) { result = (*self->proxy)->begin_ice_getConnection(cb); @@ -2026,8 +2022,6 @@ proxyIceFlushBatchRequestsAsync(ProxyObject* self, PyObject* /*args*/, PyObject* try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = (*self->proxy)->begin_ice_flushBatchRequests(cb); } catch(const Ice::Exception& ex) @@ -2100,8 +2094,6 @@ proxyBeginIceFlushBatchRequests(ProxyObject* self, PyObject* args, PyObject* kwd Ice::AsyncResultPtr result; try { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - if(cb) { result = (*self->proxy)->begin_ice_flushBatchRequests(cb); diff --git a/python/python/Ice/__init__.py b/python/python/Ice/__init__.py index 500d2e5aa27..333cede8aa5 100644 --- a/python/python/Ice/__init__.py +++ b/python/python/Ice/__init__.py @@ -211,6 +211,7 @@ class Future(FutureBase): class InvocationFuture(Future): def __init__(self, operation, asyncResult): Future.__init__(self) + assert(asyncResult) self._operation = operation self._asyncResult = asyncResult # May be None for a batch invocation. self._sent = False @@ -218,8 +219,7 @@ class InvocationFuture(Future): self._sentCallbacks = [] def cancel(self): - if self._asyncResult: - self._asyncResult.cancel() + self._asyncResult.cancel() return Future.cancel(self) def add_done_callback_async(self, fn): @@ -233,10 +233,7 @@ class InvocationFuture(Future): if self._state == Future.StateRunning: self._doneCallbacks.append(fn) return - if self._asyncResult: - self._asyncResult.callLater(callback) - else: - fn(self) + self._asyncResult.callLater(callback) def is_sent(self): with self._condition: @@ -264,11 +261,7 @@ class InvocationFuture(Future): if not self._sent: self._sentCallbacks.append(fn) return - - if self._asyncResult: - self._asyncResult.callLater(callback) - else: - fn(self, self._sentSynchronously) + self._asyncResult.callLater(callback) def sent(self, timeout=None): with self._condition: @@ -303,13 +296,13 @@ class InvocationFuture(Future): return self._operation def proxy(self): - return None if not self._asyncResult else self._asyncResult.getProxy() + return self._asyncResult.getProxy() def connection(self): - return None if not self._asyncResult else self._asyncResult.getConnection() + return self._asyncResult.getConnection() def communicator(self): - return None if not self._asyncResult else self._asyncResult.getCommunicator() + return self._asyncResult.getCommunicator() # # This value is used as the default value for struct types in the constructors diff --git a/python/test/Ice/operations/BatchOnewaysFuture.py b/python/test/Ice/operations/BatchOnewaysFuture.py index ebb05f76d41..dee7456f7eb 100644 --- a/python/test/Ice/operations/BatchOnewaysFuture.py +++ b/python/test/Ice/operations/BatchOnewaysFuture.py @@ -70,10 +70,11 @@ def batchOneways(p): batch1.ice_getConnection() batch2.ice_getConnection() + batch1.ice_pingAsync() batch1.ice_getConnection().close(Ice.ConnectionClose.GracefullyWithWait) - test(not batch1.ice_pingAsync().done()) - test(not batch2.ice_pingAsync().done()) + test(batch1.ice_pingAsync().done() and not batch1.ice_pingAsync().exception()) + test(batch2.ice_pingAsync().done() and not batch1.ice_pingAsync().exception()) identity = Ice.Identity() identity.name = "invalid"; |