diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-05-23 11:59:44 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-05-23 11:59:44 +0200 |
commit | d81701ca8182942b7936f9fd84a019b695e9c890 (patch) | |
tree | dc036c9d701fbbe1afad67782bd78572c0f61974 /cpp/src/Ice/OutgoingAsync.cpp | |
parent | Fixed bug ICE-5543: stringToIdentity bug with escaped escapes (diff) | |
download | ice-d81701ca8182942b7936f9fd84a019b695e9c890.tar.bz2 ice-d81701ca8182942b7936f9fd84a019b695e9c890.tar.xz ice-d81701ca8182942b7936f9fd84a019b695e9c890.zip |
Added support for invocation timeouts and ACM heartbeats
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 210 |
1 files changed, 134 insertions, 76 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 891278f7041..fb470bceaf2 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -62,7 +62,7 @@ public: virtual void run() { - _result->__exception(*_exception.get()); + _result->__invokeException(*_exception.get()); } private: @@ -83,7 +83,7 @@ public: virtual void run() { - _result->__sent(); + _result->__invokeSent(); } private: @@ -91,6 +91,30 @@ private: const Ice::AsyncResultPtr _result; }; +class AsynchronousTimeout : public DispatchWorkItem +{ +public: + + AsynchronousTimeout(const IceInternal::InstancePtr& instance, + const IceInternal::RequestHandlerPtr& handler, + const Ice::AsyncResultPtr& result) : + DispatchWorkItem(instance), _handler(handler), _outAsync(OutgoingAsyncMessageCallbackPtr::dynamicCast(result)) + { + assert(_outAsync); + } + + virtual void + run() + { + _handler->asyncRequestTimedOut(_outAsync); + } + +private: + + IceInternal::RequestHandlerPtr _handler; + IceInternal::OutgoingAsyncMessageCallbackPtr _outAsync; +}; + }; Ice::AsyncResult::AsyncResult(const CommunicatorPtr& communicator, @@ -218,7 +242,7 @@ Ice::AsyncResult::__throwUserException() } void -Ice::AsyncResult::__sent() +Ice::AsyncResult::__invokeSent() { // // Note: no need to change the _state here, specializations are responsible for @@ -253,7 +277,7 @@ Ice::AsyncResult::__sent() } void -Ice::AsyncResult::__sentAsync() +Ice::AsyncResult::__invokeSentAsync() { // // This is called when it's not safe to call the sent callback synchronously @@ -270,7 +294,7 @@ Ice::AsyncResult::__sentAsync() } void -Ice::AsyncResult::__exception(const Ice::Exception& ex) +Ice::AsyncResult::__invokeException(const Ice::Exception& ex) { { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); @@ -280,28 +304,11 @@ Ice::AsyncResult::__exception(const Ice::Exception& ex) _monitor.notifyAll(); } - if(_callback) - { - try - { - AsyncResultPtr self(this); - _callback->__completed(self); - } - catch(const std::exception& ex) - { - __warning(ex); - } - catch(...) - { - __warning(); - } - } - - _observer.detach(); + __invokeCompleted(); } void -Ice::AsyncResult::__exceptionAsync(const Ice::Exception& ex) +Ice::AsyncResult::__invokeExceptionAsync(const Ice::Exception& ex) { // // This is called when it's not safe to call the exception callback synchronously @@ -315,7 +322,7 @@ Ice::AsyncResult::__exceptionAsync(const Ice::Exception& ex) } void -Ice::AsyncResult::__response() +Ice::AsyncResult::__invokeCompleted() { // // Note: no need to change the _state here, specializations are responsible for @@ -343,6 +350,22 @@ Ice::AsyncResult::__response() } void +Ice::AsyncResult::runTimerTask() // Implementation of TimerTask::runTimerTask() +{ + RequestHandlerPtr handler; + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + swap(handler, _timeoutRequestHandler); + } + + if(handler) + { + _instance->clientThreadPool()->execute(new AsynchronousTimeout(_instance, handler, this)); + } +} + + +void Ice::AsyncResult::__check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation) { __check(r, operation); @@ -499,8 +522,14 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod } } +AsyncStatus +IceInternal::OutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool compress, bool response) +{ + return connection->sendAsyncRequest(this, compress, response); +} + bool -IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection) +IceInternal::OutgoingAsync::__sent() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); @@ -515,24 +544,22 @@ IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection) { _observer.detach(); } + if(_timeoutRequestHandler) + { + _instance->timer()->cancel(this); + _timeoutRequestHandler = 0; + } _state |= Done | OK; _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation } - else if(connection->timeout() > 0) - { - assert(!_timerTaskConnection); - _timerTaskConnection = connection; - IceUtil::Time timeout = IceUtil::Time::milliSeconds(connection->timeout()); - _instance->timer()->schedule(this, timeout); - } _monitor.notifyAll(); return !alreadySent && _callback && _callback->__hasSentCallback(); } void -IceInternal::OutgoingAsync::__sent() +IceInternal::OutgoingAsync::__invokeSent() { - ::Ice::AsyncResult::__sent(); + ::Ice::AsyncResult::__invokeSent(); } void @@ -543,10 +570,10 @@ IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent assert(!(_state & Done)); _remoteObserver.failed(exc.ice_name()); _remoteObserver.detach(); - if(_timerTaskConnection) + if(_timeoutRequestHandler) { _instance->timer()->cancel(this); - _timerTaskConnection = 0; + _timeoutRequestHandler = 0; } } @@ -564,12 +591,12 @@ IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent } else { - __send(false); + __invoke(false); } } catch(const Ice::LocalException& ex) { - __exception(ex); + __invokeException(ex); } } @@ -584,6 +611,8 @@ IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc) _remoteObserver.failed(exc.get()->ice_name()); _remoteObserver.detach(); + assert(!_timeoutRequestHandler); + try { int interval = handleException(exc); // This will throw if the invocation can't be retried. @@ -593,12 +622,12 @@ IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc) } else { - __send(false); + __invoke(false); } } catch(const Ice::LocalException& ex) { - __exception(ex); + __invokeException(ex); } } @@ -620,10 +649,10 @@ IceInternal::OutgoingAsync::__finished() } _remoteObserver.detach(); - if(_timerTaskConnection) + if(_timeoutRequestHandler) { _instance->timer()->cancel(this); - _timerTaskConnection = 0; + _timeoutRequestHandler = 0; } _is.read(replyStatus); @@ -759,11 +788,11 @@ IceInternal::OutgoingAsync::__finished() } assert(replyStatus == replyOK || replyStatus == replyUserException); - __response(); + __invokeCompleted(); } bool -IceInternal::OutgoingAsync::__send(bool synchronous) +IceInternal::OutgoingAsync::__invoke(bool synchronous) { while(true) { @@ -771,7 +800,8 @@ IceInternal::OutgoingAsync::__send(bool synchronous) try { _delegate = _proxy->__getDelegate(true); - AsyncStatus status = _delegate->__getRequestHandler()->sendAsyncRequest(this); + RequestHandlerPtr handler = _delegate->__getRequestHandler(); + AsyncStatus status = handler->sendAsyncRequest(this); if(status & AsyncStatusSent) { if(synchronous) @@ -779,14 +809,28 @@ IceInternal::OutgoingAsync::__send(bool synchronous) _sentSynchronously = true; if(status & AsyncStatusInvokeSentCallback) { - __sent(); // Call the sent callback from the user thread. + __invokeSent(); // Call the sent callback from the user thread. } } else { if(status & AsyncStatusInvokeSentCallback) { - __sentAsync(); // Call the sent callback from a client thread pool thread. + __invokeSentAsync(); // Call the sent callback from a client thread pool thread. + } + } + } + + if(_proxy->ice_isTwoway() || !(status & AsyncStatusSent)) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + if(!(_state & Done)) + { + int invocationTimeout = handler->getReference()->getInvocationTimeout(); + if(invocationTimeout > 0) + { + _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout)); + _timeoutRequestHandler = handler; } } } @@ -870,22 +914,6 @@ IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc, bool return 0; // Keep the compiler happy. } -void -IceInternal::OutgoingAsync::runTimerTask() // Implementation of TimerTask::runTimerTask() -{ - Ice::ConnectionIPtr connection; - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - connection = _timerTaskConnection; - _timerTaskConnection = 0; - } - - if(connection) - { - connection->exception(Ice::TimeoutException(__FILE__, __LINE__)); - } -} - IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& communicator, const InstancePtr& instance, const std::string& operation, @@ -895,14 +923,25 @@ IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& commu { } +AsyncStatus +IceInternal::BatchOutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool, bool) +{ + return connection->flushAsyncBatchRequests(this); +} + bool -IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* /*connection*/) +IceInternal::BatchOutgoingAsync::__sent() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); assert(!_exception.get()); _state |= Done | OK | Sent; _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation _remoteObserver.detach(); + if(_timeoutRequestHandler) + { + _instance->timer()->cancel(this); + _timeoutRequestHandler = 0; + } _monitor.notifyAll(); if(!_callback || !_callback->__hasSentCallback()) { @@ -913,9 +952,9 @@ IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* /*connection*/) } void -IceInternal::BatchOutgoingAsync::__sent() +IceInternal::BatchOutgoingAsync::__invokeSent() { - ::Ice::AsyncResult::__sent(); + ::Ice::AsyncResult::__invokeSent(); } void @@ -923,7 +962,12 @@ IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc, bool { _remoteObserver.failed(exc.ice_name()); _remoteObserver.detach(); - __exception(exc); + if(_timeoutRequestHandler) + { + _instance->timer()->cancel(this); + _timeoutRequestHandler = 0; + } + __invokeException(exc); } IceInternal::ProxyBatchOutgoingAsync::ProxyBatchOutgoingAsync(const Ice::ObjectPrx& proxy, @@ -937,7 +981,7 @@ IceInternal::ProxyBatchOutgoingAsync::ProxyBatchOutgoingAsync(const Ice::ObjectP } void -IceInternal::ProxyBatchOutgoingAsync::__send() +IceInternal::ProxyBatchOutgoingAsync::__invoke() { checkSupportedProtocol(_proxy->__reference()->getProtocol()); @@ -950,13 +994,27 @@ IceInternal::ProxyBatchOutgoingAsync::__send() try { delegate = _proxy->__getDelegate(true); - AsyncStatus status = delegate->__getRequestHandler()->flushAsyncBatchRequests(this); + RequestHandlerPtr handler = delegate->__getRequestHandler(); + AsyncStatus status = handler->sendAsyncRequest(this); if(status & AsyncStatusSent) { _sentSynchronously = true; if(status & AsyncStatusInvokeSentCallback) { - __sent(); + __invokeSent(); + } + } + else + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + if(!(_state & Done)) + { + int invocationTimeout = handler->getReference()->getInvocationTimeout(); + if(invocationTimeout > 0) + { + _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout)); + _timeoutRequestHandler = handler; + } } } } @@ -979,7 +1037,7 @@ IceInternal::ConnectionBatchOutgoingAsync::ConnectionBatchOutgoingAsync(const Co } void -IceInternal::ConnectionBatchOutgoingAsync::__send() +IceInternal::ConnectionBatchOutgoingAsync::__invoke() { AsyncStatus status = _connection->flushAsyncBatchRequests(this); if(status & AsyncStatusSent) @@ -987,7 +1045,7 @@ IceInternal::ConnectionBatchOutgoingAsync::__send() _sentSynchronously = true; if(status & AsyncStatusInvokeSentCallback) { - __sent(); + __invokeSent(); } } } @@ -1038,7 +1096,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt { } - virtual bool __sent(Ice::ConnectionI*) + virtual bool __sent() { _remoteObserver.detach(); _outAsync->check(false); @@ -1120,11 +1178,11 @@ IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread) // if(!_sentSynchronously || !userThread) { - __sentAsync(); + __invokeSentAsync(); } else { - AsyncResult::__sent(); + AsyncResult::__invokeSent(); } } } |