summaryrefslogtreecommitdiff
path: root/cpp/src/Glacier2/RequestQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Glacier2/RequestQueue.cpp')
-rw-r--r--cpp/src/Glacier2/RequestQueue.cpp340
1 files changed, 129 insertions, 211 deletions
diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp
index 399884cf2f2..e508fa1ad51 100644
--- a/cpp/src/Glacier2/RequestQueue.cpp
+++ b/cpp/src/Glacier2/RequestQueue.cpp
@@ -14,43 +14,40 @@ using namespace std;
using namespace Ice;
using namespace Glacier2;
-namespace Glacier2
+namespace
{
//
// AMI callback class for twoway requests
//
-// NOTE: the received response isn't sent back directly with the AMD
-// callback. Instead it's queued and the request queue thread is
-// responsible for sending back the response. It's necessary because
-// sending back the response might block.
-//
class AMI_Array_Object_ice_invokeI : public AMI_Array_Object_ice_invoke
{
public:
-
- AMI_Array_Object_ice_invokeI(const RequestQueuePtr& requestQueue, const AMD_Array_Object_ice_invokePtr& amdCB) :
- _requestQueue(requestQueue),
- _amdCB(amdCB)
+
+ AMI_Array_Object_ice_invokeI(const AMD_Array_Object_ice_invokePtr& amdCB) : _amdCB(amdCB)
{
- assert(_amdCB);
}
-
+
virtual void
ice_response(bool ok, const pair<const Byte*, const Byte*>& outParams)
{
- _requestQueue->addResponse(new Response(_amdCB, ok, outParams));
+ if(_amdCB)
+ {
+ _amdCB->ice_response(ok, outParams);
+ }
}
virtual void
ice_exception(const Exception& ex)
{
- _requestQueue->addResponse(new Response(_amdCB, ex));
+ if(_amdCB)
+ {
+ _amdCB->ice_exception(ex);
+ }
}
private:
- const RequestQueuePtr _requestQueue;
const AMD_Array_Object_ice_invokePtr _amdCB;
};
@@ -72,9 +69,7 @@ Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*,
//
if(!_proxy->ice_isTwoway())
{
- bool ok = true;
- pair<const Byte*, const Byte*> outParams(0, 0);
- _amdCB->ice_response(ok, outParams);
+ _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0));
}
Context::const_iterator p = current.ctx.find("_ovrd");
@@ -86,7 +81,7 @@ Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*,
bool
-Glacier2::Request::invoke(const RequestQueuePtr& requestQueue)
+Glacier2::Request::invoke()
{
pair<const Byte*, const Byte*> inPair;
if(_inParams.size() == 0)
@@ -98,69 +93,73 @@ Glacier2::Request::invoke(const RequestQueuePtr& requestQueue)
inPair.first = &_inParams[0];
inPair.second = inPair.first + _inParams.size();
}
- if(_proxy->ice_isTwoway())
+
+ if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram())
{
- AMI_Array_Object_ice_invokePtr cb = new AMI_Array_Object_ice_invokeI(requestQueue, _amdCB);
+ ByteSeq outParams;
if(_forwardContext)
- {
+ {
if(_sslContext.size() > 0)
{
Ice::Context ctx = _current.ctx;
ctx.insert(_sslContext.begin(), _sslContext.end());
- _proxy->ice_invoke_async(cb, _current.operation, _current.mode, inPair, ctx);
+ _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, ctx);
}
else
{
- _proxy->ice_invoke_async(cb, _current.operation, _current.mode, inPair, _current.ctx);
+ _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _current.ctx);
}
}
else
{
if(_sslContext.size() > 0)
{
- _proxy->ice_invoke_async(cb, _current.operation, _current.mode, inPair, _sslContext);
+ _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _sslContext);
}
else
{
- _proxy->ice_invoke_async(cb, _current.operation, _current.mode, inPair);
+ _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams);
}
}
- return true; // A twoway method is being dispatched.
+ return true; // Batch invocation.
}
else
{
- try
+ AMI_Array_Object_ice_invokePtr amiCB;
+ if(_proxy->ice_isTwoway())
+ {
+ amiCB = new AMI_Array_Object_ice_invokeI(_amdCB);
+ }
+ else
{
- ByteSeq outParams;
- if(_forwardContext)
+ amiCB = new AMI_Array_Object_ice_invokeI(0);
+ }
+
+ if(_forwardContext)
+ {
+ if(_sslContext.size() > 0)
{
- if(_sslContext.size() > 0)
- {
- Ice::Context ctx = _current.ctx;
- ctx.insert(_sslContext.begin(), _sslContext.end());
- _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, ctx);
- }
- else
- {
- _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _current.ctx);
- }
+ Ice::Context ctx = _current.ctx;
+ ctx.insert(_sslContext.begin(), _sslContext.end());
+ _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair, ctx);
}
else
{
- if(_sslContext.size() > 0)
- {
- _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _sslContext);
- }
- else
- {
- _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams);
- }
+ _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair, _current.ctx);
}
}
- catch(const LocalException&)
+ else
{
+ if(_sslContext.size() > 0)
+ {
+ _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair, _sslContext);
+ }
+ else
+ {
+ _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair);
+ }
}
- return false;
+ return false; // Not a batch invocation.
}
}
@@ -195,74 +194,68 @@ Glacier2::Request::override(const RequestPtr& other) const
return _override == other->_override;
}
-bool
-Glacier2::Request::isBatch() const
-{
- return _proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram();
-}
-
-ConnectionPtr
-Glacier2::Request::getConnection() const
-{
- return _proxy->ice_getConnection();
-}
-
-Glacier2::Response::Response(const AMD_Array_Object_ice_invokePtr& amdCB, bool ok,
- const pair<const Byte*, const Byte*>& outParams) :
- _amdCB(amdCB),
- _ok(ok),
- _outParams(outParams.first, outParams.second)
+Glacier2::RequestQueue::RequestQueue(const RequestQueueThreadPtr& requestQueueThread) :
+ _requestQueueThread(requestQueueThread)
{
}
-Glacier2::Response::Response(const AMD_Array_Object_ice_invokePtr& amdCB, const Exception& ex) :
- _amdCB(amdCB),
- _ok(false),
- _exception(ex.ice_clone())
+bool
+Glacier2::RequestQueue::addRequest(const RequestPtr& request)
{
+ IceUtil::Mutex::Lock lock(*this);
+ for(vector<RequestPtr>::iterator p = _requests.begin(); p != _requests.end(); ++p)
+ {
+ //
+ // If the new request overrides an old one, then abort the old
+ // request and replace it with the new request.
+ //
+ if(request->override(*p))
+ {
+ *p = request;
+ return true;
+ }
+ }
+
+ //
+ // No override, we add the new request.
+ //
+ if(_requests.empty())
+ {
+ _requestQueueThread->flushRequestQueue(this); // This might throw if the thread is destroyed.
+ }
+ _requests.push_back(request);
+ return false;
}
void
-Glacier2::Response::invoke()
+Glacier2::RequestQueue::flushRequests(set<Ice::ObjectPrx>& batchProxies)
{
- if(_exception.get())
+ IceUtil::Mutex::Lock lock(*this);
+ for(vector<RequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
{
- _amdCB->ice_exception(*_exception.get());
- }
- else
- {
- pair<const Byte*, const Byte*> outPair;
- if(_outParams.size() == 0)
- {
- outPair.first = outPair.second = 0;
- }
- else
+ if((*p)->invoke()) // If batch invocation, add the proxy to the batch proxy set.
{
- outPair.first = &_outParams[0];
- outPair.second = outPair.first + _outParams.size();
+ batchProxies.insert((*p)->getProxy());
}
- _amdCB->ice_response(_ok, outPair);
}
+ _requests.clear();
}
-Glacier2::RequestQueue::RequestQueue(const IceUtil::Time& sleepTime) :
+Glacier2::RequestQueueThread::RequestQueueThread(const IceUtil::Time& sleepTime) :
_sleepTime(sleepTime),
_destroy(false),
_sleep(false)
{
}
-Glacier2::RequestQueue::~RequestQueue()
+Glacier2::RequestQueueThread::~RequestQueueThread()
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
-
assert(_destroy);
- assert(_requests.empty());
- assert(_responses.empty());
+ assert(_queues.empty());
}
void
-Glacier2::RequestQueue::destroy()
+Glacier2::RequestQueueThread::destroy()
{
{
IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
@@ -273,82 +266,42 @@ Glacier2::RequestQueue::destroy()
notify();
}
- //
- // We don't want to wait for the RequestQueue thread, because this
- // destroy() operation is called when sessions expire or are
- // destroyed, in which case we do not want the session handler
- // thread to block here. Therefore we don't call join(), but
- // instead detach the thread right after we start it.
- //
- //getThreadControl().join();
+ getThreadControl().join();
}
-bool
-Glacier2::RequestQueue::addRequest(const RequestPtr& request)
+void
+Glacier2::RequestQueueThread::flushRequestQueue(const RequestQueuePtr& queue)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
-
if(_destroy)
{
- throw ObjectNotExistException(__FILE__, __LINE__);
- }
-
- for(vector<RequestPtr>::iterator p = _requests.begin(); p != _requests.end(); ++p)
- {
- //
- // If the new request overrides an old one, then abort the old
- // request and replace it with the new request.
- //
- if(request->override(*p))
- {
- *p = request;
- return true;
- }
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
}
- //
- // No override, we add the new request.
- //
- _requests.push_back(request);
- if(!_sleep)
+ if(_queues.empty() && !_sleep)
{
- //
- // No need to notify if the request queue thread is sleeping,
- // once it wakes up it will check if there's requests to send.
- //
notify();
}
- return false;
+ _queues.push_back(queue);
}
void
-Glacier2::RequestQueue::addResponse(const ResponsePtr& response)
+Glacier2::RequestQueueThread::run()
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
- _responses.push_back(response);
- notify();
-}
-
-void
-Glacier2::RequestQueue::run()
-{
- RequestQueuePtr self = this; // This is to avoid creating a temporary Ptr for each call to Request::invoke()
- ptrdiff_t dispatchCount = 0; // The dispatch count keeps track of the number of outstanding twoway requests.
while(true)
{
- vector<RequestPtr> requests;
- vector<ResponsePtr> responses;
+ vector<RequestQueuePtr> queues;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
//
- // Wait indefinitely if there's no requests/responses to
+ // Wait indefinitely if there's no requests to
// send. If the queue is being destroyed we still need to
// wait until all the responses for twoway requests are
// received.
//
- while((!_destroy || dispatchCount != 0) && _responses.empty() && (_requests.empty() || _sleep))
+ while(!_destroy && (_queues.empty() || _sleep))
{
if(_sleep)
{
@@ -361,6 +314,7 @@ Glacier2::RequestQueue::run()
{
_sleepDuration -= IceUtil::Time::now(IceUtil::Time::Monotonic) - now;
}
+
if(_sleepDuration <= IceUtil::Time())
{
_sleep = false;
@@ -373,90 +327,54 @@ Glacier2::RequestQueue::run()
}
//
- // If the queue is being destroyed and there's no requests
- // or responses to send, we're done.
+ // If the queue is being destroyed and there's no requests or responses
+ // to send, we're done.
//
- if(_destroy && _requests.empty() && _responses.empty())
+ if(_destroy && _queues.empty())
{
- assert(dispatchCount == 0); // We would have blocked in the wait() above otherwise.
return;
}
- //
- // If there's requests to sent and we're not sleeping,
- // send the requests. If a sleep time is configured, we
- // set the sleep duration and set the sleep flag to make
- // sure we'll sleep again once we're done sending requests
- // and responses.
- //
- if(!_requests.empty() && !_sleep)
- {
- requests.swap(_requests);
- if(_sleepTime > IceUtil::Time())
- {
- _sleep = true;
- _sleepDuration = _sleepTime;
- }
- }
- if(!_responses.empty())
+ assert(!_queues.empty() && !_sleep);
+
+ queues.swap(_queues);
+
+ if(_sleepTime > IceUtil::Time())
{
- responses.swap(_responses);
+ _sleep = true;
+ _sleepDuration = _sleepTime;
}
}
- //
- // Send requests, flush batch requests, and sleep outside the
- // thread synchronization, so that new messages can be added
- // while this is being done.
- //
+ set<Ice::ObjectPrx> flushProxySet;
+ for(vector<RequestQueuePtr>::const_iterator p = queues.begin(); p != queues.end(); ++p)
+ {
+ (*p)->flushRequests(flushProxySet);
+ }
- set<ConnectionPtr> flushSet;
-
- for(vector<RequestPtr>::const_iterator p = requests.begin(); p != requests.end(); ++p)
+ set<Ice::ConnectionPtr> flushConnectionSet;
+ for(set<Ice::ObjectPrx>::const_iterator q = flushProxySet.begin(); q != flushProxySet.end(); ++q)
{
- if((*p)->isBatch())
- {
- try
- {
- flushSet.insert((*p)->getConnection());
- }
- catch(const LocalException&)
- {
- // Ignore.
- }
- }
-
//
- // Invoke returns true if the request expects a response.
- // If that's the case we increment the dispatch count to
- // ensure that the thread won't be destroyed before the
- // response is received.
+ // As an optimization, we only flush the proxy batch requests if we didn't
+ // already flush the requests of a proxy which is using the same connection.
//
- if((*p)->invoke(self)) // Exceptions are caught within invoke().
+ Ice::ConnectionPtr connection = (*q)->ice_getCachedConnection();
+ if(!connection || flushConnectionSet.find(connection) == flushConnectionSet.end())
{
- ++dispatchCount;
- }
- }
+ class FlushCB : public AMI_Object_ice_flushBatchRequests
+ {
+ public:
- for(set<ConnectionPtr>::const_iterator q = flushSet.begin(); q != flushSet.end(); ++q)
- {
- try
- {
- (*q)->flushBatchRequests();
- }
- catch(const LocalException&)
- {
- // Ignore.
- }
- }
+ virtual void ice_exception(const Ice::Exception&) { } // Ignore.
+ };
+ (*q)->ice_flushBatchRequests_async(new FlushCB());
- //
- // Send the responses and decrement the dispatch count.
- //
- for(vector<ResponsePtr>::const_iterator r = responses.begin(); r != responses.end(); ++r)
- {
- (*r)->invoke();
+ if(connection)
+ {
+ flushConnectionSet.insert(connection);
+ }
+ }
}
- dispatchCount -= responses.size();
}
}