diff options
-rw-r--r-- | CHANGELOG-3.6.md | 27 | ||||
-rw-r--r-- | cpp/src/Glacier2/RequestQueue.cpp | 149 | ||||
-rw-r--r-- | cpp/src/Glacier2/RequestQueue.h | 10 | ||||
-rw-r--r-- | cpp/test/Glacier2/router/CallbackI.cpp | 36 | ||||
-rw-r--r-- | cpp/test/Glacier2/router/CallbackI.h | 4 | ||||
-rw-r--r-- | cpp/test/Glacier2/router/Client.cpp | 27 |
6 files changed, 141 insertions, 112 deletions
diff --git a/CHANGELOG-3.6.md b/CHANGELOG-3.6.md index 21b3cc7dcda..1ed274d3907 100644 --- a/CHANGELOG-3.6.md +++ b/CHANGELOG-3.6.md @@ -40,24 +40,27 @@ These are the changes since Ice 3.6.1. - Added two new tools, icegriddb and icestormdb, used to import/export the IceGrid and IceStorm databases. -- Fixed a bug that affects Java and C# generated code. The generated patcher for - reading class data members was bogus when the class had more than one class data - member and derived from a class that contained class data members. The same - issue was true for exceptions with class data members deriving from exceptions - with class data members. +- Fixed a bug that affects Java and C# generated code. The generated patcher + for reading class data members was bogus when the class had more than one + class data member and derived from a class that contained class data + members. The same issue was true for exceptions with class data members + deriving from exceptions with class data members. -- Fixed a bug that prevented scripting languages (Python, Ruby, Javascript and PHP) - from marshaling NaN or Infinity as a floating point value. +- Fixed a bug that prevented scripting languages (Python, Ruby, Javascript and + PHP) from marshaling NaN or Infinity as a floating point value. -- Fixed an IceGrid bug where resolving endpoints of dynamically registered replica - groups would fail unless the client was using an encoding superior to the encoding - of the dynamically registered object adapters. +- Fixed an IceGrid bug where resolving endpoints of dynamically registered + replica groups would fail unless the client was using an encoding superior + to the encoding of the dynamically registered object adapters. - Added missing functions Ice::identityToString and Ice::stringToIdentity (C++, Objective-C, PHP, Python and Ruby). -- Added support for universal character names (\uNNNN and \UNNNNNNNN) in Slice string - constants. +- Added support for universal character names (\uNNNN and \UNNNNNNNN) in Slice + string constants. + +- Fixed Glacier2 router bug where requests from client to server could be lost + if forwarded as batch requests with the _fwd=O context. ## C++ Changes diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp index 78eff63bce0..5acc66dbfa9 100644 --- a/cpp/src/Glacier2/RequestQueue.cpp +++ b/cpp/src/Glacier2/RequestQueue.cpp @@ -10,7 +10,6 @@ #include <Glacier2/RequestQueue.h> #include <Glacier2/Instance.h> #include <Glacier2/SessionRouterI.h> -#include <set> using namespace std; using namespace Ice; @@ -33,6 +32,19 @@ Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*, } } +void +Glacier2::Request::addBatchProxy(set<Ice::ObjectPrx>& batchProxies) +{ + set<Ice::ObjectPrx>::const_iterator p = batchProxies.find(_proxy); + if(p == batchProxies.end()) + { + batchProxies.insert(_proxy); + } + else if(p->get() != _proxy.get()) + { + const_cast<Ice::ObjectPrx&>(_proxy) = *p; + } +} Ice::AsyncResultPtr Glacier2::Request::invoke(const Callback_Object_ice_invokePtr& cb) @@ -52,7 +64,7 @@ Glacier2::Request::invoke(const Callback_Object_ice_invokePtr& cb) { ByteSeq outParams; if(_forwardContext) - { + { if(_sslContext.size() > 0) { Ice::Context ctx = _current.ctx; @@ -81,7 +93,7 @@ Glacier2::Request::invoke(const Callback_Object_ice_invokePtr& cb) { Ice::AsyncResultPtr result; if(_forwardContext) - { + { if(_sslContext.size() > 0) { Ice::Context ctx = _current.ctx; @@ -131,7 +143,7 @@ Glacier2::Request::override(const RequestPtr& other) const // // Don't override if the override isn't the same. - // + // if(_override != other->_override) { return false; @@ -163,13 +175,13 @@ Glacier2::Request::exception(const Ice::Exception& ex) } } -void +void Glacier2::Request::queued() { if(!_proxy->ice_isTwoway()) { #if (defined(_MSC_VER) && (_MSC_VER >= 1600)) - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(static_cast<const Byte*>(nullptr), + _amdCB->ice_response(true, pair<const Byte*, const Byte*>(static_cast<const Byte*>(nullptr), static_cast<const Byte*>(nullptr))); #else _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0)); @@ -177,8 +189,8 @@ Glacier2::Request::queued() } } -Glacier2::RequestQueue::RequestQueue(const RequestQueueThreadPtr& requestQueueThread, - const InstancePtr& instance, +Glacier2::RequestQueue::RequestQueue(const RequestQueueThreadPtr& requestQueueThread, + const InstancePtr& instance, const Ice::ConnectionPtr& connection) : _requestQueueThread(requestQueueThread), _instance(instance), @@ -219,7 +231,17 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request) } } } - + + if(!_connection) + { + // + // If it's a batch request, we make sure we use a unique batch proxy object for the queued + // batch requests. We want all the requests for the same batch proxy to be queued on the + // same proxy object. + // + request->addBatchProxy(_batchProxies); + } + // // No override, we add the new request. // @@ -237,7 +259,7 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request) } void -Glacier2::RequestQueue::flushRequests(set<Ice::ObjectPrx>& batchProxies) +Glacier2::RequestQueue::flushRequests() { IceUtil::Mutex::Lock lock(*this); if(_connection) @@ -250,7 +272,34 @@ Glacier2::RequestQueue::flushRequests(set<Ice::ObjectPrx>& batchProxies) } else { - flush(batchProxies); + for(deque<RequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + { + try + { + if(_observer) + { + _observer->forwarded(!_connection); + } + assert(_callback); + (*p)->invoke(_callback); + } + catch(const Ice::LocalException&) + { + // Ignore, this can occur for batch requests. + } + } + _requests.clear(); + + for(set<Ice::ObjectPrx>::const_iterator q = _batchProxies.begin(); q != _batchProxies.end(); ++q) + { + (*q)->begin_ice_flushBatchRequests(); + } + _batchProxies.clear(); + } + + if(_destroyed && _requests.empty()) + { + destroyInternal(); } } @@ -279,20 +328,6 @@ Glacier2::RequestQueue::updateObserver(const Glacier2::Instrumentation::SessionO } void -Glacier2::RequestQueue::destroyInternal() -{ - // - // Must be called with the mutex locked. - // - - // - // Remove cyclic references. - // - const_cast<Ice::Callback_Object_ice_invokePtr&>(_callback) = 0; - const_cast<Ice::Callback_Connection_flushBatchRequestsPtr&>(_flushCallback) = 0; -} - -void Glacier2::RequestQueue::flush() { assert(_connection); @@ -314,7 +349,7 @@ Glacier2::RequestQueue::flush() if(!result) { flushBatchRequests = true; - } + } else if(!result->sentSynchronously() && !result->isCompleted()) { _pendingSend = true; @@ -346,44 +381,20 @@ Glacier2::RequestQueue::flush() _pendingSendRequest = 0; } } - - if(_destroyed && _requests.empty()) - { - destroyInternal(); - } } void -Glacier2::RequestQueue::flush(set<Ice::ObjectPrx>& batchProxies) +Glacier2::RequestQueue::destroyInternal() { - assert(!_connection); - - for(deque<RequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) - { - try - { - if(_observer) - { - _observer->forwarded(!_connection); - } - assert(_callback); - Ice::AsyncResultPtr result = (*p)->invoke(_callback); - if(!result) - { - batchProxies.insert((*p)->getProxy()); - } - } - catch(const Ice::LocalException&) - { - // Ignore, this can occur for batch requests. - } - } - _requests.clear(); + // + // Must be called with the mutex locked. + // - if(_destroyed) - { - destroyInternal(); - } + // + // Remove cyclic references. + // + const_cast<Ice::Callback_Object_ice_invokePtr&>(_callback) = 0; + const_cast<Ice::Callback_Connection_flushBatchRequestsPtr&>(_flushCallback) = 0; } void @@ -427,7 +438,7 @@ Glacier2::RequestQueue::exception(const Ice::Exception& ex, const RequestPtr& re } } -void +void Glacier2::RequestQueue::sent(bool sentSynchronously, const RequestPtr& request) { if(_connection && !sentSynchronously) @@ -454,12 +465,12 @@ Glacier2::RequestQueueThread::~RequestQueueThread() assert(_queues.empty()); } -void +void Glacier2::RequestQueueThread::destroy() { { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - + assert(!_destroy); _destroy = true; _sleep = false; @@ -534,7 +545,7 @@ Glacier2::RequestQueueThread::run() } // - // If the queue is being destroyed and there's no requests or responses + // If the queue is being destroyed and there's no requests or responses // to send, we're done. // if(_destroy && _queues.empty()) @@ -543,7 +554,7 @@ Glacier2::RequestQueueThread::run() } assert(!_queues.empty() && !_sleep); - + queues.swap(_queues); if(_sleepTime > IceUtil::Time()) @@ -552,16 +563,10 @@ Glacier2::RequestQueueThread::run() _sleepDuration = _sleepTime; } } - - set<Ice::ObjectPrx> flushProxySet; - for(vector<RequestQueuePtr>::const_iterator p = queues.begin(); p != queues.end(); ++p) - { - (*p)->flushRequests(flushProxySet); - } - for(set<Ice::ObjectPrx>::const_iterator q = flushProxySet.begin(); q != flushProxySet.end(); ++q) + for(vector<RequestQueuePtr>::const_iterator p = queues.begin(); p != queues.end(); ++p) { - (*q)->begin_ice_flushBatchRequests(); + (*p)->flushRequests(); } } } diff --git a/cpp/src/Glacier2/RequestQueue.h b/cpp/src/Glacier2/RequestQueue.h index d918584e1d5..94ca48b089f 100644 --- a/cpp/src/Glacier2/RequestQueue.h +++ b/cpp/src/Glacier2/RequestQueue.h @@ -36,10 +36,10 @@ public: Request(const Ice::ObjectPrx&, const std::pair<const Ice::Byte*, const Ice::Byte*>&, const Ice::Current&, bool, const Ice::Context&, const Ice::AMD_Object_ice_invokePtr&); - + Ice::AsyncResultPtr invoke(const Ice::Callback_Object_ice_invokePtr& callback); bool override(const RequestPtr&) const; - const Ice::ObjectPrx& getProxy() const { return _proxy; } + void addBatchProxy(std::set<Ice::ObjectPrx>&); bool hasOverride() const { return !_override.empty(); } private: @@ -65,7 +65,7 @@ public: RequestQueue(const RequestQueueThreadPtr&, const InstancePtr&, const Ice::ConnectionPtr&); bool addRequest(const RequestPtr&); - void flushRequests(std::set<Ice::ObjectPrx>&); + void flushRequests(); void destroy(); @@ -76,12 +76,11 @@ private: void destroyInternal(); void flush(); - void flush(std::set<Ice::ObjectPrx>&); void response(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&, const RequestPtr&); void exception(const Ice::Exception&, const RequestPtr&); void sent(bool, const RequestPtr&); - + const RequestQueueThreadPtr _requestQueueThread; const InstancePtr _instance; const Ice::ConnectionPtr _connection; @@ -89,6 +88,7 @@ private: const Ice::Callback_Connection_flushBatchRequestsPtr _flushCallback; std::deque<RequestPtr> _requests; + std::set<Ice::ObjectPrx> _batchProxies; bool _pendingSend; RequestPtr _pendingSendRequest; bool _destroyed; diff --git a/cpp/test/Glacier2/router/CallbackI.cpp b/cpp/test/Glacier2/router/CallbackI.cpp index ccf22fe5a15..365a7b3d896 100644 --- a/cpp/test/Glacier2/router/CallbackI.cpp +++ b/cpp/test/Glacier2/router/CallbackI.cpp @@ -109,7 +109,7 @@ public: typedef IceUtil::Handle<AsyncCB> AsyncCBPtr; CallbackReceiverI::CallbackReceiverI() : - _callback(false), + _callback(0), _waitCallback(false), _callbackWithPayload(false), _finishWaitCallback(false) @@ -120,8 +120,7 @@ void CallbackReceiverI::callback(const Current&) { Lock sync(*this); - assert(!_callback); - _callback = true; + ++_callback; notifyAll(); } @@ -179,16 +178,15 @@ CallbackReceiverI::callbackWithPayload(const Ice::ByteSeq&, const Current&) } void -CallbackReceiverI::callbackOK() +CallbackReceiverI::callbackOK(int expected) { Lock sync(*this); - while(!_callback) + while(_callback != expected) { wait(); } - - _callback = false; + _callback = 0; } void @@ -254,8 +252,8 @@ CallbackI::initiateCallback_async(const AMD_Callback_initiateCallbackPtr& cb, if(proxy->ice_isTwoway()) { AsyncCBPtr acb = new AsyncCB(); - proxy->begin_callback(current.ctx, - newCallback_CallbackReceiver_callback(acb, &AsyncCB::responseCallback, &AsyncCB::exceptionCallback), + proxy->begin_callback(current.ctx, + newCallback_CallbackReceiver_callback(acb, &AsyncCB::responseCallback, &AsyncCB::exceptionCallback), newCookie(cb)); } else @@ -272,8 +270,8 @@ CallbackI::initiateCallbackEx_async(const AMD_Callback_initiateCallbackExPtr& cb if(proxy->ice_isTwoway()) { AsyncCBPtr acb = new AsyncCB(); - proxy->begin_callbackEx(current.ctx, - newCallback_CallbackReceiver_callbackEx(acb, &AsyncCB::responseCallbackEx, &AsyncCB::exceptionCallbackEx), + proxy->begin_callbackEx(current.ctx, + newCallback_CallbackReceiver_callbackEx(acb, &AsyncCB::responseCallbackEx, &AsyncCB::exceptionCallbackEx), newCookie(cb)); } else @@ -290,32 +288,32 @@ CallbackI::initiateConcurrentCallback_async(const AMD_Callback_initiateConcurren const Current& current) { AsyncCBPtr acb = new AsyncCB(); - proxy->begin_concurrentCallback(number, current.ctx, + proxy->begin_concurrentCallback(number, current.ctx, newCallback_CallbackReceiver_concurrentCallback(acb, &AsyncCB::responseConcurrentCallback, - &AsyncCB::exceptionConcurrentCallback), + &AsyncCB::exceptionConcurrentCallback), newCookie(cb)); } void CallbackI::initiateWaitCallback_async(const AMD_Callback_initiateWaitCallbackPtr& cb, - const CallbackReceiverPrx& proxy, + const CallbackReceiverPrx& proxy, const Current& current) { AsyncCBPtr acb = new AsyncCB(); - proxy->begin_waitCallback(current.ctx, + proxy->begin_waitCallback(current.ctx, newCallback_CallbackReceiver_waitCallback(acb, &AsyncCB::responseWaitCallback, &AsyncCB::exceptionWaitCallback), newCookie(cb)); } void -CallbackI::initiateCallbackWithPayload_async(const AMD_Callback_initiateCallbackWithPayloadPtr& cb, - const CallbackReceiverPrx& proxy, +CallbackI::initiateCallbackWithPayload_async(const AMD_Callback_initiateCallbackWithPayloadPtr& cb, + const CallbackReceiverPrx& proxy, const Current& current) { Ice::ByteSeq seq(1000 * 1024, 0); AsyncCBPtr acb = new AsyncCB(); - proxy->begin_callbackWithPayload(seq, current.ctx, - newCallback_CallbackReceiver_callbackWithPayload(acb, &AsyncCB::responseCallbackWithPayload, + proxy->begin_callbackWithPayload(seq, current.ctx, + newCallback_CallbackReceiver_callbackWithPayload(acb, &AsyncCB::responseCallbackWithPayload, &AsyncCB::exceptionCallbackWithPayload), newCookie(cb)); } diff --git a/cpp/test/Glacier2/router/CallbackI.h b/cpp/test/Glacier2/router/CallbackI.h index 548c8f64239..2d9841f660c 100644 --- a/cpp/test/Glacier2/router/CallbackI.h +++ b/cpp/test/Glacier2/router/CallbackI.h @@ -30,7 +30,7 @@ public: virtual void waitCallback(const ::Ice::Current&); virtual void callbackWithPayload(const Ice::ByteSeq&, const ::Ice::Current&); - void callbackOK(); + void callbackOK(int = 1); void waitCallbackOK(); void callbackWithPayloadOK(); void notifyWaitCallback(); @@ -38,7 +38,7 @@ public: private: - bool _callback; + int _callback; bool _waitCallback; bool _callbackWithPayload; bool _finishWaitCallback; diff --git a/cpp/test/Glacier2/router/Client.cpp b/cpp/test/Glacier2/router/Client.cpp index 5f7d4152cb7..1987e693dbc 100644 --- a/cpp/test/Glacier2/router/Client.cpp +++ b/cpp/test/Glacier2/router/Client.cpp @@ -612,7 +612,10 @@ CallbackClient::run(int argc, char* argv[]) Context context; context["_fwd"] = "o"; oneway->initiateCallback(onewayR, context); - callbackReceiverImpl->callbackOK(); + oneway->initiateCallback(onewayR, context); + oneway->initiateCallback(onewayR, context); + oneway->initiateCallback(onewayR, context); + callbackReceiverImpl->callbackOK(4); cout << "ok" << endl; } @@ -621,7 +624,27 @@ CallbackClient::run(int argc, char* argv[]) Context context; context["_fwd"] = "t"; twoway->initiateCallback(twowayR, context); - callbackReceiverImpl->callbackOK(); + twoway->initiateCallback(twowayR, context); + twoway->initiateCallback(twowayR, context); + twoway->initiateCallback(twowayR, context); + callbackReceiverImpl->callbackOK(4); + cout << "ok" << endl; + } + + { + cout << "testing batch oneway callback... " << flush; + Context context; + context["_fwd"] = "O"; + CallbackPrx batchOneway = CallbackPrx::uncheckedCast(twoway->ice_batchOneway()); + CallbackReceiverPrx onewayR = CallbackReceiverPrx::uncheckedCast(twowayR->ice_oneway()); + batchOneway->initiateCallback(onewayR, context); + batchOneway->initiateCallback(onewayR, context); + batchOneway->initiateCallback(onewayR, context); + batchOneway->initiateCallback(onewayR, context); + batchOneway->initiateCallback(onewayR, context); + batchOneway->initiateCallback(onewayR, context); + batchOneway->ice_flushBatchRequests(); + callbackReceiverImpl->callbackOK(6); cout << "ok" << endl; } |