diff options
author | Benoit Foucher <benoit@zeroc.com> | 2017-06-01 09:26:44 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2017-06-01 09:26:44 +0200 |
commit | 19077d22e78f93212445c6969045e736b4a466d7 (patch) | |
tree | 484d4b0dc9a791c9f8f0c4b3dbf212856aba1f55 | |
parent | Updated recursive mutex check (diff) | |
download | ice-19077d22e78f93212445c6969045e736b4a466d7.tar.bz2 ice-19077d22e78f93212445c6969045e736b4a466d7.tar.xz ice-19077d22e78f93212445c6969045e736b4a466d7.zip |
Fixed ICE-7941 - scheduleCallback now provides the connection to the dispatcher work item, fixed naming of AsyncResult private methods
-rw-r--r-- | cpp/include/Ice/AsyncResult.h | 20 | ||||
-rw-r--r-- | cpp/include/Ice/OutgoingAsync.h | 16 | ||||
-rw-r--r-- | cpp/src/Ice/AsyncResult.cpp | 6 | ||||
-rw-r--r-- | cpp/src/Ice/CommunicatorI.cpp | 4 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 8 | ||||
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 34 | ||||
-rw-r--r-- | cpp/src/Ice/Proxy.cpp | 114 | ||||
-rw-r--r-- | cpp/src/slice2cpp/Gen.cpp | 24 | ||||
-rw-r--r-- | python/modules/IcePy/Operation.cpp | 2 | ||||
-rw-r--r-- | python/python/Ice/__init__.py | 8 | ||||
-rw-r--r-- | python/test/Ice/ami/AllTests.py | 49 |
11 files changed, 170 insertions, 115 deletions
diff --git a/cpp/include/Ice/AsyncResult.h b/cpp/include/Ice/AsyncResult.h index c16e6341e03..56e53e65213 100644 --- a/cpp/include/Ice/AsyncResult.h +++ b/cpp/include/Ice/AsyncResult.h @@ -51,16 +51,16 @@ public: virtual const std::string& getOperation() const = 0; - virtual bool waitForResponse() = 0; - virtual Ice::InputStream* startReadParams() = 0; - virtual void endReadParams() = 0; - virtual void readEmptyParams() = 0; - virtual void readParamEncaps(const ::Ice::Byte*&, ::Ice::Int&) = 0; - virtual void throwUserException() = 0; + virtual bool _waitForResponse() = 0; + virtual Ice::InputStream* _startReadParams() = 0; + virtual void _endReadParams() = 0; + virtual void _readEmptyParams() = 0; + virtual void _readParamEncaps(const ::Ice::Byte*&, ::Ice::Int&) = 0; + virtual void _throwUserException() = 0; - static void check(const AsyncResultPtr&, const ::IceProxy::Ice::Object*, const ::std::string&); - static void check(const AsyncResultPtr&, const Connection*, const ::std::string&); - static void check(const AsyncResultPtr&, const Communicator*, const ::std::string&); + static void _check(const AsyncResultPtr&, const ::IceProxy::Ice::Object*, const ::std::string&); + static void _check(const AsyncResultPtr&, const Connection*, const ::std::string&); + static void _check(const AsyncResultPtr&, const Communicator*, const ::std::string&); class Callback : public IceUtil::Shared { @@ -70,7 +70,7 @@ public: }; typedef IceUtil::Handle<Callback> CallbackPtr; - virtual void scheduleCallback(const CallbackPtr&) = 0; + virtual void _scheduleCallback(const CallbackPtr&) = 0; protected: diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h index 33e0d4b2e14..572515e0324 100644 --- a/cpp/include/Ice/OutgoingAsync.h +++ b/cpp/include/Ice/OutgoingAsync.h @@ -101,14 +101,14 @@ public: virtual void throwLocalException() const; - virtual bool waitForResponse(); - virtual Ice::InputStream* startReadParams(); - virtual void endReadParams(); - virtual void readEmptyParams(); - virtual void readParamEncaps(const ::Ice::Byte*&, ::Ice::Int&); - virtual void throwUserException(); - - virtual void scheduleCallback(const CallbackPtr&); + virtual bool _waitForResponse(); + virtual Ice::InputStream* _startReadParams(); + virtual void _endReadParams(); + virtual void _readEmptyParams(); + virtual void _readParamEncaps(const ::Ice::Byte*&, ::Ice::Int&); + virtual void _throwUserException(); + + virtual void _scheduleCallback(const CallbackPtr&); #endif void attachRemoteObserver(const Ice::ConnectionInfoPtr& c, const Ice::EndpointPtr& endpt, Ice::Int requestId) diff --git a/cpp/src/Ice/AsyncResult.cpp b/cpp/src/Ice/AsyncResult.cpp index 6d6826e755c..7cc69b86dce 100644 --- a/cpp/src/Ice/AsyncResult.cpp +++ b/cpp/src/Ice/AsyncResult.cpp @@ -24,7 +24,7 @@ AsyncResult::~AsyncResult() } void -AsyncResult::check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation) +AsyncResult::_check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation) { check(r, operation); if(r->getProxy().get() != prx) @@ -36,7 +36,7 @@ AsyncResult::check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, co } void -AsyncResult::check(const AsyncResultPtr& r, const Ice::Communicator* com, const string& operation) +AsyncResult::_check(const AsyncResultPtr& r, const Ice::Communicator* com, const string& operation) { check(r, operation); if(r->getCommunicator().get() != com) @@ -48,7 +48,7 @@ AsyncResult::check(const AsyncResultPtr& r, const Ice::Communicator* com, const } void -AsyncResult::check(const AsyncResultPtr& r, const Ice::Connection* con, const string& operation) +AsyncResult::_check(const AsyncResultPtr& r, const Ice::Connection* con, const string& operation) { check(r, operation); if(r->getConnection().get() != con) diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp index bb3f32bbf05..a7020252b24 100644 --- a/cpp/src/Ice/CommunicatorI.cpp +++ b/cpp/src/Ice/CommunicatorI.cpp @@ -465,8 +465,8 @@ Ice::CommunicatorI::_iceI_begin_flushBatchRequests(CompressBatch compress, void Ice::CommunicatorI::end_flushBatchRequests(const AsyncResultPtr& r) { - AsyncResult::check(r, this, flushBatchRequests_name); - r->waitForResponse(); + AsyncResult::_check(r, this, flushBatchRequests_name); + r->_waitForResponse(); } #endif diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 46b276647c0..2eb66ff7d0a 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -890,8 +890,8 @@ Ice::ConnectionI::_iceI_begin_flushBatchRequests(CompressBatch compress, void Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r) { - AsyncResult::check(r, this, flushBatchRequests_name); - r->waitForResponse(); + AsyncResult::_check(r, this, flushBatchRequests_name); + r->_waitForResponse(); } #endif @@ -1058,8 +1058,8 @@ Ice::ConnectionI::_iceI_begin_heartbeat(const CallbackBasePtr& cb, const LocalOb void Ice::ConnectionI::end_heartbeat(const AsyncResultPtr& r) { - AsyncResult::check(r, this, __heartbeat_name); - r->waitForResponse(); + AsyncResult::_check(r, this, __heartbeat_name); + r->_waitForResponse(); } #endif diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 209092b5c4b..60d4cc04983 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -471,7 +471,7 @@ OutgoingAsyncBase::throwLocalException() const } bool -OutgoingAsyncBase::waitForResponse() +OutgoingAsyncBase::_waitForResponse() { Lock sync(_m); if(_state & EndCalled) @@ -492,32 +492,32 @@ OutgoingAsyncBase::waitForResponse() } Ice::InputStream* -OutgoingAsyncBase::startReadParams() +OutgoingAsyncBase::_startReadParams() { _is.startEncapsulation(); return &_is; } void -OutgoingAsyncBase::endReadParams() +OutgoingAsyncBase::_endReadParams() { _is.endEncapsulation(); } void -OutgoingAsyncBase::readEmptyParams() +OutgoingAsyncBase::_readEmptyParams() { _is.skipEmptyEncapsulation(); } void -OutgoingAsyncBase::readParamEncaps(const ::Ice::Byte*& encaps, ::Ice::Int& sz) +OutgoingAsyncBase::_readParamEncaps(const ::Ice::Byte*& encaps, ::Ice::Int& sz) { _is.readEncapsulation(encaps, sz); } void -OutgoingAsyncBase::throwUserException() +OutgoingAsyncBase::_throwUserException() { try { @@ -532,23 +532,25 @@ OutgoingAsyncBase::throwUserException() } void -OutgoingAsyncBase::scheduleCallback(const CallbackPtr& cb) +OutgoingAsyncBase::_scheduleCallback(const CallbackPtr& cb) { + // + // NOTE: for internal use only. This should only be called when the invocation has + // completed. Accessing _cachedConnection is not safe otherwise. + // + class WorkItem : public DispatchWorkItem { public: - WorkItem(const CallbackPtr& cb) : _cb(cb) {} + WorkItem(const ConnectionPtr& connection, const CallbackPtr& cb) : + DispatchWorkItem(connection), _cb(cb) + { + } virtual void run() { - try - { - _cb->run(); - } - catch(...) - { - } + _cb->run(); } private: @@ -559,7 +561,7 @@ OutgoingAsyncBase::scheduleCallback(const CallbackPtr& cb) // // CommunicatorDestroyedException is the only exception that can propagate directly from this method. // - _instance->clientThreadPool()->dispatch(new WorkItem(cb)); + _instance->clientThreadPool()->dispatch(new WorkItem(_cachedConnection, cb)); } #endif diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index 7b7a7311fa6..f5d6551edf1 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -323,13 +323,13 @@ IceProxy::Ice::Object::_iceI_begin_ice_isA(const string& typeId, bool IceProxy::Ice::Object::end_ice_isA(const AsyncResultPtr& result) { - AsyncResult::check(result, this, ice_isA_name); - bool ok = result->waitForResponse(); + AsyncResult::_check(result, this, ice_isA_name); + bool ok = result->_waitForResponse(); if(!ok) { try { - result->throwUserException(); + result->_throwUserException(); } catch(const UserException& ex) { @@ -337,17 +337,17 @@ IceProxy::Ice::Object::end_ice_isA(const AsyncResultPtr& result) } } bool ret; - ::Ice::InputStream* istr = result->startReadParams(); + ::Ice::InputStream* istr = result->_startReadParams(); istr->read(ret); - result->endReadParams(); + result->_endReadParams(); return ret; } AsyncResultPtr IceProxy::Ice::Object::_iceI_begin_ice_ping(const Context& ctx, - const ::IceInternal::CallbackBasePtr& del, - const ::Ice::LocalObjectPtr& cookie, - bool sync) + const ::IceInternal::CallbackBasePtr& del, + const ::Ice::LocalObjectPtr& cookie, + bool sync) { OutgoingAsyncPtr result = new CallbackOutgoing(this, ice_ping_name, del, cookie, sync); try @@ -371,9 +371,9 @@ IceProxy::Ice::Object::end_ice_ping(const AsyncResultPtr& result) AsyncResultPtr IceProxy::Ice::Object::_iceI_begin_ice_ids(const Context& ctx, - const ::IceInternal::CallbackBasePtr& del, - const ::Ice::LocalObjectPtr& cookie, - bool sync) + const ::IceInternal::CallbackBasePtr& del, + const ::Ice::LocalObjectPtr& cookie, + bool sync) { _checkTwowayOnly(ice_ids_name, sync); OutgoingAsyncPtr result = new CallbackOutgoing(this, ice_ids_name, del, cookie, sync); @@ -393,13 +393,13 @@ IceProxy::Ice::Object::_iceI_begin_ice_ids(const Context& ctx, vector<string> IceProxy::Ice::Object::end_ice_ids(const AsyncResultPtr& result) { - AsyncResult::check(result, this, ice_ids_name); - bool ok = result->waitForResponse(); + AsyncResult::_check(result, this, ice_ids_name); + bool ok = result->_waitForResponse(); if(!ok) { try { - result->throwUserException(); + result->_throwUserException(); } catch(const UserException& ex) { @@ -407,17 +407,17 @@ IceProxy::Ice::Object::end_ice_ids(const AsyncResultPtr& result) } } vector<string> ret; - ::Ice::InputStream* istr = result->startReadParams(); + ::Ice::InputStream* istr = result->_startReadParams(); istr->read(ret, false); - result->endReadParams(); + result->_endReadParams(); return ret; } AsyncResultPtr IceProxy::Ice::Object::_iceI_begin_ice_id(const Context& ctx, - const ::IceInternal::CallbackBasePtr& del, - const ::Ice::LocalObjectPtr& cookie, - bool sync) + const ::IceInternal::CallbackBasePtr& del, + const ::Ice::LocalObjectPtr& cookie, + bool sync) { _checkTwowayOnly(ice_id_name, sync); OutgoingAsyncPtr result = new CallbackOutgoing(this, ice_id_name, del, cookie, sync); @@ -437,13 +437,13 @@ IceProxy::Ice::Object::_iceI_begin_ice_id(const Context& ctx, string IceProxy::Ice::Object::end_ice_id(const AsyncResultPtr& result) { - AsyncResult::check(result, this, ice_id_name); - bool ok = result->waitForResponse(); + AsyncResult::_check(result, this, ice_id_name); + bool ok = result->_waitForResponse(); if(!ok) { try { - result->throwUserException(); + result->_throwUserException(); } catch(const UserException& ex) { @@ -451,9 +451,9 @@ IceProxy::Ice::Object::end_ice_id(const AsyncResultPtr& result) } } string ret; - ::Ice::InputStream* istr = result->startReadParams(); + ::Ice::InputStream* istr = result->_startReadParams(); istr->read(ret, false); - result->endReadParams(); + result->_endReadParams(); return ret; } @@ -479,12 +479,12 @@ IceProxy::Ice::Object::ice_invoke(const string& operation, AsyncResultPtr IceProxy::Ice::Object::_iceI_begin_ice_invoke(const string& operation, - OperationMode mode, - const vector<Byte>& inEncaps, - const Context& ctx, - const ::IceInternal::CallbackBasePtr& del, - const ::Ice::LocalObjectPtr& cookie, - bool sync) + OperationMode mode, + const vector<Byte>& inEncaps, + const Context& ctx, + const ::IceInternal::CallbackBasePtr& del, + const ::Ice::LocalObjectPtr& cookie, + bool sync) { pair<const Byte*, const Byte*> inPair; if(inEncaps.empty()) @@ -502,13 +502,13 @@ IceProxy::Ice::Object::_iceI_begin_ice_invoke(const string& operation, bool IceProxy::Ice::Object::end_ice_invoke(vector<Byte>& outEncaps, const AsyncResultPtr& result) { - AsyncResult::check(result, this, ice_invoke_name); - bool ok = result->waitForResponse(); + AsyncResult::_check(result, this, ice_invoke_name); + bool ok = result->_waitForResponse(); if(_reference->getMode() == Reference::ModeTwoway) { const Byte* v; Int sz; - result->readParamEncaps(v, sz); + result->_readParamEncaps(v, sz); vector<Byte>(v, v + sz).swap(outEncaps); } return ok; @@ -516,12 +516,12 @@ IceProxy::Ice::Object::end_ice_invoke(vector<Byte>& outEncaps, const AsyncResult AsyncResultPtr IceProxy::Ice::Object::_iceI_begin_ice_invoke(const string& operation, - OperationMode mode, - const pair<const Byte*, const Byte*>& inEncaps, - const Context& ctx, - const ::IceInternal::CallbackBasePtr& del, - const ::Ice::LocalObjectPtr& cookie, - bool sync) + OperationMode mode, + const pair<const Byte*, const Byte*>& inEncaps, + const Context& ctx, + const ::IceInternal::CallbackBasePtr& del, + const ::Ice::LocalObjectPtr& cookie, + bool sync) { OutgoingAsyncPtr result = new CallbackOutgoing(this, ice_invoke_name, del, cookie, sync); try @@ -540,12 +540,12 @@ IceProxy::Ice::Object::_iceI_begin_ice_invoke(const string& operation, bool IceProxy::Ice::Object::_iceI_end_ice_invoke(pair<const Byte*, const Byte*>& outEncaps, const AsyncResultPtr& result) { - AsyncResult::check(result, this, ice_invoke_name); - bool ok = result->waitForResponse(); + AsyncResult::_check(result, this, ice_invoke_name); + bool ok = result->_waitForResponse(); if(_reference->getMode() == Reference::ModeTwoway) { Int sz; - result->readParamEncaps(outEncaps.first, sz); + result->_readParamEncaps(outEncaps.first, sz); outEncaps.second = outEncaps.first + sz; } return ok; @@ -553,7 +553,7 @@ IceProxy::Ice::Object::_iceI_end_ice_invoke(pair<const Byte*, const Byte*>& outE ::Ice::AsyncResultPtr IceProxy::Ice::Object::_iceI_begin_ice_flushBatchRequests(const ::IceInternal::CallbackBasePtr& del, - const ::Ice::LocalObjectPtr& cookie) + const ::Ice::LocalObjectPtr& cookie) { class ProxyFlushBatchAsyncWithCallback : public ProxyFlushBatchAsync, public CallbackCompletion { @@ -589,29 +589,29 @@ IceProxy::Ice::Object::_iceI_begin_ice_flushBatchRequests(const ::IceInternal::C void IceProxy::Ice::Object::end_ice_flushBatchRequests(const AsyncResultPtr& result) { - AsyncResult::check(result, this, ice_flushBatchRequests_name); - result->waitForResponse(); + AsyncResult::_check(result, this, ice_flushBatchRequests_name); + result->_waitForResponse(); } void IceProxy::Ice::Object::_end(const ::Ice::AsyncResultPtr& result, const std::string& operation) const { - AsyncResult::check(result, this, operation); - bool ok = result->waitForResponse(); + AsyncResult::_check(result, this, operation); + bool ok = result->_waitForResponse(); if(_reference->getMode() == Reference::ModeTwoway) { if(!ok) { try { - result->throwUserException(); + result->_throwUserException(); } catch(const UserException& ex) { throw UnknownUserException(__FILE__, __LINE__, ex.ice_id()); } } - result->readEmptyParams(); + result->_readEmptyParams(); } } @@ -639,7 +639,7 @@ IceProxy::Ice::Object::_newInstance() const AsyncResultPtr IceProxy::Ice::Object::_iceI_begin_ice_getConnection(const ::IceInternal::CallbackBasePtr& del, - const ::Ice::LocalObjectPtr& cookie) + const ::Ice::LocalObjectPtr& cookie) { class ProxyGetConnectionWithCallback : public ProxyGetConnection, public CallbackCompletion { @@ -675,8 +675,8 @@ IceProxy::Ice::Object::_iceI_begin_ice_getConnection(const ::IceInternal::Callba ConnectionPtr IceProxy::Ice::Object::end_ice_getConnection(const AsyncResultPtr& result) { - AsyncResult::check(result, this, ice_getConnection_name); - result->waitForResponse(); + AsyncResult::_check(result, this, ice_getConnection_name); + result->_waitForResponse(); return result->getConnection(); } @@ -1286,10 +1286,10 @@ ICE_OBJECT_PRX::setup(const ReferencePtr& ref) int ICE_OBJECT_PRX::_handleException(const Exception& ex, - const RequestHandlerPtr& handler, - OperationMode mode, - bool sent, - int& cnt) + const RequestHandlerPtr& handler, + OperationMode mode, + bool sent, + int& cnt) { _updateRequestHandler(handler, 0); // Clear the request handler @@ -1376,7 +1376,7 @@ ICE_OBJECT_PRX::_setRequestHandler(const ::IceInternal::RequestHandlerPtr& handl void ICE_OBJECT_PRX::_updateRequestHandler(const ::IceInternal::RequestHandlerPtr& previous, - const ::IceInternal::RequestHandlerPtr& handler) + const ::IceInternal::RequestHandlerPtr& handler) { if(_reference->getCacheConnection() && previous) { diff --git a/cpp/src/slice2cpp/Gen.cpp b/cpp/src/slice2cpp/Gen.cpp index e096ae39e1e..6b4e25ecba6 100644 --- a/cpp/src/slice2cpp/Gen.cpp +++ b/cpp/src/slice2cpp/Gen.cpp @@ -2055,7 +2055,7 @@ Slice::Gen::ProxyVisitor::visitOperation(const OperationPtr& p) C << sb; if(p->returnsData()) { - C << nl << "::Ice::AsyncResult::check(result, this, " << flatName << ");"; + C << nl << "::Ice::AsyncResult::_check(result, this, " << flatName << ");"; // // COMPILERFIX: It's necessary to generate the allocate code here before @@ -2064,11 +2064,11 @@ Slice::Gen::ProxyVisitor::visitOperation(const OperationPtr& p) // and Windows 64 bits when compiled with optimization (see bug 4400). // writeAllocateCode(C, ParamDeclList(), p, true, _useWstring | TypeContextAMIEnd); - C << nl << "if(!result->waitForResponse())"; + C << nl << "if(!result->_waitForResponse())"; C << sb; C << nl << "try"; C << sb; - C << nl << "result->throwUserException();"; + C << nl << "result->_throwUserException();"; C << eb; // // Generate a catch block for each legal user exception. @@ -2096,17 +2096,17 @@ Slice::Gen::ProxyVisitor::visitOperation(const OperationPtr& p) C << eb; if(ret || !outParams.empty()) { - C << nl << "::Ice::InputStream* istr = result->startReadParams();"; + C << nl << "::Ice::InputStream* istr = result->_startReadParams();"; writeUnmarshalCode(C, outParams, p, true, _useWstring | TypeContextAMIEnd); if(p->returnsClasses(false)) { C << nl << "istr->readPendingValues();"; } - C << nl << "result->endReadParams();"; + C << nl << "result->_endReadParams();"; } else { - C << nl << "result->readEmptyParams();"; + C << nl << "result->_readEmptyParams();"; } if(ret) { @@ -2126,12 +2126,12 @@ Slice::Gen::ProxyVisitor::visitOperation(const OperationPtr& p) C << sp << nl << "void IceProxy" << scope << "_iceI_end_" << name << spar << outParamsDeclEndAMI << "const ::Ice::AsyncResultPtr& result" << epar; C << sb; - C << nl << "::Ice::AsyncResult::check(result, this, " << flatName << ");"; - C << nl << "if(!result->waitForResponse())"; + C << nl << "::Ice::AsyncResult::_check(result, this, " << flatName << ");"; + C << nl << "if(!result->_waitForResponse())"; C << sb; C << nl << "try"; C << sb; - C << nl << "result->throwUserException();"; + C << nl << "result->_throwUserException();"; C << eb; // // Generate a catch block for each legal user exception. @@ -2160,17 +2160,17 @@ Slice::Gen::ProxyVisitor::visitOperation(const OperationPtr& p) if(ret || !outParams.empty()) { - C << nl << "::Ice::InputStream* istr = result->startReadParams();"; + C << nl << "::Ice::InputStream* istr = result->_startReadParams();"; writeUnmarshalCode(C, outParams, p, true, _useWstring | TypeContextAMIPrivateEnd); if(p->returnsClasses(false)) { C << nl << "istr->readPendingValues();"; } - C << nl << "result->endReadParams();"; + C << nl << "result->_endReadParams();"; } else { - C << nl << "result->readEmptyParams();"; + C << nl << "result->_readEmptyParams();"; } C << eb; } diff --git a/python/modules/IcePy/Operation.cpp b/python/modules/IcePy/Operation.cpp index 3b51db4bdfd..c75512de517 100644 --- a/python/modules/IcePy/Operation.cpp +++ b/python/modules/IcePy/Operation.cpp @@ -1134,7 +1134,7 @@ asyncResultCallLater(AsyncResultObject* self, PyObject* args) try { - (*self->result)->scheduleCallback(new CallbackI(callback)); + (*self->result)->_scheduleCallback(new CallbackI(callback)); } catch(const Ice::CommunicatorDestroyedException& ex) { diff --git a/python/python/Ice/__init__.py b/python/python/Ice/__init__.py index a167f54dde5..d159feb3fc6 100644 --- a/python/python/Ice/__init__.py +++ b/python/python/Ice/__init__.py @@ -223,12 +223,18 @@ class InvocationFuture(Future): return Future.cancel(self) def add_done_callback_async(self, fn): + def callback(future): + try: + callback(future) + except: + logging.getLogger("Ice.Future").exception('callback raised exception') + with self._condition: if self._state == Future.StateRunning: self._doneCallbacks.append(fn) return if self._asyncResult: - self._asyncResult.callLater(lambda: fn(self)) + self._asyncResult.callLater(callback) else: fn(self) diff --git a/python/test/Ice/ami/AllTests.py b/python/test/Ice/ami/AllTests.py index 36d20450bf7..3b8a902a8e5 100644 --- a/python/test/Ice/ami/AllTests.py +++ b/python/test/Ice/ami/AllTests.py @@ -7,7 +7,7 @@ # # ********************************************************************** -import Ice, Test, sys, threading, random +import Ice, Test, sys, threading, random, logging def test(b): if not b: @@ -373,6 +373,11 @@ class Thrower(CallbackBase): throwEx(self._t) def allTests(communicator, collocated): + # Ice.Future uses the Python logging facility, this tests throws exceptions from Ice.Future callbacks + # so we disable errors to prevent them to show up on the console. + logging.basicConfig() + logging.disable(logging.ERROR) + sref = "test:default -p 12010" obj = communicator.stringToProxy(sref) test(obj) @@ -708,18 +713,60 @@ def allTests(communicator, collocated): p.begin_op(cb.op, cb.noEx) cb.check() + def thrower(future): + try: + future.result() + except: + test(false) + throwEx(t) + f = p.opAsync() + f.add_done_callback(thrower) + f.add_done_callback_async(thrower) + f.result() + p.begin_op(lambda: cb.opWC(cookie), lambda ex: cb.noExWC(ex, cookie)) cb.check() q.begin_op(cb.op, cb.ex) cb.check() + f = q.opAsync() + def throwerEx(future): + try: + future.result() + test(false) + except: + throwEx(t) + try: + f.add_done_callback(throwerEx) + except Exception as ex: + try: + throwEx(t) + except Exception as ex2: + test(type(ex) == type(ex2)) + f.add_done_callback_async(throwerEx) + try: + f.result() + except: + pass + q.begin_op(lambda: cb.opWC(cookie), lambda ex: cb.exWC(ex, cookie)) cb.check() p.begin_op(cb.noOp, cb.ex, cb.sent) cb.check() + f = p.opAsync() + try: + f.add_sent_callback(lambda f, s: throwEx(t)) + except Exception as ex: + try: + throwEx(t) + except Exception as ex2: + test(type(ex) == type(ex2)) + #f.add_sent_callback_async(throwerSent) + f.result() + p.begin_op(lambda: cb.noOpWC(cookie), lambda ex: cb.exWC(ex, cookie), lambda ss: cb.sentWC(ss, cookie)) cb.check() |