diff options
author | Joe George <joe@zeroc.com> | 2021-01-28 14:18:08 -0500 |
---|---|---|
committer | Joe George <joe@zeroc.com> | 2021-02-01 16:50:22 -0500 |
commit | 3dd23049d2424404255585228ffc5e0314fed7ce (patch) | |
tree | 5dd38567c461ab08f0c402f54551b42e23de29cb /cpp/src/Glacier2/RequestQueue.cpp | |
parent | Remove checksum support (#607) (diff) | |
download | ice-3dd23049d2424404255585228ffc5e0314fed7ce.tar.bz2 ice-3dd23049d2424404255585228ffc5e0314fed7ce.tar.xz ice-3dd23049d2424404255585228ffc5e0314fed7ce.zip |
Port Glacier2, IceBox, IceBridge, IceDB, IceXML, icegriddb
Diffstat (limited to 'cpp/src/Glacier2/RequestQueue.cpp')
-rw-r--r-- | cpp/src/Glacier2/RequestQueue.cpp | 351 |
1 files changed, 127 insertions, 224 deletions
diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp index 51dc1387289..9e7e2c7153b 100644 --- a/cpp/src/Glacier2/RequestQueue.cpp +++ b/cpp/src/Glacier2/RequestQueue.cpp @@ -10,15 +10,17 @@ using namespace std; using namespace Ice; using namespace Glacier2; -Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*, const Byte*>& inParams, - const Current& current, bool forwardContext, const Ice::Context& sslContext, - const AMD_Object_ice_invokePtr& amdCB) : - _proxy(proxy), +Glacier2::Request::Request(shared_ptr<ObjectPrx> proxy, const std::pair<const Byte*, const Byte*>& inParams, + const Current& current, bool forwardContext, const Ice::Context& sslContext, + function<void(bool, pair<const Byte*, const Byte*>)> response, + function<void(exception_ptr)> exception) : + _proxy(move(proxy)), _inParams(inParams.first, inParams.second), _current(current), _forwardContext(forwardContext), _sslContext(sslContext), - _amdCB(amdCB) + _response(move(response)), + _exception(move(exception)) { Context::const_iterator p = current.ctx.find("_ovrd"); if(p != current.ctx.end()) @@ -28,96 +30,53 @@ Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*, } void -Glacier2::Request::addBatchProxy(set<Ice::ObjectPrx>& batchProxies) -{ - set<Ice::ObjectPrx>::const_iterator p = batchProxies.find(_proxy); - if(p == batchProxies.end()) - { - batchProxies.insert(_proxy); - } - else if(p->get() != _proxy.get()) - { - const_cast<Ice::ObjectPrx&>(_proxy) = *p; - } -} - -Ice::AsyncResultPtr -Glacier2::Request::invoke(const Callback_Object_ice_invokePtr& cb) +Glacier2::Request::invoke(function<void(bool, pair<const Byte*, const Byte*>)>&& response, + function<void(exception_ptr)>&& exception, + std::function<void(bool)>&& sent) { pair<const Byte*, const Byte*> inPair; if(_inParams.size() == 0) { - inPair.first = inPair.second = 0; + inPair.first = inPair.second = nullptr; } else { - inPair.first = &_inParams[0]; + inPair.first = _inParams.data(); inPair.second = inPair.first + _inParams.size(); } - if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram()) + if(_forwardContext) { - ByteSeq outParams; - if(_forwardContext) + if(_sslContext.size() > 0) { - if(_sslContext.size() > 0) - { - Ice::Context ctx = _current.ctx; - ctx.insert(_sslContext.begin(), _sslContext.end()); - _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, ctx); - } - else - { - _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _current.ctx); - } + Ice::Context ctx = _current.ctx; + ctx.insert(_sslContext.begin(), _sslContext.end()); + _proxy->ice_invokeAsync(_current.operation, _current.mode, inPair, + move(response), move(exception), move(sent), ctx); } else { - if(_sslContext.size() > 0) - { - _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _sslContext); - } - else - { - _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams); - } + _proxy->ice_invokeAsync(_current.operation, _current.mode, inPair, + move(response), move(exception), move(sent), _current.ctx); } - return 0; } else { - Ice::AsyncResultPtr result; - if(_forwardContext) + if(_sslContext.size() > 0) { - if(_sslContext.size() > 0) - { - Ice::Context ctx = _current.ctx; - ctx.insert(_sslContext.begin(), _sslContext.end()); - result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, ctx, cb, this); - } - else - { - result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, _current.ctx, cb, this); - } + _proxy->ice_invokeAsync(_current.operation, _current.mode, inPair, + move(response), move(exception), move(sent), _sslContext); } else { - if(_sslContext.size() > 0) - { - result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, _sslContext, cb, this); - } - else - { - result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, cb, this); - } + _proxy->ice_invokeAsync(_current.operation, _current.mode, inPair, + move(response), move(exception), move(sent)); } - - return result; } } bool -Glacier2::Request::override(const RequestPtr& other) const +Glacier2::Request::override(const shared_ptr<Request>& other) const { // // Both override values have to be non-empty. @@ -147,26 +106,25 @@ Glacier2::Request::override(const RequestPtr& other) const // // We cannot override if the proxies differ. // - return _proxy == other->_proxy; + return Ice::targetEqualTo(_proxy, other->_proxy); } void -Glacier2::Request::response(bool ok, const pair<const Ice::Byte*, const Ice::Byte*>& outParams) +Glacier2::Request::response(bool ok, const pair<const Byte*, const Byte*>& outParams) { assert(_proxy->ice_isTwoway()); - _amdCB->ice_response(ok, outParams); + _response(ok, outParams); } void -Glacier2::Request::exception(const Ice::Exception& ex) +Glacier2::Request::exception(exception_ptr ex) { // - // Only for twoways, oneway or batch oneway dispatches are finished - // when queued, see queued(). + // Only for twoways, oneway dispatches are finished when queued, see queued(). // if(_proxy->ice_isTwoway()) { - _amdCB->ice_exception(ex); + _exception(ex); } } @@ -175,77 +133,60 @@ Glacier2::Request::queued() { if(!_proxy->ice_isTwoway()) { -#if (defined(_MSC_VER) && (_MSC_VER >= 1600)) - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(static_cast<const Byte*>(nullptr), - static_cast<const Byte*>(nullptr))); -#else - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0)); -#endif + _response(true, { nullptr, nullptr }); } } -Glacier2::RequestQueue::RequestQueue(const RequestQueueThreadPtr& requestQueueThread, - const InstancePtr& instance, - const Ice::ConnectionPtr& connection) : - _requestQueueThread(requestQueueThread), - _instance(instance), - _connection(connection), - _callback(newCallback_Object_ice_invoke(this, &RequestQueue::response, &RequestQueue::exception, - &RequestQueue::sent)), - _flushCallback(newCallback_Connection_flushBatchRequests(this, &RequestQueue::exception, &RequestQueue::sent)), +Glacier2::RequestQueue::RequestQueue(shared_ptr<RequestQueueThread> requestQueueThread, + shared_ptr<Instance> instance, + shared_ptr<Ice::Connection> connection) : + _requestQueueThread(move(requestQueueThread)), + _instance(move(instance)), + _connection(move(connection)), _pendingSend(false), _destroyed(false) { } bool -Glacier2::RequestQueue::addRequest(const RequestPtr& request) +Glacier2::RequestQueue::addRequest(shared_ptr<Request> request) { - IceUtil::Mutex::Lock lock(*this); + lock_guard<mutex> lg(_mutex); if(_destroyed) { throw Ice::ObjectNotExistException(__FILE__, __LINE__); } + if(request->hasOverride()) { - for(deque<RequestPtr>::iterator p = _requests.begin(); p != _requests.end(); ++p) + for(auto& r : _requests) { // // If the new request overrides an old one, then abort the old // request and replace it with the new request. // - if(request->override(*p)) + if(request->override(r)) { if(_observer) { _observer->overridden(!_connection); } request->queued(); - *p = request; + r = move(request); return true; } } } - if(!_connection) - { - // - // If it's a batch request, we make sure we use a unique batch proxy object for the queued - // batch requests. We want all the requests for the same batch proxy to be queued on the - // same proxy object. - // - request->addBatchProxy(_batchProxies); - } - // // No override, we add the new request. // if(_requests.empty() && (!_connection || !_pendingSend)) { - _requestQueueThread->flushRequestQueue(this); // This might throw if the thread is destroyed. + _requestQueueThread->flushRequestQueue(shared_from_this()); // This might throw if the thread is destroyed. } - _requests.push_back(request); request->queued(); + _requests.push_back(move(request)); if(_observer) { _observer->queued(!_connection); @@ -256,7 +197,7 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request) void Glacier2::RequestQueue::flushRequests() { - IceUtil::Mutex::Lock lock(*this); + lock_guard<mutex> lg(_mutex); if(_connection) { if(_pendingSend) @@ -267,58 +208,39 @@ Glacier2::RequestQueue::flushRequests() } else { - for(deque<RequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + for(const auto& request : _requests) { - try + if(_observer) { - if(_observer) + _observer->forwarded(!_connection); + } + auto self = shared_from_this(); + request->invoke( + [self, request](bool ok, const pair<const Byte*, const Byte*>& outParams) + { + self->response(ok, outParams, request); + }, + [self, request](exception_ptr e) { - _observer->forwarded(!_connection); + self->exception(e, request); } - assert(_callback); - (*p)->invoke(_callback); - } - catch(const Ice::LocalException&) - { - // Ignore, this can occur for batch requests. - } + ); } _requests.clear(); - - for(set<Ice::ObjectPrx>::const_iterator q = _batchProxies.begin(); q != _batchProxies.end(); ++q) - { - (*q)->begin_ice_flushBatchRequests(); - } - _batchProxies.clear(); - } - - if(_destroyed && _requests.empty()) - { - destroyInternal(); } } void Glacier2::RequestQueue::destroy() { - IceUtil::Mutex::Lock lock(*this); - + lock_guard<mutex> lg(_mutex); _destroyed = true; - - // - // Although the session has been destroyed, we cannot destroy this queue - // until all requests have completed. - // - if(_requests.empty()) - { - destroyInternal(); - } } void -Glacier2::RequestQueue::updateObserver(const Glacier2::Instrumentation::SessionObserverPtr& observer) +Glacier2::RequestQueue::updateObserver(shared_ptr<Glacier2::Instrumentation::SessionObserver> observer) { - IceUtil::Mutex::Lock lock(*this); + lock_guard<mutex> lg(_mutex); _observer = observer; } @@ -327,34 +249,45 @@ Glacier2::RequestQueue::flush() { assert(_connection); _pendingSend = false; - _pendingSendRequest = 0; + _pendingSendRequest = nullptr; - bool flushBatchRequests = false; - deque<RequestPtr>::iterator p; + deque<shared_ptr<Request>>::iterator p; for(p = _requests.begin(); p != _requests.end(); ++p) { - try + if(_observer) { - assert(_callback); - if(_observer) + _observer->forwarded(!_connection); + } + + shared_ptr<promise<void>> isSent = make_shared<promise<void>>(); + shared_ptr<promise<void>> completedExceptionally = make_shared<promise<void>>(); + + auto self = shared_from_this(); + auto request = *p; + + request->invoke( + [self, request](bool ok, const pair<const Byte*, const Byte*>& outParams) { - _observer->forwarded(!_connection); - } - Ice::AsyncResultPtr result = (*p)->invoke(_callback); - if(!result) + self->response(ok, outParams, request); + }, + [self, request, completedExceptionally](exception_ptr e) { - flushBatchRequests = true; - } - else if(!result->sentSynchronously() && !result->isCompleted()) + completedExceptionally->set_value(); + self->exception(e, request); + }, + [self, request, isSent](bool sentSynchronously) { - _pendingSend = true; - _pendingSendRequest = *p++; - break; + isSent->set_value(); + self->sent(sentSynchronously, request); } - } - catch(const Ice::LocalException&) + ); + + if((isSent->get_future().wait_for(0s) != future_status::ready) && + (completedExceptionally->get_future().wait_for(0s) != future_status::ready)) { - // Ignore, this can occur for batch requests. + _pendingSend = true; + _pendingSendRequest = *p++; + break; } } @@ -366,48 +299,21 @@ Glacier2::RequestQueue::flush() { _requests.erase(_requests.begin(), p); } - - if(flushBatchRequests) - { - Ice::AsyncResultPtr result = _connection->begin_flushBatchRequests(ICE_SCOPED_ENUM(CompressBatch, BasedOnProxy), _flushCallback); - if(!result->sentSynchronously() && !result->isCompleted()) - { - _pendingSend = true; - _pendingSendRequest = 0; - } - } } void -Glacier2::RequestQueue::destroyInternal() -{ - // - // Must be called with the mutex locked. - // - - // - // Remove cyclic references. - // - const_cast<Ice::Callback_Object_ice_invokePtr&>(_callback) = 0; - const_cast<Ice::Callback_Connection_flushBatchRequestsPtr&>(_flushCallback) = 0; -} - -void -Glacier2::RequestQueue::response(bool ok, const pair<const Byte*, const Byte*>& outParams, const RequestPtr& request) +Glacier2::RequestQueue::response(bool ok, const pair<const Byte*, const Byte*>& outParams, const shared_ptr<Request>& request) { assert(request); request->response(ok, outParams); } void -Glacier2::RequestQueue::exception(const Ice::Exception& ex, const RequestPtr& request) +Glacier2::RequestQueue::exception(exception_ptr ex, const shared_ptr<Request>& request) { - // - // If the connection has been lost, destroy the session. - // if(_connection) { - IceUtil::Mutex::Lock lock(*this); + lock_guard<mutex> lg(_mutex); if(request == _pendingSendRequest) { flush(); @@ -421,11 +327,11 @@ Glacier2::RequestQueue::exception(const Ice::Exception& ex, const RequestPtr& re } void -Glacier2::RequestQueue::sent(bool sentSynchronously, const RequestPtr& request) +Glacier2::RequestQueue::sent(bool sentSynchronously, const shared_ptr<Request>& request) { if(_connection && !sentSynchronously) { - IceUtil::Mutex::Lock lock(*this); + lock_guard<mutex> lg(_mutex); if(request == _pendingSendRequest) { flush(); @@ -433,11 +339,11 @@ Glacier2::RequestQueue::sent(bool sentSynchronously, const RequestPtr& request) } } -Glacier2::RequestQueueThread::RequestQueueThread(const IceUtil::Time& sleepTime) : - IceUtil::Thread("Glacier2 request queue thread"), - _sleepTime(sleepTime), +Glacier2::RequestQueueThread::RequestQueueThread(std::chrono::milliseconds sleepTime) : + _sleepTime(move(sleepTime)), _destroy(false), - _sleep(false) + _sleep(false), + _thread([this] { run(); }) { } @@ -451,28 +357,22 @@ void Glacier2::RequestQueueThread::destroy() { { - IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - + lock_guard<mutex> lg(_mutex); assert(!_destroy); _destroy = true; _sleep = false; - notify(); } - try - { - getThreadControl().join(); - } - catch(const IceUtil::ThreadNotStartedException&) - { - // Expected if start() failed. - } + _condVar.notify_one(); + + _thread.join(); } void -Glacier2::RequestQueueThread::flushRequestQueue(const RequestQueuePtr& queue) +Glacier2::RequestQueueThread::flushRequestQueue(shared_ptr<RequestQueue> queue) { - IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); + lock_guard<mutex> lg(_mutex); + if(_destroy) { throw Ice::ObjectNotExistException(__FILE__, __LINE__); @@ -480,20 +380,22 @@ Glacier2::RequestQueueThread::flushRequestQueue(const RequestQueuePtr& queue) if(_queues.empty() && !_sleep) { - notify(); + _condVar.notify_one(); } - _queues.push_back(queue); + _queues.push_back(move(queue)); } void Glacier2::RequestQueueThread::run() { + std::chrono::nanoseconds sleepDuration = 0ns; + while(true) { - vector<RequestQueuePtr> queues; + vector<shared_ptr<RequestQueue>> queues; { - IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); + unique_lock<mutex> lock(_mutex); // // Wait indefinitely if there's no requests to @@ -505,24 +407,25 @@ Glacier2::RequestQueueThread::run() { if(_sleep) { - IceUtil::Time now = IceUtil::Time::now(IceUtil::Time::Monotonic); - if(!timedWait(_sleepDuration)) + auto now = chrono::steady_clock::now(); + + if(_condVar.wait_for(lock, sleepDuration) == cv_status::no_timeout) { - _sleepDuration = IceUtil::Time(); + sleepDuration = 0ns; } else { - _sleepDuration -= IceUtil::Time::now(IceUtil::Time::Monotonic) - now; + sleepDuration -= chrono::steady_clock::now() - now; } - if(_sleepDuration <= IceUtil::Time()) + if(sleepDuration <= 0ns) { _sleep = false; } } else { - wait(); + _condVar.wait(lock); } } @@ -539,16 +442,16 @@ Glacier2::RequestQueueThread::run() queues.swap(_queues); - if(_sleepTime > IceUtil::Time()) + if(_sleepTime > 0ms) { _sleep = true; - _sleepDuration = _sleepTime; + sleepDuration = _sleepTime; } } - for(vector<RequestQueuePtr>::const_iterator p = queues.begin(); p != queues.end(); ++p) + for(const auto& queue : queues) { - (*p)->flushRequests(); + queue->flushRequests(); } } } |