diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-07-15 10:22:40 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-07-15 10:22:40 +0200 |
commit | 710a9221852d6c92b1727a429a33b38f1f949352 (patch) | |
tree | 6bc9ac9ed04a6b1858d8fc30282d4f18ef04abbb /cpp/src/Ice/OutgoingAsync.cpp | |
parent | - Fix for ICE-5578 - Python build failure (diff) | |
download | ice-710a9221852d6c92b1727a429a33b38f1f949352.tar.bz2 ice-710a9221852d6c92b1727a429a33b38f1f949352.tar.xz ice-710a9221852d6c92b1727a429a33b38f1f949352.zip |
Fixed collocation optimization to use the dispatcher, minor test fixes
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 99 |
1 files changed, 51 insertions, 48 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 8b12805ba4f..e8aadb6fb38 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -51,7 +51,8 @@ class AsynchronousException : public DispatchWorkItem { public: - AsynchronousException(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result, const Ice::Exception& ex) : + AsynchronousException(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result, + const Ice::Exception& ex) : DispatchWorkItem(connection), _result(result), _exception(ex.ice_clone()) { } @@ -88,28 +89,6 @@ private: const Ice::AsyncResultPtr _result; }; -class AsynchronousTimeout : public DispatchWorkItem -{ -public: - - AsynchronousTimeout(const Ice::ConnectionPtr& connection, const IceInternal::RequestHandlerPtr& handler, const Ice::AsyncResultPtr& result) : - DispatchWorkItem(connection), _handler(handler), _outAsync(OutgoingAsyncMessageCallbackPtr::dynamicCast(result)) - { - assert(_outAsync); - } - - virtual void - run() - { - _handler->asyncRequestTimedOut(_outAsync); - } - -private: - - IceInternal::RequestHandlerPtr _handler; - IceInternal::OutgoingAsyncMessageCallbackPtr _outAsync; -}; - }; Ice::AsyncResult::AsyncResult(const CommunicatorPtr& communicator, @@ -269,7 +248,7 @@ Ice::AsyncResult::__invokeSentAsync() // try { - _instance->clientThreadPool()->execute(new AsynchronousSent(_cachedConnection, this)); + _instance->clientThreadPool()->dispatch(new AsynchronousSent(_cachedConnection, this)); } catch(const Ice::CommunicatorDestroyedException&) { @@ -301,7 +280,7 @@ Ice::AsyncResult::__invokeExceptionAsync(const Ice::Exception& ex) // CommunicatorDestroyedException is the only exception that can propagate directly // from this method. // - _instance->clientThreadPool()->execute(new AsynchronousException(_cachedConnection, this, ex)); + _instance->clientThreadPool()->dispatch(new AsynchronousException(_cachedConnection, this, ex)); } void @@ -343,16 +322,7 @@ Ice::AsyncResult::runTimerTask() // Implementation of TimerTask::runTimerTask() if(handler) { - Ice::ConnectionPtr connection; - try - { - connection = handler->getConnection(false); - } - catch(const Ice::LocalException&) - { - // Ignore. - } - _instance->clientThreadPool()->execute(new AsynchronousTimeout(connection, handler, this)); + handler->asyncRequestTimedOut(OutgoingAsyncMessageCallbackPtr::dynamicCast(this)); } } @@ -436,6 +406,33 @@ Ice::AsyncResult::__warning() const } } +void +IceInternal::OutgoingAsyncMessageCallback::__dispatchInvocationTimeout(const ThreadPoolPtr& threadPool, + const Ice::ConnectionPtr& connection) +{ + class InvocationTimeoutCall : public DispatchWorkItem + { + public: + + InvocationTimeoutCall(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::ConnectionPtr& connection) : + DispatchWorkItem(connection), _outAsync(outAsync) + { + } + + virtual void + run() + { + InvocationTimeoutException ex(__FILE__, __LINE__); + _outAsync->__finished(ex); + } + + private: + + const OutgoingAsyncMessageCallbackPtr _outAsync; + }; + threadPool->dispatch(new InvocationTimeoutCall(this, connection)); +} + IceInternal::OutgoingAsync::OutgoingAsync(const ObjectPrx& prx, const std::string& operation, const CallbackBasePtr& delegate, @@ -451,6 +448,7 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod { _handler = 0; _cnt = 0; + _sent = false; _mode = mode; _sentSynchronously = false; @@ -534,6 +532,7 @@ IceInternal::OutgoingAsync::__sent() bool alreadySent = _state & Sent; // Expected in case of a retry. _state |= Sent; + _sent = true; assert(!(_state & Done)); if(_proxy->__reference()->getMode() != Reference::ModeTwoway) @@ -562,7 +561,7 @@ IceInternal::OutgoingAsync::__invokeSent() } void -IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc, bool sent) +IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc) { { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); @@ -582,7 +581,7 @@ IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc, bool sent) // try { - if(!handleException(exc, sent)) // This will throw if the invocation can't be retried. + if(!handleException(exc)) // This will throw if the invocation can't be retried. { return; // Can't be retried immediately. } @@ -747,7 +746,7 @@ IceInternal::OutgoingAsync::__finished() } catch(const LocalException& ex) { - __finished(ex, true); + __finished(ex); return; } @@ -762,6 +761,7 @@ IceInternal::OutgoingAsync::__invoke(bool synchronous) { try { + _sent = false; _handler = _proxy->__getRequestHandler(true); AsyncStatus status = _handler->sendAsyncRequest(this); if(status & AsyncStatusSent) @@ -804,7 +804,7 @@ IceInternal::OutgoingAsync::__invoke(bool synchronous) } catch(const Ice::Exception& ex) { - if(!handleException(ex, false)) // This will throw if the invocation can't be retried. + if(!handleException(ex)) // This will throw if the invocation can't be retried. { break; // Can't be retried immediately. } @@ -814,11 +814,11 @@ IceInternal::OutgoingAsync::__invoke(bool synchronous) } bool -IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc, bool sent) +IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc) { try { - int interval = _proxy->__handleException(exc, _handler, _mode, sent, _cnt); + int interval = _proxy->__handleException(exc, _handler, _mode, _sent, _cnt); _observer.retried(); // Invocation is being retried. if(interval > 0) { @@ -888,14 +888,17 @@ IceInternal::BatchOutgoingAsync::__invokeSent() } void -IceInternal::BatchOutgoingAsync::__finished(const Ice::Exception& exc, bool) +IceInternal::BatchOutgoingAsync::__finished(const Ice::Exception& exc) { - _childObserver.failed(exc.ice_name()); - _childObserver.detach(); - if(_timeoutRequestHandler) { - _instance->timer()->cancel(this); - _timeoutRequestHandler = 0; + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + _childObserver.failed(exc.ice_name()); + _childObserver.detach(); + if(_timeoutRequestHandler) + { + _instance->timer()->cancel(this); + _timeoutRequestHandler = 0; + } } __invokeException(exc); } @@ -1042,7 +1045,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt using BatchOutgoingAsync::__sent; #endif - virtual void __finished(const Ice::Exception& ex, bool) + virtual void __finished(const Ice::Exception& ex) { _childObserver.failed(ex.ice_name()); _childObserver.detach(); |