From bc2e4034079842cde62b6b44c4bdcd6a954f4f46 Mon Sep 17 00:00:00 2001 From: Benoit Foucher Date: Wed, 6 Jan 2010 12:43:48 +0100 Subject: Merged Mark's pythonami branch --- py/modules/IcePy/Operation.cpp | 1898 ++++++++++++++++++++++++++++++++-------- 1 file changed, 1542 insertions(+), 356 deletions(-) (limited to 'py/modules/IcePy/Operation.cpp') diff --git a/py/modules/IcePy/Operation.cpp b/py/modules/IcePy/Operation.cpp index a3c5a08697b..aa2ef440aac 100644 --- a/py/modules/IcePy/Operation.cpp +++ b/py/modules/IcePy/Operation.cpp @@ -89,7 +89,7 @@ public: Invocation(const Ice::ObjectPrx&); - virtual PyObject* invoke(PyObject*) = 0; + virtual PyObject* invoke(PyObject*, PyObject* = 0) = 0; protected: @@ -112,7 +112,9 @@ protected: OperationPtr _op; Ice::CommunicatorPtr _communicator; - bool prepareRequest(PyObject*, bool, vector&); + enum MappingType { SyncMapping, AsyncMapping, OldAsyncMapping }; + + bool prepareRequest(PyObject*, MappingType, vector&); PyObject* unmarshalResults(const pair&); PyObject* unmarshalException(const pair&); bool validateException(PyObject*) const; @@ -120,7 +122,7 @@ protected: }; // -// A synchronous typed invocation. +// Synchronous typed invocation. // class SyncTypedInvocation : virtual public TypedInvocation { @@ -128,45 +130,62 @@ public: SyncTypedInvocation(const Ice::ObjectPrx&, const OperationPtr&); - virtual PyObject* invoke(PyObject*); + virtual PyObject* invoke(PyObject*, PyObject* = 0); }; // -// An asynchronous typed invocation. +// Asynchronous typed invocation. // -class AsyncTypedInvocation : virtual public TypedInvocation, virtual public Ice::AMI_Array_Object_ice_invoke +class AsyncTypedInvocation : virtual public TypedInvocation { public: - AsyncTypedInvocation(const Ice::ObjectPrx&, const OperationPtr&); + AsyncTypedInvocation(const Ice::ObjectPrx&, PyObject*, const OperationPtr&); ~AsyncTypedInvocation(); - virtual PyObject* invoke(PyObject*); + virtual PyObject* invoke(PyObject*, PyObject* = 0); + PyObject* end(const Ice::ObjectPrx&, const OperationPtr&, const Ice::AsyncResultPtr&); + + string opName() const; - virtual void ice_response(bool, const pair&); - virtual void ice_exception(const Ice::Exception&); + void response(bool, const pair&); + void exception(const Ice::Exception&); + void sent(bool); protected: - void handleException(PyObject*); + void checkAsyncTwowayOnly(const Ice::ObjectPrx&) const; - PyObject* _callback; + PyObject* _pyProxy; + PyObject* _response; + PyObject* _ex; + PyObject* _sent; }; +typedef IceUtil::Handle AsyncTypedInvocationPtr; // -// An asynchronous typed invocation with support for ice_sent. +// Old-style asynchronous typed invocation. // -class AsyncSentTypedInvocation : virtual public AsyncTypedInvocation, virtual public Ice::AMISentCallback +class OldAsyncTypedInvocation : virtual public TypedInvocation { public: - AsyncSentTypedInvocation(const Ice::ObjectPrx&, const OperationPtr&); + OldAsyncTypedInvocation(const Ice::ObjectPrx&, const OperationPtr&); + ~OldAsyncTypedInvocation(); + + virtual PyObject* invoke(PyObject*, PyObject* = 0); - virtual void ice_sent(); + void response(bool, const pair&); + void exception(const Ice::Exception&); + void sent(bool); + +protected: + + PyObject* _callback; }; // -// A synchronous blobject invocation. +// Synchronous blobject invocation. // class SyncBlobjectInvocation : virtual public Invocation { @@ -174,42 +193,56 @@ public: SyncBlobjectInvocation(const Ice::ObjectPrx&); - virtual PyObject* invoke(PyObject*); + virtual PyObject* invoke(PyObject*, PyObject* = 0); }; // -// An asynchronous blobject invocation. +// Asynchronous blobject invocation. // -class AsyncBlobjectInvocation : virtual public Invocation, virtual public Ice::AMI_Array_Object_ice_invoke +class AsyncBlobjectInvocation : virtual public Invocation { public: - AsyncBlobjectInvocation(const Ice::ObjectPrx&); + AsyncBlobjectInvocation(const Ice::ObjectPrx&, PyObject*); ~AsyncBlobjectInvocation(); - virtual PyObject* invoke(PyObject*); + virtual PyObject* invoke(PyObject*, PyObject* = 0); + PyObject* end(const Ice::ObjectPrx&, const Ice::AsyncResultPtr&); - virtual void ice_response(bool, const pair&); - virtual void ice_exception(const Ice::Exception&); + void response(bool, const pair&); + void exception(const Ice::Exception&); + void sent(bool); protected: + PyObject* _pyProxy; string _op; - PyObject* _callback; - - void handleException(PyObject*); + PyObject* _response; + PyObject* _ex; + PyObject* _sent; }; +typedef IceUtil::Handle AsyncBlobjectInvocationPtr; // -// An asynchronous blobject invocation with support for ice_sent. +// Old-style asynchronous blobject invocation. // -class AsyncSentBlobjectInvocation : virtual public AsyncBlobjectInvocation, virtual public Ice::AMISentCallback +class OldAsyncBlobjectInvocation : virtual public Invocation { public: - AsyncSentBlobjectInvocation(const Ice::ObjectPrx&); + OldAsyncBlobjectInvocation(const Ice::ObjectPrx&); + ~OldAsyncBlobjectInvocation(); + + virtual PyObject* invoke(PyObject*, PyObject* = 0); + + void response(bool, const pair&); + void exception(const Ice::Exception&); + void sent(bool); - virtual void ice_sent(); +protected: + + string _op; + PyObject* _callback; }; // @@ -318,12 +351,25 @@ struct AMDCallbackObject UpcallPtr* upcall; }; +struct AsyncResultObject +{ + PyObject_HEAD + Ice::AsyncResultPtr* result; + InvocationPtr* invocation; + PyObject* proxy; + PyObject* connection; + PyObject* communicator; +}; + extern PyTypeObject OperationType; extern PyTypeObject AMDCallbackType; } -static OperationPtr +namespace +{ + +OperationPtr getOperation(PyObject* p) { assert(PyObject_IsInstance(p, reinterpret_cast(&OperationType)) == 1); @@ -331,6 +377,101 @@ getOperation(PyObject* p) return *obj->op; } +void +handleException() +{ + assert(PyErr_Occurred()); + + PyException ex; // Retrieve it before another Python API call clears it. + + // + // A callback that calls sys.exit() will raise the SystemExit exception. + // This is normally caught by the interpreter, causing it to exit. + // However, we have no way to pass this exception to the interpreter, + // so we act on it directly. + // + ex.checkSystemExit(); + + ex.raise(); +} + +void +callException(PyObject* method, PyObject* ex) +{ + PyObjectHandle args = Py_BuildValue(STRCAST("(O)"), ex); + PyObjectHandle tmp = PyObject_Call(method, args.get(), 0); + if(PyErr_Occurred()) + { + handleException(); // Callback raised an exception. + } +} + +void +callException(PyObject* method, const Ice::Exception& ex) +{ + PyObjectHandle exh = convertException(ex); + assert(exh.get()); + callException(method, exh.get()); +} + +void +callException(PyObject* callback, const string& op, const string& method, PyObject* ex) +{ + if(!PyObject_HasAttrString(callback, STRCAST(method.c_str()))) + { + ostringstream ostr; + ostr << "AMI callback object for operation `" << op << "' does not define " << method << "()"; + string str = ostr.str(); + PyErr_Warn(PyExc_RuntimeWarning, const_cast(str.c_str())); + } + else + { + PyObjectHandle m = PyObject_GetAttrString(callback, STRCAST(method.c_str())); + assert(m.get()); + callException(m.get(), ex); + } +} + +void +callException(PyObject* callback, const string& op, const string& method, const Ice::Exception& ex) +{ + PyObjectHandle exh = convertException(ex); + assert(exh.get()); + callException(callback, op, method, exh.get()); +} + +void +callSent(PyObject* method, bool sentSynchronously, bool passArg) +{ + PyObjectHandle args; + if(passArg) + { + args = Py_BuildValue(STRCAST("(O)"), sentSynchronously ? getTrue() : getFalse()); + } + else + { + args = PyTuple_New(0); + } + PyObjectHandle tmp = PyObject_Call(method, args.get(), 0); + if(PyErr_Occurred()) + { + handleException(); // Callback raised an exception. + } +} + +void +callSent(PyObject* callback, const string& method, bool sentSynchronously, bool passArg) +{ + if(PyObject_HasAttrString(callback, STRCAST(method.c_str()))) + { + PyObjectHandle m = PyObject_GetAttrString(callback, STRCAST(method.c_str())); + assert(m.get()); + callSent(m.get(), sentSynchronously, passArg); + } +} + +} + #ifdef WIN32 extern "C" #endif @@ -422,22 +563,52 @@ operationInvokeAsync(OperationObject* self, PyObject* args) Ice::ObjectPrx prx = getProxy(pyProxy); assert(self->op); - // - // If the callback implements an ice_sent method, we create a wrapper that derives - // from AMISentCallback. - // - assert(PyTuple_GET_SIZE(opArgs) > 0); - PyObject* callback = PyTuple_GET_ITEM(opArgs, 0); - if(PyObject_HasAttrString(callback, STRCAST("ice_sent"))) + InvocationPtr i = new OldAsyncTypedInvocation(prx, *self->op); + return i->invoke(opArgs); +} + +#ifdef WIN32 +extern "C" +#endif +static PyObject* +operationBegin(OperationObject* self, PyObject* args) +{ + PyObject* proxy; + PyObject* opArgs; + if(!PyArg_ParseTuple(args, STRCAST("O!O!"), &ProxyType, &proxy, &PyTuple_Type, &opArgs)) { - InvocationPtr i = new AsyncSentTypedInvocation(prx, *self->op); - return i->invoke(opArgs); + return 0; } - else + + Ice::ObjectPrx p = getProxy(proxy); + InvocationPtr i = new AsyncTypedInvocation(p, proxy, *self->op); + return i->invoke(opArgs); +} + +#ifdef WIN32 +extern "C" +#endif +static PyObject* +operationEnd(OperationObject* self, PyObject* args) +{ + PyObject* proxy; + PyObject* result; + if(!PyArg_ParseTuple(args, STRCAST("O!O!"), &ProxyType, &proxy, &AsyncResultType, &result)) { - InvocationPtr i = new AsyncTypedInvocation(prx, *self->op); - return i->invoke(opArgs); + return 0; + } + + AsyncResultObject* ar = reinterpret_cast(result); + assert(ar); + AsyncTypedInvocationPtr i = AsyncTypedInvocationPtr::dynamicCast(*ar->invocation); + if(!i) + { + PyErr_Format(PyExc_ValueError, STRCAST("invalid AsyncResult object passed to end_%s"), + (*self->op)->name.c_str()); + return 0; } + Ice::ObjectPrx p = getProxy(proxy); + return i->end(p, *self->op, *ar->result); } #ifdef WIN32 @@ -459,6 +630,10 @@ operationDeprecate(OperationObject* self, PyObject* args) return Py_None; } +// +// AMDCallback operations +// + #ifdef WIN32 extern "C" #endif @@ -538,113 +713,340 @@ amdCallbackIceException(AMDCallbackObject* self, PyObject* args) } // -// ParamInfo implementation. +// AsyncResult operations // -void -IcePy::ParamInfo::unmarshaled(PyObject* val, PyObject* target, void* closure) + +#ifdef WIN32 +extern "C" +#endif +static AsyncResultObject* +asyncResultNew(PyObject* /*arg*/) { - assert(PyTuple_Check(target)); - long i = reinterpret_cast(closure); - PyTuple_SET_ITEM(target, i, val); - Py_INCREF(val); // PyTuple_SET_ITEM steals a reference. + AsyncResultObject* self = PyObject_New(AsyncResultObject, &AsyncResultType); + if(!self) + { + return 0; + } + self->result = 0; + self->invocation = 0; + self->proxy = 0; + self->connection = 0; + self->communicator = 0; + return self; } -// -// Operation implementation. -// -IcePy::Operation::Operation(const char* n, PyObject* m, PyObject* sm, int amdFlag, PyObject* meta, - PyObject* in, PyObject* out, PyObject* ret, PyObject* ex) +#ifdef WIN32 +extern "C" +#endif +static void +asyncResultDealloc(AsyncResultObject* self) { - name = n; - - // - // mode - // - PyObjectHandle modeValue = PyObject_GetAttrString(m, STRCAST("value")); - assert(PyInt_Check(modeValue.get())); - mode = (Ice::OperationMode)static_cast(PyInt_AS_LONG(modeValue.get())); - - // - // sendMode - // - PyObjectHandle sendModeValue = PyObject_GetAttrString(sm, STRCAST("value")); - assert(PyInt_Check(sendModeValue.get())); - sendMode = (Ice::OperationMode)static_cast(PyInt_AS_LONG(sendModeValue.get())); + delete self->result; + delete self->invocation; + Py_XDECREF(self->proxy); + Py_XDECREF(self->connection); + Py_XDECREF(self->communicator); + PyObject_Del(self); +} - // - // amd - // - amd = amdFlag ? true : false; - if(amd) +#ifdef WIN32 +extern "C" +#endif +static PyObject* +asyncResultGetCommunicator(AsyncResultObject* self) +{ + if(self->communicator) { - dispatchName = fixIdent(name) + "_async"; + Py_INCREF(self->communicator); + return self->communicator; } - else + + Py_INCREF(Py_None); + return Py_None; +} + +#ifdef WIN32 +extern "C" +#endif +static PyObject* +asyncResultGetConnection(AsyncResultObject* self) +{ + if(self->connection) { - dispatchName = fixIdent(name); + Py_INCREF(self->connection); + return self->connection; } - // - // metaData - // -#ifndef NDEBUG - bool b = -#endif - tupleToStringSeq(meta, metaData); - assert(b); + Py_INCREF(Py_None); + return Py_None; +} - Py_ssize_t i, sz; +#ifdef WIN32 +extern "C" +#endif +static PyObject* +asyncResultGetProxy(AsyncResultObject* self) +{ + if(self->proxy) + { + Py_INCREF(self->proxy); + return self->proxy; + } - // - // inParams - // - convertParams(in, inParams, sendsClasses); + Py_INCREF(Py_None); + return Py_None; +} - // - // outParams - // - convertParams(out, outParams, returnsClasses); +#ifdef WIN32 +extern "C" +#endif +static PyObject* +asyncResultIsCompleted(AsyncResultObject* self) +{ + bool b = false; - // - // returnType - // - if(ret != Py_None) + try { - returnType = new ParamInfo; - returnType->type = getType(ret); - if(!returnsClasses) - { - returnsClasses = returnType->type->usesClasses(); - } + assert(self->result); + b = (*self->result)->isCompleted(); } - - // - // exceptions - // - sz = PyTuple_GET_SIZE(ex); - for(i = 0; i < sz; ++i) + catch(...) { - exceptions.push_back(getException(PyTuple_GET_ITEM(ex, i))); + assert(false); } + + PyRETURN_BOOL(b); } -void -IcePy::Operation::deprecate(const string& msg) +#ifdef WIN32 +extern "C" +#endif +static PyObject* +asyncResultWaitForCompleted(AsyncResultObject* self) { - if(!msg.empty()) + AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. + try { - _deprecateMessage = msg; + assert(self->result); + (*self->result)->waitForCompleted(); } - else + catch(...) { - _deprecateMessage = "operation " + name + " is deprecated"; + assert(false); } + + Py_INCREF(Py_None); + return Py_None; } -void -IcePy::Operation::convertParams(PyObject* p, ParamInfoList& params, bool& usesClasses) +#ifdef WIN32 +extern "C" +#endif +static PyObject* +asyncResultIsSent(AsyncResultObject* self) { - usesClasses = false; + bool b = false; + + try + { + assert(self->result); + b = (*self->result)->isSent(); + } + catch(...) + { + assert(false); + } + + PyRETURN_BOOL(b); +} + +#ifdef WIN32 +extern "C" +#endif +static PyObject* +asyncResultWaitForSent(AsyncResultObject* self) +{ + AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. + try + { + assert(self->result); + (*self->result)->waitForSent(); + } + catch(...) + { + assert(false); + } + + Py_INCREF(Py_None); + return Py_None; +} + +#ifdef WIN32 +extern "C" +#endif +static PyObject* +asyncResultSentSynchronously(AsyncResultObject* self) +{ + bool b = false; + + try + { + assert(self->result); + b = (*self->result)->sentSynchronously(); + } + catch(...) + { + assert(false); + } + + PyRETURN_BOOL(b); +} + +#ifdef WIN32 +extern "C" +#endif +static PyObject* +asyncResultGetOperation(AsyncResultObject* self) +{ + string op; + + try + { + // + // Since the extension uses the Blobject API, calling (*self->result)->getOperation() + // always returns "ice_invoke" as the operation name. If the caller used a regular + // (typed) proxy method, we obtain the actual operation name from the invocation. + // + if(self->invocation) + { + AsyncTypedInvocationPtr i = AsyncTypedInvocationPtr::dynamicCast(*self->invocation); + if(i) + { + op = i->opName(); + } + } + if(op.empty()) + { + assert(self->result); + op = (*self->result)->getOperation(); + } + } + catch(...) + { + assert(false); + } + + return createString(op); +} + +// +// ParamInfo implementation. +// +void +IcePy::ParamInfo::unmarshaled(PyObject* val, PyObject* target, void* closure) +{ + assert(PyTuple_Check(target)); + long i = reinterpret_cast(closure); + PyTuple_SET_ITEM(target, i, val); + Py_INCREF(val); // PyTuple_SET_ITEM steals a reference. +} + +// +// Operation implementation. +// +IcePy::Operation::Operation(const char* n, PyObject* m, PyObject* sm, int amdFlag, PyObject* meta, + PyObject* in, PyObject* out, PyObject* ret, PyObject* ex) +{ + name = n; + + // + // mode + // + PyObjectHandle modeValue = PyObject_GetAttrString(m, STRCAST("value")); + assert(PyInt_Check(modeValue.get())); + mode = (Ice::OperationMode)static_cast(PyInt_AS_LONG(modeValue.get())); + + // + // sendMode + // + PyObjectHandle sendModeValue = PyObject_GetAttrString(sm, STRCAST("value")); + assert(PyInt_Check(sendModeValue.get())); + sendMode = (Ice::OperationMode)static_cast(PyInt_AS_LONG(sendModeValue.get())); + + // + // amd + // + amd = amdFlag ? true : false; + if(amd) + { + dispatchName = fixIdent(name) + "_async"; + } + else + { + dispatchName = fixIdent(name); + } + + // + // metaData + // +#ifndef NDEBUG + bool b = +#endif + tupleToStringSeq(meta, metaData); + assert(b); + + Py_ssize_t i, sz; + + // + // inParams + // + convertParams(in, inParams, sendsClasses); + + // + // outParams + // + convertParams(out, outParams, returnsClasses); + + // + // returnType + // + if(ret != Py_None) + { + returnType = new ParamInfo; + returnType->type = getType(ret); + if(!returnsClasses) + { + returnsClasses = returnType->type->usesClasses(); + } + } + + // + // exceptions + // + sz = PyTuple_GET_SIZE(ex); + for(i = 0; i < sz; ++i) + { + exceptions.push_back(getException(PyTuple_GET_ITEM(ex, i))); + } +} + +void +IcePy::Operation::deprecate(const string& msg) +{ + if(!msg.empty()) + { + _deprecateMessage = msg; + } + else + { + _deprecateMessage = "operation " + name + " is deprecated"; + } +} + +void +IcePy::Operation::convertParams(PyObject* p, ParamInfoList& params, bool& usesClasses) +{ + usesClasses = false; int sz = static_cast(PyTuple_GET_SIZE(p)); for(int i = 0; i < sz; ++i) { @@ -683,6 +1085,10 @@ static PyMethodDef OperationMethods[] = PyDoc_STR(STRCAST("internal function")) }, { STRCAST("invokeAsync"), reinterpret_cast(operationInvokeAsync), METH_VARARGS, PyDoc_STR(STRCAST("internal function")) }, + { STRCAST("begin"), reinterpret_cast(operationBegin), METH_VARARGS, + PyDoc_STR(STRCAST("internal function")) }, + { STRCAST("end"), reinterpret_cast(operationEnd), METH_VARARGS, + PyDoc_STR(STRCAST("internal function")) }, { STRCAST("deprecate"), reinterpret_cast(operationDeprecate), METH_VARARGS, PyDoc_STR(STRCAST("internal function")) }, { 0, 0 } /* sentinel */ @@ -697,6 +1103,29 @@ static PyMethodDef AMDCallbackMethods[] = { 0, 0 } /* sentinel */ }; +static PyMethodDef AsyncResultMethods[] = +{ + { STRCAST("getCommunicator"), reinterpret_cast(asyncResultGetCommunicator), METH_NOARGS, + PyDoc_STR(STRCAST("returns the communicator for the invocation")) }, + { STRCAST("getConnection"), reinterpret_cast(asyncResultGetConnection), METH_NOARGS, + PyDoc_STR(STRCAST("returns the connection for the invocation")) }, + { STRCAST("getProxy"), reinterpret_cast(asyncResultGetProxy), METH_NOARGS, + PyDoc_STR(STRCAST("returns the proxy for the invocation")) }, + { STRCAST("isCompleted"), reinterpret_cast(asyncResultIsCompleted), METH_NOARGS, + PyDoc_STR(STRCAST("returns true if the request is complete")) }, + { STRCAST("waitForCompleted"), reinterpret_cast(asyncResultWaitForCompleted), METH_NOARGS, + PyDoc_STR(STRCAST("blocks until the request is complete")) }, + { STRCAST("isSent"), reinterpret_cast(asyncResultIsSent), METH_NOARGS, + PyDoc_STR(STRCAST("returns true if the request is sent")) }, + { STRCAST("waitForSent"), reinterpret_cast(asyncResultWaitForSent), METH_NOARGS, + PyDoc_STR(STRCAST("blocks until the request is sent")) }, + { STRCAST("sentSynchronously"), reinterpret_cast(asyncResultSentSynchronously), METH_NOARGS, + PyDoc_STR(STRCAST("returns true if the request was sent synchronously")) }, + { STRCAST("getOperation"), reinterpret_cast(asyncResultGetOperation), METH_NOARGS, + PyDoc_STR(STRCAST("returns the name of the operation")) }, + { 0, 0 } /* sentinel */ +}; + namespace IcePy { @@ -796,6 +1225,54 @@ PyTypeObject AMDCallbackType = 0, /* tp_is_gc */ }; +PyTypeObject AsyncResultType = +{ + /* The ob_type field must be initialized in the module init function + * to be portable to Windows without using C++. */ + PyObject_HEAD_INIT(0) + 0, /* ob_size */ + STRCAST("IcePy.AsyncResult"), /* tp_name */ + sizeof(AsyncResultObject), /* tp_basicsize */ + 0, /* tp_itemsize */ + /* methods */ + reinterpret_cast(asyncResultDealloc), /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + 0, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + AsyncResultMethods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + reinterpret_cast(asyncResultNew), /* tp_new */ + 0, /* tp_free */ + 0, /* tp_is_gc */ +}; + } bool @@ -821,6 +1298,16 @@ IcePy::initOperation(PyObject* module) return false; } + if(PyType_Ready(&AsyncResultType) < 0) + { + return false; + } + PyTypeObject* arType = &AsyncResultType; // Necessary to prevent GCC's strict-alias warnings. + if(PyModule_AddObject(module, STRCAST("AsyncResult"), reinterpret_cast(arType)) < 0) + { + return false; + } + return true; } @@ -841,7 +1328,7 @@ IcePy::TypedInvocation::TypedInvocation(const Ice::ObjectPrx& prx, const Operati } bool -IcePy::TypedInvocation::prepareRequest(PyObject* args, bool async, vector& bytes) +IcePy::TypedInvocation::prepareRequest(PyObject* args, MappingType mapping, vector& bytes) { assert(PyTuple_Check(args)); @@ -852,12 +1339,20 @@ IcePy::TypedInvocation::prepareRequest(PyObject* args, bool async, vector(_op->inParams.size()); if(argc != paramCount) { - string fixedName = fixIdent(_op->name); - if(async) + string opName; + if(mapping == OldAsyncMapping) + { + opName = _op->name + "_async"; + } + else if(mapping == AsyncMapping) { - fixedName += "_async"; + opName = "begin_" + _op->name; } - PyErr_Format(PyExc_RuntimeError, STRCAST("%s expects %d in parameters"), fixedName.c_str(), + else + { + opName = fixIdent(_op->name); + } + PyErr_Format(PyExc_RuntimeError, STRCAST("%s expects %d in parameters"), opName.c_str(), static_cast(paramCount)); return false; } @@ -879,16 +1374,20 @@ IcePy::TypedInvocation::prepareRequest(PyObject* args, bool async, vectortype->validate(arg)) { string opName; - if(async) + if(mapping == OldAsyncMapping) + { + opName = _op->name + "_async"; + } + else if(mapping == AsyncMapping) { - opName = fixIdent(_op->name) + "_async"; + opName = "begin_" + _op->name; } else { opName = fixIdent(_op->name); } PyErr_Format(PyExc_ValueError, STRCAST("invalid value for argument %d in operation `%s'"), - async ? i + 2 : i + 1, const_cast(opName.c_str())); + mapping == OldAsyncMapping ? i + 2 : i + 1, const_cast(opName.c_str())); return false; } (*p)->type->marshal(arg, os, &objectMap, &(*p)->metaData); @@ -1041,7 +1540,7 @@ IcePy::TypedInvocation::validateException(PyObject* ex) const void IcePy::TypedInvocation::checkTwowayOnly(const Ice::ObjectPrx& proxy) const { - if((_op->returnType != 0 || !_op->outParams.empty()) && !proxy->ice_isTwoway()) + if((_op->returnType != 0 || !_op->outParams.empty() || !_op->exceptions.empty()) && !proxy->ice_isTwoway()) { Ice::TwowayOnlyException ex(__FILE__, __LINE__); ex.operation = _op->name; @@ -1058,7 +1557,7 @@ IcePy::SyncTypedInvocation::SyncTypedInvocation(const Ice::ObjectPrx& prx, const } PyObject* -IcePy::SyncTypedInvocation::invoke(PyObject* args) +IcePy::SyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) { assert(PyTuple_Check(args)); assert(PyTuple_GET_SIZE(args) == 2); // Format is ((params...), context|None) @@ -1070,7 +1569,7 @@ IcePy::SyncTypedInvocation::invoke(PyObject* args) // Marshal the input parameters to a byte sequence. // Ice::ByteSeq params; - if(!prepareRequest(pyparams, false, params)) + if(!prepareRequest(pyparams, SyncMapping, params)) { return 0; } @@ -1190,49 +1689,398 @@ IcePy::SyncTypedInvocation::invoke(PyObject* args) // // AsyncTypedInvocation // -IcePy::AsyncTypedInvocation::AsyncTypedInvocation(const Ice::ObjectPrx& prx, const OperationPtr& op) - : Invocation(prx), TypedInvocation(prx, op), _callback(0) +IcePy::AsyncTypedInvocation::AsyncTypedInvocation(const Ice::ObjectPrx& prx, PyObject* pyProxy, + const OperationPtr& op) : + Invocation(prx), TypedInvocation(prx, op), _pyProxy(pyProxy), _response(0), _ex(0), _sent(0) { + Py_INCREF(_pyProxy); } IcePy::AsyncTypedInvocation::~AsyncTypedInvocation() { AdoptThread adoptThread; // Ensure the current thread is able to call into Python. - Py_XDECREF(_callback); + Py_DECREF(_pyProxy); + Py_XDECREF(_response); + Py_XDECREF(_ex); + Py_XDECREF(_sent); } PyObject* -IcePy::AsyncTypedInvocation::invoke(PyObject* args) +IcePy::AsyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) { assert(PyTuple_Check(args)); - assert(PyTuple_GET_SIZE(args) == 3); // Format is (callback, (params...), context|None) - _callback = PyTuple_GET_ITEM(args, 0); - Py_INCREF(_callback); - PyObject* pyparams = PyTuple_GET_ITEM(args, 1); + assert(PyTuple_GET_SIZE(args) == 5); // Format is ((params...), response|None, exception|None, sent|None, ctx|None) + PyObject* pyparams = PyTuple_GET_ITEM(args, 0); assert(PyTuple_Check(pyparams)); - PyObject* pyctx = PyTuple_GET_ITEM(args, 2); - // - // Marshal the input parameters to a byte sequence. - // - Ice::ByteSeq params; - if(!prepareRequest(pyparams, true, params)) + PyObject* method; + + method = PyTuple_GET_ITEM(args, 1); + if(PyMethod_Check(method)) + { + _response = method; + Py_INCREF(_response); + } + else if(method != Py_None) { + PyErr_Format(PyExc_RuntimeError, STRCAST("response callback must be a method or None")); return 0; } - bool result = false; - try + method = PyTuple_GET_ITEM(args, 2); + if(PyMethod_Check(method)) { - checkTwowayOnly(_prx); - pair pparams(0, 0); - if(!params.empty()) + _ex = method; + Py_INCREF(_ex); + } + else if(method != Py_None) + { + PyErr_Format(PyExc_RuntimeError, STRCAST("exception callback must be a method or None")); + return 0; + } + + method = PyTuple_GET_ITEM(args, 3); + if(PyMethod_Check(method)) + { + _sent = method; + Py_INCREF(_sent); + } + else if(method != Py_None) + { + PyErr_Format(PyExc_RuntimeError, STRCAST("sent callback must be a method or None")); + return 0; + } + + if(!_ex && (_response || _sent)) + { + PyErr_Format(PyExc_RuntimeError, + STRCAST("exception callback must also be provided when response or sent callbacks are used")); + return 0; + } + + PyObject* pyctx = PyTuple_GET_ITEM(args, 4); + if(pyctx != Py_None && !PyDict_Check(pyctx)) + { + PyErr_Format(PyExc_RuntimeError, STRCAST("context must be a dictionary or None")); + return 0; + } + + // + // Marshal the input parameters to a byte sequence. + // + Ice::ByteSeq params; + if(!prepareRequest(pyparams, AsyncMapping, params)) + { + return 0; + } + + Ice::AsyncResultPtr result; + try + { + checkAsyncTwowayOnly(_prx); + pair pparams(0, 0); + if(!params.empty()) + { + pparams.first = ¶ms[0]; + pparams.second = ¶ms[0] + params.size(); + } + + Ice::Callback_Object_ice_invokePtr cb; + if(_response || _ex || _sent) + { + cb = Ice::newCallback_Object_ice_invoke(this, &AsyncTypedInvocation::response, + &AsyncTypedInvocation::exception, &AsyncTypedInvocation::sent); + } + + // + // Invoke the operation asynchronously. + // + if(pyctx != Py_None) + { + Ice::Context ctx; + if(!dictionaryToContext(pyctx, ctx)) + { + return 0; + } + + AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. + if(cb) + { + result = _prx->begin_ice_invoke(_op->name, _op->sendMode, pparams, ctx, cb); + } + else + { + result = _prx->begin_ice_invoke(_op->name, _op->sendMode, pparams, ctx); + } + } + else + { + AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. + if(cb) + { + result = _prx->begin_ice_invoke(_op->name, _op->sendMode, pparams, cb); + } + else + { + result = _prx->begin_ice_invoke(_op->name, _op->sendMode, pparams); + } + } + } + catch(const Ice::CommunicatorDestroyedException& ex) + { + // + // CommunicatorDestroyedException can propagate directly. + // + setPythonException(ex); + return 0; + } + catch(const IceUtil::IllegalArgumentException& ex) + { + // + // IllegalArgumentException can propagate directly. + // (Raised by checkAsyncTwowayOnly) + // + PyErr_Format(PyExc_RuntimeError, STRCAST(ex.reason().c_str())); + return 0; + } + catch(const Ice::Exception&) + { + // + // No other exceptions should be raised by begin_ice_invoke. + // + assert(false); + } + + assert(result); + AsyncResultObject* obj = asyncResultNew(0); + if(!obj) + { + return 0; + } + obj->result = new Ice::AsyncResultPtr(result); + obj->invocation = new InvocationPtr(this); + obj->proxy = _pyProxy; + Py_INCREF(obj->proxy); + return reinterpret_cast(obj); +} + +PyObject* +IcePy::AsyncTypedInvocation::end(const Ice::ObjectPrx& proxy, const OperationPtr& op, const Ice::AsyncResultPtr& r) +{ + try + { + if(op.get() != _op.get()) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_" + op->name + + " called with AsyncResult object from begin_" + _op->name); + } + + pair results; + bool ok; + + { + AllowThreads allowThreads; // Release Python's global interpreter lock during blocking operations. + ok = proxy->___end_ice_invoke(results, r); + } + + if(ok) + { + // + // Unmarshal the results. + // + PyObjectHandle args = unmarshalResults(results); + if(args.get()) + { + // + // If there are no results, return None. If there's only one element + // in the tuple, return the element. Otherwise, return the tuple. + // + assert(PyTuple_Check(args.get())); + if(PyTuple_GET_SIZE(args.get()) == 0) + { + Py_INCREF(Py_None); + return Py_None; + } + else if(PyTuple_GET_SIZE(args.get()) == 1) + { + PyObject* res = PyTuple_GET_ITEM(args.get(), 0); + Py_INCREF(res); + return res; + } + else + { + return args.release(); + } + } + } + else + { + PyObjectHandle ex = unmarshalException(results); + setPythonException(ex.get()); + } + } + catch(const AbortMarshaling&) + { + // Nothing to do. + } + catch(const IceUtil::IllegalArgumentException& ex) + { + PyErr_Format(PyExc_RuntimeError, STRCAST(ex.reason().c_str())); + } + catch(const Ice::Exception& ex) + { + setPythonException(ex); + } + + assert(PyErr_Occurred()); + return 0; +} + +string +IcePy::AsyncTypedInvocation::opName() const +{ + return _op->name; +} + +void +IcePy::AsyncTypedInvocation::response(bool ok, const pair& results) +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + try + { + if(ok) + { + if(_response) + { + // + // Unmarshal the results. + // + PyObjectHandle args; + try + { + args = unmarshalResults(results); + if(!args.get()) + { + assert(PyErr_Occurred()); + PyErr_Print(); + return; + } + } + catch(const Ice::Exception& ex) + { + assert(_ex); + callException(_ex, ex); + return; + } + + PyObjectHandle tmp = PyObject_Call(_response, args.get(), 0); + if(PyErr_Occurred()) + { + handleException(); // Callback raised an exception. + } + } + } + else + { + assert(_ex); + PyObjectHandle ex = unmarshalException(results); + callException(_ex, ex.get()); + } + } + catch(const AbortMarshaling&) + { + assert(PyErr_Occurred()); + PyErr_Print(); + } +} + +void +IcePy::AsyncTypedInvocation::exception(const Ice::Exception& ex) +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + assert(_ex); + callException(_ex, ex); +} + +void +IcePy::AsyncTypedInvocation::sent(bool sentSynchronously) +{ + if(_sent) + { + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + callSent(_sent, sentSynchronously, true); + } +} + +void +IcePy::AsyncTypedInvocation::checkAsyncTwowayOnly(const Ice::ObjectPrx& proxy) const +{ + if((_op->returnType != 0 || !_op->outParams.empty() || !_op->exceptions.empty()) && !proxy->ice_isTwoway()) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, + "`" + _op->name + "' can only be called with a twoway proxy"); + } + + if((_op->returnType != 0 || !_op->outParams.empty()) && (!_response && (_ex || _sent))) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "`" + _op->name + "' requires a response callback"); + } +} + +// +// OldAsyncTypedInvocation +// +IcePy::OldAsyncTypedInvocation::OldAsyncTypedInvocation(const Ice::ObjectPrx& prx, const OperationPtr& op) + : Invocation(prx), TypedInvocation(prx, op), _callback(0) +{ +} + +IcePy::OldAsyncTypedInvocation::~OldAsyncTypedInvocation() +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + Py_XDECREF(_callback); +} + +PyObject* +IcePy::OldAsyncTypedInvocation::invoke(PyObject* args, PyObject* /* kwds */) +{ + assert(PyTuple_Check(args)); + assert(PyTuple_GET_SIZE(args) == 3); // Format is (callback, (params...), context|None) + _callback = PyTuple_GET_ITEM(args, 0); + Py_INCREF(_callback); + PyObject* pyparams = PyTuple_GET_ITEM(args, 1); + assert(PyTuple_Check(pyparams)); + PyObject* pyctx = PyTuple_GET_ITEM(args, 2); + + // + // Marshal the input parameters to a byte sequence. + // + Ice::ByteSeq params; + if(!prepareRequest(pyparams, OldAsyncMapping, params)) + { + return 0; + } + + bool sentSynchronously = false; + try + { + checkTwowayOnly(_prx); + pair pparams(0, 0); + if(!params.empty()) { pparams.first = ¶ms[0]; pparams.second = ¶ms[0] + params.size(); } + Ice::Callback_Object_ice_invokePtr cb = + Ice::newCallback_Object_ice_invoke(this, &OldAsyncTypedInvocation::response, + &OldAsyncTypedInvocation::exception, &OldAsyncTypedInvocation::sent); + + Ice::AsyncResultPtr result; + // // Invoke the operation asynchronously. // @@ -1252,37 +2100,49 @@ IcePy::AsyncTypedInvocation::invoke(PyObject* args) } AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = _prx->ice_invoke_async(this, _op->name, _op->sendMode, pparams, ctx); + result = _prx->begin_ice_invoke(_op->name, _op->sendMode, pparams, ctx, cb); } else { AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = _prx->ice_invoke_async(this, _op->name, _op->sendMode, pparams); + result = _prx->begin_ice_invoke(_op->name, _op->sendMode, pparams, cb); } + + sentSynchronously = result->sentSynchronously(); } catch(const Ice::CommunicatorDestroyedException& ex) { // - // CommunicatorDestroyedException is the only exception that can propagate directly. + // CommunicatorDestroyedException can propagate directly. // setPythonException(ex); return 0; } - catch(const Ice::Exception& ex) + catch(const Ice::TwowayOnlyException& ex) { - PyObjectHandle exh = convertException(ex); - assert(exh.get()); - handleException(exh.get()); + // + // Raised by checkTwowayOnly. + // + callException(_callback, _op->name, "ice_exception", ex); + } + catch(const Ice::Exception&) + { + // + // No other exceptions should be raised by begin_ice_invoke. + // + assert(false); } - PyRETURN_BOOL(result); + PyRETURN_BOOL(sentSynchronously); } void -IcePy::AsyncTypedInvocation::ice_response(bool ok, const pair& results) +IcePy::OldAsyncTypedInvocation::response(bool ok, const pair& results) { AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + assert(_callback); + try { if(ok) @@ -1303,32 +2163,34 @@ IcePy::AsyncTypedInvocation::ice_response(bool ok, const pairname, "ice_exception", ex); return; } - PyObjectHandle method = PyObject_GetAttrString(_callback, STRCAST("ice_response")); - if(!method.get()) + const string methodName = "ice_response"; + if(!PyObject_HasAttrString(_callback, STRCAST(methodName.c_str()))) { ostringstream ostr; - ostr << "AMI callback object for operation `" << _op->name << "' does not define ice_response()"; + ostr << "AMI callback object for operation `" << _op->name << "' does not define " << methodName + << "()"; string str = ostr.str(); PyErr_Warn(PyExc_RuntimeWarning, const_cast(str.c_str())); } else { + PyObjectHandle method = PyObject_GetAttrString(_callback, STRCAST(methodName.c_str())); + assert(method.get()); PyObjectHandle tmp = PyObject_Call(method.get(), args.get(), 0); if(PyErr_Occurred()) { - PyErr_Print(); + handleException(); // Callback raised an exception. } } } else { PyObjectHandle ex = unmarshalException(results); - handleException(ex.get()); + callException(_callback, _op->name, "ice_exception", ex.get()); } } catch(const AbortMarshaling&) @@ -1336,105 +2198,234 @@ IcePy::AsyncTypedInvocation::ice_response(bool ok, const pairname << "':" << ex; - string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast(str.c_str())); - } } void -IcePy::AsyncTypedInvocation::ice_exception(const Ice::Exception& ex) +IcePy::OldAsyncTypedInvocation::exception(const Ice::Exception& ex) { AdoptThread adoptThread; // Ensure the current thread is able to call into Python. - PyObjectHandle exh = convertException(ex); - assert(exh.get()); - - handleException(exh.get()); + callException(_callback, _op->name, "ice_exception", ex); } void -IcePy::AsyncTypedInvocation::handleException(PyObject* ex) +IcePy::OldAsyncTypedInvocation::sent(bool) { - PyObjectHandle method = PyObject_GetAttrString(_callback, STRCAST("ice_exception")); - if(!method.get()) - { - ostringstream ostr; - ostr << "AMI callback object for operation `" << _op->name << "' does not define ice_exception()"; - string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast(str.c_str())); - } - else - { - PyObjectHandle args = Py_BuildValue(STRCAST("(O)"), ex); - PyObjectHandle tmp = PyObject_Call(method.get(), args.get(), 0); - if(PyErr_Occurred()) - { - PyErr_Print(); - } - } + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + callSent(_callback, "ice_sent", false, false); } // -// AsyncSentTypedInvocation +// SyncBlobjectInvocation // -IcePy::AsyncSentTypedInvocation::AsyncSentTypedInvocation(const Ice::ObjectPrx& prx, const OperationPtr& op) - : Invocation(prx), TypedInvocation(prx, op), AsyncTypedInvocation(prx, op) +IcePy::SyncBlobjectInvocation::SyncBlobjectInvocation(const Ice::ObjectPrx& prx) + : Invocation(prx) { } -void -IcePy::AsyncSentTypedInvocation::ice_sent() +PyObject* +IcePy::SyncBlobjectInvocation::invoke(PyObject* args, PyObject* /* kwds */) { - AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + char* operation; + PyObject* mode; + PyObject* inParams; + PyObject* operationModeType = lookupType("Ice.OperationMode"); + PyObject* ctx = 0; + if(!PyArg_ParseTuple(args, STRCAST("sO!O!|O"), &operation, operationModeType, &mode, &PyBuffer_Type, &inParams, + &ctx)) + { + return 0; + } - PyObjectHandle method = PyObject_GetAttrString(_callback, STRCAST("ice_sent")); - if(!method.get()) + PyObjectHandle modeValue = PyObject_GetAttrString(mode, STRCAST("value")); + Ice::OperationMode sendMode = (Ice::OperationMode)static_cast(PyInt_AS_LONG(modeValue.get())); + + // + // Use the array API to avoid copying the data. + // +#if PY_VERSION_HEX < 0x02050000 + const char* charBuf = 0; +#else + char* charBuf = 0; +#endif + Py_ssize_t sz = inParams->ob_type->tp_as_buffer->bf_getcharbuffer(inParams, 0, &charBuf); + const Ice::Byte* mem = reinterpret_cast(charBuf); + pair in(0, 0); + if(sz > 0) { - ostringstream ostr; - ostr << "AMI callback object for operation `" << _op->name << "' does not define ice_sent()"; - string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast(str.c_str())); + in.first = mem; + in.second = mem + sz; } - else + + try { - PyObjectHandle args = PyTuple_New(0); - PyObjectHandle tmp = PyObject_Call(method.get(), args.get(), 0); - if(PyErr_Occurred()) + vector out; + + bool ok; + if(ctx == 0 || ctx == Py_None) { - PyErr_Print(); + AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. + ok = _prx->ice_invoke(operation, sendMode, in, out); + } + else + { + Ice::Context context; + if(!dictionaryToContext(ctx, context)) + { + return 0; + } + + AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. + ok = _prx->ice_invoke(operation, sendMode, in, out, context); + } + + // + // Prepare the result as a tuple of the bool and out param buffer. + // + PyObjectHandle result = PyTuple_New(2); + if(!result.get()) + { + throwPythonException(); + } + + if(PyTuple_SET_ITEM(result.get(), 0, ok ? getTrue() : getFalse()) < 0) + { + throwPythonException(); + } + + // + // Create the output buffer and copy in the outParams. + // + PyObjectHandle ip = PyBuffer_New(out.size()); + if(!ip.get()) + { + throwPythonException(); + } + if(!out.empty()) + { + void* buf; + Py_ssize_t sz; + if(PyObject_AsWriteBuffer(ip.get(), &buf, &sz)) + { + throwPythonException(); + } + memcpy(buf, &out[0], sz); + } + + if(PyTuple_SET_ITEM(result.get(), 1, ip.get()) < 0) + { + throwPythonException(); } + ip.release(); // PyTuple_SET_ITEM steals a reference. + + return result.release(); + } + catch(const Ice::Exception& ex) + { + setPythonException(ex); + return 0; } } // -// SyncBlobjectInvocation +// AsyncBlobjectInvocation // -IcePy::SyncBlobjectInvocation::SyncBlobjectInvocation(const Ice::ObjectPrx& prx) - : Invocation(prx) +IcePy::AsyncBlobjectInvocation::AsyncBlobjectInvocation(const Ice::ObjectPrx& prx, PyObject* pyProxy) : + Invocation(prx), _pyProxy(pyProxy), _response(0), _ex(0), _sent(0) { + Py_INCREF(_pyProxy); } -PyObject* -IcePy::SyncBlobjectInvocation::invoke(PyObject* args) +IcePy::AsyncBlobjectInvocation::~AsyncBlobjectInvocation() { + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + Py_DECREF(_pyProxy); + Py_XDECREF(_response); + Py_XDECREF(_ex); + Py_XDECREF(_sent); +} + +PyObject* +IcePy::AsyncBlobjectInvocation::invoke(PyObject* args, PyObject* kwds) +{ + static char* argNames[] = + { + const_cast("op"), + const_cast("mode"), + const_cast("inParams"), + const_cast("_response"), + const_cast("_ex"), + const_cast("_sent"), + const_cast("_ctx"), + 0 + }; char* operation; PyObject* mode; PyObject* inParams; PyObject* operationModeType = lookupType("Ice.OperationMode"); - PyObject* ctx = 0; - if(!PyArg_ParseTuple(args, STRCAST("sO!O!|O"), &operation, operationModeType, &mode, &PyBuffer_Type, &inParams, - &ctx)) + PyObject* response = Py_None; + PyObject* ex = Py_None; + PyObject* sent = Py_None; + PyObject* pyctx = Py_None; + if(!PyArg_ParseTupleAndKeywords(args, kwds, STRCAST("sO!O!|OOOO"), argNames, &operation, operationModeType, &mode, + &PyBuffer_Type, &inParams, &response, &ex, &sent, &pyctx)) { return 0; } + _op = operation; + PyObjectHandle modeValue = PyObject_GetAttrString(mode, STRCAST("value")); Ice::OperationMode sendMode = (Ice::OperationMode)static_cast(PyInt_AS_LONG(modeValue.get())); + if(PyMethod_Check(response)) + { + _response = response; + Py_INCREF(_response); + } + else if(response != Py_None) + { + PyErr_Format(PyExc_RuntimeError, STRCAST("response callback must be a method or None")); + return 0; + } + + if(PyMethod_Check(ex)) + { + _ex = ex; + Py_INCREF(_ex); + } + else if(ex != Py_None) + { + PyErr_Format(PyExc_RuntimeError, STRCAST("exception callback must be a method or None")); + return 0; + } + + if(PyMethod_Check(sent)) + { + _sent = sent; + Py_INCREF(_sent); + } + else if(sent != Py_None) + { + PyErr_Format(PyExc_RuntimeError, STRCAST("sent callback must be a method or None")); + return 0; + } + + if(!_ex && (_response || _sent)) + { + PyErr_Format(PyExc_RuntimeError, + STRCAST("exception callback must also be provided when response or sent callbacks are used")); + return 0; + } + + if(pyctx != Py_None && !PyDict_Check(pyctx)) + { + PyErr_Format(PyExc_RuntimeError, STRCAST("context must be a dictionary or None")); + return 0; + } + // // Use the array API to avoid copying the data. // @@ -1452,85 +2443,237 @@ IcePy::SyncBlobjectInvocation::invoke(PyObject* args) in.second = mem + sz; } + Ice::AsyncResultPtr result; try { - vector out; + Ice::Callback_Object_ice_invokePtr cb; + if(_response || _ex || _sent) + { + cb = Ice::newCallback_Object_ice_invoke(this, &AsyncBlobjectInvocation::response, + &AsyncBlobjectInvocation::exception, + &AsyncBlobjectInvocation::sent); + } - bool ok; - if(ctx == 0 || ctx == Py_None) + if(pyctx == Py_None) { AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - ok = _prx->ice_invoke(operation, sendMode, in, out); + if(cb) + { + result = _prx->begin_ice_invoke(operation, sendMode, in, cb); + } + else + { + result = _prx->begin_ice_invoke(operation, sendMode, in); + } } else { Ice::Context context; - if(!dictionaryToContext(ctx, context)) + if(!dictionaryToContext(pyctx, context)) { return 0; } AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - ok = _prx->ice_invoke(operation, sendMode, in, out, context); + if(cb) + { + result = _prx->begin_ice_invoke(operation, sendMode, in, context, cb); + } + else + { + result = _prx->begin_ice_invoke(operation, sendMode, in, context); + } + } + } + catch(const Ice::CommunicatorDestroyedException& ex) + { + // + // CommunicatorDestroyedException is the only exception that can propagate directly. + // + setPythonException(ex); + return 0; + } + catch(const Ice::Exception&) + { + // + // No other exceptions should be raised by begin_ice_invoke. + // + assert(false); + } + + assert(result); + AsyncResultObject* obj = asyncResultNew(0); + if(!obj) + { + return 0; + } + obj->result = new Ice::AsyncResultPtr(result); + obj->invocation = new InvocationPtr(this); + obj->proxy = _pyProxy; + Py_INCREF(obj->proxy); + return reinterpret_cast(obj); +} + +PyObject* +IcePy::AsyncBlobjectInvocation::end(const Ice::ObjectPrx& proxy, const Ice::AsyncResultPtr& r) +{ + try + { + pair results; + bool ok; + + { + AllowThreads allowThreads; // Release Python's global interpreter lock during blocking operations. + ok = proxy->___end_ice_invoke(results, r); } // - // Prepare the result as a tuple of the bool and out param buffer. + // Prepare the results as a tuple of the bool and out param buffer. // - PyObjectHandle result = PyTuple_New(2); - if(!result.get()) + PyObjectHandle args = PyTuple_New(2); + if(!args.get()) { - throwPythonException(); + return 0; } - if(PyTuple_SET_ITEM(result.get(), 0, ok ? getTrue() : getFalse()) < 0) + if(PyTuple_SET_ITEM(args.get(), 0, ok ? getTrue() : getFalse()) < 0) { - throwPythonException(); + return 0; } // // Create the output buffer and copy in the outParams. // - PyObjectHandle ip = PyBuffer_New(out.size()); + PyObjectHandle ip = PyBuffer_New(results.second - results.first); if(!ip.get()) { - throwPythonException(); + return 0; } - if(!out.empty()) + + void* buf; + Py_ssize_t sz; + if(PyObject_AsWriteBuffer(ip.get(), &buf, &sz)) { - void* buf; - Py_ssize_t sz; - if(PyObject_AsWriteBuffer(ip.get(), &buf, &sz)) - { - throwPythonException(); - } - memcpy(buf, &out[0], sz); + return 0; } - - if(PyTuple_SET_ITEM(result.get(), 1, ip.get()) < 0) + assert(sz == results.second - results.first); + memcpy(buf, results.first, sz); + + if(PyTuple_SET_ITEM(args.get(), 1, ip.get()) < 0) { - throwPythonException(); + return 0; } ip.release(); // PyTuple_SET_ITEM steals a reference. - return result.release(); + return args.release(); + } + catch(const AbortMarshaling&) + { + // Nothing to do. + } + catch(const IceUtil::IllegalArgumentException& ex) + { + PyErr_Format(PyExc_RuntimeError, STRCAST(ex.reason().c_str())); } catch(const Ice::Exception& ex) { setPythonException(ex); - return 0; + } + + assert(PyErr_Occurred()); + return 0; +} + +void +IcePy::AsyncBlobjectInvocation::response(bool ok, const pair& results) +{ + if(_response) + { + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + // + // Prepare the args as a tuple of the bool and out param buffer. + // + PyObjectHandle args = PyTuple_New(2); + if(!args.get()) + { + assert(PyErr_Occurred()); + PyErr_Print(); + return; + } + + if(PyTuple_SET_ITEM(args.get(), 0, ok ? getTrue() : getFalse()) < 0) + { + assert(PyErr_Occurred()); + PyErr_Print(); + return; + } + + // + // Create the output buffer and copy in the outParams. + // + PyObjectHandle ip = PyBuffer_New(results.second - results.first); + if(!ip.get()) + { + assert(PyErr_Occurred()); + PyErr_Print(); + return; + } + + void* buf; + Py_ssize_t sz; + if(PyObject_AsWriteBuffer(ip.get(), &buf, &sz)) + { + assert(PyErr_Occurred()); + PyErr_Print(); + return; + } + assert(sz == results.second - results.first); + memcpy(buf, results.first, sz); + + if(PyTuple_SET_ITEM(args.get(), 1, ip.get()) < 0) + { + assert(PyErr_Occurred()); + PyErr_Print(); + return; + } + ip.release(); // PyTuple_SET_ITEM steals a reference. + + PyObjectHandle tmp = PyObject_Call(_response, args.get(), 0); + if(PyErr_Occurred()) + { + handleException(); // Callback raised an exception. + } + } +} + +void +IcePy::AsyncBlobjectInvocation::exception(const Ice::Exception& ex) +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + assert(_ex); + callException(_ex, ex); +} + +void +IcePy::AsyncBlobjectInvocation::sent(bool sentSynchronously) +{ + if(_sent) + { + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + callSent(_sent, sentSynchronously, true); } } // -// AsyncBlobjectInvocation +// OldAsyncBlobjectInvocation // -IcePy::AsyncBlobjectInvocation::AsyncBlobjectInvocation(const Ice::ObjectPrx& prx) - : Invocation(prx), _callback(0) +IcePy::OldAsyncBlobjectInvocation::OldAsyncBlobjectInvocation(const Ice::ObjectPrx& prx) : + Invocation(prx), _callback(0) { } -IcePy::AsyncBlobjectInvocation::~AsyncBlobjectInvocation() +IcePy::OldAsyncBlobjectInvocation::~OldAsyncBlobjectInvocation() { AdoptThread adoptThread; // Ensure the current thread is able to call into Python. @@ -1538,7 +2681,7 @@ IcePy::AsyncBlobjectInvocation::~AsyncBlobjectInvocation() } PyObject* -IcePy::AsyncBlobjectInvocation::invoke(PyObject* args) +IcePy::OldAsyncBlobjectInvocation::invoke(PyObject* args, PyObject* /* kwds */) { char* operation; PyObject* mode; @@ -1574,13 +2717,19 @@ IcePy::AsyncBlobjectInvocation::invoke(PyObject* args) in.second = mem + sz; } - bool result = false; + bool sentSynchronously = false; try { + Ice::AsyncResultPtr result; + Ice::Callback_Object_ice_invokePtr cb = + Ice::newCallback_Object_ice_invoke(this, &OldAsyncBlobjectInvocation::response, + &OldAsyncBlobjectInvocation::exception, + &OldAsyncBlobjectInvocation::sent); + if(ctx == 0 || ctx == Py_None) { AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = _prx->ice_invoke_async(this, operation, sendMode, in); + result = _prx->begin_ice_invoke(operation, sendMode, in, cb); } else { @@ -1591,8 +2740,10 @@ IcePy::AsyncBlobjectInvocation::invoke(PyObject* args) } AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = _prx->ice_invoke_async(this, operation, sendMode, in, context); + result = _prx->begin_ice_invoke(operation, sendMode, in, context, cb); } + + sentSynchronously = result->sentSynchronously(); } catch(const Ice::CommunicatorDestroyedException& ex) { @@ -1602,18 +2753,19 @@ IcePy::AsyncBlobjectInvocation::invoke(PyObject* args) setPythonException(ex); return 0; } - catch(const Ice::Exception& ex) + catch(const Ice::Exception&) { - PyObjectHandle exh = convertException(ex); - assert(exh.get()); - handleException(exh.get()); + // + // No other exceptions should be raised by begin_ice_invoke. + // + assert(false); } - PyRETURN_BOOL(result); + PyRETURN_BOOL(sentSynchronously); } void -IcePy::AsyncBlobjectInvocation::ice_response(bool ok, const pair& results) +IcePy::OldAsyncBlobjectInvocation::response(bool ok, const pair& results) { AdoptThread adoptThread; // Ensure the current thread is able to call into Python. @@ -1667,20 +2819,22 @@ IcePy::AsyncBlobjectInvocation::ice_response(bool ok, const pair(str.c_str())); } else { + PyObjectHandle method = PyObject_GetAttrString(_callback, STRCAST(methodName.c_str())); + assert(method.get()); PyObjectHandle tmp = PyObject_Call(method.get(), args.get(), 0); if(PyErr_Occurred()) { - PyErr_Print(); + handleException(); // Callback raised an exception. } } } @@ -1694,68 +2848,19 @@ IcePy::AsyncBlobjectInvocation::ice_response(bool ok, const pair(str.c_str())); - } - else - { - PyObjectHandle args = Py_BuildValue(STRCAST("(O)"), ex); - PyObjectHandle tmp = PyObject_Call(method.get(), args.get(), 0); - if(PyErr_Occurred()) - { - PyErr_Print(); - } - } -} - -// -// AsyncSentBlobjectInvocation -// -IcePy::AsyncSentBlobjectInvocation::AsyncSentBlobjectInvocation(const Ice::ObjectPrx& prx) - : Invocation(prx), AsyncBlobjectInvocation(prx) -{ + callException(_callback, "ice_invoke", "ice_exception", ex); } void -IcePy::AsyncSentBlobjectInvocation::ice_sent() +IcePy::OldAsyncBlobjectInvocation::sent(bool) { AdoptThread adoptThread; // Ensure the current thread is able to call into Python. - PyObjectHandle method = PyObject_GetAttrString(_callback, STRCAST("ice_sent")); - if(!method.get()) - { - ostringstream ostr; - ostr << "AMI callback object for ice_invoke_async does not define ice_sent()"; - string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast(str.c_str())); - } - else - { - PyObjectHandle args = PyTuple_New(0); - PyObjectHandle tmp = PyObject_Call(method.get(), args.get(), 0); - if(PyErr_Occurred()) - { - PyErr_Print(); - } - } + callSent(_callback, "ice_sent", false, false); } // @@ -2269,90 +3374,171 @@ IcePy::BlobjectUpcall::exception(PyException& ex) } PyObject* -IcePy::iceIsA(const Ice::ObjectPrx& prx, PyObject* args) +IcePy::invokeBuiltin(PyObject* proxy, const string& builtin, PyObject* args) { + string name = "_op_" + builtin; PyObject* objectType = lookupType("Ice.Object"); assert(objectType); - PyObjectHandle obj = PyObject_GetAttrString(objectType, "_op_ice_isA"); + PyObjectHandle obj = PyObject_GetAttrString(objectType, STRCAST(name.c_str())); assert(obj.get()); OperationPtr op = getOperation(obj.get()); assert(op); - InvocationPtr i = new SyncTypedInvocation(prx, op); + Ice::ObjectPrx p = getProxy(proxy); + InvocationPtr i = new SyncTypedInvocation(p, op); return i->invoke(args); } PyObject* -IcePy::icePing(const Ice::ObjectPrx& prx, PyObject* args) +IcePy::beginBuiltin(PyObject* proxy, const string& builtin, PyObject* args) { + string name = "_op_" + builtin; PyObject* objectType = lookupType("Ice.Object"); assert(objectType); - PyObjectHandle obj = PyObject_GetAttrString(objectType, "_op_ice_ping"); + PyObjectHandle obj = PyObject_GetAttrString(objectType, STRCAST(name.c_str())); assert(obj.get()); OperationPtr op = getOperation(obj.get()); assert(op); - InvocationPtr i = new SyncTypedInvocation(prx, op); + Ice::ObjectPrx p = getProxy(proxy); + InvocationPtr i = new AsyncTypedInvocation(p, proxy, op); return i->invoke(args); } PyObject* -IcePy::iceIds(const Ice::ObjectPrx& prx, PyObject* args) +IcePy::endBuiltin(PyObject* proxy, const string& builtin, PyObject* args) { + PyObject* result; + if(!PyArg_ParseTuple(args, STRCAST("O!"), &AsyncResultType, &result)) + { + return 0; + } + + string name = "_op_" + builtin; PyObject* objectType = lookupType("Ice.Object"); assert(objectType); - PyObjectHandle obj = PyObject_GetAttrString(objectType, "_op_ice_ids"); + PyObjectHandle obj = PyObject_GetAttrString(objectType, STRCAST(name.c_str())); assert(obj.get()); OperationPtr op = getOperation(obj.get()); assert(op); - InvocationPtr i = new SyncTypedInvocation(prx, op); - return i->invoke(args); + AsyncResultObject* ar = reinterpret_cast(result); + assert(ar); + AsyncTypedInvocationPtr i = AsyncTypedInvocationPtr::dynamicCast(*ar->invocation); + if(!i) + { + PyErr_Format(PyExc_ValueError, STRCAST("invalid AsyncResult object passed to end_%s"), op->name.c_str()); + return 0; + } + Ice::ObjectPrx p = getProxy(proxy); + return i->end(p, op, *ar->result); } PyObject* -IcePy::iceId(const Ice::ObjectPrx& prx, PyObject* args) +IcePy::iceInvoke(PyObject* proxy, PyObject* args) { - PyObject* objectType = lookupType("Ice.Object"); - assert(objectType); - PyObjectHandle obj = PyObject_GetAttrString(objectType, "_op_ice_id"); - assert(obj.get()); - - OperationPtr op = getOperation(obj.get()); - assert(op); - - InvocationPtr i = new SyncTypedInvocation(prx, op); + Ice::ObjectPrx p = getProxy(proxy); + InvocationPtr i = new SyncBlobjectInvocation(p); return i->invoke(args); } PyObject* -IcePy::iceInvoke(const Ice::ObjectPrx& prx, PyObject* args) +IcePy::iceInvokeAsync(PyObject* proxy, PyObject* args) { - InvocationPtr i = new SyncBlobjectInvocation(prx); + Ice::ObjectPrx p = getProxy(proxy); + InvocationPtr i = new OldAsyncBlobjectInvocation(p); return i->invoke(args); } PyObject* -IcePy::iceInvokeAsync(const Ice::ObjectPrx& prx, PyObject* args) +IcePy::beginIceInvoke(PyObject* proxy, PyObject* args, PyObject* kwds) { - // - // If the callback implements an ice_sent method, we create a wrapper that derives - // from AMISentCallback. - // - assert(PyTuple_GET_SIZE(args) > 0); - PyObject* callback = PyTuple_GET_ITEM(args, 0); - if(PyObject_HasAttrString(callback, STRCAST("ice_sent"))) + Ice::ObjectPrx p = getProxy(proxy); + InvocationPtr i = new AsyncBlobjectInvocation(p, proxy); + return i->invoke(args, kwds); +} + +PyObject* +IcePy::endIceInvoke(PyObject* proxy, PyObject* args) +{ + PyObject* result; + if(!PyArg_ParseTuple(args, STRCAST("O!"), &AsyncResultType, &result)) { - InvocationPtr i = new AsyncSentBlobjectInvocation(prx); - return i->invoke(args); + return 0; } - else + + AsyncResultObject* ar = reinterpret_cast(result); + assert(ar); + AsyncBlobjectInvocationPtr i = AsyncBlobjectInvocationPtr::dynamicCast(*ar->invocation); + if(!i) + { + PyErr_Format(PyExc_ValueError, STRCAST("invalid AsyncResult object passed to end_ice_invoke")); + return 0; + } + Ice::ObjectPrx p = getProxy(proxy); + return i->end(p, *ar->result); +} + +PyObject* +IcePy::createAsyncResult(const Ice::AsyncResultPtr& r, PyObject* proxy, PyObject* connection, PyObject* communicator) +{ + AsyncResultObject* obj = asyncResultNew(0); + if(!obj) + { + return 0; + } + obj->result = new Ice::AsyncResultPtr(r); + obj->proxy = proxy; + Py_XINCREF(obj->proxy); + obj->connection = connection; + Py_XINCREF(obj->connection); + obj->communicator = communicator; + Py_XINCREF(obj->communicator); + return reinterpret_cast(obj); +} + +Ice::AsyncResultPtr +IcePy::getAsyncResult(PyObject* p) +{ + assert(PyObject_IsInstance(p, reinterpret_cast(&AsyncResultType)) == 1); + AsyncResultObject* obj = reinterpret_cast(p); + return *obj->result; +} + +IcePy::FlushCallback::FlushCallback(PyObject* ex, PyObject* sent, const string& op) : + _ex(ex), _sent(sent), _op(op) +{ + assert(_ex); + Py_INCREF(_ex); + Py_XINCREF(_sent); +} + +IcePy::FlushCallback::~FlushCallback() +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + Py_DECREF(_ex); + Py_XDECREF(_sent); +} + +void +IcePy::FlushCallback::exception(const Ice::Exception& ex) +{ + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + callException(_ex, ex); +} + +void +IcePy::FlushCallback::sent(bool sentSynchronously) +{ + if(_sent) { - InvocationPtr i = new AsyncBlobjectInvocation(prx); - return i->invoke(args); + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + callSent(_sent, sentSynchronously, true); } } -- cgit v1.2.3