summaryrefslogtreecommitdiff
path: root/cpp/src/Glacier2/RequestQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Glacier2/RequestQueue.cpp')
-rw-r--r--cpp/src/Glacier2/RequestQueue.cpp151
1 files changed, 85 insertions, 66 deletions
diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp
index f7912fa3414..1b2dceac9e2 100644
--- a/cpp/src/Glacier2/RequestQueue.cpp
+++ b/cpp/src/Glacier2/RequestQueue.cpp
@@ -14,53 +14,84 @@ using namespace std;
using namespace Ice;
using namespace Glacier2;
+class Glacier2CB : public AMI_Object_ice_invoke
+{
+public:
+
+ Glacier2CB(const AMD_Object_ice_invokePtr& cb) :
+ _cb(cb)
+ {
+ }
+
+ virtual void
+ ice_response(bool ok, const vector<Byte>& outParams)
+ {
+ _cb->ice_response(ok, outParams);
+ }
+
+ virtual void
+ ice_exception(const Exception& ex)
+ {
+ _cb->ice_exception(ex);
+ }
+
+private:
+
+ AMD_Object_ice_invokePtr _cb;
+};
+
Glacier2::Request::Request(const ObjectPrx& proxy, const vector<Byte>& inParams, const Current& current,
- const AMI_Object_ice_invokePtr& amiCB) :
+ bool forwardContext, const AMD_Object_ice_invokePtr& amdCB) :
_proxy(proxy),
_inParams(inParams),
_current(current),
- _amiCB(amiCB)
+ _forwardContext(forwardContext),
+ _amiCB(new Glacier2CB(amdCB))
{
Context::const_iterator p = current.ctx.find("_ovrd");
if(p != current.ctx.end())
{
- _override = p->second;
+ const_cast<string&>(_override) = p->second;
}
}
void
-Glacier2::Request::invoke(bool forwardContext)
+Glacier2::Request::invoke()
{
if(_proxy->ice_isTwoway())
{
- assert(_amiCB);
- try
+ if(_forwardContext)
{
- if(forwardContext)
- {
- _proxy->ice_invoke_async(_amiCB, _current.operation, _current.mode, _inParams, _current.ctx);
- }
- else
- {
- _proxy->ice_invoke_async(_amiCB, _current.operation, _current.mode, _inParams);
- }
+ _proxy->ice_invoke_async(_amiCB, _current.operation, _current.mode, _inParams, _current.ctx);
}
- catch(const Ice::Exception& ex)
+ else
{
- _amiCB->ice_exception(ex);
+ _proxy->ice_invoke_async(_amiCB, _current.operation, _current.mode, _inParams);
}
}
else
{
- vector<Byte> dummy;
- if(forwardContext)
+ bool ok;
+ vector<Byte> outParams;
+
+ try
{
- _proxy->ice_invoke(_current.operation, _current.mode, _inParams, dummy, _current.ctx);
+ if(_forwardContext)
+ {
+ ok = _proxy->ice_invoke(_current.operation, _current.mode, _inParams, outParams, _current.ctx);
+ }
+ else
+ {
+ ok = _proxy->ice_invoke(_current.operation, _current.mode, _inParams, outParams);
+ }
}
- else
+ catch(const LocalException& ex)
{
- _proxy->ice_invoke(_current.operation, _current.mode, _inParams, dummy);
+ _amiCB->ice_exception(ex);
+ return;
}
+
+ _amiCB->ice_response(ok, outParams);
}
}
@@ -107,30 +138,25 @@ Glacier2::Request::getCurrent() const
return _current;
}
+static const string serverSleepTime = "Glacier2.Server.SleepTime";
+static const string clientSleepTime = "Glacier2.Client.SleepTime";
static const string serverTraceRequest = "Glacier2.Server.Trace.Request";
static const string clientTraceRequest = "Glacier2.Client.Trace.Request";
static const string serverTraceOverride = "Glacier2.Server.Trace.Override";
static const string clientTraceOverride = "Glacier2.Client.Trace.Override";
-static const string serverForwardContext = "Glacier2.Server.ForwardContext";
-static const string clientForwardContext = "Glacier2.Client.ForwardContext";
-static const string serverSleepTime = "Glacier2.Server.SleepTime";
-static const string clientSleepTime = "Glacier2.Client.SleepTime";
Glacier2::RequestQueue::RequestQueue(const Ice::CommunicatorPtr& communicator, bool reverse) :
_logger(communicator->getLogger()),
_reverse(reverse),
- _traceLevelRequest(reverse ?
+ _sleepTime(reverse ?
+ IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsInt(serverSleepTime)) :
+ IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsInt(clientSleepTime))),
+ _requestTraceLevel(_reverse ?
communicator->getProperties()->getPropertyAsInt(serverTraceRequest) :
communicator->getProperties()->getPropertyAsInt(clientTraceRequest)),
- _traceLevelOverride(reverse ?
+ _overrideTraceLevel(reverse ?
communicator->getProperties()->getPropertyAsInt(serverTraceOverride) :
communicator->getProperties()->getPropertyAsInt(clientTraceOverride)),
- _forwardContext(reverse ?
- communicator->getProperties()->getPropertyAsInt(serverForwardContext) > 0 :
- communicator->getProperties()->getPropertyAsInt(clientForwardContext) > 0),
- _sleepTime(reverse ?
- IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsInt(serverSleepTime)) :
- IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsInt(clientSleepTime))),
_destroy(false)
{
}
@@ -179,7 +205,7 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request)
{
*p = request; // Replace old request if this is an override.
- if(_traceLevelOverride >= 1)
+ if(_overrideTraceLevel >= 1)
{
traceRequest(request, "override");
}
@@ -225,43 +251,36 @@ Glacier2::RequestQueue::run()
// while this is being done.
//
- try
- {
- set<ConnectionPtr> flushSet;
-
- for(vector<RequestPtr>::const_iterator p = requests.begin(); p != requests.end(); ++p)
+ set<ConnectionPtr> flushSet;
+
+ for(vector<RequestPtr>::const_iterator p = requests.begin(); p != requests.end(); ++p)
+ {
+ const ObjectPrx& proxy = (*p)->getProxy();
+
+ if(proxy->ice_batchOneway() || proxy->ice_batchDatagram())
{
- const ObjectPrx& proxy = (*p)->getProxy();
-
- if(proxy->ice_batchOneway() || proxy->ice_batchDatagram())
- {
- flushSet.insert(proxy->ice_connection());
- }
-
- if(_traceLevelRequest >= 1)
- {
- traceRequest(*p, "");
- }
-
- (*p)->invoke(_forwardContext);
+ flushSet.insert(proxy->ice_connection());
}
-
- for_each(flushSet.begin(), flushSet.end(), Ice::voidMemFun(&Connection::flushBatchRequests));
+
+ if(_requestTraceLevel >= 1)
+ {
+ traceRequest(*p, "");
+ }
+
+ (*p)->invoke();
}
- catch(const Ice::Exception& ex)
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
-
- if(_traceLevelRequest >= 1)
+
+ for(set<ConnectionPtr>::const_iterator q = flushSet.begin(); q != flushSet.end(); ++q)
+ {
+ try
{
- Trace out(_logger, "Glacier2");
- if(_reverse)
- {
- out << "reverse ";
- }
- out << "routing exception:\n" << ex;
+ for_each(flushSet.begin(), flushSet.end(), Ice::voidMemFun(&Connection::flushBatchRequests));
}
- }
+ catch(const LocalException& ex)
+ {
+ // Ignore.
+ }
+ }
//
// In order to avoid flooding, we add a delay, if so