diff options
Diffstat (limited to 'cpp/src/Glacier2/RequestQueue.cpp')
-rw-r--r-- | cpp/src/Glacier2/RequestQueue.cpp | 149 |
1 files changed, 77 insertions, 72 deletions
diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp index 78eff63bce0..5acc66dbfa9 100644 --- a/cpp/src/Glacier2/RequestQueue.cpp +++ b/cpp/src/Glacier2/RequestQueue.cpp @@ -10,7 +10,6 @@ #include <Glacier2/RequestQueue.h> #include <Glacier2/Instance.h> #include <Glacier2/SessionRouterI.h> -#include <set> using namespace std; using namespace Ice; @@ -33,6 +32,19 @@ Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*, } } +void +Glacier2::Request::addBatchProxy(set<Ice::ObjectPrx>& batchProxies) +{ + set<Ice::ObjectPrx>::const_iterator p = batchProxies.find(_proxy); + if(p == batchProxies.end()) + { + batchProxies.insert(_proxy); + } + else if(p->get() != _proxy.get()) + { + const_cast<Ice::ObjectPrx&>(_proxy) = *p; + } +} Ice::AsyncResultPtr Glacier2::Request::invoke(const Callback_Object_ice_invokePtr& cb) @@ -52,7 +64,7 @@ Glacier2::Request::invoke(const Callback_Object_ice_invokePtr& cb) { ByteSeq outParams; if(_forwardContext) - { + { if(_sslContext.size() > 0) { Ice::Context ctx = _current.ctx; @@ -81,7 +93,7 @@ Glacier2::Request::invoke(const Callback_Object_ice_invokePtr& cb) { Ice::AsyncResultPtr result; if(_forwardContext) - { + { if(_sslContext.size() > 0) { Ice::Context ctx = _current.ctx; @@ -131,7 +143,7 @@ Glacier2::Request::override(const RequestPtr& other) const // // Don't override if the override isn't the same. - // + // if(_override != other->_override) { return false; @@ -163,13 +175,13 @@ Glacier2::Request::exception(const Ice::Exception& ex) } } -void +void Glacier2::Request::queued() { if(!_proxy->ice_isTwoway()) { #if (defined(_MSC_VER) && (_MSC_VER >= 1600)) - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(static_cast<const Byte*>(nullptr), + _amdCB->ice_response(true, pair<const Byte*, const Byte*>(static_cast<const Byte*>(nullptr), static_cast<const Byte*>(nullptr))); #else _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0)); @@ -177,8 +189,8 @@ Glacier2::Request::queued() } } -Glacier2::RequestQueue::RequestQueue(const RequestQueueThreadPtr& requestQueueThread, - const InstancePtr& instance, +Glacier2::RequestQueue::RequestQueue(const RequestQueueThreadPtr& requestQueueThread, + const InstancePtr& instance, const Ice::ConnectionPtr& connection) : _requestQueueThread(requestQueueThread), _instance(instance), @@ -219,7 +231,17 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request) } } } - + + if(!_connection) + { + // + // If it's a batch request, we make sure we use a unique batch proxy object for the queued + // batch requests. We want all the requests for the same batch proxy to be queued on the + // same proxy object. + // + request->addBatchProxy(_batchProxies); + } + // // No override, we add the new request. // @@ -237,7 +259,7 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request) } void -Glacier2::RequestQueue::flushRequests(set<Ice::ObjectPrx>& batchProxies) +Glacier2::RequestQueue::flushRequests() { IceUtil::Mutex::Lock lock(*this); if(_connection) @@ -250,7 +272,34 @@ Glacier2::RequestQueue::flushRequests(set<Ice::ObjectPrx>& batchProxies) } else { - flush(batchProxies); + for(deque<RequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + { + try + { + if(_observer) + { + _observer->forwarded(!_connection); + } + assert(_callback); + (*p)->invoke(_callback); + } + catch(const Ice::LocalException&) + { + // Ignore, this can occur for batch requests. + } + } + _requests.clear(); + + for(set<Ice::ObjectPrx>::const_iterator q = _batchProxies.begin(); q != _batchProxies.end(); ++q) + { + (*q)->begin_ice_flushBatchRequests(); + } + _batchProxies.clear(); + } + + if(_destroyed && _requests.empty()) + { + destroyInternal(); } } @@ -279,20 +328,6 @@ Glacier2::RequestQueue::updateObserver(const Glacier2::Instrumentation::SessionO } void -Glacier2::RequestQueue::destroyInternal() -{ - // - // Must be called with the mutex locked. - // - - // - // Remove cyclic references. - // - const_cast<Ice::Callback_Object_ice_invokePtr&>(_callback) = 0; - const_cast<Ice::Callback_Connection_flushBatchRequestsPtr&>(_flushCallback) = 0; -} - -void Glacier2::RequestQueue::flush() { assert(_connection); @@ -314,7 +349,7 @@ Glacier2::RequestQueue::flush() if(!result) { flushBatchRequests = true; - } + } else if(!result->sentSynchronously() && !result->isCompleted()) { _pendingSend = true; @@ -346,44 +381,20 @@ Glacier2::RequestQueue::flush() _pendingSendRequest = 0; } } - - if(_destroyed && _requests.empty()) - { - destroyInternal(); - } } void -Glacier2::RequestQueue::flush(set<Ice::ObjectPrx>& batchProxies) +Glacier2::RequestQueue::destroyInternal() { - assert(!_connection); - - for(deque<RequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) - { - try - { - if(_observer) - { - _observer->forwarded(!_connection); - } - assert(_callback); - Ice::AsyncResultPtr result = (*p)->invoke(_callback); - if(!result) - { - batchProxies.insert((*p)->getProxy()); - } - } - catch(const Ice::LocalException&) - { - // Ignore, this can occur for batch requests. - } - } - _requests.clear(); + // + // Must be called with the mutex locked. + // - if(_destroyed) - { - destroyInternal(); - } + // + // Remove cyclic references. + // + const_cast<Ice::Callback_Object_ice_invokePtr&>(_callback) = 0; + const_cast<Ice::Callback_Connection_flushBatchRequestsPtr&>(_flushCallback) = 0; } void @@ -427,7 +438,7 @@ Glacier2::RequestQueue::exception(const Ice::Exception& ex, const RequestPtr& re } } -void +void Glacier2::RequestQueue::sent(bool sentSynchronously, const RequestPtr& request) { if(_connection && !sentSynchronously) @@ -454,12 +465,12 @@ Glacier2::RequestQueueThread::~RequestQueueThread() assert(_queues.empty()); } -void +void Glacier2::RequestQueueThread::destroy() { { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - + assert(!_destroy); _destroy = true; _sleep = false; @@ -534,7 +545,7 @@ Glacier2::RequestQueueThread::run() } // - // If the queue is being destroyed and there's no requests or responses + // If the queue is being destroyed and there's no requests or responses // to send, we're done. // if(_destroy && _queues.empty()) @@ -543,7 +554,7 @@ Glacier2::RequestQueueThread::run() } assert(!_queues.empty() && !_sleep); - + queues.swap(_queues); if(_sleepTime > IceUtil::Time()) @@ -552,16 +563,10 @@ Glacier2::RequestQueueThread::run() _sleepDuration = _sleepTime; } } - - set<Ice::ObjectPrx> flushProxySet; - for(vector<RequestQueuePtr>::const_iterator p = queues.begin(); p != queues.end(); ++p) - { - (*p)->flushRequests(flushProxySet); - } - for(set<Ice::ObjectPrx>::const_iterator q = flushProxySet.begin(); q != flushProxySet.end(); ++q) + for(vector<RequestQueuePtr>::const_iterator p = queues.begin(); p != queues.end(); ++p) { - (*q)->begin_ice_flushBatchRequests(); + (*p)->flushRequests(); } } } |