summaryrefslogtreecommitdiff
path: root/cpp/src/Glacier2/RequestQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Glacier2/RequestQueue.cpp')
-rw-r--r--cpp/src/Glacier2/RequestQueue.cpp351
1 files changed, 127 insertions, 224 deletions
diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp
index 51dc1387289..9e7e2c7153b 100644
--- a/cpp/src/Glacier2/RequestQueue.cpp
+++ b/cpp/src/Glacier2/RequestQueue.cpp
@@ -10,15 +10,17 @@ using namespace std;
using namespace Ice;
using namespace Glacier2;
-Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*, const Byte*>& inParams,
- const Current& current, bool forwardContext, const Ice::Context& sslContext,
- const AMD_Object_ice_invokePtr& amdCB) :
- _proxy(proxy),
+Glacier2::Request::Request(shared_ptr<ObjectPrx> proxy, const std::pair<const Byte*, const Byte*>& inParams,
+ const Current& current, bool forwardContext, const Ice::Context& sslContext,
+ function<void(bool, pair<const Byte*, const Byte*>)> response,
+ function<void(exception_ptr)> exception) :
+ _proxy(move(proxy)),
_inParams(inParams.first, inParams.second),
_current(current),
_forwardContext(forwardContext),
_sslContext(sslContext),
- _amdCB(amdCB)
+ _response(move(response)),
+ _exception(move(exception))
{
Context::const_iterator p = current.ctx.find("_ovrd");
if(p != current.ctx.end())
@@ -28,96 +30,53 @@ Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*,
}
void
-Glacier2::Request::addBatchProxy(set<Ice::ObjectPrx>& batchProxies)
-{
- set<Ice::ObjectPrx>::const_iterator p = batchProxies.find(_proxy);
- if(p == batchProxies.end())
- {
- batchProxies.insert(_proxy);
- }
- else if(p->get() != _proxy.get())
- {
- const_cast<Ice::ObjectPrx&>(_proxy) = *p;
- }
-}
-
-Ice::AsyncResultPtr
-Glacier2::Request::invoke(const Callback_Object_ice_invokePtr& cb)
+Glacier2::Request::invoke(function<void(bool, pair<const Byte*, const Byte*>)>&& response,
+ function<void(exception_ptr)>&& exception,
+ std::function<void(bool)>&& sent)
{
pair<const Byte*, const Byte*> inPair;
if(_inParams.size() == 0)
{
- inPair.first = inPair.second = 0;
+ inPair.first = inPair.second = nullptr;
}
else
{
- inPair.first = &_inParams[0];
+ inPair.first = _inParams.data();
inPair.second = inPair.first + _inParams.size();
}
- if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram())
+ if(_forwardContext)
{
- ByteSeq outParams;
- if(_forwardContext)
+ if(_sslContext.size() > 0)
{
- if(_sslContext.size() > 0)
- {
- Ice::Context ctx = _current.ctx;
- ctx.insert(_sslContext.begin(), _sslContext.end());
- _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, ctx);
- }
- else
- {
- _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _current.ctx);
- }
+ Ice::Context ctx = _current.ctx;
+ ctx.insert(_sslContext.begin(), _sslContext.end());
+ _proxy->ice_invokeAsync(_current.operation, _current.mode, inPair,
+ move(response), move(exception), move(sent), ctx);
}
else
{
- if(_sslContext.size() > 0)
- {
- _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _sslContext);
- }
- else
- {
- _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams);
- }
+ _proxy->ice_invokeAsync(_current.operation, _current.mode, inPair,
+ move(response), move(exception), move(sent), _current.ctx);
}
- return 0;
}
else
{
- Ice::AsyncResultPtr result;
- if(_forwardContext)
+ if(_sslContext.size() > 0)
{
- if(_sslContext.size() > 0)
- {
- Ice::Context ctx = _current.ctx;
- ctx.insert(_sslContext.begin(), _sslContext.end());
- result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, ctx, cb, this);
- }
- else
- {
- result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, _current.ctx, cb, this);
- }
+ _proxy->ice_invokeAsync(_current.operation, _current.mode, inPair,
+ move(response), move(exception), move(sent), _sslContext);
}
else
{
- if(_sslContext.size() > 0)
- {
- result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, _sslContext, cb, this);
- }
- else
- {
- result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, cb, this);
- }
+ _proxy->ice_invokeAsync(_current.operation, _current.mode, inPair,
+ move(response), move(exception), move(sent));
}
-
- return result;
}
}
bool
-Glacier2::Request::override(const RequestPtr& other) const
+Glacier2::Request::override(const shared_ptr<Request>& other) const
{
//
// Both override values have to be non-empty.
@@ -147,26 +106,25 @@ Glacier2::Request::override(const RequestPtr& other) const
//
// We cannot override if the proxies differ.
//
- return _proxy == other->_proxy;
+ return Ice::targetEqualTo(_proxy, other->_proxy);
}
void
-Glacier2::Request::response(bool ok, const pair<const Ice::Byte*, const Ice::Byte*>& outParams)
+Glacier2::Request::response(bool ok, const pair<const Byte*, const Byte*>& outParams)
{
assert(_proxy->ice_isTwoway());
- _amdCB->ice_response(ok, outParams);
+ _response(ok, outParams);
}
void
-Glacier2::Request::exception(const Ice::Exception& ex)
+Glacier2::Request::exception(exception_ptr ex)
{
//
- // Only for twoways, oneway or batch oneway dispatches are finished
- // when queued, see queued().
+ // Only for twoways, oneway dispatches are finished when queued, see queued().
//
if(_proxy->ice_isTwoway())
{
- _amdCB->ice_exception(ex);
+ _exception(ex);
}
}
@@ -175,77 +133,60 @@ Glacier2::Request::queued()
{
if(!_proxy->ice_isTwoway())
{
-#if (defined(_MSC_VER) && (_MSC_VER >= 1600))
- _amdCB->ice_response(true, pair<const Byte*, const Byte*>(static_cast<const Byte*>(nullptr),
- static_cast<const Byte*>(nullptr)));
-#else
- _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0));
-#endif
+ _response(true, { nullptr, nullptr });
}
}
-Glacier2::RequestQueue::RequestQueue(const RequestQueueThreadPtr& requestQueueThread,
- const InstancePtr& instance,
- const Ice::ConnectionPtr& connection) :
- _requestQueueThread(requestQueueThread),
- _instance(instance),
- _connection(connection),
- _callback(newCallback_Object_ice_invoke(this, &RequestQueue::response, &RequestQueue::exception,
- &RequestQueue::sent)),
- _flushCallback(newCallback_Connection_flushBatchRequests(this, &RequestQueue::exception, &RequestQueue::sent)),
+Glacier2::RequestQueue::RequestQueue(shared_ptr<RequestQueueThread> requestQueueThread,
+ shared_ptr<Instance> instance,
+ shared_ptr<Ice::Connection> connection) :
+ _requestQueueThread(move(requestQueueThread)),
+ _instance(move(instance)),
+ _connection(move(connection)),
_pendingSend(false),
_destroyed(false)
{
}
bool
-Glacier2::RequestQueue::addRequest(const RequestPtr& request)
+Glacier2::RequestQueue::addRequest(shared_ptr<Request> request)
{
- IceUtil::Mutex::Lock lock(*this);
+ lock_guard<mutex> lg(_mutex);
if(_destroyed)
{
throw Ice::ObjectNotExistException(__FILE__, __LINE__);
}
+
if(request->hasOverride())
{
- for(deque<RequestPtr>::iterator p = _requests.begin(); p != _requests.end(); ++p)
+ for(auto& r : _requests)
{
//
// If the new request overrides an old one, then abort the old
// request and replace it with the new request.
//
- if(request->override(*p))
+ if(request->override(r))
{
if(_observer)
{
_observer->overridden(!_connection);
}
request->queued();
- *p = request;
+ r = move(request);
return true;
}
}
}
- if(!_connection)
- {
- //
- // If it's a batch request, we make sure we use a unique batch proxy object for the queued
- // batch requests. We want all the requests for the same batch proxy to be queued on the
- // same proxy object.
- //
- request->addBatchProxy(_batchProxies);
- }
-
//
// No override, we add the new request.
//
if(_requests.empty() && (!_connection || !_pendingSend))
{
- _requestQueueThread->flushRequestQueue(this); // This might throw if the thread is destroyed.
+ _requestQueueThread->flushRequestQueue(shared_from_this()); // This might throw if the thread is destroyed.
}
- _requests.push_back(request);
request->queued();
+ _requests.push_back(move(request));
if(_observer)
{
_observer->queued(!_connection);
@@ -256,7 +197,7 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request)
void
Glacier2::RequestQueue::flushRequests()
{
- IceUtil::Mutex::Lock lock(*this);
+ lock_guard<mutex> lg(_mutex);
if(_connection)
{
if(_pendingSend)
@@ -267,58 +208,39 @@ Glacier2::RequestQueue::flushRequests()
}
else
{
- for(deque<RequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
+ for(const auto& request : _requests)
{
- try
+ if(_observer)
{
- if(_observer)
+ _observer->forwarded(!_connection);
+ }
+ auto self = shared_from_this();
+ request->invoke(
+ [self, request](bool ok, const pair<const Byte*, const Byte*>& outParams)
+ {
+ self->response(ok, outParams, request);
+ },
+ [self, request](exception_ptr e)
{
- _observer->forwarded(!_connection);
+ self->exception(e, request);
}
- assert(_callback);
- (*p)->invoke(_callback);
- }
- catch(const Ice::LocalException&)
- {
- // Ignore, this can occur for batch requests.
- }
+ );
}
_requests.clear();
-
- for(set<Ice::ObjectPrx>::const_iterator q = _batchProxies.begin(); q != _batchProxies.end(); ++q)
- {
- (*q)->begin_ice_flushBatchRequests();
- }
- _batchProxies.clear();
- }
-
- if(_destroyed && _requests.empty())
- {
- destroyInternal();
}
}
void
Glacier2::RequestQueue::destroy()
{
- IceUtil::Mutex::Lock lock(*this);
-
+ lock_guard<mutex> lg(_mutex);
_destroyed = true;
-
- //
- // Although the session has been destroyed, we cannot destroy this queue
- // until all requests have completed.
- //
- if(_requests.empty())
- {
- destroyInternal();
- }
}
void
-Glacier2::RequestQueue::updateObserver(const Glacier2::Instrumentation::SessionObserverPtr& observer)
+Glacier2::RequestQueue::updateObserver(shared_ptr<Glacier2::Instrumentation::SessionObserver> observer)
{
- IceUtil::Mutex::Lock lock(*this);
+ lock_guard<mutex> lg(_mutex);
_observer = observer;
}
@@ -327,34 +249,45 @@ Glacier2::RequestQueue::flush()
{
assert(_connection);
_pendingSend = false;
- _pendingSendRequest = 0;
+ _pendingSendRequest = nullptr;
- bool flushBatchRequests = false;
- deque<RequestPtr>::iterator p;
+ deque<shared_ptr<Request>>::iterator p;
for(p = _requests.begin(); p != _requests.end(); ++p)
{
- try
+ if(_observer)
{
- assert(_callback);
- if(_observer)
+ _observer->forwarded(!_connection);
+ }
+
+ shared_ptr<promise<void>> isSent = make_shared<promise<void>>();
+ shared_ptr<promise<void>> completedExceptionally = make_shared<promise<void>>();
+
+ auto self = shared_from_this();
+ auto request = *p;
+
+ request->invoke(
+ [self, request](bool ok, const pair<const Byte*, const Byte*>& outParams)
{
- _observer->forwarded(!_connection);
- }
- Ice::AsyncResultPtr result = (*p)->invoke(_callback);
- if(!result)
+ self->response(ok, outParams, request);
+ },
+ [self, request, completedExceptionally](exception_ptr e)
{
- flushBatchRequests = true;
- }
- else if(!result->sentSynchronously() && !result->isCompleted())
+ completedExceptionally->set_value();
+ self->exception(e, request);
+ },
+ [self, request, isSent](bool sentSynchronously)
{
- _pendingSend = true;
- _pendingSendRequest = *p++;
- break;
+ isSent->set_value();
+ self->sent(sentSynchronously, request);
}
- }
- catch(const Ice::LocalException&)
+ );
+
+ if((isSent->get_future().wait_for(0s) != future_status::ready) &&
+ (completedExceptionally->get_future().wait_for(0s) != future_status::ready))
{
- // Ignore, this can occur for batch requests.
+ _pendingSend = true;
+ _pendingSendRequest = *p++;
+ break;
}
}
@@ -366,48 +299,21 @@ Glacier2::RequestQueue::flush()
{
_requests.erase(_requests.begin(), p);
}
-
- if(flushBatchRequests)
- {
- Ice::AsyncResultPtr result = _connection->begin_flushBatchRequests(ICE_SCOPED_ENUM(CompressBatch, BasedOnProxy), _flushCallback);
- if(!result->sentSynchronously() && !result->isCompleted())
- {
- _pendingSend = true;
- _pendingSendRequest = 0;
- }
- }
}
void
-Glacier2::RequestQueue::destroyInternal()
-{
- //
- // Must be called with the mutex locked.
- //
-
- //
- // Remove cyclic references.
- //
- const_cast<Ice::Callback_Object_ice_invokePtr&>(_callback) = 0;
- const_cast<Ice::Callback_Connection_flushBatchRequestsPtr&>(_flushCallback) = 0;
-}
-
-void
-Glacier2::RequestQueue::response(bool ok, const pair<const Byte*, const Byte*>& outParams, const RequestPtr& request)
+Glacier2::RequestQueue::response(bool ok, const pair<const Byte*, const Byte*>& outParams, const shared_ptr<Request>& request)
{
assert(request);
request->response(ok, outParams);
}
void
-Glacier2::RequestQueue::exception(const Ice::Exception& ex, const RequestPtr& request)
+Glacier2::RequestQueue::exception(exception_ptr ex, const shared_ptr<Request>& request)
{
- //
- // If the connection has been lost, destroy the session.
- //
if(_connection)
{
- IceUtil::Mutex::Lock lock(*this);
+ lock_guard<mutex> lg(_mutex);
if(request == _pendingSendRequest)
{
flush();
@@ -421,11 +327,11 @@ Glacier2::RequestQueue::exception(const Ice::Exception& ex, const RequestPtr& re
}
void
-Glacier2::RequestQueue::sent(bool sentSynchronously, const RequestPtr& request)
+Glacier2::RequestQueue::sent(bool sentSynchronously, const shared_ptr<Request>& request)
{
if(_connection && !sentSynchronously)
{
- IceUtil::Mutex::Lock lock(*this);
+ lock_guard<mutex> lg(_mutex);
if(request == _pendingSendRequest)
{
flush();
@@ -433,11 +339,11 @@ Glacier2::RequestQueue::sent(bool sentSynchronously, const RequestPtr& request)
}
}
-Glacier2::RequestQueueThread::RequestQueueThread(const IceUtil::Time& sleepTime) :
- IceUtil::Thread("Glacier2 request queue thread"),
- _sleepTime(sleepTime),
+Glacier2::RequestQueueThread::RequestQueueThread(std::chrono::milliseconds sleepTime) :
+ _sleepTime(move(sleepTime)),
_destroy(false),
- _sleep(false)
+ _sleep(false),
+ _thread([this] { run(); })
{
}
@@ -451,28 +357,22 @@ void
Glacier2::RequestQueueThread::destroy()
{
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
-
+ lock_guard<mutex> lg(_mutex);
assert(!_destroy);
_destroy = true;
_sleep = false;
- notify();
}
- try
- {
- getThreadControl().join();
- }
- catch(const IceUtil::ThreadNotStartedException&)
- {
- // Expected if start() failed.
- }
+ _condVar.notify_one();
+
+ _thread.join();
}
void
-Glacier2::RequestQueueThread::flushRequestQueue(const RequestQueuePtr& queue)
+Glacier2::RequestQueueThread::flushRequestQueue(shared_ptr<RequestQueue> queue)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
+ lock_guard<mutex> lg(_mutex);
+
if(_destroy)
{
throw Ice::ObjectNotExistException(__FILE__, __LINE__);
@@ -480,20 +380,22 @@ Glacier2::RequestQueueThread::flushRequestQueue(const RequestQueuePtr& queue)
if(_queues.empty() && !_sleep)
{
- notify();
+ _condVar.notify_one();
}
- _queues.push_back(queue);
+ _queues.push_back(move(queue));
}
void
Glacier2::RequestQueueThread::run()
{
+ std::chrono::nanoseconds sleepDuration = 0ns;
+
while(true)
{
- vector<RequestQueuePtr> queues;
+ vector<shared_ptr<RequestQueue>> queues;
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
+ unique_lock<mutex> lock(_mutex);
//
// Wait indefinitely if there's no requests to
@@ -505,24 +407,25 @@ Glacier2::RequestQueueThread::run()
{
if(_sleep)
{
- IceUtil::Time now = IceUtil::Time::now(IceUtil::Time::Monotonic);
- if(!timedWait(_sleepDuration))
+ auto now = chrono::steady_clock::now();
+
+ if(_condVar.wait_for(lock, sleepDuration) == cv_status::no_timeout)
{
- _sleepDuration = IceUtil::Time();
+ sleepDuration = 0ns;
}
else
{
- _sleepDuration -= IceUtil::Time::now(IceUtil::Time::Monotonic) - now;
+ sleepDuration -= chrono::steady_clock::now() - now;
}
- if(_sleepDuration <= IceUtil::Time())
+ if(sleepDuration <= 0ns)
{
_sleep = false;
}
}
else
{
- wait();
+ _condVar.wait(lock);
}
}
@@ -539,16 +442,16 @@ Glacier2::RequestQueueThread::run()
queues.swap(_queues);
- if(_sleepTime > IceUtil::Time())
+ if(_sleepTime > 0ms)
{
_sleep = true;
- _sleepDuration = _sleepTime;
+ sleepDuration = _sleepTime;
}
}
- for(vector<RequestQueuePtr>::const_iterator p = queues.begin(); p != queues.end(); ++p)
+ for(const auto& queue : queues)
{
- (*p)->flushRequests();
+ queue->flushRequests();
}
}
}