summaryrefslogtreecommitdiff
path: root/cpp/src/Glacier2/RequestQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Glacier2/RequestQueue.cpp')
-rw-r--r--cpp/src/Glacier2/RequestQueue.cpp402
1 files changed, 201 insertions, 201 deletions
diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp
index 55f700a0d65..099b73f3640 100644
--- a/cpp/src/Glacier2/RequestQueue.cpp
+++ b/cpp/src/Glacier2/RequestQueue.cpp
@@ -30,22 +30,22 @@ class AMI_Array_Object_ice_invokeI : public AMI_Array_Object_ice_invoke
public:
AMI_Array_Object_ice_invokeI(const RequestQueuePtr& requestQueue, const AMD_Array_Object_ice_invokePtr& amdCB) :
- _requestQueue(requestQueue),
- _amdCB(amdCB)
+ _requestQueue(requestQueue),
+ _amdCB(amdCB)
{
- assert(_amdCB);
+ assert(_amdCB);
}
virtual void
ice_response(bool ok, const pair<const Byte*, const Byte*>& outParams)
{
- _requestQueue->addResponse(new Response(_amdCB, ok, outParams));
+ _requestQueue->addResponse(new Response(_amdCB, ok, outParams));
}
virtual void
ice_exception(const Exception& ex)
{
- _requestQueue->addResponse(new Response(_amdCB, ex));
+ _requestQueue->addResponse(new Response(_amdCB, ex));
}
private:
@@ -57,8 +57,8 @@ private:
}
Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*, const Byte*>& inParams,
- const Current& current, bool forwardContext, const Ice::Context& sslContext,
- const AMD_Array_Object_ice_invokePtr& amdCB) :
+ const Current& current, bool forwardContext, const Ice::Context& sslContext,
+ const AMD_Array_Object_ice_invokePtr& amdCB) :
_proxy(proxy),
_inParams(inParams.first, inParams.second),
_current(current),
@@ -72,15 +72,15 @@ Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*,
//
if(!_proxy->ice_isTwoway())
{
- bool ok = true;
- pair<const Byte*, const Byte*> outParams(0, 0);
- _amdCB->ice_response(ok, outParams);
+ bool ok = true;
+ pair<const Byte*, const Byte*> outParams(0, 0);
+ _amdCB->ice_response(ok, outParams);
}
Context::const_iterator p = current.ctx.find("_ovrd");
if(p != current.ctx.end())
{
- const_cast<string&>(_override) = p->second;
+ const_cast<string&>(_override) = p->second;
}
}
@@ -96,71 +96,71 @@ Glacier2::Request::invoke(const RequestQueuePtr& requestQueue)
else
{
inPair.first = &_inParams[0];
- inPair.second = inPair.first + _inParams.size();
+ inPair.second = inPair.first + _inParams.size();
}
if(_proxy->ice_isTwoway())
{
- AMI_Array_Object_ice_invokePtr cb = new AMI_Array_Object_ice_invokeI(requestQueue, _amdCB);
- if(_forwardContext)
- {
- if(_sslContext.size() > 0)
- {
- Ice::Context ctx = _current.ctx;
- ctx.insert(_sslContext.begin(), _sslContext.end());
- _proxy->ice_invoke_async(cb, _current.operation, _current.mode, inPair, ctx);
- }
- else
- {
- _proxy->ice_invoke_async(cb, _current.operation, _current.mode, inPair, _current.ctx);
- }
- }
- else
- {
- if(_sslContext.size() > 0)
- {
- _proxy->ice_invoke_async(cb, _current.operation, _current.mode, inPair, _sslContext);
- }
- else
- {
- _proxy->ice_invoke_async(cb, _current.operation, _current.mode, inPair);
- }
- }
- return true; // A twoway method is being dispatched.
+ AMI_Array_Object_ice_invokePtr cb = new AMI_Array_Object_ice_invokeI(requestQueue, _amdCB);
+ if(_forwardContext)
+ {
+ if(_sslContext.size() > 0)
+ {
+ Ice::Context ctx = _current.ctx;
+ ctx.insert(_sslContext.begin(), _sslContext.end());
+ _proxy->ice_invoke_async(cb, _current.operation, _current.mode, inPair, ctx);
+ }
+ else
+ {
+ _proxy->ice_invoke_async(cb, _current.operation, _current.mode, inPair, _current.ctx);
+ }
+ }
+ else
+ {
+ if(_sslContext.size() > 0)
+ {
+ _proxy->ice_invoke_async(cb, _current.operation, _current.mode, inPair, _sslContext);
+ }
+ else
+ {
+ _proxy->ice_invoke_async(cb, _current.operation, _current.mode, inPair);
+ }
+ }
+ return true; // A twoway method is being dispatched.
}
else
{
- try
- {
- ByteSeq outParams;
- if(_forwardContext)
- {
- if(_sslContext.size() > 0)
- {
- Ice::Context ctx = _current.ctx;
- ctx.insert(_sslContext.begin(), _sslContext.end());
- _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, ctx);
- }
- else
- {
- _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _current.ctx);
- }
- }
- else
- {
- if(_sslContext.size() > 0)
- {
- _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _sslContext);
- }
- else
- {
- _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams);
- }
- }
- }
- catch(const LocalException&)
- {
- }
- return false;
+ try
+ {
+ ByteSeq outParams;
+ if(_forwardContext)
+ {
+ if(_sslContext.size() > 0)
+ {
+ Ice::Context ctx = _current.ctx;
+ ctx.insert(_sslContext.begin(), _sslContext.end());
+ _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, ctx);
+ }
+ else
+ {
+ _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _current.ctx);
+ }
+ }
+ else
+ {
+ if(_sslContext.size() > 0)
+ {
+ _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _sslContext);
+ }
+ else
+ {
+ _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams);
+ }
+ }
+ }
+ catch(const LocalException&)
+ {
+ }
+ return false;
}
}
@@ -172,7 +172,7 @@ Glacier2::Request::override(const RequestPtr& other) const
//
if(_override.empty() || other->_override.empty())
{
- return false;
+ return false;
}
//
@@ -181,7 +181,7 @@ Glacier2::Request::override(const RequestPtr& other) const
//
if(_proxy->ice_isTwoway() || other->_proxy->ice_isTwoway())
{
- return false;
+ return false;
}
//
@@ -189,7 +189,7 @@ Glacier2::Request::override(const RequestPtr& other) const
//
if(_proxy != other->_proxy)
{
- return false;
+ return false;
}
return _override == other->_override;
@@ -208,7 +208,7 @@ Glacier2::Request::getConnection() const
}
Glacier2::Response::Response(const AMD_Array_Object_ice_invokePtr& amdCB, bool ok,
- const pair<const Byte*, const Byte*>& outParams) :
+ const pair<const Byte*, const Byte*>& outParams) :
_amdCB(amdCB),
_ok(ok),
_outParams(outParams.first, outParams.second)
@@ -227,21 +227,21 @@ Glacier2::Response::invoke()
{
if(_exception.get())
{
- _amdCB->ice_exception(*_exception.get());
+ _amdCB->ice_exception(*_exception.get());
}
else
{
pair<const Byte*, const Byte*> outPair;
- if(_outParams.size() == 0)
- {
- outPair.first = outPair.second = 0;
- }
- else
- {
- outPair.first = &_outParams[0];
- outPair.second = outPair.first + _outParams.size();
- }
- _amdCB->ice_response(_ok, outPair);
+ if(_outParams.size() == 0)
+ {
+ outPair.first = outPair.second = 0;
+ }
+ else
+ {
+ outPair.first = &_outParams[0];
+ outPair.second = outPair.first + _outParams.size();
+ }
+ _amdCB->ice_response(_ok, outPair);
}
}
@@ -265,12 +265,12 @@ void
Glacier2::RequestQueue::destroy()
{
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
-
- assert(!_destroy);
- _destroy = true;
- _sleep = false;
- notify();
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
+
+ assert(!_destroy);
+ _destroy = true;
+ _sleep = false;
+ notify();
}
//
@@ -295,14 +295,14 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request)
for(vector<RequestPtr>::iterator p = _requests.begin(); p != _requests.end(); ++p)
{
- //
+ //
// If the new request overrides an old one, then abort the old
// request and replace it with the new request.
- //
+ //
if(request->override(*p))
{
*p = request;
- return true;
+ return true;
}
}
@@ -312,11 +312,11 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request)
_requests.push_back(request);
if(!_sleep)
{
- //
- // No need to notify if the request queue thread is sleeping,
- // once it wakes up it will check if there's requests to send.
- //
- notify();
+ //
+ // No need to notify if the request queue thread is sleeping,
+ // once it wakes up it will check if there's requests to send.
+ //
+ notify();
}
return false;
}
@@ -336,73 +336,73 @@ Glacier2::RequestQueue::run()
ptrdiff_t dispatchCount = 0; // The dispatch count keeps track of the number of outstanding twoway requests.
while(true)
{
- vector<RequestPtr> requests;
- vector<ResponsePtr> responses;
+ vector<RequestPtr> requests;
+ vector<ResponsePtr> responses;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
- //
- // Wait indefinitely if there's no requests/responses to
- // send. If the queue is being destroyed we still need to
- // wait until all the responses for twoway requests are
- // received.
- //
+ //
+ // Wait indefinitely if there's no requests/responses to
+ // send. If the queue is being destroyed we still need to
+ // wait until all the responses for twoway requests are
+ // received.
+ //
while((!_destroy || dispatchCount != 0) && _responses.empty() && (_requests.empty() || _sleep))
{
- if(_sleep)
- {
- IceUtil::Time now = IceUtil::Time::now();
- if(!timedWait(_sleepDuration))
- {
- _sleepDuration = IceUtil::Time();
- }
- else
- {
- _sleepDuration -= IceUtil::Time::now() - now;
- }
- if(_sleepDuration <= IceUtil::Time())
- {
- _sleep = false;
- }
- }
- else
- {
- wait();
- }
+ if(_sleep)
+ {
+ IceUtil::Time now = IceUtil::Time::now();
+ if(!timedWait(_sleepDuration))
+ {
+ _sleepDuration = IceUtil::Time();
+ }
+ else
+ {
+ _sleepDuration -= IceUtil::Time::now() - now;
+ }
+ if(_sleepDuration <= IceUtil::Time())
+ {
+ _sleep = false;
+ }
+ }
+ else
+ {
+ wait();
+ }
}
- //
- // If the queue is being destroyed and there's no requests
- // or responses to send, we're done.
- //
+ //
+ // If the queue is being destroyed and there's no requests
+ // or responses to send, we're done.
+ //
if(_destroy && _requests.empty() && _responses.empty())
{
- assert(dispatchCount == 0); // We would have blocked in the wait() above otherwise.
+ assert(dispatchCount == 0); // We would have blocked in the wait() above otherwise.
return;
}
- //
- // If there's requests to sent and we're not sleeping,
- // send the requests. If a sleep time is configured, we
- // set the sleep duration and set the sleep flag to make
- // sure we'll sleep again once we're done sending requests
- // and responses.
- //
- if(!_requests.empty() && !_sleep)
- {
- requests.swap(_requests);
- if(_sleepTime > IceUtil::Time())
- {
- _sleep = true;
- _sleepDuration = _sleepTime;
- }
- }
- if(!_responses.empty())
- {
- responses.swap(_responses);
- }
- }
+ //
+ // If there's requests to sent and we're not sleeping,
+ // send the requests. If a sleep time is configured, we
+ // set the sleep duration and set the sleep flag to make
+ // sure we'll sleep again once we're done sending requests
+ // and responses.
+ //
+ if(!_requests.empty() && !_sleep)
+ {
+ requests.swap(_requests);
+ if(_sleepTime > IceUtil::Time())
+ {
+ _sleep = true;
+ _sleepDuration = _sleepTime;
+ }
+ }
+ if(!_responses.empty())
+ {
+ responses.swap(_responses);
+ }
+ }
//
// Send requests, flush batch requests, and sleep outside the
@@ -410,53 +410,53 @@ Glacier2::RequestQueue::run()
// while this is being done.
//
- set<ConnectionPtr> flushSet;
-
- for(vector<RequestPtr>::const_iterator p = requests.begin(); p != requests.end(); ++p)
- {
- if((*p)->isBatch())
- {
- try
- {
- flushSet.insert((*p)->getConnection());
- }
- catch(const LocalException&)
- {
- // Ignore.
- }
- }
-
- //
- // Invoke returns true if the request expects a response.
- // If that's the case we increment the dispatch count to
- // ensure that the thread won't be destroyed before the
- // response is received.
- //
- if((*p)->invoke(self)) // Exceptions are caught within invoke().
- {
- ++dispatchCount;
- }
- }
-
- for(set<ConnectionPtr>::const_iterator q = flushSet.begin(); q != flushSet.end(); ++q)
- {
- try
- {
- (*q)->flushBatchRequests();
- }
- catch(const LocalException&)
- {
- // Ignore.
- }
- }
-
- //
- // Send the responses and decrement the dispatch count.
- //
- for(vector<ResponsePtr>::const_iterator r = responses.begin(); r != responses.end(); ++r)
- {
- (*r)->invoke();
- }
- dispatchCount -= responses.size();
+ set<ConnectionPtr> flushSet;
+
+ for(vector<RequestPtr>::const_iterator p = requests.begin(); p != requests.end(); ++p)
+ {
+ if((*p)->isBatch())
+ {
+ try
+ {
+ flushSet.insert((*p)->getConnection());
+ }
+ catch(const LocalException&)
+ {
+ // Ignore.
+ }
+ }
+
+ //
+ // Invoke returns true if the request expects a response.
+ // If that's the case we increment the dispatch count to
+ // ensure that the thread won't be destroyed before the
+ // response is received.
+ //
+ if((*p)->invoke(self)) // Exceptions are caught within invoke().
+ {
+ ++dispatchCount;
+ }
+ }
+
+ for(set<ConnectionPtr>::const_iterator q = flushSet.begin(); q != flushSet.end(); ++q)
+ {
+ try
+ {
+ (*q)->flushBatchRequests();
+ }
+ catch(const LocalException&)
+ {
+ // Ignore.
+ }
+ }
+
+ //
+ // Send the responses and decrement the dispatch count.
+ //
+ for(vector<ResponsePtr>::const_iterator r = responses.begin(); r != responses.end(); ++r)
+ {
+ (*r)->invoke();
+ }
+ dispatchCount -= responses.size();
}
}