diff options
Diffstat (limited to 'cpp/src/Glacier2/RequestQueue.cpp')
-rwxr-xr-x | cpp/src/Glacier2/RequestQueue.cpp | 355 |
1 files changed, 189 insertions, 166 deletions
diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp index a901b6208a1..85cb55c5c63 100755 --- a/cpp/src/Glacier2/RequestQueue.cpp +++ b/cpp/src/Glacier2/RequestQueue.cpp @@ -16,102 +16,6 @@ using namespace std; using namespace Ice; using namespace Glacier2; -namespace -{ - -// -// AMI base callback class for twoway/oneway requests -// -class IceInvokeI : public AMI_Array_Object_ice_invoke -{ -public: - - IceInvokeI(const AMD_Object_ice_invokePtr& amdCB, const InstancePtr& instance, const ConnectionPtr& connection) : - _amdCB(amdCB), - _instance(instance), - _connection(connection) - { - } - - virtual void - ice_exception(const Exception& ex) - { - // - // If the connection has been lost, destroy the session. - // - if(_connection) - { - if(dynamic_cast<const Ice::SocketException*>(&ex) || - dynamic_cast<const Ice::TimeoutException*>(&ex) || - dynamic_cast<const Ice::ProtocolException*>(&ex)) - { - try - { - _instance->sessionRouter()->destroySession(_connection); - } - catch(const Exception&) - { - } - } - } - - if(_amdCB) - { - _amdCB->ice_exception(ex); - } - } - -protected: - - const AMD_Object_ice_invokePtr _amdCB; - const InstancePtr _instance; - const ConnectionPtr _connection; -}; - -class TwowayIceInvokeI : public IceInvokeI -{ -public: - - TwowayIceInvokeI(const AMD_Object_ice_invokePtr& amdCB, const InstancePtr& instance, const ConnectionPtr& con) : - IceInvokeI(amdCB, instance, con) - { - } - - virtual void - ice_response(bool ok, const pair<const Byte*, const Byte*>& outParams) - { - _amdCB->ice_response(ok, outParams); - } -}; - -class OnewayIceInvokeI : public IceInvokeI, public Ice::AMISentCallback -{ -public: - - OnewayIceInvokeI(const AMD_Object_ice_invokePtr& amdCB, const InstancePtr& instance, const ConnectionPtr& con) : - IceInvokeI(amdCB, instance, con) - { - } - - virtual void - ice_response(bool ok, const pair<const Byte*, const Byte*>& outParams) - { - assert(false); - } - - virtual void - ice_sent() - { -#if (defined(_MSC_VER) && (_MSC_VER >= 1600)) - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(nullptr, nullptr)); -#else - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0)); -#endif - } -}; - -} - Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*, const Byte*>& inParams, const Current& current, bool forwardContext, const Ice::Context& sslContext, const AMD_Object_ice_invokePtr& amdCB) : @@ -122,18 +26,6 @@ Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*, _sslContext(sslContext), _amdCB(amdCB) { - // - // If this is a batch call, we can finish the AMD call right away. - // - if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram()) - { -#if (defined(_MSC_VER) && (_MSC_VER >= 1600)) - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(nullptr, nullptr)); -#else - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0)); -#endif - } - Context::const_iterator p = current.ctx.find("_ovrd"); if(p != current.ctx.end()) { @@ -142,8 +34,8 @@ Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*, } -bool -Glacier2::Request::invoke(const InstancePtr& instance, const Ice::ConnectionPtr& connection) +Ice::AsyncResultPtr +Glacier2::Request::invoke(const Callback_Object_ice_invokePtr& cb) { pair<const Byte*, const Byte*> inPair; if(_inParams.size() == 0) @@ -176,61 +68,44 @@ Glacier2::Request::invoke(const InstancePtr& instance, const Ice::ConnectionPtr& { if(_sslContext.size() > 0) { - _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _sslContext); + _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _sslContext); } else { _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams); } } - return true; // Batch invocation. + return 0; } else { - AMI_Array_Object_ice_invokePtr amiCB; - if(_proxy->ice_isTwoway()) - { - amiCB = new TwowayIceInvokeI(_amdCB, instance, connection); - } - else - { - amiCB = new OnewayIceInvokeI(_amdCB, instance, connection); - } - - bool sent; + Ice::AsyncResultPtr result; if(_forwardContext) { if(_sslContext.size() > 0) { Ice::Context ctx = _current.ctx; ctx.insert(_sslContext.begin(), _sslContext.end()); - sent = _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair, ctx); + result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, ctx, cb, this); } else { - sent = _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair, _current.ctx); + result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, _current.ctx, cb, this); } } else { if(_sslContext.size() > 0) { - sent = _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair, _sslContext); + result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, _sslContext, cb, this); } else { - sent = _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair); + result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, cb, this); } } - if(sent && !_proxy->ice_isTwoway()) - { -#if (defined(_MSC_VER) && (_MSC_VER >= 1600)) - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(nullptr, nullptr)); -#else - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0)); -#endif - } - return false; // Not a batch invocation. + + return result; } } @@ -255,14 +130,50 @@ Glacier2::Request::override(const RequestPtr& other) const } // + // Don't override if the override isn't the same. + // + if(_override != other->_override) + { + return false; + } + + // // We cannot override if the proxies differ. // - if(_proxy != other->_proxy) + return _proxy == other->_proxy; +} + +void +Glacier2::Request::response(bool ok, const pair<const Ice::Byte*, const Ice::Byte*>& outParams) +{ + assert(_proxy->ice_isTwoway()); + _amdCB->ice_response(ok, outParams); +} + +void +Glacier2::Request::exception(const Ice::Exception& ex) +{ + // + // Only for twoways, oneway or batch oneway dispatches are finished + // when queued, see queued(). + // + if(_proxy->ice_isTwoway()) { - return false; + _amdCB->ice_exception(ex); } +} - return _override == other->_override; +void +Glacier2::Request::queued() +{ + if(!_proxy->ice_isTwoway()) + { +#if (defined(_MSC_VER) && (_MSC_VER >= 1600)) + _amdCB->ice_response(true, pair<const Byte*, const Byte*>(nullptr, nullptr)); +#else + _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0)); +#endif + } } Glacier2::RequestQueue::RequestQueue(const RequestQueueThreadPtr& requestQueueThread, @@ -270,7 +181,11 @@ Glacier2::RequestQueue::RequestQueue(const RequestQueueThreadPtr& requestQueueTh const Ice::ConnectionPtr& connection) : _requestQueueThread(requestQueueThread), _instance(instance), - _connection(connection) + _connection(connection), + _callback(newCallback_Object_ice_invoke(this, &RequestQueue::response, &RequestQueue::exception, + &RequestQueue::sent)), + _flushCallback(newCallback_Connection_flushBatchRequests(this, &RequestQueue::exception, &RequestQueue::sent)), + _pendingSend(false) { } @@ -280,7 +195,7 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request) IceUtil::Mutex::Lock lock(*this); if(request->hasOverride()) { - for(vector<RequestPtr>::iterator p = _requests.begin(); p != _requests.end(); ++p) + for(deque<RequestPtr>::iterator p = _requests.begin(); p != _requests.end(); ++p) { // // If the new request overrides an old one, then abort the old @@ -288,6 +203,7 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request) // if(request->override(*p)) { + request->queued(); *p = request; return true; } @@ -297,11 +213,12 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request) // // No override, we add the new request. // - if(_requests.empty()) + if(_requests.empty() && (!_connection || !_pendingSend)) { _requestQueueThread->flushRequestQueue(this); // This might throw if the thread is destroyed. } _requests.push_back(request); + request->queued(); return false; } @@ -309,13 +226,43 @@ void Glacier2::RequestQueue::flushRequests(set<Ice::ObjectPrx>& batchProxies) { IceUtil::Mutex::Lock lock(*this); - for(vector<RequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + if(_connection) + { + if(_pendingSend) + { + return; + } + flush(); + } + else + { + flush(batchProxies); + } +} + +void +Glacier2::RequestQueue::flush() +{ + assert(_connection); + _pendingSend = false; + _pendingSendRequest = 0; + + bool flushBatchRequests = false; + deque<RequestPtr>::iterator p; + for(p = _requests.begin(); p != _requests.end(); ++p) { try { - if((*p)->invoke(_instance, _connection)) // If batch invocation, add the proxy to the batch proxy set. + Ice::AsyncResultPtr result = (*p)->invoke(_callback); + if(!result) { - batchProxies.insert((*p)->getProxy()); + flushBatchRequests = true; + } + else if(!result->sentSynchronously() && !result->isCompleted()) + { + _pendingSend = true; + _pendingSendRequest = *p++; + break; } } catch(const Ice::LocalException&) @@ -323,9 +270,104 @@ Glacier2::RequestQueue::flushRequests(set<Ice::ObjectPrx>& batchProxies) // Ignore, this can occur for batch requests. } } + + if(p == _requests.end()) + { + _requests.clear(); + } + else + { + _requests.erase(_requests.begin(), p); + } + + if(flushBatchRequests) + { + Ice::AsyncResultPtr result = _connection->begin_flushBatchRequests(_flushCallback); + if(!result->sentSynchronously() && !result->isCompleted()) + { + _pendingSend = true; + _pendingSendRequest = 0; + } + } +} + +void +Glacier2::RequestQueue::flush(set<Ice::ObjectPrx>& batchProxies) +{ + assert(!_connection); + + for(deque<RequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + { + try + { + Ice::AsyncResultPtr result = (*p)->invoke(_callback); + if(!result) + { + batchProxies.insert((*p)->getProxy()); + } + } + catch(const Ice::LocalException&) + { + // Ignore, this can occur for batch requests. + } + } _requests.clear(); } +void +Glacier2::RequestQueue::response(bool ok, const pair<const Byte*, const Byte*>& outParams, const RequestPtr& request) +{ + assert(request); + request->response(ok, outParams); +} + +void +Glacier2::RequestQueue::exception(const Ice::Exception& ex, const RequestPtr& request) +{ + // + // If the connection has been lost, destroy the session. + // + if(_connection) + { + if(dynamic_cast<const Ice::SocketException*>(&ex) || + dynamic_cast<const Ice::TimeoutException*>(&ex) || + dynamic_cast<const Ice::ProtocolException*>(&ex)) + { + try + { + _instance->sessionRouter()->destroySession(_connection); + } + catch(const Exception&) + { + } + } + + IceUtil::Mutex::Lock lock(*this); + if(request == _pendingSendRequest) + { + flush(); + } + } + + if(request) + { + request->exception(ex); + } +} + +void +Glacier2::RequestQueue::sent(bool sentSynchronously, const RequestPtr& request) +{ + if(_connection && !sentSynchronously) + { + IceUtil::Mutex::Lock lock(*this); + if(request == _pendingSendRequest) + { + flush(); + } + } +} + Glacier2::RequestQueueThread::RequestQueueThread(const IceUtil::Time& sleepTime) : IceUtil::Thread("Glacier2 request queue thread"), _sleepTime(sleepTime), @@ -445,29 +487,10 @@ Glacier2::RequestQueueThread::run() (*p)->flushRequests(flushProxySet); } - set<Ice::ConnectionPtr> flushConnectionSet; for(set<Ice::ObjectPrx>::const_iterator q = flushProxySet.begin(); q != flushProxySet.end(); ++q) { - // - // As an optimization, we only flush the proxy batch requests if we didn't - // already flush the requests of a proxy which is using the same connection. - // - Ice::ConnectionPtr connection = (*q)->ice_getCachedConnection(); - if(!connection || flushConnectionSet.find(connection) == flushConnectionSet.end()) - { - class FlushCB : public AMI_Object_ice_flushBatchRequests - { - public: - - virtual void ice_exception(const Ice::Exception&) { } // Ignore. - }; - (*q)->ice_flushBatchRequests_async(new FlushCB()); - - if(connection) - { - flushConnectionSet.insert(connection); - } - } + (*q)->begin_ice_flushBatchRequests(); } } } + |