diff options
Diffstat (limited to 'python/modules/IcePy/Operation.cpp')
-rw-r--r-- | python/modules/IcePy/Operation.cpp | 2306 |
1 files changed, 1385 insertions, 921 deletions
diff --git a/python/modules/IcePy/Operation.cpp b/python/modules/IcePy/Operation.cpp index 0eef27793d0..5db4ab9e41c 100644 --- a/python/modules/IcePy/Operation.cpp +++ b/python/modules/IcePy/Operation.cpp @@ -27,6 +27,7 @@ #include <Ice/AsyncResult.h> #include <Ice/Properties.h> #include <Ice/Proxy.h> +#include <IceUtil/Time.h> #include <Slice/PythonUtil.h> using namespace std; @@ -104,50 +105,44 @@ public: protected: - Ice::ObjectPrx _prx; -}; -typedef IceUtil::Handle<Invocation> InvocationPtr; - -// -// TypedInvocation uses the information in the given Operation to validate, marshal, and unmarshal -// parameters and exceptions. -// -class TypedInvocation : virtual public Invocation -{ -public: + // + // Helpers for typed invocations. + // - TypedInvocation(const Ice::ObjectPrx&, const OperationPtr&); + enum MappingType { SyncMapping, AsyncMapping, NewAsyncMapping }; -protected: + bool prepareRequest(const OperationPtr&, PyObject*, MappingType, Ice::OutputStream*, + pair<const Ice::Byte*, const Ice::Byte*>&); + PyObject* unmarshalResults(const OperationPtr&, const pair<const Ice::Byte*, const Ice::Byte*>&); + PyObject* unmarshalException(const OperationPtr&, const pair<const Ice::Byte*, const Ice::Byte*>&); + bool validateException(const OperationPtr&, PyObject*) const; + void checkTwowayOnly(const OperationPtr&, const Ice::ObjectPrx&) const; - OperationPtr _op; + Ice::ObjectPrx _prx; Ice::CommunicatorPtr _communicator; - - enum MappingType { SyncMapping, AsyncMapping, OldAsyncMapping }; - - bool prepareRequest(PyObject*, MappingType, Ice::OutputStreamPtr&, pair<const Ice::Byte*, const Ice::Byte*>&); - PyObject* unmarshalResults(const pair<const Ice::Byte*, const Ice::Byte*>&); - PyObject* unmarshalException(const pair<const Ice::Byte*, const Ice::Byte*>&); - bool validateException(PyObject*) const; - void checkTwowayOnly(const Ice::ObjectPrx&) const; }; +typedef IceUtil::Handle<Invocation> InvocationPtr; // // Synchronous typed invocation. // -class SyncTypedInvocation : virtual public TypedInvocation +class SyncTypedInvocation : public Invocation { public: SyncTypedInvocation(const Ice::ObjectPrx&, const OperationPtr&); virtual PyObject* invoke(PyObject*, PyObject* = 0); + +private: + + OperationPtr _op; }; // // Asynchronous typed invocation. // -class AsyncTypedInvocation : virtual public TypedInvocation +class AsyncTypedInvocation : public Invocation { public: @@ -163,10 +158,11 @@ public: void exception(const Ice::Exception&); void sent(bool); -protected: +private: void checkAsyncTwowayOnly(const Ice::ObjectPrx&) const; + OperationPtr _op; PyObject* _pyProxy; PyObject* _response; PyObject* _ex; @@ -175,14 +171,14 @@ protected: typedef IceUtil::Handle<AsyncTypedInvocation> AsyncTypedInvocationPtr; // -// Old-style asynchronous typed invocation. +// Asynchronous invocation with futures. // -class OldAsyncTypedInvocation : virtual public TypedInvocation +class NewAsyncInvocation : public Invocation { public: - OldAsyncTypedInvocation(const Ice::ObjectPrx&, const OperationPtr&); - ~OldAsyncTypedInvocation(); + NewAsyncInvocation(const Ice::ObjectPrx&, PyObject*, const string&); + ~NewAsyncInvocation(); virtual PyObject* invoke(PyObject*, PyObject* = 0); @@ -192,13 +188,47 @@ public: protected: - PyObject* _callback; + virtual Ice::AsyncResultPtr handleInvoke(PyObject*, PyObject*) = 0; + virtual void handleResponse(PyObject*, bool, const pair<const Ice::Byte*, const Ice::Byte*>&) = 0; + + PyObject* _pyProxy; + string _operation; + bool _twoway; + bool _sent; + bool _sentSynchronously; + bool _done; + PyObject* _future; + bool _ok; + vector<Ice::Byte> _results; + PyObject* _exception; +}; +typedef IceUtil::Handle<NewAsyncInvocation> NewAsyncInvocationPtr; + +// +// New-style asynchronous typed invocation. +// +class NewAsyncTypedInvocation : public NewAsyncInvocation +{ +public: + + NewAsyncTypedInvocation(const Ice::ObjectPrx&, PyObject*, const OperationPtr&); + +protected: + + virtual Ice::AsyncResultPtr handleInvoke(PyObject*, PyObject*); + virtual void handleResponse(PyObject*, bool, const pair<const Ice::Byte*, const Ice::Byte*>&); + +private: + + void checkAsyncTwowayOnly(const Ice::ObjectPrx&) const; + + OperationPtr _op; }; // // Synchronous blobject invocation. // -class SyncBlobjectInvocation : virtual public Invocation +class SyncBlobjectInvocation : public Invocation { public: @@ -210,7 +240,7 @@ public: // // Asynchronous blobject invocation. // -class AsyncBlobjectInvocation : virtual public Invocation +class AsyncBlobjectInvocation : public Invocation { public: @@ -235,82 +265,83 @@ protected: typedef IceUtil::Handle<AsyncBlobjectInvocation> AsyncBlobjectInvocationPtr; // -// Old-style asynchronous blobject invocation. +// New-style asynchronous blobject invocation. // -class OldAsyncBlobjectInvocation : virtual public Invocation +class NewAsyncBlobjectInvocation : public NewAsyncInvocation { public: - OldAsyncBlobjectInvocation(const Ice::ObjectPrx&); - ~OldAsyncBlobjectInvocation(); - - virtual PyObject* invoke(PyObject*, PyObject* = 0); - - void response(bool, const pair<const Ice::Byte*, const Ice::Byte*>&); - void exception(const Ice::Exception&); - void sent(bool); + NewAsyncBlobjectInvocation(const Ice::ObjectPrx&, PyObject*); protected: + virtual Ice::AsyncResultPtr handleInvoke(PyObject*, PyObject*); + virtual void handleResponse(PyObject*, bool, const pair<const Ice::Byte*, const Ice::Byte*>&); + string _op; - PyObject* _callback; }; // // The base class for server-side upcalls. // -class Upcall : virtual public IceUtil::Shared +class Upcall : public IceUtil::Shared { public: virtual void dispatch(PyObject*, const pair<const Ice::Byte*, const Ice::Byte*>&, const Ice::Current&) = 0; - virtual void response(PyObject*, const Ice::EncodingVersion&) = 0; - virtual void exception(PyException&, const Ice::EncodingVersion&) = 0; + virtual void response(PyObject*) = 0; + virtual void exception(PyException&) = 0; + virtual void exception(const Ice::Exception&) = 0; + +protected: + + void dispatchImpl(PyObject*, const string&, PyObject*, const Ice::Current&); }; typedef IceUtil::Handle<Upcall> UpcallPtr; // -// TypedInvocation uses the information in the given Operation to validate, marshal, and unmarshal +// TypedUpcall uses the information in the given Operation to validate, marshal, and unmarshal // parameters and exceptions. // -class TypedUpcall : virtual public Upcall +class TypedUpcall; +typedef IceUtil::Handle<TypedUpcall> TypedUpcallPtr; + +class TypedUpcall : public Upcall { public: TypedUpcall(const OperationPtr&, const Ice::AMD_Object_ice_invokePtr&, const Ice::CommunicatorPtr&); virtual void dispatch(PyObject*, const pair<const Ice::Byte*, const Ice::Byte*>&, const Ice::Current&); - virtual void response(PyObject*, const Ice::EncodingVersion&); - virtual void exception(PyException&, const Ice::EncodingVersion&); + virtual void response(PyObject*); + virtual void exception(PyException&); + virtual void exception(const Ice::Exception&); private: - bool validateException(PyObject*) const; - OperationPtr _op; Ice::AMD_Object_ice_invokePtr _callback; Ice::CommunicatorPtr _communicator; - bool _finished; + Ice::EncodingVersion _encoding; }; // // Upcall for blobject servants. // -class BlobjectUpcall : virtual public Upcall +class BlobjectUpcall : public Upcall { public: - BlobjectUpcall(bool, const Ice::AMD_Object_ice_invokePtr&); + BlobjectUpcall(const Ice::AMD_Object_ice_invokePtr&); virtual void dispatch(PyObject*, const pair<const Ice::Byte*, const Ice::Byte*>&, const Ice::Current&); - virtual void response(PyObject*, const Ice::EncodingVersion&); - virtual void exception(PyException&, const Ice::EncodingVersion&); + virtual void response(PyObject*); + virtual void exception(PyException&); + virtual void exception(const Ice::Exception&); private: - bool _amd; Ice::AMD_Object_ice_invokePtr _callback; - bool _finished; }; // @@ -341,15 +372,11 @@ class BlobjectServantWrapper : public ServantWrapper { public: - BlobjectServantWrapper(PyObject*, bool); + BlobjectServantWrapper(PyObject*); virtual void ice_invoke_async(const Ice::AMD_Object_ice_invokePtr&, const pair<const Ice::Byte*, const Ice::Byte*>&, const Ice::Current&); - -private: - - bool _amd; }; struct OperationObject @@ -358,11 +385,19 @@ struct OperationObject OperationPtr* op; }; -struct AMDCallbackObject +struct DoneCallbackObject +{ + PyObject_HEAD + UpcallPtr* upcall; +#if PY_VERSION_HEX >= 0x03050000 + PyObject* coroutine; +#endif +}; + +struct DispatchCallbackObject { PyObject_HEAD UpcallPtr* upcall; - Ice::EncodingVersion encoding; }; struct AsyncResultObject @@ -376,29 +411,19 @@ struct AsyncResultObject }; extern PyTypeObject OperationType; -extern PyTypeObject AMDCallbackType; -class UserExceptionReaderFactoryI : public Ice::UserExceptionReaderFactory +class UserExceptionFactory : public Ice::UserExceptionFactory { public: - UserExceptionReaderFactoryI(const Ice::CommunicatorPtr& communicator) : - _communicator(communicator) - { - } - - virtual void createAndThrow(const string& id) const + virtual void createAndThrow(const string& id) { ExceptionInfoPtr info = lookupExceptionInfo(id); if(info) { - throw ExceptionReader(_communicator, info); + throw ExceptionReader(info); } } - -private: - - const Ice::CommunicatorPtr _communicator; }; } @@ -435,8 +460,7 @@ handleException() void callException(PyObject* method, PyObject* ex) { - PyObjectHandle args = Py_BuildValue(STRCAST("(O)"), ex); - PyObjectHandle tmp = PyObject_Call(method, args.get(), 0); + PyObjectHandle tmp = callMethod(method, ex); if(PyErr_Occurred()) { handleException(); // Callback raised an exception. @@ -452,61 +476,20 @@ callException(PyObject* method, const Ice::Exception& ex) } void -callException(PyObject* callback, const string& op, const string& method, PyObject* ex) -{ - if(!PyObject_HasAttrString(callback, STRCAST(method.c_str()))) - { - ostringstream ostr; - ostr << "AMI callback object for operation `" << op << "' does not define " << method << "()"; - string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast<char*>(str.c_str())); - } - else - { - PyObjectHandle m = PyObject_GetAttrString(callback, STRCAST(method.c_str())); - assert(m.get()); - callException(m.get(), ex); - } -} - -void -callException(PyObject* callback, const string& op, const string& method, const Ice::Exception& ex) -{ - PyObjectHandle exh = convertException(ex); - assert(exh.get()); - callException(callback, op, method, exh.get()); -} - -void callSent(PyObject* method, bool sentSynchronously, bool passArg) { - PyObjectHandle args; + PyObject* arg = 0; if(passArg) { - args = Py_BuildValue(STRCAST("(O)"), sentSynchronously ? getTrue() : getFalse()); + arg = sentSynchronously ? getTrue() : getFalse(); } - else - { - args = PyTuple_New(0); - } - PyObjectHandle tmp = PyObject_Call(method, args.get(), 0); + PyObjectHandle tmp = callMethod(method, arg); if(PyErr_Occurred()) { handleException(); // Callback raised an exception. } } -void -callSent(PyObject* callback, const string& method, bool sentSynchronously, bool passArg) -{ - if(PyObject_HasAttrString(callback, STRCAST(method.c_str()))) - { - PyObjectHandle m = PyObject_GetAttrString(callback, STRCAST(method.c_str())); - assert(m.get()); - callSent(m.get(), sentSynchronously, passArg); - } -} - } #ifdef WIN32 @@ -592,17 +575,15 @@ extern "C" static PyObject* operationInvokeAsync(OperationObject* self, PyObject* args) { - PyObject* pyProxy; + PyObject* proxy; PyObject* opArgs; - if(!PyArg_ParseTuple(args, STRCAST("O!O!"), &ProxyType, &pyProxy, &PyTuple_Type, &opArgs)) + if(!PyArg_ParseTuple(args, STRCAST("O!O!"), &ProxyType, &proxy, &PyTuple_Type, &opArgs)) { return 0; } - Ice::ObjectPrx prx = getProxy(pyProxy); - assert(self->op); - - InvocationPtr i = new OldAsyncTypedInvocation(prx, *self->op); + Ice::ObjectPrx p = getProxy(proxy); + InvocationPtr i = new NewAsyncTypedInvocation(p, proxy, *self->op); return i->invoke(opArgs); } @@ -665,26 +646,28 @@ operationDeprecate(OperationObject* self, PyObject* args) assert(self->op); (*self->op)->deprecate(msg); - Py_INCREF(Py_None); - return Py_None; + return incRef(Py_None); } // -// AMDCallback operations +// DoneCallback operations // #ifdef WIN32 extern "C" #endif -static AMDCallbackObject* -amdCallbackNew(PyTypeObject* type, PyObject* /*args*/, PyObject* /*kwds*/) +static DoneCallbackObject* +doneCallbackNew(PyTypeObject* type, PyObject* /*args*/, PyObject* /*kwds*/) { - AMDCallbackObject* self = reinterpret_cast<AMDCallbackObject*>(type->tp_alloc(type, 0)); + DoneCallbackObject* self = reinterpret_cast<DoneCallbackObject*>(type->tp_alloc(type, 0)); if(!self) { return 0; } self->upcall = 0; +#if PY_VERSION_HEX >= 0x03050000 + self->coroutine = 0; +#endif return self; } @@ -692,9 +675,12 @@ amdCallbackNew(PyTypeObject* type, PyObject* /*args*/, PyObject* /*kwds*/) extern "C" #endif static void -amdCallbackDealloc(AMDCallbackObject* self) +doneCallbackDealloc(DoneCallbackObject* self) { delete self->upcall; +#if PY_VERSION_HEX >= 0x03050000 + Py_XDECREF(self->coroutine); +#endif Py_TYPE(self)->tp_free(reinterpret_cast<PyObject*>(self)); } @@ -702,12 +688,32 @@ amdCallbackDealloc(AMDCallbackObject* self) extern "C" #endif static PyObject* -amdCallbackIceResponse(AMDCallbackObject* self, PyObject* args) +doneCallbackInvoke(DoneCallbackObject* self, PyObject* args) { + PyObject* future = 0; + if(!PyArg_ParseTuple(args, STRCAST("O"), &future)) + { + return 0; + } + try { assert(self->upcall); - (*self->upcall)->response(args, self->encoding); + + PyObjectHandle resultMethod = PyObject_GetAttrString(future, STRCAST("result")); + assert(resultMethod.get()); + PyObjectHandle empty = PyTuple_New(0); + PyObjectHandle result = PyObject_Call(resultMethod.get(), empty.get(), 0); + + if(PyErr_Occurred()) + { + PyException ex; + (*self->upcall)->exception(ex); + } + else + { + (*self->upcall)->response(result.get()); + } } catch(...) { @@ -717,33 +723,83 @@ amdCallbackIceResponse(AMDCallbackObject* self, PyObject* args) assert(false); } - Py_INCREF(Py_None); - return Py_None; + return incRef(Py_None); +} + +// +// DispatchCallbackObject operations +// + +#ifdef WIN32 +extern "C" +#endif +static DispatchCallbackObject* +dispatchCallbackNew(PyTypeObject* type, PyObject* /*args*/, PyObject* /*kwds*/) +{ + DispatchCallbackObject* self = reinterpret_cast<DispatchCallbackObject*>(type->tp_alloc(type, 0)); + if(!self) + { + return 0; + } + self->upcall = 0; + return self; +} + +#ifdef WIN32 +extern "C" +#endif +static void +dispatchCallbackDealloc(DispatchCallbackObject* self) +{ + delete self->upcall; + Py_TYPE(self)->tp_free(reinterpret_cast<PyObject*>(self)); } #ifdef WIN32 extern "C" #endif static PyObject* -amdCallbackIceException(AMDCallbackObject* self, PyObject* args) +dispatchCallbackResponse(DispatchCallbackObject* self, PyObject* args) { - PyObject* ex; - if(!PyArg_ParseTuple(args, STRCAST("O"), &ex)) + PyObject* result = 0; + if(!PyArg_ParseTuple(args, STRCAST("O"), &result)) { return 0; } - if(!PyObject_IsInstance(ex, PyExc_Exception)) + try + { + assert(self->upcall); + (*self->upcall)->response(result); + } + catch(...) + { + // + // No exceptions should propagate to Python. + // + assert(false); + } + + return incRef(Py_None); +} + +#ifdef WIN32 +extern "C" +#endif +static PyObject* +dispatchCallbackException(DispatchCallbackObject* self, PyObject* args) +{ + PyObject* ex = 0; + if(!PyArg_ParseTuple(args, STRCAST("O"), &ex)) { - PyErr_Format(PyExc_TypeError, "ice_exception argument is not an exception"); return 0; } try { assert(self->upcall); - PyException pye(ex); // No traceback information available. - (*self->upcall)->exception(pye, self->encoding); + PyException pyex(ex); + (*self->upcall)->exception(pyex); } catch(...) { @@ -753,8 +809,7 @@ amdCallbackIceException(AMDCallbackObject* self, PyObject* args) assert(false); } - Py_INCREF(Py_None); - return Py_None; + return incRef(Py_None); } // @@ -802,12 +857,10 @@ asyncResultGetCommunicator(AsyncResultObject* self) { if(self->communicator) { - Py_INCREF(self->communicator); - return self->communicator; + return incRef(self->communicator); } - Py_INCREF(Py_None); - return Py_None; + return incRef(Py_None); } #ifdef WIN32 @@ -825,8 +878,7 @@ asyncResultCancel(AsyncResultObject* self) assert(false); } - Py_INCREF(Py_None); - return Py_None; + return incRef(Py_None); } #ifdef WIN32 @@ -837,12 +889,10 @@ asyncResultGetConnection(AsyncResultObject* self) { if(self->connection) { - Py_INCREF(self->connection); - return self->connection; + return incRef(self->connection); } - Py_INCREF(Py_None); - return Py_None; + return incRef(Py_None); } #ifdef WIN32 @@ -853,12 +903,10 @@ asyncResultGetProxy(AsyncResultObject* self) { if(self->proxy) { - Py_INCREF(self->proxy); - return self->proxy; + return incRef(self->proxy); } - Py_INCREF(Py_None); - return Py_None; + return incRef(Py_None); } #ifdef WIN32 @@ -899,8 +947,7 @@ asyncResultWaitForCompleted(AsyncResultObject* self) assert(false); } - Py_INCREF(Py_None); - return Py_None; + return incRef(Py_None); } #ifdef WIN32 @@ -941,8 +988,7 @@ asyncResultWaitForSent(AsyncResultObject* self) assert(false); } - Py_INCREF(Py_None); - return Py_None; + return incRef(Py_None); } #ifdef WIN32 @@ -966,8 +1012,7 @@ asyncResultThrowLocalException(AsyncResultObject* self) assert(false); } - Py_INCREF(Py_None); - return Py_None; + return incRef(Py_None); } #ifdef WIN32 @@ -1028,6 +1073,72 @@ asyncResultGetOperation(AsyncResultObject* self) return createString(op); } +#ifdef WIN32 +extern "C" +#endif +static PyObject* +asyncResultCallLater(AsyncResultObject* self, PyObject* args) +{ + PyObject* callback; + if(!PyArg_ParseTuple(args, STRCAST("O"), &callback)) + { + return 0; + } + + if(!PyCallable_Check(callback)) + { + PyErr_Format(PyExc_ValueError, STRCAST("invalid argument passed to callLater")); + return 0; + } + + class CallbackI : public Ice::AsyncResult::Callback + { + public: + + CallbackI(PyObject* callback) : + _callback(incRef(callback)) + { + } + + ~CallbackI() + { + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + Py_DECREF(_callback); + } + + virtual void run() + { + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + PyObjectHandle args = PyTuple_New(0); + assert(args.get()); + PyObjectHandle tmp = PyObject_Call(_callback, args.get(), 0); + PyErr_Clear(); + } + + private: + + PyObject* _callback; + }; + + try + { + (*self->result)->scheduleCallback(new CallbackI(callback)); + } + catch(const Ice::CommunicatorDestroyedException& ex) + { + setPythonException(ex); + return 0; + } + catch(...) + { + assert(false); + } + + return incRef(Py_None); +} + // // ParamInfo implementation. // @@ -1036,8 +1147,7 @@ IcePy::ParamInfo::unmarshaled(PyObject* val, PyObject* target, void* closure) { assert(PyTuple_Check(target)); Py_ssize_t i = reinterpret_cast<Py_ssize_t>(closure); - PyTuple_SET_ITEM(target, i, val); - Py_INCREF(val); // PyTuple_SET_ITEM steals a reference. + PyTuple_SET_ITEM(target, i, incRef(val)); // PyTuple_SET_ITEM steals a reference. } // @@ -1066,14 +1176,7 @@ IcePy::Operation::Operation(const char* n, PyObject* m, PyObject* sm, int amdFla // amd // amd = amdFlag ? true : false; - if(amd) - { - dispatchName = name + "_async"; - } - else - { - dispatchName = fixIdent(name); - } + dispatchName = fixIdent(name); // Use the same dispatch name regardless of AMD. // // format @@ -1263,11 +1366,18 @@ static PyMethodDef OperationMethods[] = { 0, 0 } /* sentinel */ }; -static PyMethodDef AMDCallbackMethods[] = +static PyMethodDef DoneCallbackMethods[] = { - { STRCAST("ice_response"), reinterpret_cast<PyCFunction>(amdCallbackIceResponse), METH_VARARGS, + { STRCAST("invoke"), reinterpret_cast<PyCFunction>(doneCallbackInvoke), METH_VARARGS, PyDoc_STR(STRCAST("internal function")) }, - { STRCAST("ice_exception"), reinterpret_cast<PyCFunction>(amdCallbackIceException), METH_VARARGS, + { 0, 0 } /* sentinel */ +}; + +static PyMethodDef DispatchCallbackMethods[] = +{ + { STRCAST("response"), reinterpret_cast<PyCFunction>(dispatchCallbackResponse), METH_VARARGS, + PyDoc_STR(STRCAST("internal function")) }, + { STRCAST("exception"), reinterpret_cast<PyCFunction>(dispatchCallbackException), METH_VARARGS, PyDoc_STR(STRCAST("internal function")) }, { 0, 0 } /* sentinel */ }; @@ -1296,6 +1406,8 @@ static PyMethodDef AsyncResultMethods[] = PyDoc_STR(STRCAST("returns true if the request was sent synchronously")) }, { STRCAST("getOperation"), reinterpret_cast<PyCFunction>(asyncResultGetOperation), METH_NOARGS, PyDoc_STR(STRCAST("returns the name of the operation")) }, + { STRCAST("callLater"), reinterpret_cast<PyCFunction>(asyncResultCallLater), METH_VARARGS, + PyDoc_STR(STRCAST("internal function")) }, { 0, 0 } /* sentinel */ }; @@ -1349,16 +1461,63 @@ PyTypeObject OperationType = 0, /* tp_is_gc */ }; -PyTypeObject AMDCallbackType = +static PyTypeObject DoneCallbackType = +{ + /* The ob_type field must be initialized in the module init function + * to be portable to Windows without using C++. */ + PyVarObject_HEAD_INIT(0, 0) + STRCAST("IcePy.DoneCallback"), /* tp_name */ + sizeof(DoneCallbackObject), /* tp_basicsize */ + 0, /* tp_itemsize */ + /* methods */ + reinterpret_cast<destructor>(doneCallbackDealloc), /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_reserved */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + 0, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + DoneCallbackMethods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + reinterpret_cast<newfunc>(doneCallbackNew), /* tp_new */ + 0, /* tp_free */ + 0, /* tp_is_gc */ +}; + +static PyTypeObject DispatchCallbackType = { /* The ob_type field must be initialized in the module init function * to be portable to Windows without using C++. */ PyVarObject_HEAD_INIT(0, 0) - STRCAST("IcePy.AMDCallback"), /* tp_name */ - sizeof(AMDCallbackObject), /* tp_basicsize */ + STRCAST("IcePy.Dispatch"), /* tp_name */ + sizeof(DispatchCallbackObject), /* tp_basicsize */ 0, /* tp_itemsize */ /* methods */ - reinterpret_cast<destructor>(amdCallbackDealloc), /* tp_dealloc */ + reinterpret_cast<destructor>(dispatchCallbackDealloc), /* tp_dealloc */ 0, /* tp_print */ 0, /* tp_getattr */ 0, /* tp_setattr */ @@ -1381,7 +1540,7 @@ PyTypeObject AMDCallbackType = 0, /* tp_weaklistoffset */ 0, /* tp_iter */ 0, /* tp_iternext */ - AMDCallbackMethods, /* tp_methods */ + DispatchCallbackMethods, /* tp_methods */ 0, /* tp_members */ 0, /* tp_getset */ 0, /* tp_base */ @@ -1391,7 +1550,7 @@ PyTypeObject AMDCallbackType = 0, /* tp_dictoffset */ 0, /* tp_init */ 0, /* tp_alloc */ - reinterpret_cast<newfunc>(amdCallbackNew), /* tp_new */ + reinterpret_cast<newfunc>(dispatchCallbackNew), /* tp_new */ 0, /* tp_free */ 0, /* tp_is_gc */ }; @@ -1458,12 +1617,22 @@ IcePy::initOperation(PyObject* module) return false; } - if(PyType_Ready(&AMDCallbackType) < 0) + if(PyType_Ready(&DoneCallbackType) < 0) { return false; } - PyTypeObject* cbType = &AMDCallbackType; // Necessary to prevent GCC's strict-alias warnings. - if(PyModule_AddObject(module, STRCAST("AMDCallback"), reinterpret_cast<PyObject*>(cbType)) < 0) + PyTypeObject* cbType = &DoneCallbackType; // Necessary to prevent GCC's strict-alias warnings. + if(PyModule_AddObject(module, STRCAST("DoneCallback"), reinterpret_cast<PyObject*>(cbType)) < 0) + { + return false; + } + + if(PyType_Ready(&DispatchCallbackType) < 0) + { + return false; + } + PyTypeObject* dispatchType = &DispatchCallbackType; // Necessary to prevent GCC's strict-alias warnings. + if(PyModule_AddObject(module, STRCAST("DispatchCallback"), reinterpret_cast<PyObject*>(dispatchType)) < 0) { return false; } @@ -1485,21 +1654,13 @@ IcePy::initOperation(PyObject* module) // Invocation // IcePy::Invocation::Invocation(const Ice::ObjectPrx& prx) : - _prx(prx) -{ -} - -// -// TypedInvocation -// -IcePy::TypedInvocation::TypedInvocation(const Ice::ObjectPrx& prx, const OperationPtr& op) : - Invocation(prx), _op(op), _communicator(prx->ice_getCommunicator()) + _prx(prx), _communicator(prx->ice_getCommunicator()) { } bool -IcePy::TypedInvocation::prepareRequest(PyObject* args, MappingType mapping, Ice::OutputStreamPtr& os, - pair<const Ice::Byte*, const Ice::Byte*>& params) +IcePy::Invocation::prepareRequest(const OperationPtr& op, PyObject* args, MappingType mapping, Ice::OutputStream* os, + pair<const Ice::Byte*, const Ice::Byte*>& params) { assert(PyTuple_Check(args)); params.first = params.second = static_cast<const Ice::Byte*>(0); @@ -1508,36 +1669,35 @@ IcePy::TypedInvocation::prepareRequest(PyObject* args, MappingType mapping, Ice: // Validate the number of arguments. // Py_ssize_t argc = PyTuple_GET_SIZE(args); - Py_ssize_t paramCount = static_cast<Py_ssize_t>(_op->inParams.size()); + Py_ssize_t paramCount = static_cast<Py_ssize_t>(op->inParams.size()); if(argc != paramCount) { string opName; - if(mapping == OldAsyncMapping) + if(mapping == NewAsyncMapping) { - opName = _op->name + "_async"; + opName = op->name + "Async"; } else if(mapping == AsyncMapping) { - opName = "begin_" + _op->name; + opName = "begin_" + op->name; } else { - opName = fixIdent(_op->name); + opName = fixIdent(op->name); } PyErr_Format(PyExc_RuntimeError, STRCAST("%s expects %d in parameters"), opName.c_str(), static_cast<int>(paramCount)); return false; } - if(!_op->inParams.empty()) + if(!op->inParams.empty()) { try { // // Marshal the in parameters. // - os = Ice::createOutputStream(_communicator); - os->startEncapsulation(_prx->ice_getEncodingVersion(), _op->format); + os->startEncapsulation(_prx->ice_getEncodingVersion(), op->format); ObjectMap objectMap; ParamInfoList::iterator p; @@ -1545,28 +1705,27 @@ IcePy::TypedInvocation::prepareRequest(PyObject* args, MappingType mapping, Ice: // // Validate the supplied arguments. // - for(p = _op->inParams.begin(); p != _op->inParams.end(); ++p) + for(p = op->inParams.begin(); p != op->inParams.end(); ++p) { ParamInfoPtr info = *p; PyObject* arg = PyTuple_GET_ITEM(args, info->pos); if((!info->optional || arg != Unset) && !info->type->validate(arg)) { string name; - if(mapping == OldAsyncMapping) + if(mapping == NewAsyncMapping) { - name = _op->name + "_async"; + name = op->name + "Async"; } else if(mapping == AsyncMapping) { - name = "begin_" + _op->name; + name = "begin_" + op->name; } else { - name = fixIdent(_op->name); + name = fixIdent(op->name); } PyErr_Format(PyExc_ValueError, STRCAST("invalid value for argument %d in operation `%s'"), - mapping == OldAsyncMapping ? info->pos + 2 : info->pos + 1, - const_cast<char*>(name.c_str())); + info->pos + 1, const_cast<char*>(name.c_str())); return false; } } @@ -1574,7 +1733,7 @@ IcePy::TypedInvocation::prepareRequest(PyObject* args, MappingType mapping, Ice: // // Marshal the required parameters. // - for(p = _op->inParams.begin(); p != _op->inParams.end(); ++p) + for(p = op->inParams.begin(); p != op->inParams.end(); ++p) { ParamInfoPtr info = *p; if(!info->optional) @@ -1587,7 +1746,7 @@ IcePy::TypedInvocation::prepareRequest(PyObject* args, MappingType mapping, Ice: // // Marshal the optional parameters. // - for(p = _op->optionalInParams.begin(); p != _op->optionalInParams.end(); ++p) + for(p = op->optionalInParams.begin(); p != op->optionalInParams.end(); ++p) { ParamInfoPtr info = *p; PyObject* arg = PyTuple_GET_ITEM(args, info->pos); @@ -1597,9 +1756,9 @@ IcePy::TypedInvocation::prepareRequest(PyObject* args, MappingType mapping, Ice: } } - if(_op->sendsClasses) + if(op->sendsClasses) { - os->writePendingObjects(); + os->writePendingValues(); } os->endEncapsulation(); @@ -1621,10 +1780,10 @@ IcePy::TypedInvocation::prepareRequest(PyObject* args, MappingType mapping, Ice: } PyObject* -IcePy::TypedInvocation::unmarshalResults(const pair<const Ice::Byte*, const Ice::Byte*>& bytes) +IcePy::Invocation::unmarshalResults(const OperationPtr& op, const pair<const Ice::Byte*, const Ice::Byte*>& bytes) { - Py_ssize_t numResults = static_cast<Py_ssize_t>(_op->outParams.size()); - if(_op->returnType) + Py_ssize_t numResults = static_cast<Py_ssize_t>(op->outParams.size()); + if(op->returnType) { numResults++; } @@ -1632,112 +1791,110 @@ IcePy::TypedInvocation::unmarshalResults(const pair<const Ice::Byte*, const Ice: PyObjectHandle results = PyTuple_New(numResults); if(results.get() && numResults > 0) { - Ice::InputStreamPtr is = Ice::wrapInputStream(_communicator, bytes); + Ice::InputStream is(_communicator, bytes); // - // Store a pointer to a local SlicedDataUtil object as the stream's closure. + // Store a pointer to a local StreamUtil object as the stream's closure. // This is necessary to support object unmarshaling (see ObjectReader). // - SlicedDataUtil util; - assert(!is->closure()); - is->closure(&util); + StreamUtil util; + assert(!is.getClosure()); + is.setClosure(&util); - is->startEncapsulation(); + is.startEncapsulation(); ParamInfoList::iterator p; // // Unmarshal the required out parameters. // - for(p = _op->outParams.begin(); p != _op->outParams.end(); ++p) + for(p = op->outParams.begin(); p != op->outParams.end(); ++p) { ParamInfoPtr info = *p; if(!info->optional) { void* closure = reinterpret_cast<void*>(static_cast<Py_ssize_t>(info->pos)); - info->type->unmarshal(is, info, results.get(), closure, false, &info->metaData); + info->type->unmarshal(&is, info, results.get(), closure, false, &info->metaData); } } // // Unmarshal the required return value, if any. // - if(_op->returnType && !_op->returnType->optional) + if(op->returnType && !op->returnType->optional) { - assert(_op->returnType->pos == 0); - void* closure = reinterpret_cast<void*>(static_cast<Py_ssize_t>(_op->returnType->pos)); - _op->returnType->type->unmarshal(is, _op->returnType, results.get(), closure, false, &_op->metaData); + assert(op->returnType->pos == 0); + void* closure = reinterpret_cast<void*>(static_cast<Py_ssize_t>(op->returnType->pos)); + op->returnType->type->unmarshal(&is, op->returnType, results.get(), closure, false, &op->metaData); } // // Unmarshal the optional results. This includes an optional return value. // - for(p = _op->optionalOutParams.begin(); p != _op->optionalOutParams.end(); ++p) + for(p = op->optionalOutParams.begin(); p != op->optionalOutParams.end(); ++p) { ParamInfoPtr info = *p; - if(is->readOptional(info->tag, info->type->optionalFormat())) + if(is.readOptional(info->tag, info->type->optionalFormat())) { void* closure = reinterpret_cast<void*>(static_cast<Py_ssize_t>(info->pos)); - info->type->unmarshal(is, info, results.get(), closure, true, &info->metaData); + info->type->unmarshal(&is, info, results.get(), closure, true, &info->metaData); } else { - PyTuple_SET_ITEM(results.get(), info->pos, Unset); - Py_INCREF(Unset); // PyTuple_SET_ITEM steals a reference. + PyTuple_SET_ITEM(results.get(), info->pos, incRef(Unset)); // PyTuple_SET_ITEM steals a reference. } } - if(_op->returnsClasses) + if(op->returnsClasses) { - is->readPendingObjects(); + is.readPendingValues(); } - is->endEncapsulation(); + is.endEncapsulation(); - util.update(); + util.updateSlicedData(); } return results.release(); } PyObject* -IcePy::TypedInvocation::unmarshalException(const pair<const Ice::Byte*, const Ice::Byte*>& bytes) +IcePy::Invocation::unmarshalException(const OperationPtr& op, const pair<const Ice::Byte*, const Ice::Byte*>& bytes) { - Ice::InputStreamPtr is = Ice::wrapInputStream(_communicator, bytes); + Ice::InputStream is(_communicator, bytes); // - // Store a pointer to a local SlicedDataUtil object as the stream's closure. + // Store a pointer to a local StreamUtil object as the stream's closure. // This is necessary to support object unmarshaling (see ObjectReader). // - SlicedDataUtil util; - assert(!is->closure()); - is->closure(&util); + StreamUtil util; + assert(!is.getClosure()); + is.setClosure(&util); - is->startEncapsulation(); + is.startEncapsulation(); try { - Ice::UserExceptionReaderFactoryPtr factory = new UserExceptionReaderFactoryI(_communicator); - is->throwException(factory); + Ice::UserExceptionFactoryPtr factory = new UserExceptionFactory; + is.throwException(factory); } catch(const ExceptionReader& r) { - is->endEncapsulation(); + is.endEncapsulation(); PyObject* ex = r.getException(); - if(validateException(ex)) + if(validateException(op, ex)) { - util.update(); + util.updateSlicedData(); Ice::SlicedDataPtr slicedData = r.getSlicedData(); if(slicedData) { - SlicedDataUtil::setMember(ex, slicedData); + StreamUtil::setSlicedDataMember(ex, slicedData); } - Py_INCREF(ex); - return ex; + return incRef(ex); } else { @@ -1767,9 +1924,9 @@ IcePy::TypedInvocation::unmarshalException(const pair<const Ice::Byte*, const Ic } bool -IcePy::TypedInvocation::validateException(PyObject* ex) const +IcePy::Invocation::validateException(const OperationPtr& op, PyObject* ex) const { - for(ExceptionInfoList::const_iterator p = _op->exceptions.begin(); p != _op->exceptions.end(); ++p) + for(ExceptionInfoList::const_iterator p = op->exceptions.begin(); p != op->exceptions.end(); ++p) { if(PyObject_IsInstance(ex, (*p)->pythonType.get())) { @@ -1781,12 +1938,12 @@ IcePy::TypedInvocation::validateException(PyObject* ex) const } void -IcePy::TypedInvocation::checkTwowayOnly(const Ice::ObjectPrx& proxy) const +IcePy::Invocation::checkTwowayOnly(const OperationPtr& op, const Ice::ObjectPrx& proxy) const { - if((_op->returnType != 0 || !_op->outParams.empty() || !_op->exceptions.empty()) && !proxy->ice_isTwoway()) + if((op->returnType != 0 || !op->outParams.empty() || !op->exceptions.empty()) && !proxy->ice_isTwoway()) { Ice::TwowayOnlyException ex(__FILE__, __LINE__); - ex.operation = _op->name; + ex.operation = op->name; throw ex; } } @@ -1795,7 +1952,7 @@ IcePy::TypedInvocation::checkTwowayOnly(const Ice::ObjectPrx& proxy) const // SyncTypedInvocation // IcePy::SyncTypedInvocation::SyncTypedInvocation(const Ice::ObjectPrx& prx, const OperationPtr& op) : - Invocation(prx), TypedInvocation(prx, op) + Invocation(prx), _op(op) { } @@ -1811,16 +1968,16 @@ IcePy::SyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) // // Marshal the input parameters to a byte sequence. // - Ice::OutputStreamPtr os; + Ice::OutputStream os(_communicator); pair<const Ice::Byte*, const Ice::Byte*> params; - if(!prepareRequest(pyparams, SyncMapping, os, params)) + if(!prepareRequest(_op, pyparams, SyncMapping, &os, params)) { return 0; } try { - checkTwowayOnly(_prx); + checkTwowayOnly(_op, _prx); // // Invoke the operation. @@ -1870,7 +2027,7 @@ IcePy::SyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) rb.first = &result[0]; rb.second = &result[0] + result.size(); } - PyObjectHandle ex = unmarshalException(rb); + PyObjectHandle ex = unmarshalException(_op, rb); // // Set the Python exception. @@ -1891,7 +2048,7 @@ IcePy::SyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) rb.first = &result[0]; rb.second = &result[0] + result.size(); } - PyObjectHandle results = unmarshalResults(rb); + PyObjectHandle results = unmarshalResults(_op, rb); if(!results.get()) { return 0; @@ -1910,8 +2067,7 @@ IcePy::SyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) } else { - Py_INCREF(ret); - return ret; + return incRef(ret); } } } @@ -1928,8 +2084,7 @@ IcePy::SyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) return 0; } - Py_INCREF(Py_None); - return Py_None; + return incRef(Py_None); } // @@ -1937,7 +2092,7 @@ IcePy::SyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) // IcePy::AsyncTypedInvocation::AsyncTypedInvocation(const Ice::ObjectPrx& prx, PyObject* pyProxy, const OperationPtr& op) : - Invocation(prx), TypedInvocation(prx, op), _pyProxy(pyProxy), _response(0), _ex(0), _sent(0) + Invocation(prx), _op(op), _pyProxy(pyProxy), _response(0), _ex(0), _sent(0) { Py_INCREF(_pyProxy); } @@ -1965,8 +2120,7 @@ IcePy::AsyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) callable = PyTuple_GET_ITEM(args, 1); if(PyCallable_Check(callable)) { - _response = callable; - Py_INCREF(_response); + _response = incRef(callable); } else if(callable != Py_None) { @@ -1977,8 +2131,7 @@ IcePy::AsyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) callable = PyTuple_GET_ITEM(args, 2); if(PyCallable_Check(callable)) { - _ex = callable; - Py_INCREF(_ex); + _ex = incRef(callable); } else if(callable != Py_None) { @@ -1989,8 +2142,7 @@ IcePy::AsyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) callable = PyTuple_GET_ITEM(args, 3); if(PyCallable_Check(callable)) { - _sent = callable; - Py_INCREF(_sent); + _sent = incRef(callable); } else if(callable != Py_None) { @@ -2015,9 +2167,9 @@ IcePy::AsyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) // // Marshal the input parameters to a byte sequence. // - Ice::OutputStreamPtr os; + Ice::OutputStream os(_communicator); pair<const Ice::Byte*, const Ice::Byte*> params; - if(!prepareRequest(pyparams, AsyncMapping, os, params)) + if(!prepareRequest(_op, pyparams, AsyncMapping, &os, params)) { return 0; } @@ -2101,8 +2253,7 @@ IcePy::AsyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) } obj->result = new Ice::AsyncResultPtr(result); obj->invocation = new InvocationPtr(this); - obj->proxy = _pyProxy; - Py_INCREF(obj->proxy); + obj->proxy = incRef(_pyProxy); obj->communicator = getCommunicatorWrapper(_communicator); return reinterpret_cast<PyObject*>(obj); } @@ -2123,7 +2274,7 @@ IcePy::AsyncTypedInvocation::end(const Ice::ObjectPrx& proxy, const OperationPtr { AllowThreads allowThreads; // Release Python's global interpreter lock during blocking operations. - ok = proxy->___end_ice_invoke(results, r); + ok = proxy->_iceI_end_ice_invoke(results, r); } if(ok) @@ -2131,7 +2282,7 @@ IcePy::AsyncTypedInvocation::end(const Ice::ObjectPrx& proxy, const OperationPtr // // Unmarshal the results. // - PyObjectHandle args = unmarshalResults(results); + PyObjectHandle args = unmarshalResults(_op, results); if(args.get()) { // @@ -2141,14 +2292,12 @@ IcePy::AsyncTypedInvocation::end(const Ice::ObjectPrx& proxy, const OperationPtr assert(PyTuple_Check(args.get())); if(PyTuple_GET_SIZE(args.get()) == 0) { - Py_INCREF(Py_None); - return Py_None; + return incRef(Py_None); } else if(PyTuple_GET_SIZE(args.get()) == 1) { PyObject* res = PyTuple_GET_ITEM(args.get(), 0); - Py_INCREF(res); - return res; + return incRef(res); } else { @@ -2158,7 +2307,7 @@ IcePy::AsyncTypedInvocation::end(const Ice::ObjectPrx& proxy, const OperationPtr } else { - PyObjectHandle ex = unmarshalException(results); + PyObjectHandle ex = unmarshalException(_op, results); setPythonException(ex.get()); } } @@ -2202,7 +2351,7 @@ IcePy::AsyncTypedInvocation::response(bool ok, const pair<const Ice::Byte*, cons PyObjectHandle args; try { - args = unmarshalResults(results); + args = unmarshalResults(_op, results); if(!args.get()) { assert(PyErr_Occurred()); @@ -2227,7 +2376,7 @@ IcePy::AsyncTypedInvocation::response(bool ok, const pair<const Ice::Byte*, cons else { assert(_ex); - PyObjectHandle ex = unmarshalException(results); + PyObjectHandle ex = unmarshalException(_op, results); callException(_ex, ex.get()); } } @@ -2272,114 +2421,344 @@ IcePy::AsyncTypedInvocation::checkAsyncTwowayOnly(const Ice::ObjectPrx& proxy) c } // -// OldAsyncTypedInvocation +// NewAsyncInvocation // -IcePy::OldAsyncTypedInvocation::OldAsyncTypedInvocation(const Ice::ObjectPrx& prx, const OperationPtr& op) - : Invocation(prx), TypedInvocation(prx, op), _callback(0) +IcePy::NewAsyncInvocation::NewAsyncInvocation(const Ice::ObjectPrx& prx, PyObject* pyProxy, const string& operation) + : Invocation(prx), _pyProxy(pyProxy), _operation(operation), _twoway(prx->ice_isTwoway()), _sent(false), + _sentSynchronously(false), _done(false), _future(0), _ok(false), _exception(0) { + Py_INCREF(_pyProxy); } -IcePy::OldAsyncTypedInvocation::~OldAsyncTypedInvocation() +IcePy::NewAsyncInvocation::~NewAsyncInvocation() { AdoptThread adoptThread; // Ensure the current thread is able to call into Python. - Py_XDECREF(_callback); + Py_DECREF(_pyProxy); + Py_XDECREF(_future); + Py_XDECREF(_exception); } PyObject* -IcePy::OldAsyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) +IcePy::NewAsyncInvocation::invoke(PyObject* args, PyObject* kwds) { - assert(PyTuple_Check(args)); - assert(PyTuple_GET_SIZE(args) == 3); // Format is (callback, (params...), context|None) - _callback = PyTuple_GET_ITEM(args, 0); - Py_INCREF(_callback); - PyObject* pyparams = PyTuple_GET_ITEM(args, 1); - assert(PyTuple_Check(pyparams)); - PyObject* pyctx = PyTuple_GET_ITEM(args, 2); - // - // Marshal the input parameters to a byte sequence. + // Called from Python code, so the GIL is already acquired. // - Ice::OutputStreamPtr os; - pair<const Ice::Byte*, const Ice::Byte*> params; - if(!prepareRequest(pyparams, OldAsyncMapping, os, params)) + + Ice::AsyncResultPtr result; + + try + { + result = handleInvoke(args, kwds); + } + catch(const Ice::CommunicatorDestroyedException& ex) + { + // + // CommunicatorDestroyedException can propagate directly. + // + setPythonException(ex); + return 0; + } + catch(const IceUtil::IllegalArgumentException& ex) + { + // + // IllegalArgumentException can propagate directly. + // (Raised by checkAsyncTwowayOnly) + // + PyErr_Format(PyExc_RuntimeError, "%s", STRCAST(ex.reason().c_str())); + return 0; + } + catch(const Ice::Exception&) + { + // + // No other exceptions should be raised by begin_ice_invoke. + // + assert(false); + } + + if(PyErr_Occurred()) { return 0; } - bool sentSynchronously = false; - try + assert(result); + + PyObjectHandle communicatorObj = getCommunicatorWrapper(_communicator); + PyObjectHandle asyncResultObj; + + // + // Pass the AsyncResult object to the future. Note that this creates a circular reference. + // Don't do this for batch invocations because there is no opportunity to break the circular + // reference. + // + if(!_prx->ice_isBatchOneway() && !_prx->ice_isBatchDatagram()) { - checkTwowayOnly(_prx); + asyncResultObj = createAsyncResult(result, _pyProxy, 0, communicatorObj.get()); + if(!asyncResultObj.get()) + { + return 0; + } + } - Ice::Callback_Object_ice_invokePtr cb = - Ice::newCallback_Object_ice_invoke(this, &OldAsyncTypedInvocation::response, - &OldAsyncTypedInvocation::exception, &OldAsyncTypedInvocation::sent); + // + // NOTE: Any time we call into interpreted Python code there's a chance that another thread will be + // allowed to run! + // - Ice::AsyncResultPtr result; + PyObjectHandle future = createFuture(_operation, asyncResultObj.get()); // Calls into Python code. + if(!future.get()) + { + return 0; + } - // - // Invoke the operation asynchronously. - // - if(pyctx != Py_None) + // + // Check if any callbacks have been invoked already. + // + + if(_sent) + { + PyObjectHandle tmp = callMethod(future.get(), "set_sent", _sentSynchronously ? getTrue() : getFalse()); + if(PyErr_Occurred()) { - Ice::Context ctx; + return 0; + } - if(!PyDict_Check(pyctx)) + if(!_twoway) + { + // + // For a oneway/datagram invocation, we consider it complete when sent. + // + tmp = callMethod(future.get(), "set_result", Py_None); + if(PyErr_Occurred()) { - PyErr_Format(PyExc_ValueError, STRCAST("context argument must be None or a dictionary")); return 0; } + } + } - if(!dictionaryToContext(pyctx, ctx)) + if(_done) + { + if(_exception) + { + PyObjectHandle tmp = callMethod(future.get(), "set_exception", _exception); + if(PyErr_Occurred()) { return 0; } - - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = _prx->begin_ice_invoke(_op->name, _op->sendMode, params, ctx, cb); } else { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = _prx->begin_ice_invoke(_op->name, _op->sendMode, params, cb); + // + // Delegate to the subclass. + // + pair<const Ice::Byte*, const Ice::Byte*> p(&_results[0], &_results[0] + _results.size()); + handleResponse(future.get(), _ok, p); + if(PyErr_Occurred()) + { + return 0; + } } - - sentSynchronously = result->sentSynchronously(); } - catch(const Ice::CommunicatorDestroyedException& ex) + + _future = future.release(); + return incRef(_future); +} + +void +IcePy::NewAsyncInvocation::response(bool ok, const pair<const Ice::Byte*, const Ice::Byte*>& results) +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + if(!_future) { // - // CommunicatorDestroyedException can propagate directly. + // The future hasn't been created yet, which means invoke() is still running. Save the results for later. // - setPythonException(ex); - return 0; + _ok = ok; + vector<Ice::Byte> v(results.first, results.second); + _results.swap(v); + _done = true; + return; + } + + PyObjectHandle future = _future; // Steals a reference. + + if(_sent) + { + _future = 0; // Break cyclic dependency. } - catch(const Ice::TwowayOnlyException& ex) + else { + assert(!_done); + // - // Raised by checkTwowayOnly. + // The sent callback will release our reference. // - callException(_callback, _op->name, "ice_exception", ex); + Py_INCREF(_future); } - catch(const Ice::Exception&) + + _done = true; + + // + // Delegate to the subclass. + // + handleResponse(future.get(), ok, results); + if(PyErr_Occurred()) + { + handleException(); + } +} + +void +IcePy::NewAsyncInvocation::exception(const Ice::Exception& ex) +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + if(!_future) { // - // No other exceptions should be raised by begin_ice_invoke. + // The future hasn't been created yet, which means invoke() is still running. Save the exception for later. // - assert(false); + _exception = convertException(ex); + _done = true; + return; } - PyRETURN_BOOL(sentSynchronously); + PyObjectHandle future = _future; // Steals a reference. + _future = 0; // Break cyclic dependency. + _done = true; + + PyObjectHandle exh = convertException(ex); + assert(exh.get()); + PyObjectHandle tmp = callMethod(future.get(), "set_exception", exh.get()); + if(PyErr_Occurred()) + { + handleException(); + } } void -IcePy::OldAsyncTypedInvocation::response(bool ok, const pair<const Ice::Byte*, const Ice::Byte*>& results) +IcePy::NewAsyncInvocation::sent(bool sentSynchronously) { AdoptThread adoptThread; // Ensure the current thread is able to call into Python. - assert(_callback); + if(!_future) + { + // + // The future hasn't been created yet, which means invoke() is still running. + // + _sent = true; + _sentSynchronously = sentSynchronously; + return; + } + + PyObjectHandle future = _future; + + if(_done || !_twoway) + { + _future = 0; // Break cyclic dependency. + } + else + { + _sent = true; + + // + // The reference to _future will be released in response() or exception(). + // + Py_INCREF(_future); + } + + PyObjectHandle tmp = callMethod(future.get(), "set_sent", sentSynchronously ? getTrue() : getFalse()); + if(PyErr_Occurred()) + { + handleException(); + } + + if(!_twoway) + { + // + // For a oneway/datagram invocation, we consider it complete when sent. + // + tmp = callMethod(future.get(), "set_result", Py_None); + if(PyErr_Occurred()) + { + handleException(); + } + } +} + +// +// NewAsyncTypedInvocation +// +IcePy::NewAsyncTypedInvocation::NewAsyncTypedInvocation(const Ice::ObjectPrx& prx, PyObject* pyProxy, + const OperationPtr& op) + : NewAsyncInvocation(prx, pyProxy, op->name), _op(op) +{ +} + +Ice::AsyncResultPtr +IcePy::NewAsyncTypedInvocation::handleInvoke(PyObject* args, PyObject* /* kwds */) +{ + // + // Called from Python code, so the GIL is already acquired. + // + + assert(PyTuple_Check(args)); + assert(PyTuple_GET_SIZE(args) == 2); // Format is ((params...), context|None) + PyObject* pyparams = PyTuple_GET_ITEM(args, 0); + assert(PyTuple_Check(pyparams)); + PyObject* pyctx = PyTuple_GET_ITEM(args, 1); + + // + // Marshal the input parameters to a byte sequence. + // + Ice::OutputStream os(_communicator); + pair<const Ice::Byte*, const Ice::Byte*> params; + if(!prepareRequest(_op, pyparams, NewAsyncMapping, &os, params)) + { + return 0; + } + + checkAsyncTwowayOnly(_prx); + + NewAsyncInvocationPtr self = this; + Ice::Callback_Object_ice_invokePtr cb = + Ice::newCallback_Object_ice_invoke(self, &NewAsyncInvocation::response, &NewAsyncInvocation::exception, + &NewAsyncInvocation::sent); + // + // Invoke the operation asynchronously. + // + if(pyctx != Py_None) + { + Ice::Context ctx; + + if(!PyDict_Check(pyctx)) + { + PyErr_Format(PyExc_ValueError, STRCAST("context argument must be None or a dictionary")); + return 0; + } + + if(!dictionaryToContext(pyctx, ctx)) + { + return 0; + } + + AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. + return _prx->begin_ice_invoke(_op->name, _op->sendMode, params, ctx, cb); + } + else + { + AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. + return _prx->begin_ice_invoke(_op->name, _op->sendMode, params, cb); + } +} + +void +IcePy::NewAsyncTypedInvocation::handleResponse(PyObject* future, bool ok, + const pair<const Ice::Byte*, const Ice::Byte*>& results) +{ try { if(ok) @@ -2388,69 +2767,70 @@ IcePy::OldAsyncTypedInvocation::response(bool ok, const pair<const Ice::Byte*, c // Unmarshal the results. // PyObjectHandle args; + try { - args = unmarshalResults(results); + args = unmarshalResults(_op, results); if(!args.get()) { assert(PyErr_Occurred()); - PyErr_Print(); return; } } catch(const Ice::Exception& ex) { - callException(_callback, _op->name, "ice_exception", ex); + PyObjectHandle exh = convertException(ex); + assert(exh.get()); + PyObjectHandle tmp = callMethod(future, "set_exception", exh.get()); + PyErr_Clear(); return; } - const string methodName = "ice_response"; - if(!PyObject_HasAttrString(_callback, STRCAST(methodName.c_str()))) + // + // The future's result is always one value: + // + // - If the operation has no out parameters, the result is None + // - If the operation returns one value, the result is the value + // - If the operation returns multiple values, the result is a tuple containing the values + // + PyObjectHandle r; + if(PyTuple_GET_SIZE(args.get()) == 0) { - ostringstream ostr; - ostr << "AMI callback object for operation `" << _op->name << "' does not define " << methodName - << "()"; - string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast<char*>(str.c_str())); + r = incRef(Py_None); + } + else if(PyTuple_GET_SIZE(args.get()) == 1) + { + r = incRef(PyTuple_GET_ITEM(args.get(), 0)); // PyTuple_GET_ITEM steals a reference. } else { - PyObjectHandle method = PyObject_GetAttrString(_callback, STRCAST(methodName.c_str())); - assert(method.get()); - PyObjectHandle tmp = PyObject_Call(method.get(), args.get(), 0); - if(PyErr_Occurred()) - { - handleException(); // Callback raised an exception. - } + r = args; } + + PyObjectHandle tmp = callMethod(future, "set_result", r.get()); + PyErr_Clear(); } else { - PyObjectHandle ex = unmarshalException(results); - callException(_callback, _op->name, "ice_exception", ex.get()); + PyObjectHandle ex = unmarshalException(_op, results); + PyObjectHandle tmp = callMethod(future, "set_exception", ex.get()); + PyErr_Clear(); } } catch(const AbortMarshaling&) { assert(PyErr_Occurred()); - PyErr_Print(); } } void -IcePy::OldAsyncTypedInvocation::exception(const Ice::Exception& ex) -{ - AdoptThread adoptThread; // Ensure the current thread is able to call into Python. - - callException(_callback, _op->name, "ice_exception", ex); -} - -void -IcePy::OldAsyncTypedInvocation::sent(bool) +IcePy::NewAsyncTypedInvocation::checkAsyncTwowayOnly(const Ice::ObjectPrx& proxy) const { - AdoptThread adoptThread; // Ensure the current thread is able to call into Python. - - callSent(_callback, "ice_sent", false, false); + if((_op->returnType != 0 || !_op->outParams.empty() || !_op->exceptions.empty()) && !proxy->ice_isTwoway()) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, + "`" + _op->name + "' can only be called with a twoway proxy"); + } } // @@ -2580,8 +2960,7 @@ IcePy::SyncBlobjectInvocation::invoke(PyObject* args, PyObject* /* kwds */) } #endif - PyTuple_SET_ITEM(result.get(), 1, op.get()); - op.release(); // PyTuple_SET_ITEM steals a reference. + PyTuple_SET_ITEM(result.get(), 1, op.release()); // PyTuple_SET_ITEM steals a reference. return result.release(); } @@ -2622,7 +3001,7 @@ IcePy::AsyncBlobjectInvocation::invoke(PyObject* args, PyObject* kwds) const_cast<char*>("_response"), const_cast<char*>("_ex"), const_cast<char*>("_sent"), - const_cast<char*>("_ctx"), + const_cast<char*>("context"), 0 }; char* operation; @@ -2655,8 +3034,7 @@ IcePy::AsyncBlobjectInvocation::invoke(PyObject* args, PyObject* kwds) if(PyCallable_Check(response)) { - _response = response; - Py_INCREF(_response); + _response = incRef(response); } else if(response != Py_None) { @@ -2666,8 +3044,7 @@ IcePy::AsyncBlobjectInvocation::invoke(PyObject* args, PyObject* kwds) if(PyCallable_Check(ex)) { - _ex = ex; - Py_INCREF(_ex); + _ex = incRef(ex); } else if(ex != Py_None) { @@ -2677,8 +3054,7 @@ IcePy::AsyncBlobjectInvocation::invoke(PyObject* args, PyObject* kwds) if(PyCallable_Check(sent)) { - _sent = sent; - Py_INCREF(_sent); + _sent = incRef(sent); } else if(sent != Py_None) { @@ -2790,8 +3166,7 @@ IcePy::AsyncBlobjectInvocation::invoke(PyObject* args, PyObject* kwds) } obj->result = new Ice::AsyncResultPtr(result); obj->invocation = new InvocationPtr(this); - obj->proxy = _pyProxy; - Py_INCREF(obj->proxy); + obj->proxy = incRef(_pyProxy); obj->communicator = getCommunicatorWrapper(_prx->ice_getCommunicator()); return reinterpret_cast<PyObject*>(obj); } @@ -2806,7 +3181,7 @@ IcePy::AsyncBlobjectInvocation::end(const Ice::ObjectPrx& proxy, const Ice::Asyn { AllowThreads allowThreads; // Release Python's global interpreter lock during blocking operations. - ok = proxy->___end_ice_invoke(results, r); + ok = proxy->_iceI_end_ice_invoke(results, r); } // @@ -2855,8 +3230,7 @@ IcePy::AsyncBlobjectInvocation::end(const Ice::ObjectPrx& proxy, const Ice::Asyn memcpy(buf, results.first, sz); #endif - PyTuple_SET_ITEM(args.get(), 1, op.get()); - op.release(); // PyTuple_SET_ITEM steals a reference. + PyTuple_SET_ITEM(args.get(), 1, op.release()); // PyTuple_SET_ITEM steals a reference. return args.release(); } @@ -2938,8 +3312,7 @@ IcePy::AsyncBlobjectInvocation::response(bool ok, const pair<const Ice::Byte*, c memcpy(buf, results.first, sz); #endif - PyTuple_SET_ITEM(args.get(), 1, op.get()); - op.release(); // PyTuple_SET_ITEM steals a reference. + PyTuple_SET_ITEM(args.get(), 1, op.release()); // PyTuple_SET_ITEM steals a reference. PyObjectHandle tmp = PyObject_Call(_response, args.get(), 0); if(PyErr_Occurred()) @@ -2968,22 +3341,15 @@ IcePy::AsyncBlobjectInvocation::sent(bool sentSynchronously) } // -// OldAsyncBlobjectInvocation +// NewAsyncBlobjectInvocation // -IcePy::OldAsyncBlobjectInvocation::OldAsyncBlobjectInvocation(const Ice::ObjectPrx& prx) : - Invocation(prx), _callback(0) +IcePy::NewAsyncBlobjectInvocation::NewAsyncBlobjectInvocation(const Ice::ObjectPrx& prx, PyObject* pyProxy) : + NewAsyncInvocation(prx, pyProxy, "ice_invoke") { } -IcePy::OldAsyncBlobjectInvocation::~OldAsyncBlobjectInvocation() -{ - AdoptThread adoptThread; // Ensure the current thread is able to call into Python. - - Py_XDECREF(_callback); -} - -PyObject* -IcePy::OldAsyncBlobjectInvocation::invoke(PyObject* args, PyObject* /* kwds */) +Ice::AsyncResultPtr +IcePy::NewAsyncBlobjectInvocation::handleInvoke(PyObject* args, PyObject* /* kwds */) { char* operation; PyObject* mode; @@ -2991,20 +3357,19 @@ IcePy::OldAsyncBlobjectInvocation::invoke(PyObject* args, PyObject* /* kwds */) PyObject* operationModeType = lookupType("Ice.OperationMode"); PyObject* ctx = 0; #if PY_VERSION_HEX >= 0x03000000 - if(!PyArg_ParseTuple(args, STRCAST("OsO!O!|O"), &_callback, &operation, operationModeType, &mode, + if(!PyArg_ParseTuple(args, STRCAST("sO!O!|O"), &operation, operationModeType, &mode, &PyBytes_Type, &inParams, &ctx)) { return 0; } #else - if(!PyArg_ParseTuple(args, STRCAST("OsO!O!|O"), &_callback, &operation, operationModeType, &mode, + if(!PyArg_ParseTuple(args, STRCAST("sO!O!|O"), &operation, operationModeType, &mode, &PyBuffer_Type, &inParams, &ctx)) { return 0; } #endif - Py_INCREF(_callback); _op = operation; PyObjectHandle modeValue = PyObject_GetAttrString(mode, STRCAST("value")); @@ -3036,159 +3401,170 @@ IcePy::OldAsyncBlobjectInvocation::invoke(PyObject* args, PyObject* /* kwds */) } #endif - bool sentSynchronously = false; - try - { - Ice::AsyncResultPtr result; - Ice::Callback_Object_ice_invokePtr cb = - Ice::newCallback_Object_ice_invoke(this, &OldAsyncBlobjectInvocation::response, - &OldAsyncBlobjectInvocation::exception, - &OldAsyncBlobjectInvocation::sent); - - if(ctx == 0 || ctx == Py_None) - { - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = _prx->begin_ice_invoke(operation, sendMode, in, cb); - } - else - { - Ice::Context context; - if(!dictionaryToContext(ctx, context)) - { - return 0; - } - - AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = _prx->begin_ice_invoke(operation, sendMode, in, context, cb); - } + NewAsyncInvocationPtr self = this; + Ice::Callback_Object_ice_invokePtr cb = + Ice::newCallback_Object_ice_invoke(self, &NewAsyncInvocation::response, &NewAsyncInvocation::exception, + &NewAsyncInvocation::sent); - sentSynchronously = result->sentSynchronously(); - } - catch(const Ice::CommunicatorDestroyedException& ex) + if(ctx == 0 || ctx == Py_None) { // - // CommunicatorDestroyedException is the only exception that can propagate directly. + // Don't release the GIL here. We want other threads to block until we've had a chance + // to create the future. // - setPythonException(ex); - return 0; + //AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. + return _prx->begin_ice_invoke(operation, sendMode, in, cb); } - catch(const Ice::Exception&) + else { + Ice::Context context; + if(!dictionaryToContext(ctx, context)) + { + return 0; + } + // - // No other exceptions should be raised by begin_ice_invoke. + // Don't release the GIL here. We want other threads to block until we've had a chance + // to create the future. // - assert(false); + //AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. + return _prx->begin_ice_invoke(operation, sendMode, in, context, cb); } - - PyRETURN_BOOL(sentSynchronously); } void -IcePy::OldAsyncBlobjectInvocation::response(bool ok, const pair<const Ice::Byte*, const Ice::Byte*>& results) +IcePy::NewAsyncBlobjectInvocation::handleResponse(PyObject* future, bool ok, + const pair<const Ice::Byte*, const Ice::Byte*>& results) { - AdoptThread adoptThread; // Ensure the current thread is able to call into Python. - - try + // + // Prepare the args as a tuple of the bool and out param buffer. + // + PyObjectHandle args = PyTuple_New(2); + if(!args.get()) { - // - // Prepare the args as a tuple of the bool and out param buffer. - // - PyObjectHandle args = PyTuple_New(2); - if(!args.get()) - { - assert(PyErr_Occurred()); - PyErr_Print(); - return; - } + assert(PyErr_Occurred()); + PyErr_Print(); + return; + } - PyTuple_SET_ITEM(args.get(), 0, ok ? incTrue() : incFalse()); + PyTuple_SET_ITEM(args.get(), 0, ok ? incTrue() : incFalse()); #if PY_VERSION_HEX >= 0x03000000 - Py_ssize_t sz = results.second - results.first; - PyObjectHandle op; - if(sz == 0) - { - op = PyBytes_FromString(""); - } - else - { - op = PyBytes_FromStringAndSize(reinterpret_cast<const char*>(results.first), sz); - } - if(!op.get()) - { - assert(PyErr_Occurred()); - PyErr_Print(); - return; - } + Py_ssize_t sz = results.second - results.first; + PyObjectHandle op; + if(sz == 0) + { + op = PyBytes_FromString(""); + } + else + { + op = PyBytes_FromStringAndSize(reinterpret_cast<const char*>(results.first), sz); + } + if(!op.get()) + { + assert(PyErr_Occurred()); + PyErr_Print(); + return; + } #else - // - // Create the output buffer and copy in the outParams. - // - PyObjectHandle op = PyBuffer_New(results.second - results.first); - if(!op.get()) - { - assert(PyErr_Occurred()); - PyErr_Print(); - return; - } + // + // Create the output buffer and copy in the outParams. + // + PyObjectHandle op = PyBuffer_New(results.second - results.first); + if(!op.get()) + { + assert(PyErr_Occurred()); + PyErr_Print(); + return; + } - void* buf; - Py_ssize_t sz; - if(PyObject_AsWriteBuffer(op.get(), &buf, &sz)) - { - assert(PyErr_Occurred()); - PyErr_Print(); - return; - } - assert(sz == results.second - results.first); - memcpy(buf, results.first, sz); + void* buf; + Py_ssize_t sz; + if(PyObject_AsWriteBuffer(op.get(), &buf, &sz)) + { + assert(PyErr_Occurred()); + PyErr_Print(); + return; + } + assert(sz == results.second - results.first); + memcpy(buf, results.first, sz); #endif - PyTuple_SET_ITEM(args.get(), 1, op.get()); - op.release(); // PyTuple_SET_ITEM steals a reference. + PyTuple_SET_ITEM(args.get(), 1, op.release()); // PyTuple_SET_ITEM steals a reference. - const string methodName = "ice_response"; - if(!PyObject_HasAttrString(_callback, STRCAST(methodName.c_str()))) - { - ostringstream ostr; - ostr << "AMI callback object for operation `ice_invoke_async' does not define " << methodName << "()"; - string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast<char*>(str.c_str())); - } - else - { - PyObjectHandle method = PyObject_GetAttrString(_callback, STRCAST(methodName.c_str())); - assert(method.get()); - PyObjectHandle tmp = PyObject_Call(method.get(), args.get(), 0); - if(PyErr_Occurred()) - { - handleException(); // Callback raised an exception. - } - } + PyObjectHandle tmp = callMethod(future, "set_result", args.get()); + PyErr_Clear(); +} + +// +// Upcall +// +void +Upcall::dispatchImpl(PyObject* servant, const string& dispatchName, PyObject* args, const Ice::Current& current) +{ + Ice::CommunicatorPtr communicator = current.adapter->getCommunicator(); + + // + // Find the servant method for the operation. Use dispatchName here, not current.operation. + // + PyObjectHandle servantMethod = PyObject_GetAttrString(servant, const_cast<char*>(dispatchName.c_str())); + if(!servantMethod.get()) + { + ostringstream ostr; + ostr << "servant for identity " << communicator->identityToString(current.id) + << " does not define operation `" << dispatchName << "'"; + string str = ostr.str(); + PyErr_WarnEx(PyExc_RuntimeWarning, const_cast<char*>(str.c_str()), 1); + Ice::UnknownException ex(__FILE__, __LINE__); + ex.unknown = str; + throw ex; } - catch(const Ice::Exception& ex) + + // + // Get the _iceDispatch method. The _iceDispatch method will invoke the servant method and pass it the arguments. + // + PyObjectHandle dispatchMethod = PyObject_GetAttrString(servant, STRCAST("_iceDispatch")); + if(!dispatchMethod.get()) { ostringstream ostr; - ostr << "Exception raised by AMI callback for operation `ice_invoke_async':" << ex; + ostr << "_iceDispatch method not found for identity " << communicator->identityToString(current.id) + << " and operation `" << dispatchName << "'"; string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast<char*>(str.c_str())); + PyErr_WarnEx(PyExc_RuntimeWarning, const_cast<char*>(str.c_str()), 1); + Ice::UnknownException ex(__FILE__, __LINE__); + ex.unknown = str; + throw ex; } -} -void -IcePy::OldAsyncBlobjectInvocation::exception(const Ice::Exception& ex) -{ - AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + PyObjectHandle dispatchArgs = PyTuple_New(3); + if(!dispatchArgs.get()) + { + throwPythonException(); + } - callException(_callback, "ice_invoke", "ice_exception", ex); -} + DispatchCallbackObject* callback = dispatchCallbackNew(&DispatchCallbackType, 0, 0); + if(!callback) + { + throwPythonException(); + } + callback->upcall = new UpcallPtr(this); + PyTuple_SET_ITEM(dispatchArgs.get(), 0, reinterpret_cast<PyObject*>(callback)); // Steals a reference. + PyTuple_SET_ITEM(dispatchArgs.get(), 1, servantMethod.release()); // Steals a reference. + PyTuple_SET_ITEM(dispatchArgs.get(), 2, incRef(args)); // Steals a reference. -void -IcePy::OldAsyncBlobjectInvocation::sent(bool) -{ - AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + // + // Ignore the return value of _iceDispatch -- it will use the dispatch callback. + // + PyObjectHandle ignore = PyObject_Call(dispatchMethod.get(), dispatchArgs.get(), 0); - callSent(_callback, "ice_sent", false, false); + // + // Check for exceptions. + // + if(PyErr_Occurred()) + { + PyException ex; // Retrieve it before another Python API call clears it. + exception(ex); + } } // @@ -3196,7 +3572,7 @@ IcePy::OldAsyncBlobjectInvocation::sent(bool) // IcePy::TypedUpcall::TypedUpcall(const OperationPtr& op, const Ice::AMD_Object_ice_invokePtr& callback, const Ice::CommunicatorPtr& communicator) : - _op(op), _callback(callback), _communicator(communicator), _finished(false) + _op(op), _callback(callback), _communicator(communicator) { } @@ -3204,19 +3580,14 @@ void IcePy::TypedUpcall::dispatch(PyObject* servant, const pair<const Ice::Byte*, const Ice::Byte*>& inBytes, const Ice::Current& current) { + _encoding = current.encoding; + // // Unmarshal the in parameters. We have to leave room in the arguments for a trailing // Ice::Current object. // Py_ssize_t count = static_cast<Py_ssize_t>(_op->inParams.size()) + 1; - Py_ssize_t offset = 0; - if(_op->amd) - { - ++count; // Leave room for a leading AMD callback argument. - offset = 1; - } - PyObjectHandle args = PyTuple_New(count); if(!args.get()) { @@ -3225,19 +3596,19 @@ IcePy::TypedUpcall::dispatch(PyObject* servant, const pair<const Ice::Byte*, con if(!_op->inParams.empty()) { - Ice::InputStreamPtr is = Ice::wrapInputStream(_communicator, inBytes); + Ice::InputStream is(_communicator, inBytes); // - // Store a pointer to a local SlicedDataUtil object as the stream's closure. + // Store a pointer to a local StreamUtil object as the stream's closure. // This is necessary to support object unmarshaling (see ObjectReader). // - SlicedDataUtil util; - assert(!is->closure()); - is->closure(&util); + StreamUtil util; + assert(!is.getClosure()); + is.setClosure(&util); try { - is->startEncapsulation(); + is.startEncapsulation(); ParamInfoList::iterator p; @@ -3249,8 +3620,8 @@ IcePy::TypedUpcall::dispatch(PyObject* servant, const pair<const Ice::Byte*, con ParamInfoPtr info = *p; if(!info->optional) { - void* closure = reinterpret_cast<void*>(info->pos + offset); - info->type->unmarshal(is, info, args.get(), closure, false, &info->metaData); + void* closure = reinterpret_cast<void*>(info->pos); + info->type->unmarshal(&is, info, args.get(), closure, false, &info->metaData); } } @@ -3260,26 +3631,25 @@ IcePy::TypedUpcall::dispatch(PyObject* servant, const pair<const Ice::Byte*, con for(p = _op->optionalInParams.begin(); p != _op->optionalInParams.end(); ++p) { ParamInfoPtr info = *p; - if(is->readOptional(info->tag, info->type->optionalFormat())) + if(is.readOptional(info->tag, info->type->optionalFormat())) { - void* closure = reinterpret_cast<void*>(info->pos + offset); - info->type->unmarshal(is, info, args.get(), closure, true, &info->metaData); + void* closure = reinterpret_cast<void*>(info->pos); + info->type->unmarshal(&is, info, args.get(), closure, true, &info->metaData); } else { - PyTuple_SET_ITEM(args.get(), info->pos + offset, Unset); - Py_INCREF(Unset); // PyTuple_SET_ITEM steals a reference. + PyTuple_SET_ITEM(args.get(), info->pos, incRef(Unset)); // PyTuple_SET_ITEM steals a reference. } } if(_op->sendsClasses) { - is->readPendingObjects(); + is.readPendingValues(); } - is->endEncapsulation(); + is.endEncapsulation(); - util.update(); + util.updateSlicedData(); } catch(const AbortMarshaling&) { @@ -3291,202 +3661,149 @@ IcePy::TypedUpcall::dispatch(PyObject* servant, const pair<const Ice::Byte*, con // Create an object to represent Ice::Current. We need to append this to the argument tuple. // PyObjectHandle curr = createCurrent(current); - PyTuple_SET_ITEM(args.get(), PyTuple_GET_SIZE(args.get()) - 1, curr.get()); - curr.release(); // PyTuple_SET_ITEM steals a reference. - - if(_op->amd) - { - // - // Create the callback object and pass it as the first argument. - // - AMDCallbackObject* obj = amdCallbackNew(&AMDCallbackType, 0, 0); - if(!obj) - { - throwPythonException(); - } - obj->upcall = new UpcallPtr(this); - obj->encoding = current.encoding; - PyTuple_SET_ITEM(args.get(), 0, (PyObject*)obj); // PyTuple_SET_ITEM steals a reference. - } - - // - // Dispatch the operation. Use _dispatchName here, not current.operation. - // - PyObjectHandle method = PyObject_GetAttrString(servant, const_cast<char*>(_op->dispatchName.c_str())); - if(!method.get()) - { - ostringstream ostr; - ostr << "servant for identity " << _communicator->identityToString(current.id) - << " does not define operation `" << _op->dispatchName << "'"; - string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast<char*>(str.c_str())); - Ice::UnknownException ex(__FILE__, __LINE__); - ex.unknown = str; - throw ex; - } - - PyObjectHandle result = PyObject_Call(method.get(), args.get(), 0); - - // - // Check for exceptions. - // - if(PyErr_Occurred()) - { - PyException ex; // Retrieve it before another Python API call clears it. - exception(ex, current.encoding); - return; - } + PyTuple_SET_ITEM(args.get(), PyTuple_GET_SIZE(args.get()) - 1, + curr.release()); // PyTuple_SET_ITEM steals a reference. - if(!_op->amd) - { - response(result.get(), current.encoding); - } + dispatchImpl(servant, _op->dispatchName, args.get(), current); } void -IcePy::TypedUpcall::response(PyObject* args, const Ice::EncodingVersion& encoding) +IcePy::TypedUpcall::response(PyObject* result) { - if(_finished) + try { // - // This method could be called more than once if the application calls - // ice_response multiple times. We ignore subsequent calls. + // Marshal the results. If there is more than one value to be returned, then they must be + // returned in a tuple of the form (result, outParam1, ...). // - return; - } - _finished = true; + Py_ssize_t numResults = static_cast<Py_ssize_t>(_op->outParams.size()); + if(_op->returnType) + { + numResults++; + } + + if(numResults > 1 && (!PyTuple_Check(result) || PyTuple_GET_SIZE(result) != numResults)) + { + ostringstream ostr; + ostr << "operation `" << fixIdent(_op->name) << "' should return a tuple of length " << numResults; + string str = ostr.str(); + PyErr_WarnEx(PyExc_RuntimeWarning, const_cast<char*>(str.c_str()), 1); + throw Ice::MarshalException(__FILE__, __LINE__); + } - try - { // - // Marshal the results. If there is more than one value to be returned, then they must be - // returned in a tuple of the form (result, outParam1, ...). + // Normalize the result value. When there are multiple result values, result is already a tuple. + // Otherwise, we create a tuple to make the code a little simpler. // - Ice::OutputStreamPtr os = Ice::createOutputStream(_communicator); - try + PyObjectHandle t; + if(numResults > 1) + { + t = incRef(result); + } + else { - Py_ssize_t numResults = static_cast<Py_ssize_t>(_op->outParams.size()); - if(_op->returnType) + t = PyTuple_New(1); + if(!t.get()) { - numResults++; + throw AbortMarshaling(); } + PyTuple_SET_ITEM(t.get(), 0, incRef(result)); + } + + ObjectMap objectMap; + ParamInfoList::iterator p; - if(numResults > 1 && (!PyTuple_Check(args) || PyTuple_GET_SIZE(args) != numResults)) + // + // Validate the results. + // + for(p = _op->outParams.begin(); p != _op->outParams.end(); ++p) + { + ParamInfoPtr info = *p; + PyObject* arg = PyTuple_GET_ITEM(t.get(), info->pos); + if((!info->optional || arg != Unset) && !info->type->validate(arg)) { + // TODO: Provide the parameter name instead? ostringstream ostr; - ostr << "operation `" << fixIdent(_op->name) << "' should return a tuple of length " << numResults; + ostr << "invalid value for out argument " << (info->pos + 1) << " in operation `" + << _op->dispatchName << "'"; string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast<char*>(str.c_str())); + PyErr_WarnEx(PyExc_RuntimeWarning, const_cast<char*>(str.c_str()), 1); throw Ice::MarshalException(__FILE__, __LINE__); } - - // - // Normalize the args value. For an AMD operation, or when there are multiple - // result values, args is already a tuple. Otherwise, we create a tuple to - // make the code a little simpler. - // - PyObjectHandle t; - if(_op->amd || numResults > 1) - { - t = args; - } - else + } + if(_op->returnType) + { + PyObject* res = PyTuple_GET_ITEM(t.get(), 0); + if((!_op->returnType->optional || res != Unset) && !_op->returnType->type->validate(res)) { - t = PyTuple_New(1); - if(!t.get()) - { - throw AbortMarshaling(); - } - PyTuple_SET_ITEM(t.get(), 0, args); + ostringstream ostr; + ostr << "invalid return value for operation `" << _op->dispatchName << "'"; + string str = ostr.str(); + PyErr_WarnEx(PyExc_RuntimeWarning, const_cast<char*>(str.c_str()), 1); + throw Ice::MarshalException(__FILE__, __LINE__); } - Py_INCREF(args); - - os->startEncapsulation(encoding, _op->format); + } - ObjectMap objectMap; - ParamInfoList::iterator p; + Ice::OutputStream os(_communicator); + os.startEncapsulation(_encoding, _op->format); - // - // Validate the results. - // - for(p = _op->outParams.begin(); p != _op->outParams.end(); ++p) + // + // Marshal the required out parameters. + // + for(p = _op->outParams.begin(); p != _op->outParams.end(); ++p) + { + ParamInfoPtr info = *p; + if(!info->optional) { - ParamInfoPtr info = *p; PyObject* arg = PyTuple_GET_ITEM(t.get(), info->pos); - if((!info->optional || arg != Unset) && !info->type->validate(arg)) - { - // TODO: Provide the parameter name instead? - ostringstream ostr; - ostr << "invalid value for out argument " << (info->pos + 1) << " in operation `" - << _op->dispatchName << "'"; - string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast<char*>(str.c_str())); - throw Ice::MarshalException(__FILE__, __LINE__); - } - } - if(_op->returnType) - { - PyObject* res = PyTuple_GET_ITEM(t.get(), 0); - if((!_op->returnType->optional || res != Unset) && !_op->returnType->type->validate(res)) - { - ostringstream ostr; - ostr << "invalid return value for operation `" << _op->dispatchName << "'"; - string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast<char*>(str.c_str())); - throw Ice::MarshalException(__FILE__, __LINE__); - } - } - - // - // Marshal the required out parameters. - // - for(p = _op->outParams.begin(); p != _op->outParams.end(); ++p) - { - ParamInfoPtr info = *p; - if(!info->optional) - { - PyObject* arg = PyTuple_GET_ITEM(t.get(), info->pos); - info->type->marshal(arg, os, &objectMap, false, &info->metaData); - } + info->type->marshal(arg, &os, &objectMap, false, &info->metaData); } + } - // - // Marshal the required return value, if any. - // - if(_op->returnType && !_op->returnType->optional) - { - PyObject* res = PyTuple_GET_ITEM(t.get(), 0); - _op->returnType->type->marshal(res, os, &objectMap, false, &_op->metaData); - } + // + // Marshal the required return value, if any. + // + if(_op->returnType && !_op->returnType->optional) + { + PyObject* res = PyTuple_GET_ITEM(t.get(), 0); + _op->returnType->type->marshal(res, &os, &objectMap, false, &_op->metaData); + } - // - // Marshal the optional results. - // - for(p = _op->optionalOutParams.begin(); p != _op->optionalOutParams.end(); ++p) + // + // Marshal the optional results. + // + for(p = _op->optionalOutParams.begin(); p != _op->optionalOutParams.end(); ++p) + { + ParamInfoPtr info = *p; + PyObject* arg = PyTuple_GET_ITEM(t.get(), info->pos); + if(arg != Unset && os.writeOptional(info->tag, info->type->optionalFormat())) { - ParamInfoPtr info = *p; - PyObject* arg = PyTuple_GET_ITEM(t.get(), info->pos); - if(arg != Unset && os->writeOptional(info->tag, info->type->optionalFormat())) - { - info->type->marshal(arg, os, &objectMap, true, &info->metaData); - } + info->type->marshal(arg, &os, &objectMap, true, &info->metaData); } + } - if(_op->returnsClasses) - { - os->writePendingObjects(); - } + if(_op->returnsClasses) + { + os.writePendingValues(); + } - os->endEncapsulation(); + os.endEncapsulation(); - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. - _callback->ice_response(true, os->finished()); - } - catch(const AbortMarshaling&) + AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. + _callback->ice_response(true, os.finished()); + } + catch(const AbortMarshaling&) + { + try { throwPythonException(); } + catch(const Ice::Exception& ex) + { + AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. + _callback->ice_exception(ex); + } } catch(const Ice::Exception& ex) { @@ -3496,19 +3813,8 @@ IcePy::TypedUpcall::response(PyObject* args, const Ice::EncodingVersion& encodin } void -IcePy::TypedUpcall::exception(PyException& ex, const Ice::EncodingVersion& encoding) +IcePy::TypedUpcall::exception(PyException& ex) { - if(_finished) - { - // - // An asynchronous response or exception has already been sent. We just - // raise an exception and let the C++ run time handle it. - // - ex.raise(); - } - - _finished = true; - try { try @@ -3526,29 +3832,23 @@ IcePy::TypedUpcall::exception(PyException& ex, const Ice::EncodingVersion& encod if(PyObject_IsInstance(ex.ex.get(), userExceptionType)) { // - // Get the exception's type and verify that it is legal to be thrown from this operation. + // Get the exception's type. // PyObjectHandle iceType = PyObject_GetAttrString(ex.ex.get(), STRCAST("_ice_type")); assert(iceType.get()); ExceptionInfoPtr info = ExceptionInfoPtr::dynamicCast(getException(iceType.get())); assert(info); - if(!validateException(ex.ex.get())) - { - ex.raise(); // Raises UnknownUserException. - } - else - { - Ice::OutputStreamPtr os = Ice::createOutputStream(_communicator); - os->startEncapsulation(encoding, _op->format); - ExceptionWriter writer(_communicator, ex.ex, info); - os->writeException(writer); + Ice::OutputStream os(_communicator); + os.startEncapsulation(_encoding, _op->format); - os->endEncapsulation(); + ExceptionWriter writer(ex.ex, info); + os.writeException(writer); - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. - _callback->ice_response(false, os->finished()); - } + os.endEncapsulation(); + + AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. + _callback->ice_response(false, os.finished()); } else { @@ -3562,30 +3862,22 @@ IcePy::TypedUpcall::exception(PyException& ex, const Ice::EncodingVersion& encod } catch(const Ice::Exception& ex) { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. - _callback->ice_exception(ex); + exception(ex); } } -bool -IcePy::TypedUpcall::validateException(PyObject* ex) const +void +IcePy::TypedUpcall::exception(const Ice::Exception& ex) { - for(ExceptionInfoList::const_iterator p = _op->exceptions.begin(); p != _op->exceptions.end(); ++p) - { - if(PyObject_IsInstance(ex, (*p)->pythonType.get())) - { - return true; - } - } - - return false; + AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. + _callback->ice_exception(ex); } // // BlobjectUpcall // -IcePy::BlobjectUpcall::BlobjectUpcall(bool amd, const Ice::AMD_Object_ice_invokePtr& callback) : - _amd(amd), _callback(callback), _finished(false) +IcePy::BlobjectUpcall::BlobjectUpcall(const Ice::AMD_Object_ice_invokePtr& callback) : + _callback(callback) { } @@ -3598,11 +3890,6 @@ IcePy::BlobjectUpcall::dispatch(PyObject* servant, const pair<const Ice::Byte*, Py_ssize_t count = 2; // First is the inParams, second is the Ice::Current object. Py_ssize_t start = 0; - if(_amd) - { - ++count; // Leave room for a leading AMD callback argument. - start = 1; - } PyObjectHandle args = PyTuple_New(count); if(!args.get()) @@ -3623,186 +3910,113 @@ IcePy::BlobjectUpcall::dispatch(PyObject* servant, const pair<const Ice::Byte*, } #else // - // If using AMD we need to copy the bytes since the bytes may be - // accessed after this method is over, otherwise - // PyBuffer_FromMemory can be used which doesn't do a copy. + // Make a copy of the bytes since the bytes may be accessed after this method is over. // - if(!_amd) + ip = PyBuffer_New(inBytes.second - inBytes.first); + if(!ip.get()) { - ip = PyBuffer_FromMemory((void*)inBytes.first, inBytes.second - inBytes.first); - if(!ip.get()) - { - throwPythonException(); - } + throwPythonException(); } - else + void* buf; + Py_ssize_t sz; + if(PyObject_AsWriteBuffer(ip.get(), &buf, &sz)) { - ip = PyBuffer_New(inBytes.second - inBytes.first); - if(!ip.get()) - { - throwPythonException(); - } - void* buf; - Py_ssize_t sz; - if(PyObject_AsWriteBuffer(ip.get(), &buf, &sz)) - { - throwPythonException(); - } - assert(sz == inBytes.second - inBytes.first); - memcpy(buf, inBytes.first, sz); + throwPythonException(); } + assert(sz == inBytes.second - inBytes.first); + memcpy(buf, inBytes.first, sz); #endif - PyTuple_SET_ITEM(args.get(), start, ip.get()); + PyTuple_SET_ITEM(args.get(), start, ip.release()); // PyTuple_SET_ITEM steals a reference. ++start; - ip.release(); // PyTuple_SET_ITEM steals a reference. // // Create an object to represent Ice::Current. We need to append // this to the argument tuple. // PyObjectHandle curr = createCurrent(current); - PyTuple_SET_ITEM(args.get(), start, curr.get()); - curr.release(); // PyTuple_SET_ITEM steals a reference. + PyTuple_SET_ITEM(args.get(), start, curr.release()); // PyTuple_SET_ITEM steals a reference. - string dispatchName = "ice_invoke"; - if(_amd) - { - dispatchName += "_async"; - // - // Create the callback object and pass it as the first argument. - // - AMDCallbackObject* obj = amdCallbackNew(&AMDCallbackType, 0, 0); - if(!obj) - { - throwPythonException(); - } - obj->upcall = new UpcallPtr(this); - obj->encoding = current.encoding; - PyTuple_SET_ITEM(args.get(), 0, (PyObject*)obj); // PyTuple_SET_ITEM steals a reference. - } - - // - // Dispatch the operation. - // - PyObjectHandle method = PyObject_GetAttrString(servant, const_cast<char*>(dispatchName.c_str())); - if(!method.get()) - { - ostringstream ostr; - ostr << "servant for identity " << communicator->identityToString(current.id) - << " does not define operation `" << dispatchName << "'"; - string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast<char*>(str.c_str())); - Ice::UnknownException ex(__FILE__, __LINE__); - ex.unknown = str; - throw ex; - } - - PyObjectHandle result = PyObject_Call(method.get(), args.get(), 0); - - // - // Check for exceptions. - // - if(PyErr_Occurred()) - { - PyException ex; // Retrieve it before another Python API call clears it. - exception(ex, current.encoding); - return; - } - - if(!_amd) - { - response(result.get(), current.encoding); - } + dispatchImpl(servant, "ice_invoke", args.get(), current); } void -IcePy::BlobjectUpcall::response(PyObject* args, const Ice::EncodingVersion&) +IcePy::BlobjectUpcall::response(PyObject* result) { - if(_finished) + try { // - // This method could be called more than once if the application calls - // ice_response multiple times. We ignore subsequent calls. + // The result is a tuple of (bool, results). // - return; - } + if(!PyTuple_Check(result) || PyTuple_GET_SIZE(result) != 2) + { + string str = "operation `ice_invoke' should return a tuple of length 2"; + PyErr_WarnEx(PyExc_RuntimeWarning, const_cast<char*>(str.c_str()), 1); + throw Ice::MarshalException(__FILE__, __LINE__); + } - _finished = true; + PyObject* arg = PyTuple_GET_ITEM(result, 0); + bool isTrue = PyObject_IsTrue(arg) == 1; - // - // The return value is a tuple of (bool, results). - // - if(!PyTuple_Check(args) || PyTuple_GET_SIZE(args) != 2) - { - ostringstream ostr; - string name = "ice_invoke"; - if(_amd) + arg = PyTuple_GET_ITEM(result, 1); + +#if PY_VERSION_HEX >= 0x03000000 + if(!PyBytes_Check(arg)) { - name += "_async"; + ostringstream ostr; + ostr << "invalid return value for operation `ice_invoke'"; + string str = ostr.str(); + PyErr_WarnEx(PyExc_RuntimeWarning, const_cast<char*>(str.c_str()), 1); + throw Ice::MarshalException(__FILE__, __LINE__); } - ostr << "operation `" << name << "' should return a tuple of length 2"; - string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast<char*>(str.c_str())); - throw Ice::MarshalException(__FILE__, __LINE__); - } - PyObject* arg = PyTuple_GET_ITEM(args, 0); - bool isTrue = PyObject_IsTrue(arg) == 1; + Py_ssize_t sz = PyBytes_GET_SIZE(arg); + pair<const ::Ice::Byte*, const ::Ice::Byte*> r(static_cast<const Ice::Byte*>(0), + static_cast<const Ice::Byte*>(0)); + if(sz > 0) + { + r.first = reinterpret_cast<Ice::Byte*>(PyBytes_AS_STRING(arg)); + r.second = r.first + sz; + } +#else + if(!PyBuffer_Check(arg)) + { + ostringstream ostr; + ostr << "invalid return value for operation `ice_invoke'"; + string str = ostr.str(); + PyErr_WarnEx(PyExc_RuntimeWarning, const_cast<char*>(str.c_str()), 1); + throw Ice::MarshalException(__FILE__, __LINE__); + } - arg = PyTuple_GET_ITEM(args, 1); + char* charBuf = 0; + Py_ssize_t sz = arg->ob_type->tp_as_buffer->bf_getcharbuffer(arg, 0, &charBuf); + const Ice::Byte* mem = reinterpret_cast<const Ice::Byte*>(charBuf); + const pair<const ::Ice::Byte*, const ::Ice::Byte*> r(mem, mem + sz); +#endif -#if PY_VERSION_HEX >= 0x03000000 - if(!PyBytes_Check(arg)) - { - ostringstream ostr; - ostr << "invalid return value for operation `ice_invoke'"; - string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast<char*>(str.c_str())); - throw Ice::MarshalException(__FILE__, __LINE__); + AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. + _callback->ice_response(isTrue, r); } - - Py_ssize_t sz = PyBytes_GET_SIZE(arg); - pair<const ::Ice::Byte*, const ::Ice::Byte*> r(static_cast<const Ice::Byte*>(0),static_cast<const Ice::Byte*>(0)); - if(sz > 0) + catch(const AbortMarshaling&) { - r.first = reinterpret_cast<Ice::Byte*>(PyBytes_AS_STRING(arg)); - r.second = r.first + sz; + try + { + throwPythonException(); + } + catch(const Ice::Exception& ex) + { + exception(ex); + } } -#else - if(!PyBuffer_Check(arg)) + catch(const Ice::Exception& ex) { - ostringstream ostr; - ostr << "invalid return value for operation `ice_invoke'"; - string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast<char*>(str.c_str())); - throw Ice::MarshalException(__FILE__, __LINE__); + exception(ex); } - - char* charBuf = 0; - Py_ssize_t sz = arg->ob_type->tp_as_buffer->bf_getcharbuffer(arg, 0, &charBuf); - const Ice::Byte* mem = reinterpret_cast<const Ice::Byte*>(charBuf); - const pair<const ::Ice::Byte*, const ::Ice::Byte*> r(mem, mem + sz); -#endif - - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. - _callback->ice_response(isTrue, r); } void -IcePy::BlobjectUpcall::exception(PyException& ex, const Ice::EncodingVersion&) +IcePy::BlobjectUpcall::exception(PyException& ex) { - if(_finished) - { - // - // An asynchronous response or exception has already been sent. We just - // raise an exception and let the C++ run time handle it. - // - ex.raise(); - } - - _finished = true; - try { // @@ -3817,11 +4031,17 @@ IcePy::BlobjectUpcall::exception(PyException& ex, const Ice::EncodingVersion&) } catch(const Ice::Exception& ex) { - AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. - _callback->ice_exception(ex); + exception(ex); } } +void +IcePy::BlobjectUpcall::exception(const Ice::Exception& ex) +{ + AllowThreads allowThreads; // Release Python's global interpreter lock during blocking calls. + _callback->ice_exception(ex); +} + PyObject* IcePy::invokeBuiltin(PyObject* proxy, const string& builtin, PyObject* args) { @@ -3840,6 +4060,23 @@ IcePy::invokeBuiltin(PyObject* proxy, const string& builtin, PyObject* args) } PyObject* +IcePy::invokeBuiltinAsync(PyObject* proxy, const string& builtin, PyObject* args) +{ + string name = "_op_" + builtin; + PyObject* objectType = lookupType("Ice.Object"); + assert(objectType); + PyObjectHandle obj = PyObject_GetAttrString(objectType, STRCAST(name.c_str())); + assert(obj.get()); + + OperationPtr op = getOperation(obj.get()); + assert(op); + + Ice::ObjectPrx p = getProxy(proxy); + InvocationPtr i = new NewAsyncTypedInvocation(p, proxy, op); + return i->invoke(args); +} + +PyObject* IcePy::beginBuiltin(PyObject* proxy, const string& builtin, PyObject* args) { string name = "_op_" + builtin; @@ -3898,7 +4135,7 @@ PyObject* IcePy::iceInvokeAsync(PyObject* proxy, PyObject* args) { Ice::ObjectPrx p = getProxy(proxy); - InvocationPtr i = new OldAsyncBlobjectInvocation(p); + InvocationPtr i = new NewAsyncBlobjectInvocation(p, proxy); return i->invoke(args); } @@ -3940,12 +4177,9 @@ IcePy::createAsyncResult(const Ice::AsyncResultPtr& r, PyObject* proxy, PyObject return 0; } obj->result = new Ice::AsyncResultPtr(r); - obj->proxy = proxy; - Py_XINCREF(obj->proxy); - obj->connection = connection; - Py_XINCREF(obj->connection); - obj->communicator = communicator; - Py_XINCREF(obj->communicator); + obj->proxy = incRef(proxy); + obj->connection = incRef(connection); + obj->communicator = incRef(communicator); return reinterpret_cast<PyObject*>(obj); } @@ -3957,6 +4191,9 @@ IcePy::getAsyncResult(PyObject* p) return *obj->result; } +// +// FlushCallback +// IcePy::FlushCallback::FlushCallback(PyObject* ex, PyObject* sent, const string& op) : _ex(ex), _sent(sent), _op(op) { @@ -3991,6 +4228,100 @@ IcePy::FlushCallback::sent(bool sentSynchronously) } } +IcePy::FlushAsyncCallback::FlushAsyncCallback(const string& op) : + _op(op), _future(0), _sent(false), _sentSynchronously(false), _exception(0) +{ +} + +IcePy::FlushAsyncCallback::~FlushAsyncCallback() +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + Py_XDECREF(_future); + Py_XDECREF(_exception); +} + +void +IcePy::FlushAsyncCallback::setFuture(PyObject* future) +{ + // + // Called with the GIL locked. + // + + // + // Check if any callbacks have been invoked already. + // + if(_exception) + { + PyObjectHandle tmp = callMethod(future, "set_exception", _exception); + PyErr_Clear(); + } + else if(_sent) + { + PyObjectHandle tmp = callMethod(future, "set_sent", _sentSynchronously ? getTrue() : getFalse()); + PyErr_Clear(); + // + // We consider the invocation complete when sent. + // + tmp = callMethod(future, "set_result", Py_None); + PyErr_Clear(); + } + else + { + _future = incRef(future); + } +} + +void +IcePy::FlushAsyncCallback::exception(const Ice::Exception& ex) +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + if(!_future) + { + // + // The future hasn't been set yet, which means the request is still being invoked. Save the results for later. + // + _exception = convertException(ex); + return; + } + + PyObjectHandle exh = convertException(ex); + assert(exh.get()); + PyObjectHandle tmp = callMethod(_future, "set_exception", exh.get()); + PyErr_Clear(); + + Py_DECREF(_future); // Break cyclic dependency. + _future = 0; +} + +void +IcePy::FlushAsyncCallback::sent(bool sentSynchronously) +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + if(!_future) + { + // + // The future hasn't been set yet, which means the request is still being invoked. Save the results for later. + // + _sent = true; + _sentSynchronously = sentSynchronously; + return; + } + + PyObjectHandle tmp = callMethod(_future, "set_sent", _sentSynchronously ? getTrue() : getFalse()); + PyErr_Clear(); + // + // We consider the invocation complete when sent. + // + tmp = callMethod(_future, "set_result", Py_None); + PyErr_Clear(); + + Py_DECREF(_future); // Break cyclic dependency. + _future = 0; +} + IcePy::GetConnectionCallback::GetConnectionCallback(const Ice::CommunicatorPtr& communicator, PyObject* response, PyObject* ex, const string& op) : _communicator(communicator), _response(response), _ex(ex), _op(op) @@ -4030,6 +4361,92 @@ IcePy::GetConnectionCallback::exception(const Ice::Exception& ex) callException(_ex, ex); } +IcePy::GetConnectionAsyncCallback::GetConnectionAsyncCallback(const Ice::CommunicatorPtr& communicator, + const string& op) : + _communicator(communicator), _op(op), _future(0), _exception(0) +{ +} + +IcePy::GetConnectionAsyncCallback::~GetConnectionAsyncCallback() +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + Py_XDECREF(_future); + Py_XDECREF(_exception); +} + +void +IcePy::GetConnectionAsyncCallback::setFuture(PyObject* future) +{ + // + // Called with the GIL locked. + // + + // + // Check if any callbacks have been invoked already. + // + if(_connection) + { + PyObjectHandle pyConn = createConnection(_connection, _communicator); + assert(pyConn.get()); + PyObjectHandle tmp = callMethod(future, "set_result", pyConn.get()); + PyErr_Clear(); + } + else if(_exception) + { + PyObjectHandle tmp = callMethod(future, "set_exception", _exception); + PyErr_Clear(); + } + else + { + _future = incRef(future); + } +} + +void +IcePy::GetConnectionAsyncCallback::response(const Ice::ConnectionPtr& conn) +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + if(!_future) + { + // + // The future hasn't been set yet, which means the request is still being invoked. Save the results for later. + // + _connection = conn; + return; + } + + PyObjectHandle pyConn = createConnection(conn, _communicator); + PyObjectHandle tmp = callMethod(_future, "set_result", pyConn.get()); + PyErr_Clear(); + + Py_DECREF(_future); // Break cyclic dependency. + _future = 0; +} + +void +IcePy::GetConnectionAsyncCallback::exception(const Ice::Exception& ex) +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + if(!_future) + { + // + // The future hasn't been set yet, which means the request is still being invoked. Save the results for later. + // + _exception = convertException(ex); + return; + } + + PyObjectHandle exh = convertException(ex); + PyObjectHandle tmp = callMethod(_future, "set_exception", exh.get()); + PyErr_Clear(); + + Py_DECREF(_future); // Break cyclic dependency. + _future = 0; +} + // // ServantWrapper implementation. // @@ -4119,7 +4536,7 @@ IcePy::TypedServantWrapper::ice_invoke_async(const Ice::AMD_Object_ice_invokePtr // if(!op->pseudoOp) { - __checkMode(op->mode, current.mode); + _iceCheckMode(op->mode, current.mode); } UpcallPtr up = new TypedUpcall(op, cb, current.adapter->getCommunicator()); @@ -4135,8 +4552,8 @@ IcePy::TypedServantWrapper::ice_invoke_async(const Ice::AMD_Object_ice_invokePtr // // BlobjectServantWrapper implementation. // -IcePy::BlobjectServantWrapper::BlobjectServantWrapper(PyObject* servant, bool amd) : - ServantWrapper(servant), _amd(amd) +IcePy::BlobjectServantWrapper::BlobjectServantWrapper(PyObject* servant) : + ServantWrapper(servant) { } @@ -4149,7 +4566,7 @@ IcePy::BlobjectServantWrapper::ice_invoke_async(const Ice::AMD_Object_ice_invoke try { - UpcallPtr up = new BlobjectUpcall(_amd, cb); + UpcallPtr up = new BlobjectUpcall(cb); up->dispatch(_servant, inParams, current); } catch(const Ice::Exception& ex) @@ -4167,12 +4584,59 @@ IcePy::createServantWrapper(PyObject* servant) PyObject* blobjectAsyncType = lookupType("Ice.BlobjectAsync"); if(PyObject_IsInstance(servant, blobjectType)) { - return new BlobjectServantWrapper(servant, false); + return new BlobjectServantWrapper(servant); } else if(PyObject_IsInstance(servant, blobjectAsyncType)) { - return new BlobjectServantWrapper(servant, true); + return new BlobjectServantWrapper(servant); } return new TypedServantWrapper(servant); } + +PyObject* +IcePy::createFuture() +{ + PyObject* futureType = lookupType("Ice.Future"); + assert(futureType); + PyObjectHandle args = PyTuple_New(0); + if(!args.get()) + { + return 0; + } + PyTypeObject* type = reinterpret_cast<PyTypeObject*>(futureType); + PyObject* future = type->tp_new(type, args.get(), 0); + if(!future) + { + return 0; + } + type->tp_init(future, args.get(), 0); // Call the constructor + return future; +} + +PyObject* +IcePy::createFuture(const string& operation, PyObject* asyncResult) +{ + if(!asyncResult) // Can be nil for batch invocations. + { + asyncResult = Py_None; + } + + PyObject* futureType = lookupType("Ice.InvocationFuture"); + assert(futureType); + PyObjectHandle args = PyTuple_New(2); + if(!args.get()) + { + return 0; + } + PyTuple_SET_ITEM(args.get(), 0, createString(operation)); + PyTuple_SET_ITEM(args.get(), 1, incRef(asyncResult)); + PyTypeObject* type = reinterpret_cast<PyTypeObject*>(futureType); + PyObject* future = type->tp_new(type, args.get(), 0); + if(!future) + { + return 0; + } + type->tp_init(future, args.get(), 0); // Call the constructor + return future; +} |