summaryrefslogtreecommitdiff
path: root/cpp/src/Glacier2/RequestQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Glacier2/RequestQueue.cpp')
-rwxr-xr-xcpp/src/Glacier2/RequestQueue.cpp355
1 files changed, 189 insertions, 166 deletions
diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp
index a901b6208a1..85cb55c5c63 100755
--- a/cpp/src/Glacier2/RequestQueue.cpp
+++ b/cpp/src/Glacier2/RequestQueue.cpp
@@ -16,102 +16,6 @@ using namespace std;
using namespace Ice;
using namespace Glacier2;
-namespace
-{
-
-//
-// AMI base callback class for twoway/oneway requests
-//
-class IceInvokeI : public AMI_Array_Object_ice_invoke
-{
-public:
-
- IceInvokeI(const AMD_Object_ice_invokePtr& amdCB, const InstancePtr& instance, const ConnectionPtr& connection) :
- _amdCB(amdCB),
- _instance(instance),
- _connection(connection)
- {
- }
-
- virtual void
- ice_exception(const Exception& ex)
- {
- //
- // If the connection has been lost, destroy the session.
- //
- if(_connection)
- {
- if(dynamic_cast<const Ice::SocketException*>(&ex) ||
- dynamic_cast<const Ice::TimeoutException*>(&ex) ||
- dynamic_cast<const Ice::ProtocolException*>(&ex))
- {
- try
- {
- _instance->sessionRouter()->destroySession(_connection);
- }
- catch(const Exception&)
- {
- }
- }
- }
-
- if(_amdCB)
- {
- _amdCB->ice_exception(ex);
- }
- }
-
-protected:
-
- const AMD_Object_ice_invokePtr _amdCB;
- const InstancePtr _instance;
- const ConnectionPtr _connection;
-};
-
-class TwowayIceInvokeI : public IceInvokeI
-{
-public:
-
- TwowayIceInvokeI(const AMD_Object_ice_invokePtr& amdCB, const InstancePtr& instance, const ConnectionPtr& con) :
- IceInvokeI(amdCB, instance, con)
- {
- }
-
- virtual void
- ice_response(bool ok, const pair<const Byte*, const Byte*>& outParams)
- {
- _amdCB->ice_response(ok, outParams);
- }
-};
-
-class OnewayIceInvokeI : public IceInvokeI, public Ice::AMISentCallback
-{
-public:
-
- OnewayIceInvokeI(const AMD_Object_ice_invokePtr& amdCB, const InstancePtr& instance, const ConnectionPtr& con) :
- IceInvokeI(amdCB, instance, con)
- {
- }
-
- virtual void
- ice_response(bool ok, const pair<const Byte*, const Byte*>& outParams)
- {
- assert(false);
- }
-
- virtual void
- ice_sent()
- {
-#if (defined(_MSC_VER) && (_MSC_VER >= 1600))
- _amdCB->ice_response(true, pair<const Byte*, const Byte*>(nullptr, nullptr));
-#else
- _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0));
-#endif
- }
-};
-
-}
-
Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*, const Byte*>& inParams,
const Current& current, bool forwardContext, const Ice::Context& sslContext,
const AMD_Object_ice_invokePtr& amdCB) :
@@ -122,18 +26,6 @@ Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*,
_sslContext(sslContext),
_amdCB(amdCB)
{
- //
- // If this is a batch call, we can finish the AMD call right away.
- //
- if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram())
- {
-#if (defined(_MSC_VER) && (_MSC_VER >= 1600))
- _amdCB->ice_response(true, pair<const Byte*, const Byte*>(nullptr, nullptr));
-#else
- _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0));
-#endif
- }
-
Context::const_iterator p = current.ctx.find("_ovrd");
if(p != current.ctx.end())
{
@@ -142,8 +34,8 @@ Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*,
}
-bool
-Glacier2::Request::invoke(const InstancePtr& instance, const Ice::ConnectionPtr& connection)
+Ice::AsyncResultPtr
+Glacier2::Request::invoke(const Callback_Object_ice_invokePtr& cb)
{
pair<const Byte*, const Byte*> inPair;
if(_inParams.size() == 0)
@@ -176,61 +68,44 @@ Glacier2::Request::invoke(const InstancePtr& instance, const Ice::ConnectionPtr&
{
if(_sslContext.size() > 0)
{
- _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _sslContext);
+ _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _sslContext);
}
else
{
_proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams);
}
}
- return true; // Batch invocation.
+ return 0;
}
else
{
- AMI_Array_Object_ice_invokePtr amiCB;
- if(_proxy->ice_isTwoway())
- {
- amiCB = new TwowayIceInvokeI(_amdCB, instance, connection);
- }
- else
- {
- amiCB = new OnewayIceInvokeI(_amdCB, instance, connection);
- }
-
- bool sent;
+ Ice::AsyncResultPtr result;
if(_forwardContext)
{
if(_sslContext.size() > 0)
{
Ice::Context ctx = _current.ctx;
ctx.insert(_sslContext.begin(), _sslContext.end());
- sent = _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair, ctx);
+ result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, ctx, cb, this);
}
else
{
- sent = _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair, _current.ctx);
+ result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, _current.ctx, cb, this);
}
}
else
{
if(_sslContext.size() > 0)
{
- sent = _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair, _sslContext);
+ result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, _sslContext, cb, this);
}
else
{
- sent = _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair);
+ result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, cb, this);
}
}
- if(sent && !_proxy->ice_isTwoway())
- {
-#if (defined(_MSC_VER) && (_MSC_VER >= 1600))
- _amdCB->ice_response(true, pair<const Byte*, const Byte*>(nullptr, nullptr));
-#else
- _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0));
-#endif
- }
- return false; // Not a batch invocation.
+
+ return result;
}
}
@@ -255,14 +130,50 @@ Glacier2::Request::override(const RequestPtr& other) const
}
//
+ // Don't override if the override isn't the same.
+ //
+ if(_override != other->_override)
+ {
+ return false;
+ }
+
+ //
// We cannot override if the proxies differ.
//
- if(_proxy != other->_proxy)
+ return _proxy == other->_proxy;
+}
+
+void
+Glacier2::Request::response(bool ok, const pair<const Ice::Byte*, const Ice::Byte*>& outParams)
+{
+ assert(_proxy->ice_isTwoway());
+ _amdCB->ice_response(ok, outParams);
+}
+
+void
+Glacier2::Request::exception(const Ice::Exception& ex)
+{
+ //
+ // Only for twoways, oneway or batch oneway dispatches are finished
+ // when queued, see queued().
+ //
+ if(_proxy->ice_isTwoway())
{
- return false;
+ _amdCB->ice_exception(ex);
}
+}
- return _override == other->_override;
+void
+Glacier2::Request::queued()
+{
+ if(!_proxy->ice_isTwoway())
+ {
+#if (defined(_MSC_VER) && (_MSC_VER >= 1600))
+ _amdCB->ice_response(true, pair<const Byte*, const Byte*>(nullptr, nullptr));
+#else
+ _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0));
+#endif
+ }
}
Glacier2::RequestQueue::RequestQueue(const RequestQueueThreadPtr& requestQueueThread,
@@ -270,7 +181,11 @@ Glacier2::RequestQueue::RequestQueue(const RequestQueueThreadPtr& requestQueueTh
const Ice::ConnectionPtr& connection) :
_requestQueueThread(requestQueueThread),
_instance(instance),
- _connection(connection)
+ _connection(connection),
+ _callback(newCallback_Object_ice_invoke(this, &RequestQueue::response, &RequestQueue::exception,
+ &RequestQueue::sent)),
+ _flushCallback(newCallback_Connection_flushBatchRequests(this, &RequestQueue::exception, &RequestQueue::sent)),
+ _pendingSend(false)
{
}
@@ -280,7 +195,7 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request)
IceUtil::Mutex::Lock lock(*this);
if(request->hasOverride())
{
- for(vector<RequestPtr>::iterator p = _requests.begin(); p != _requests.end(); ++p)
+ for(deque<RequestPtr>::iterator p = _requests.begin(); p != _requests.end(); ++p)
{
//
// If the new request overrides an old one, then abort the old
@@ -288,6 +203,7 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request)
//
if(request->override(*p))
{
+ request->queued();
*p = request;
return true;
}
@@ -297,11 +213,12 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request)
//
// No override, we add the new request.
//
- if(_requests.empty())
+ if(_requests.empty() && (!_connection || !_pendingSend))
{
_requestQueueThread->flushRequestQueue(this); // This might throw if the thread is destroyed.
}
_requests.push_back(request);
+ request->queued();
return false;
}
@@ -309,13 +226,43 @@ void
Glacier2::RequestQueue::flushRequests(set<Ice::ObjectPrx>& batchProxies)
{
IceUtil::Mutex::Lock lock(*this);
- for(vector<RequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
+ if(_connection)
+ {
+ if(_pendingSend)
+ {
+ return;
+ }
+ flush();
+ }
+ else
+ {
+ flush(batchProxies);
+ }
+}
+
+void
+Glacier2::RequestQueue::flush()
+{
+ assert(_connection);
+ _pendingSend = false;
+ _pendingSendRequest = 0;
+
+ bool flushBatchRequests = false;
+ deque<RequestPtr>::iterator p;
+ for(p = _requests.begin(); p != _requests.end(); ++p)
{
try
{
- if((*p)->invoke(_instance, _connection)) // If batch invocation, add the proxy to the batch proxy set.
+ Ice::AsyncResultPtr result = (*p)->invoke(_callback);
+ if(!result)
{
- batchProxies.insert((*p)->getProxy());
+ flushBatchRequests = true;
+ }
+ else if(!result->sentSynchronously() && !result->isCompleted())
+ {
+ _pendingSend = true;
+ _pendingSendRequest = *p++;
+ break;
}
}
catch(const Ice::LocalException&)
@@ -323,9 +270,104 @@ Glacier2::RequestQueue::flushRequests(set<Ice::ObjectPrx>& batchProxies)
// Ignore, this can occur for batch requests.
}
}
+
+ if(p == _requests.end())
+ {
+ _requests.clear();
+ }
+ else
+ {
+ _requests.erase(_requests.begin(), p);
+ }
+
+ if(flushBatchRequests)
+ {
+ Ice::AsyncResultPtr result = _connection->begin_flushBatchRequests(_flushCallback);
+ if(!result->sentSynchronously() && !result->isCompleted())
+ {
+ _pendingSend = true;
+ _pendingSendRequest = 0;
+ }
+ }
+}
+
+void
+Glacier2::RequestQueue::flush(set<Ice::ObjectPrx>& batchProxies)
+{
+ assert(!_connection);
+
+ for(deque<RequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
+ {
+ try
+ {
+ Ice::AsyncResultPtr result = (*p)->invoke(_callback);
+ if(!result)
+ {
+ batchProxies.insert((*p)->getProxy());
+ }
+ }
+ catch(const Ice::LocalException&)
+ {
+ // Ignore, this can occur for batch requests.
+ }
+ }
_requests.clear();
}
+void
+Glacier2::RequestQueue::response(bool ok, const pair<const Byte*, const Byte*>& outParams, const RequestPtr& request)
+{
+ assert(request);
+ request->response(ok, outParams);
+}
+
+void
+Glacier2::RequestQueue::exception(const Ice::Exception& ex, const RequestPtr& request)
+{
+ //
+ // If the connection has been lost, destroy the session.
+ //
+ if(_connection)
+ {
+ if(dynamic_cast<const Ice::SocketException*>(&ex) ||
+ dynamic_cast<const Ice::TimeoutException*>(&ex) ||
+ dynamic_cast<const Ice::ProtocolException*>(&ex))
+ {
+ try
+ {
+ _instance->sessionRouter()->destroySession(_connection);
+ }
+ catch(const Exception&)
+ {
+ }
+ }
+
+ IceUtil::Mutex::Lock lock(*this);
+ if(request == _pendingSendRequest)
+ {
+ flush();
+ }
+ }
+
+ if(request)
+ {
+ request->exception(ex);
+ }
+}
+
+void
+Glacier2::RequestQueue::sent(bool sentSynchronously, const RequestPtr& request)
+{
+ if(_connection && !sentSynchronously)
+ {
+ IceUtil::Mutex::Lock lock(*this);
+ if(request == _pendingSendRequest)
+ {
+ flush();
+ }
+ }
+}
+
Glacier2::RequestQueueThread::RequestQueueThread(const IceUtil::Time& sleepTime) :
IceUtil::Thread("Glacier2 request queue thread"),
_sleepTime(sleepTime),
@@ -445,29 +487,10 @@ Glacier2::RequestQueueThread::run()
(*p)->flushRequests(flushProxySet);
}
- set<Ice::ConnectionPtr> flushConnectionSet;
for(set<Ice::ObjectPrx>::const_iterator q = flushProxySet.begin(); q != flushProxySet.end(); ++q)
{
- //
- // As an optimization, we only flush the proxy batch requests if we didn't
- // already flush the requests of a proxy which is using the same connection.
- //
- Ice::ConnectionPtr connection = (*q)->ice_getCachedConnection();
- if(!connection || flushConnectionSet.find(connection) == flushConnectionSet.end())
- {
- class FlushCB : public AMI_Object_ice_flushBatchRequests
- {
- public:
-
- virtual void ice_exception(const Ice::Exception&) { } // Ignore.
- };
- (*q)->ice_flushBatchRequests_async(new FlushCB());
-
- if(connection)
- {
- flushConnectionSet.insert(connection);
- }
- }
+ (*q)->begin_ice_flushBatchRequests();
}
}
}
+