diff options
Diffstat (limited to 'cpp/src/Glacier2/RequestQueue.cpp')
-rw-r--r-- | cpp/src/Glacier2/RequestQueue.cpp | 151 |
1 files changed, 85 insertions, 66 deletions
diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp index f7912fa3414..1b2dceac9e2 100644 --- a/cpp/src/Glacier2/RequestQueue.cpp +++ b/cpp/src/Glacier2/RequestQueue.cpp @@ -14,53 +14,84 @@ using namespace std; using namespace Ice; using namespace Glacier2; +class Glacier2CB : public AMI_Object_ice_invoke +{ +public: + + Glacier2CB(const AMD_Object_ice_invokePtr& cb) : + _cb(cb) + { + } + + virtual void + ice_response(bool ok, const vector<Byte>& outParams) + { + _cb->ice_response(ok, outParams); + } + + virtual void + ice_exception(const Exception& ex) + { + _cb->ice_exception(ex); + } + +private: + + AMD_Object_ice_invokePtr _cb; +}; + Glacier2::Request::Request(const ObjectPrx& proxy, const vector<Byte>& inParams, const Current& current, - const AMI_Object_ice_invokePtr& amiCB) : + bool forwardContext, const AMD_Object_ice_invokePtr& amdCB) : _proxy(proxy), _inParams(inParams), _current(current), - _amiCB(amiCB) + _forwardContext(forwardContext), + _amiCB(new Glacier2CB(amdCB)) { Context::const_iterator p = current.ctx.find("_ovrd"); if(p != current.ctx.end()) { - _override = p->second; + const_cast<string&>(_override) = p->second; } } void -Glacier2::Request::invoke(bool forwardContext) +Glacier2::Request::invoke() { if(_proxy->ice_isTwoway()) { - assert(_amiCB); - try + if(_forwardContext) { - if(forwardContext) - { - _proxy->ice_invoke_async(_amiCB, _current.operation, _current.mode, _inParams, _current.ctx); - } - else - { - _proxy->ice_invoke_async(_amiCB, _current.operation, _current.mode, _inParams); - } + _proxy->ice_invoke_async(_amiCB, _current.operation, _current.mode, _inParams, _current.ctx); } - catch(const Ice::Exception& ex) + else { - _amiCB->ice_exception(ex); + _proxy->ice_invoke_async(_amiCB, _current.operation, _current.mode, _inParams); } } else { - vector<Byte> dummy; - if(forwardContext) + bool ok; + vector<Byte> outParams; + + try { - _proxy->ice_invoke(_current.operation, _current.mode, _inParams, dummy, _current.ctx); + if(_forwardContext) + { + ok = _proxy->ice_invoke(_current.operation, _current.mode, _inParams, outParams, _current.ctx); + } + else + { + ok = _proxy->ice_invoke(_current.operation, _current.mode, _inParams, outParams); + } } - else + catch(const LocalException& ex) { - _proxy->ice_invoke(_current.operation, _current.mode, _inParams, dummy); + _amiCB->ice_exception(ex); + return; } + + _amiCB->ice_response(ok, outParams); } } @@ -107,30 +138,25 @@ Glacier2::Request::getCurrent() const return _current; } +static const string serverSleepTime = "Glacier2.Server.SleepTime"; +static const string clientSleepTime = "Glacier2.Client.SleepTime"; static const string serverTraceRequest = "Glacier2.Server.Trace.Request"; static const string clientTraceRequest = "Glacier2.Client.Trace.Request"; static const string serverTraceOverride = "Glacier2.Server.Trace.Override"; static const string clientTraceOverride = "Glacier2.Client.Trace.Override"; -static const string serverForwardContext = "Glacier2.Server.ForwardContext"; -static const string clientForwardContext = "Glacier2.Client.ForwardContext"; -static const string serverSleepTime = "Glacier2.Server.SleepTime"; -static const string clientSleepTime = "Glacier2.Client.SleepTime"; Glacier2::RequestQueue::RequestQueue(const Ice::CommunicatorPtr& communicator, bool reverse) : _logger(communicator->getLogger()), _reverse(reverse), - _traceLevelRequest(reverse ? + _sleepTime(reverse ? + IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsInt(serverSleepTime)) : + IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsInt(clientSleepTime))), + _requestTraceLevel(_reverse ? communicator->getProperties()->getPropertyAsInt(serverTraceRequest) : communicator->getProperties()->getPropertyAsInt(clientTraceRequest)), - _traceLevelOverride(reverse ? + _overrideTraceLevel(reverse ? communicator->getProperties()->getPropertyAsInt(serverTraceOverride) : communicator->getProperties()->getPropertyAsInt(clientTraceOverride)), - _forwardContext(reverse ? - communicator->getProperties()->getPropertyAsInt(serverForwardContext) > 0 : - communicator->getProperties()->getPropertyAsInt(clientForwardContext) > 0), - _sleepTime(reverse ? - IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsInt(serverSleepTime)) : - IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsInt(clientSleepTime))), _destroy(false) { } @@ -179,7 +205,7 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request) { *p = request; // Replace old request if this is an override. - if(_traceLevelOverride >= 1) + if(_overrideTraceLevel >= 1) { traceRequest(request, "override"); } @@ -225,43 +251,36 @@ Glacier2::RequestQueue::run() // while this is being done. // - try - { - set<ConnectionPtr> flushSet; - - for(vector<RequestPtr>::const_iterator p = requests.begin(); p != requests.end(); ++p) + set<ConnectionPtr> flushSet; + + for(vector<RequestPtr>::const_iterator p = requests.begin(); p != requests.end(); ++p) + { + const ObjectPrx& proxy = (*p)->getProxy(); + + if(proxy->ice_batchOneway() || proxy->ice_batchDatagram()) { - const ObjectPrx& proxy = (*p)->getProxy(); - - if(proxy->ice_batchOneway() || proxy->ice_batchDatagram()) - { - flushSet.insert(proxy->ice_connection()); - } - - if(_traceLevelRequest >= 1) - { - traceRequest(*p, ""); - } - - (*p)->invoke(_forwardContext); + flushSet.insert(proxy->ice_connection()); } - - for_each(flushSet.begin(), flushSet.end(), Ice::voidMemFun(&Connection::flushBatchRequests)); + + if(_requestTraceLevel >= 1) + { + traceRequest(*p, ""); + } + + (*p)->invoke(); } - catch(const Ice::Exception& ex) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - - if(_traceLevelRequest >= 1) + + for(set<ConnectionPtr>::const_iterator q = flushSet.begin(); q != flushSet.end(); ++q) + { + try { - Trace out(_logger, "Glacier2"); - if(_reverse) - { - out << "reverse "; - } - out << "routing exception:\n" << ex; + for_each(flushSet.begin(), flushSet.end(), Ice::voidMemFun(&Connection::flushBatchRequests)); } - } + catch(const LocalException& ex) + { + // Ignore. + } + } // // In order to avoid flooding, we add a delay, if so |