summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2016-03-30 10:05:51 +0200
committerBenoit Foucher <benoit@zeroc.com>2016-03-30 10:05:51 +0200
commitb6dca14b73652e666b998063587a45fa44fcc27b (patch)
treeba2c180ac71df72d14220f03b70eb16269e245fa
parentDisabled binding test on Safari + WebWorkers (diff)
downloadice-b6dca14b73652e666b998063587a45fa44fcc27b.tar.bz2
ice-b6dca14b73652e666b998063587a45fa44fcc27b.tar.xz
ice-b6dca14b73652e666b998063587a45fa44fcc27b.zip
Fixed ICE-7074 - Glacier2 router bug where client to server requests forward as batch could be lost
-rw-r--r--CHANGELOG-3.6.md27
-rw-r--r--cpp/src/Glacier2/RequestQueue.cpp149
-rw-r--r--cpp/src/Glacier2/RequestQueue.h10
-rw-r--r--cpp/test/Glacier2/router/CallbackI.cpp36
-rw-r--r--cpp/test/Glacier2/router/CallbackI.h4
-rw-r--r--cpp/test/Glacier2/router/Client.cpp27
6 files changed, 141 insertions, 112 deletions
diff --git a/CHANGELOG-3.6.md b/CHANGELOG-3.6.md
index 21b3cc7dcda..1ed274d3907 100644
--- a/CHANGELOG-3.6.md
+++ b/CHANGELOG-3.6.md
@@ -40,24 +40,27 @@ These are the changes since Ice 3.6.1.
- Added two new tools, icegriddb and icestormdb, used to import/export the
IceGrid and IceStorm databases.
-- Fixed a bug that affects Java and C# generated code. The generated patcher for
- reading class data members was bogus when the class had more than one class data
- member and derived from a class that contained class data members. The same
- issue was true for exceptions with class data members deriving from exceptions
- with class data members.
+- Fixed a bug that affects Java and C# generated code. The generated patcher
+ for reading class data members was bogus when the class had more than one
+ class data member and derived from a class that contained class data
+ members. The same issue was true for exceptions with class data members
+ deriving from exceptions with class data members.
-- Fixed a bug that prevented scripting languages (Python, Ruby, Javascript and PHP)
- from marshaling NaN or Infinity as a floating point value.
+- Fixed a bug that prevented scripting languages (Python, Ruby, Javascript and
+ PHP) from marshaling NaN or Infinity as a floating point value.
-- Fixed an IceGrid bug where resolving endpoints of dynamically registered replica
- groups would fail unless the client was using an encoding superior to the encoding
- of the dynamically registered object adapters.
+- Fixed an IceGrid bug where resolving endpoints of dynamically registered
+ replica groups would fail unless the client was using an encoding superior
+ to the encoding of the dynamically registered object adapters.
- Added missing functions Ice::identityToString and Ice::stringToIdentity
(C++, Objective-C, PHP, Python and Ruby).
-- Added support for universal character names (\uNNNN and \UNNNNNNNN) in Slice string
- constants.
+- Added support for universal character names (\uNNNN and \UNNNNNNNN) in Slice
+ string constants.
+
+- Fixed Glacier2 router bug where requests from client to server could be lost
+ if forwarded as batch requests with the _fwd=O context.
## C++ Changes
diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp
index 78eff63bce0..5acc66dbfa9 100644
--- a/cpp/src/Glacier2/RequestQueue.cpp
+++ b/cpp/src/Glacier2/RequestQueue.cpp
@@ -10,7 +10,6 @@
#include <Glacier2/RequestQueue.h>
#include <Glacier2/Instance.h>
#include <Glacier2/SessionRouterI.h>
-#include <set>
using namespace std;
using namespace Ice;
@@ -33,6 +32,19 @@ Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*,
}
}
+void
+Glacier2::Request::addBatchProxy(set<Ice::ObjectPrx>& batchProxies)
+{
+ set<Ice::ObjectPrx>::const_iterator p = batchProxies.find(_proxy);
+ if(p == batchProxies.end())
+ {
+ batchProxies.insert(_proxy);
+ }
+ else if(p->get() != _proxy.get())
+ {
+ const_cast<Ice::ObjectPrx&>(_proxy) = *p;
+ }
+}
Ice::AsyncResultPtr
Glacier2::Request::invoke(const Callback_Object_ice_invokePtr& cb)
@@ -52,7 +64,7 @@ Glacier2::Request::invoke(const Callback_Object_ice_invokePtr& cb)
{
ByteSeq outParams;
if(_forwardContext)
- {
+ {
if(_sslContext.size() > 0)
{
Ice::Context ctx = _current.ctx;
@@ -81,7 +93,7 @@ Glacier2::Request::invoke(const Callback_Object_ice_invokePtr& cb)
{
Ice::AsyncResultPtr result;
if(_forwardContext)
- {
+ {
if(_sslContext.size() > 0)
{
Ice::Context ctx = _current.ctx;
@@ -131,7 +143,7 @@ Glacier2::Request::override(const RequestPtr& other) const
//
// Don't override if the override isn't the same.
- //
+ //
if(_override != other->_override)
{
return false;
@@ -163,13 +175,13 @@ Glacier2::Request::exception(const Ice::Exception& ex)
}
}
-void
+void
Glacier2::Request::queued()
{
if(!_proxy->ice_isTwoway())
{
#if (defined(_MSC_VER) && (_MSC_VER >= 1600))
- _amdCB->ice_response(true, pair<const Byte*, const Byte*>(static_cast<const Byte*>(nullptr),
+ _amdCB->ice_response(true, pair<const Byte*, const Byte*>(static_cast<const Byte*>(nullptr),
static_cast<const Byte*>(nullptr)));
#else
_amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0));
@@ -177,8 +189,8 @@ Glacier2::Request::queued()
}
}
-Glacier2::RequestQueue::RequestQueue(const RequestQueueThreadPtr& requestQueueThread,
- const InstancePtr& instance,
+Glacier2::RequestQueue::RequestQueue(const RequestQueueThreadPtr& requestQueueThread,
+ const InstancePtr& instance,
const Ice::ConnectionPtr& connection) :
_requestQueueThread(requestQueueThread),
_instance(instance),
@@ -219,7 +231,17 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request)
}
}
}
-
+
+ if(!_connection)
+ {
+ //
+ // If it's a batch request, we make sure we use a unique batch proxy object for the queued
+ // batch requests. We want all the requests for the same batch proxy to be queued on the
+ // same proxy object.
+ //
+ request->addBatchProxy(_batchProxies);
+ }
+
//
// No override, we add the new request.
//
@@ -237,7 +259,7 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request)
}
void
-Glacier2::RequestQueue::flushRequests(set<Ice::ObjectPrx>& batchProxies)
+Glacier2::RequestQueue::flushRequests()
{
IceUtil::Mutex::Lock lock(*this);
if(_connection)
@@ -250,7 +272,34 @@ Glacier2::RequestQueue::flushRequests(set<Ice::ObjectPrx>& batchProxies)
}
else
{
- flush(batchProxies);
+ for(deque<RequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
+ {
+ try
+ {
+ if(_observer)
+ {
+ _observer->forwarded(!_connection);
+ }
+ assert(_callback);
+ (*p)->invoke(_callback);
+ }
+ catch(const Ice::LocalException&)
+ {
+ // Ignore, this can occur for batch requests.
+ }
+ }
+ _requests.clear();
+
+ for(set<Ice::ObjectPrx>::const_iterator q = _batchProxies.begin(); q != _batchProxies.end(); ++q)
+ {
+ (*q)->begin_ice_flushBatchRequests();
+ }
+ _batchProxies.clear();
+ }
+
+ if(_destroyed && _requests.empty())
+ {
+ destroyInternal();
}
}
@@ -279,20 +328,6 @@ Glacier2::RequestQueue::updateObserver(const Glacier2::Instrumentation::SessionO
}
void
-Glacier2::RequestQueue::destroyInternal()
-{
- //
- // Must be called with the mutex locked.
- //
-
- //
- // Remove cyclic references.
- //
- const_cast<Ice::Callback_Object_ice_invokePtr&>(_callback) = 0;
- const_cast<Ice::Callback_Connection_flushBatchRequestsPtr&>(_flushCallback) = 0;
-}
-
-void
Glacier2::RequestQueue::flush()
{
assert(_connection);
@@ -314,7 +349,7 @@ Glacier2::RequestQueue::flush()
if(!result)
{
flushBatchRequests = true;
- }
+ }
else if(!result->sentSynchronously() && !result->isCompleted())
{
_pendingSend = true;
@@ -346,44 +381,20 @@ Glacier2::RequestQueue::flush()
_pendingSendRequest = 0;
}
}
-
- if(_destroyed && _requests.empty())
- {
- destroyInternal();
- }
}
void
-Glacier2::RequestQueue::flush(set<Ice::ObjectPrx>& batchProxies)
+Glacier2::RequestQueue::destroyInternal()
{
- assert(!_connection);
-
- for(deque<RequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
- {
- try
- {
- if(_observer)
- {
- _observer->forwarded(!_connection);
- }
- assert(_callback);
- Ice::AsyncResultPtr result = (*p)->invoke(_callback);
- if(!result)
- {
- batchProxies.insert((*p)->getProxy());
- }
- }
- catch(const Ice::LocalException&)
- {
- // Ignore, this can occur for batch requests.
- }
- }
- _requests.clear();
+ //
+ // Must be called with the mutex locked.
+ //
- if(_destroyed)
- {
- destroyInternal();
- }
+ //
+ // Remove cyclic references.
+ //
+ const_cast<Ice::Callback_Object_ice_invokePtr&>(_callback) = 0;
+ const_cast<Ice::Callback_Connection_flushBatchRequestsPtr&>(_flushCallback) = 0;
}
void
@@ -427,7 +438,7 @@ Glacier2::RequestQueue::exception(const Ice::Exception& ex, const RequestPtr& re
}
}
-void
+void
Glacier2::RequestQueue::sent(bool sentSynchronously, const RequestPtr& request)
{
if(_connection && !sentSynchronously)
@@ -454,12 +465,12 @@ Glacier2::RequestQueueThread::~RequestQueueThread()
assert(_queues.empty());
}
-void
+void
Glacier2::RequestQueueThread::destroy()
{
{
IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
-
+
assert(!_destroy);
_destroy = true;
_sleep = false;
@@ -534,7 +545,7 @@ Glacier2::RequestQueueThread::run()
}
//
- // If the queue is being destroyed and there's no requests or responses
+ // If the queue is being destroyed and there's no requests or responses
// to send, we're done.
//
if(_destroy && _queues.empty())
@@ -543,7 +554,7 @@ Glacier2::RequestQueueThread::run()
}
assert(!_queues.empty() && !_sleep);
-
+
queues.swap(_queues);
if(_sleepTime > IceUtil::Time())
@@ -552,16 +563,10 @@ Glacier2::RequestQueueThread::run()
_sleepDuration = _sleepTime;
}
}
-
- set<Ice::ObjectPrx> flushProxySet;
- for(vector<RequestQueuePtr>::const_iterator p = queues.begin(); p != queues.end(); ++p)
- {
- (*p)->flushRequests(flushProxySet);
- }
- for(set<Ice::ObjectPrx>::const_iterator q = flushProxySet.begin(); q != flushProxySet.end(); ++q)
+ for(vector<RequestQueuePtr>::const_iterator p = queues.begin(); p != queues.end(); ++p)
{
- (*q)->begin_ice_flushBatchRequests();
+ (*p)->flushRequests();
}
}
}
diff --git a/cpp/src/Glacier2/RequestQueue.h b/cpp/src/Glacier2/RequestQueue.h
index d918584e1d5..94ca48b089f 100644
--- a/cpp/src/Glacier2/RequestQueue.h
+++ b/cpp/src/Glacier2/RequestQueue.h
@@ -36,10 +36,10 @@ public:
Request(const Ice::ObjectPrx&, const std::pair<const Ice::Byte*, const Ice::Byte*>&, const Ice::Current&, bool,
const Ice::Context&, const Ice::AMD_Object_ice_invokePtr&);
-
+
Ice::AsyncResultPtr invoke(const Ice::Callback_Object_ice_invokePtr& callback);
bool override(const RequestPtr&) const;
- const Ice::ObjectPrx& getProxy() const { return _proxy; }
+ void addBatchProxy(std::set<Ice::ObjectPrx>&);
bool hasOverride() const { return !_override.empty(); }
private:
@@ -65,7 +65,7 @@ public:
RequestQueue(const RequestQueueThreadPtr&, const InstancePtr&, const Ice::ConnectionPtr&);
bool addRequest(const RequestPtr&);
- void flushRequests(std::set<Ice::ObjectPrx>&);
+ void flushRequests();
void destroy();
@@ -76,12 +76,11 @@ private:
void destroyInternal();
void flush();
- void flush(std::set<Ice::ObjectPrx>&);
void response(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&, const RequestPtr&);
void exception(const Ice::Exception&, const RequestPtr&);
void sent(bool, const RequestPtr&);
-
+
const RequestQueueThreadPtr _requestQueueThread;
const InstancePtr _instance;
const Ice::ConnectionPtr _connection;
@@ -89,6 +88,7 @@ private:
const Ice::Callback_Connection_flushBatchRequestsPtr _flushCallback;
std::deque<RequestPtr> _requests;
+ std::set<Ice::ObjectPrx> _batchProxies;
bool _pendingSend;
RequestPtr _pendingSendRequest;
bool _destroyed;
diff --git a/cpp/test/Glacier2/router/CallbackI.cpp b/cpp/test/Glacier2/router/CallbackI.cpp
index ccf22fe5a15..365a7b3d896 100644
--- a/cpp/test/Glacier2/router/CallbackI.cpp
+++ b/cpp/test/Glacier2/router/CallbackI.cpp
@@ -109,7 +109,7 @@ public:
typedef IceUtil::Handle<AsyncCB> AsyncCBPtr;
CallbackReceiverI::CallbackReceiverI() :
- _callback(false),
+ _callback(0),
_waitCallback(false),
_callbackWithPayload(false),
_finishWaitCallback(false)
@@ -120,8 +120,7 @@ void
CallbackReceiverI::callback(const Current&)
{
Lock sync(*this);
- assert(!_callback);
- _callback = true;
+ ++_callback;
notifyAll();
}
@@ -179,16 +178,15 @@ CallbackReceiverI::callbackWithPayload(const Ice::ByteSeq&, const Current&)
}
void
-CallbackReceiverI::callbackOK()
+CallbackReceiverI::callbackOK(int expected)
{
Lock sync(*this);
- while(!_callback)
+ while(_callback != expected)
{
wait();
}
-
- _callback = false;
+ _callback = 0;
}
void
@@ -254,8 +252,8 @@ CallbackI::initiateCallback_async(const AMD_Callback_initiateCallbackPtr& cb,
if(proxy->ice_isTwoway())
{
AsyncCBPtr acb = new AsyncCB();
- proxy->begin_callback(current.ctx,
- newCallback_CallbackReceiver_callback(acb, &AsyncCB::responseCallback, &AsyncCB::exceptionCallback),
+ proxy->begin_callback(current.ctx,
+ newCallback_CallbackReceiver_callback(acb, &AsyncCB::responseCallback, &AsyncCB::exceptionCallback),
newCookie(cb));
}
else
@@ -272,8 +270,8 @@ CallbackI::initiateCallbackEx_async(const AMD_Callback_initiateCallbackExPtr& cb
if(proxy->ice_isTwoway())
{
AsyncCBPtr acb = new AsyncCB();
- proxy->begin_callbackEx(current.ctx,
- newCallback_CallbackReceiver_callbackEx(acb, &AsyncCB::responseCallbackEx, &AsyncCB::exceptionCallbackEx),
+ proxy->begin_callbackEx(current.ctx,
+ newCallback_CallbackReceiver_callbackEx(acb, &AsyncCB::responseCallbackEx, &AsyncCB::exceptionCallbackEx),
newCookie(cb));
}
else
@@ -290,32 +288,32 @@ CallbackI::initiateConcurrentCallback_async(const AMD_Callback_initiateConcurren
const Current& current)
{
AsyncCBPtr acb = new AsyncCB();
- proxy->begin_concurrentCallback(number, current.ctx,
+ proxy->begin_concurrentCallback(number, current.ctx,
newCallback_CallbackReceiver_concurrentCallback(acb, &AsyncCB::responseConcurrentCallback,
- &AsyncCB::exceptionConcurrentCallback),
+ &AsyncCB::exceptionConcurrentCallback),
newCookie(cb));
}
void
CallbackI::initiateWaitCallback_async(const AMD_Callback_initiateWaitCallbackPtr& cb,
- const CallbackReceiverPrx& proxy,
+ const CallbackReceiverPrx& proxy,
const Current& current)
{
AsyncCBPtr acb = new AsyncCB();
- proxy->begin_waitCallback(current.ctx,
+ proxy->begin_waitCallback(current.ctx,
newCallback_CallbackReceiver_waitCallback(acb, &AsyncCB::responseWaitCallback, &AsyncCB::exceptionWaitCallback),
newCookie(cb));
}
void
-CallbackI::initiateCallbackWithPayload_async(const AMD_Callback_initiateCallbackWithPayloadPtr& cb,
- const CallbackReceiverPrx& proxy,
+CallbackI::initiateCallbackWithPayload_async(const AMD_Callback_initiateCallbackWithPayloadPtr& cb,
+ const CallbackReceiverPrx& proxy,
const Current& current)
{
Ice::ByteSeq seq(1000 * 1024, 0);
AsyncCBPtr acb = new AsyncCB();
- proxy->begin_callbackWithPayload(seq, current.ctx,
- newCallback_CallbackReceiver_callbackWithPayload(acb, &AsyncCB::responseCallbackWithPayload,
+ proxy->begin_callbackWithPayload(seq, current.ctx,
+ newCallback_CallbackReceiver_callbackWithPayload(acb, &AsyncCB::responseCallbackWithPayload,
&AsyncCB::exceptionCallbackWithPayload),
newCookie(cb));
}
diff --git a/cpp/test/Glacier2/router/CallbackI.h b/cpp/test/Glacier2/router/CallbackI.h
index 548c8f64239..2d9841f660c 100644
--- a/cpp/test/Glacier2/router/CallbackI.h
+++ b/cpp/test/Glacier2/router/CallbackI.h
@@ -30,7 +30,7 @@ public:
virtual void waitCallback(const ::Ice::Current&);
virtual void callbackWithPayload(const Ice::ByteSeq&, const ::Ice::Current&);
- void callbackOK();
+ void callbackOK(int = 1);
void waitCallbackOK();
void callbackWithPayloadOK();
void notifyWaitCallback();
@@ -38,7 +38,7 @@ public:
private:
- bool _callback;
+ int _callback;
bool _waitCallback;
bool _callbackWithPayload;
bool _finishWaitCallback;
diff --git a/cpp/test/Glacier2/router/Client.cpp b/cpp/test/Glacier2/router/Client.cpp
index 5f7d4152cb7..1987e693dbc 100644
--- a/cpp/test/Glacier2/router/Client.cpp
+++ b/cpp/test/Glacier2/router/Client.cpp
@@ -612,7 +612,10 @@ CallbackClient::run(int argc, char* argv[])
Context context;
context["_fwd"] = "o";
oneway->initiateCallback(onewayR, context);
- callbackReceiverImpl->callbackOK();
+ oneway->initiateCallback(onewayR, context);
+ oneway->initiateCallback(onewayR, context);
+ oneway->initiateCallback(onewayR, context);
+ callbackReceiverImpl->callbackOK(4);
cout << "ok" << endl;
}
@@ -621,7 +624,27 @@ CallbackClient::run(int argc, char* argv[])
Context context;
context["_fwd"] = "t";
twoway->initiateCallback(twowayR, context);
- callbackReceiverImpl->callbackOK();
+ twoway->initiateCallback(twowayR, context);
+ twoway->initiateCallback(twowayR, context);
+ twoway->initiateCallback(twowayR, context);
+ callbackReceiverImpl->callbackOK(4);
+ cout << "ok" << endl;
+ }
+
+ {
+ cout << "testing batch oneway callback... " << flush;
+ Context context;
+ context["_fwd"] = "O";
+ CallbackPrx batchOneway = CallbackPrx::uncheckedCast(twoway->ice_batchOneway());
+ CallbackReceiverPrx onewayR = CallbackReceiverPrx::uncheckedCast(twowayR->ice_oneway());
+ batchOneway->initiateCallback(onewayR, context);
+ batchOneway->initiateCallback(onewayR, context);
+ batchOneway->initiateCallback(onewayR, context);
+ batchOneway->initiateCallback(onewayR, context);
+ batchOneway->initiateCallback(onewayR, context);
+ batchOneway->initiateCallback(onewayR, context);
+ batchOneway->ice_flushBatchRequests();
+ callbackReceiverImpl->callbackOK(6);
cout << "ok" << endl;
}