diff options
Diffstat (limited to 'cpp/src/Glacier2/RequestQueue.cpp')
-rw-r--r-- | cpp/src/Glacier2/RequestQueue.cpp | 340 |
1 files changed, 129 insertions, 211 deletions
diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp index 399884cf2f2..e508fa1ad51 100644 --- a/cpp/src/Glacier2/RequestQueue.cpp +++ b/cpp/src/Glacier2/RequestQueue.cpp @@ -14,43 +14,40 @@ using namespace std; using namespace Ice; using namespace Glacier2; -namespace Glacier2 +namespace { // // AMI callback class for twoway requests // -// NOTE: the received response isn't sent back directly with the AMD -// callback. Instead it's queued and the request queue thread is -// responsible for sending back the response. It's necessary because -// sending back the response might block. -// class AMI_Array_Object_ice_invokeI : public AMI_Array_Object_ice_invoke { public: - - AMI_Array_Object_ice_invokeI(const RequestQueuePtr& requestQueue, const AMD_Array_Object_ice_invokePtr& amdCB) : - _requestQueue(requestQueue), - _amdCB(amdCB) + + AMI_Array_Object_ice_invokeI(const AMD_Array_Object_ice_invokePtr& amdCB) : _amdCB(amdCB) { - assert(_amdCB); } - + virtual void ice_response(bool ok, const pair<const Byte*, const Byte*>& outParams) { - _requestQueue->addResponse(new Response(_amdCB, ok, outParams)); + if(_amdCB) + { + _amdCB->ice_response(ok, outParams); + } } virtual void ice_exception(const Exception& ex) { - _requestQueue->addResponse(new Response(_amdCB, ex)); + if(_amdCB) + { + _amdCB->ice_exception(ex); + } } private: - const RequestQueuePtr _requestQueue; const AMD_Array_Object_ice_invokePtr _amdCB; }; @@ -72,9 +69,7 @@ Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*, // if(!_proxy->ice_isTwoway()) { - bool ok = true; - pair<const Byte*, const Byte*> outParams(0, 0); - _amdCB->ice_response(ok, outParams); + _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0)); } Context::const_iterator p = current.ctx.find("_ovrd"); @@ -86,7 +81,7 @@ Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*, bool -Glacier2::Request::invoke(const RequestQueuePtr& requestQueue) +Glacier2::Request::invoke() { pair<const Byte*, const Byte*> inPair; if(_inParams.size() == 0) @@ -98,69 +93,73 @@ Glacier2::Request::invoke(const RequestQueuePtr& requestQueue) inPair.first = &_inParams[0]; inPair.second = inPair.first + _inParams.size(); } - if(_proxy->ice_isTwoway()) + + if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram()) { - AMI_Array_Object_ice_invokePtr cb = new AMI_Array_Object_ice_invokeI(requestQueue, _amdCB); + ByteSeq outParams; if(_forwardContext) - { + { if(_sslContext.size() > 0) { Ice::Context ctx = _current.ctx; ctx.insert(_sslContext.begin(), _sslContext.end()); - _proxy->ice_invoke_async(cb, _current.operation, _current.mode, inPair, ctx); + _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, ctx); } else { - _proxy->ice_invoke_async(cb, _current.operation, _current.mode, inPair, _current.ctx); + _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _current.ctx); } } else { if(_sslContext.size() > 0) { - _proxy->ice_invoke_async(cb, _current.operation, _current.mode, inPair, _sslContext); + _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _sslContext); } else { - _proxy->ice_invoke_async(cb, _current.operation, _current.mode, inPair); + _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams); } } - return true; // A twoway method is being dispatched. + return true; // Batch invocation. } else { - try + AMI_Array_Object_ice_invokePtr amiCB; + if(_proxy->ice_isTwoway()) + { + amiCB = new AMI_Array_Object_ice_invokeI(_amdCB); + } + else { - ByteSeq outParams; - if(_forwardContext) + amiCB = new AMI_Array_Object_ice_invokeI(0); + } + + if(_forwardContext) + { + if(_sslContext.size() > 0) { - if(_sslContext.size() > 0) - { - Ice::Context ctx = _current.ctx; - ctx.insert(_sslContext.begin(), _sslContext.end()); - _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, ctx); - } - else - { - _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _current.ctx); - } + Ice::Context ctx = _current.ctx; + ctx.insert(_sslContext.begin(), _sslContext.end()); + _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair, ctx); } else { - if(_sslContext.size() > 0) - { - _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _sslContext); - } - else - { - _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams); - } + _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair, _current.ctx); } } - catch(const LocalException&) + else { + if(_sslContext.size() > 0) + { + _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair, _sslContext); + } + else + { + _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair); + } } - return false; + return false; // Not a batch invocation. } } @@ -195,74 +194,68 @@ Glacier2::Request::override(const RequestPtr& other) const return _override == other->_override; } -bool -Glacier2::Request::isBatch() const -{ - return _proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram(); -} - -ConnectionPtr -Glacier2::Request::getConnection() const -{ - return _proxy->ice_getConnection(); -} - -Glacier2::Response::Response(const AMD_Array_Object_ice_invokePtr& amdCB, bool ok, - const pair<const Byte*, const Byte*>& outParams) : - _amdCB(amdCB), - _ok(ok), - _outParams(outParams.first, outParams.second) +Glacier2::RequestQueue::RequestQueue(const RequestQueueThreadPtr& requestQueueThread) : + _requestQueueThread(requestQueueThread) { } -Glacier2::Response::Response(const AMD_Array_Object_ice_invokePtr& amdCB, const Exception& ex) : - _amdCB(amdCB), - _ok(false), - _exception(ex.ice_clone()) +bool +Glacier2::RequestQueue::addRequest(const RequestPtr& request) { + IceUtil::Mutex::Lock lock(*this); + for(vector<RequestPtr>::iterator p = _requests.begin(); p != _requests.end(); ++p) + { + // + // If the new request overrides an old one, then abort the old + // request and replace it with the new request. + // + if(request->override(*p)) + { + *p = request; + return true; + } + } + + // + // No override, we add the new request. + // + if(_requests.empty()) + { + _requestQueueThread->flushRequestQueue(this); // This might throw if the thread is destroyed. + } + _requests.push_back(request); + return false; } void -Glacier2::Response::invoke() +Glacier2::RequestQueue::flushRequests(set<Ice::ObjectPrx>& batchProxies) { - if(_exception.get()) + IceUtil::Mutex::Lock lock(*this); + for(vector<RequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) { - _amdCB->ice_exception(*_exception.get()); - } - else - { - pair<const Byte*, const Byte*> outPair; - if(_outParams.size() == 0) - { - outPair.first = outPair.second = 0; - } - else + if((*p)->invoke()) // If batch invocation, add the proxy to the batch proxy set. { - outPair.first = &_outParams[0]; - outPair.second = outPair.first + _outParams.size(); + batchProxies.insert((*p)->getProxy()); } - _amdCB->ice_response(_ok, outPair); } + _requests.clear(); } -Glacier2::RequestQueue::RequestQueue(const IceUtil::Time& sleepTime) : +Glacier2::RequestQueueThread::RequestQueueThread(const IceUtil::Time& sleepTime) : _sleepTime(sleepTime), _destroy(false), _sleep(false) { } -Glacier2::RequestQueue::~RequestQueue() +Glacier2::RequestQueueThread::~RequestQueueThread() { - IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - assert(_destroy); - assert(_requests.empty()); - assert(_responses.empty()); + assert(_queues.empty()); } void -Glacier2::RequestQueue::destroy() +Glacier2::RequestQueueThread::destroy() { { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); @@ -273,82 +266,42 @@ Glacier2::RequestQueue::destroy() notify(); } - // - // We don't want to wait for the RequestQueue thread, because this - // destroy() operation is called when sessions expire or are - // destroyed, in which case we do not want the session handler - // thread to block here. Therefore we don't call join(), but - // instead detach the thread right after we start it. - // - //getThreadControl().join(); + getThreadControl().join(); } -bool -Glacier2::RequestQueue::addRequest(const RequestPtr& request) +void +Glacier2::RequestQueueThread::flushRequestQueue(const RequestQueuePtr& queue) { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - if(_destroy) { - throw ObjectNotExistException(__FILE__, __LINE__); - } - - for(vector<RequestPtr>::iterator p = _requests.begin(); p != _requests.end(); ++p) - { - // - // If the new request overrides an old one, then abort the old - // request and replace it with the new request. - // - if(request->override(*p)) - { - *p = request; - return true; - } + throw Ice::ObjectNotExistException(__FILE__, __LINE__); } - // - // No override, we add the new request. - // - _requests.push_back(request); - if(!_sleep) + if(_queues.empty() && !_sleep) { - // - // No need to notify if the request queue thread is sleeping, - // once it wakes up it will check if there's requests to send. - // notify(); } - return false; + _queues.push_back(queue); } void -Glacier2::RequestQueue::addResponse(const ResponsePtr& response) +Glacier2::RequestQueueThread::run() { - IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - _responses.push_back(response); - notify(); -} - -void -Glacier2::RequestQueue::run() -{ - RequestQueuePtr self = this; // This is to avoid creating a temporary Ptr for each call to Request::invoke() - ptrdiff_t dispatchCount = 0; // The dispatch count keeps track of the number of outstanding twoway requests. while(true) { - vector<RequestPtr> requests; - vector<ResponsePtr> responses; + vector<RequestQueuePtr> queues; { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); // - // Wait indefinitely if there's no requests/responses to + // Wait indefinitely if there's no requests to // send. If the queue is being destroyed we still need to // wait until all the responses for twoway requests are // received. // - while((!_destroy || dispatchCount != 0) && _responses.empty() && (_requests.empty() || _sleep)) + while(!_destroy && (_queues.empty() || _sleep)) { if(_sleep) { @@ -361,6 +314,7 @@ Glacier2::RequestQueue::run() { _sleepDuration -= IceUtil::Time::now(IceUtil::Time::Monotonic) - now; } + if(_sleepDuration <= IceUtil::Time()) { _sleep = false; @@ -373,90 +327,54 @@ Glacier2::RequestQueue::run() } // - // If the queue is being destroyed and there's no requests - // or responses to send, we're done. + // If the queue is being destroyed and there's no requests or responses + // to send, we're done. // - if(_destroy && _requests.empty() && _responses.empty()) + if(_destroy && _queues.empty()) { - assert(dispatchCount == 0); // We would have blocked in the wait() above otherwise. return; } - // - // If there's requests to sent and we're not sleeping, - // send the requests. If a sleep time is configured, we - // set the sleep duration and set the sleep flag to make - // sure we'll sleep again once we're done sending requests - // and responses. - // - if(!_requests.empty() && !_sleep) - { - requests.swap(_requests); - if(_sleepTime > IceUtil::Time()) - { - _sleep = true; - _sleepDuration = _sleepTime; - } - } - if(!_responses.empty()) + assert(!_queues.empty() && !_sleep); + + queues.swap(_queues); + + if(_sleepTime > IceUtil::Time()) { - responses.swap(_responses); + _sleep = true; + _sleepDuration = _sleepTime; } } - // - // Send requests, flush batch requests, and sleep outside the - // thread synchronization, so that new messages can be added - // while this is being done. - // + set<Ice::ObjectPrx> flushProxySet; + for(vector<RequestQueuePtr>::const_iterator p = queues.begin(); p != queues.end(); ++p) + { + (*p)->flushRequests(flushProxySet); + } - set<ConnectionPtr> flushSet; - - for(vector<RequestPtr>::const_iterator p = requests.begin(); p != requests.end(); ++p) + set<Ice::ConnectionPtr> flushConnectionSet; + for(set<Ice::ObjectPrx>::const_iterator q = flushProxySet.begin(); q != flushProxySet.end(); ++q) { - if((*p)->isBatch()) - { - try - { - flushSet.insert((*p)->getConnection()); - } - catch(const LocalException&) - { - // Ignore. - } - } - // - // Invoke returns true if the request expects a response. - // If that's the case we increment the dispatch count to - // ensure that the thread won't be destroyed before the - // response is received. + // As an optimization, we only flush the proxy batch requests if we didn't + // already flush the requests of a proxy which is using the same connection. // - if((*p)->invoke(self)) // Exceptions are caught within invoke(). + Ice::ConnectionPtr connection = (*q)->ice_getCachedConnection(); + if(!connection || flushConnectionSet.find(connection) == flushConnectionSet.end()) { - ++dispatchCount; - } - } + class FlushCB : public AMI_Object_ice_flushBatchRequests + { + public: - for(set<ConnectionPtr>::const_iterator q = flushSet.begin(); q != flushSet.end(); ++q) - { - try - { - (*q)->flushBatchRequests(); - } - catch(const LocalException&) - { - // Ignore. - } - } + virtual void ice_exception(const Ice::Exception&) { } // Ignore. + }; + (*q)->ice_flushBatchRequests_async(new FlushCB()); - // - // Send the responses and decrement the dispatch count. - // - for(vector<ResponsePtr>::const_iterator r = responses.begin(); r != responses.end(); ++r) - { - (*r)->invoke(); + if(connection) + { + flushConnectionSet.insert(connection); + } + } } - dispatchCount -= responses.size(); } } |