diff options
author | Benoit Foucher <benoit@zeroc.com> | 2011-04-12 12:12:55 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2011-04-12 12:12:55 +0200 |
commit | 1d4c8cf9f9a75394da3c0383fbcd787e2545a58c (patch) | |
tree | 0b50055f2506b099752b13ffb48a18f6d7fb5454 /cpp/src | |
parent | Updated MSDN menu names for SDK (diff) | |
download | ice-1d4c8cf9f9a75394da3c0383fbcd787e2545a58c.tar.bz2 ice-1d4c8cf9f9a75394da3c0383fbcd787e2545a58c.tar.xz ice-1d4c8cf9f9a75394da3c0383fbcd787e2545a58c.zip |
Fixed bug 4993 & 5001, addition fix for bug 4914
Diffstat (limited to 'cpp/src')
-rwxr-xr-x | cpp/src/Glacier2/RequestQueue.cpp | 355 | ||||
-rw-r--r-- | cpp/src/Glacier2/RequestQueue.h | 25 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 13 | ||||
-rw-r--r-- | cpp/src/Ice/ProxyFactory.cpp | 4 | ||||
-rw-r--r-- | cpp/src/Ice/RouterInfo.cpp | 7 | ||||
-rw-r--r-- | cpp/src/Ice/RouterInfo.h | 2 |
6 files changed, 236 insertions, 170 deletions
diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp index a901b6208a1..85cb55c5c63 100755 --- a/cpp/src/Glacier2/RequestQueue.cpp +++ b/cpp/src/Glacier2/RequestQueue.cpp @@ -16,102 +16,6 @@ using namespace std; using namespace Ice; using namespace Glacier2; -namespace -{ - -// -// AMI base callback class for twoway/oneway requests -// -class IceInvokeI : public AMI_Array_Object_ice_invoke -{ -public: - - IceInvokeI(const AMD_Object_ice_invokePtr& amdCB, const InstancePtr& instance, const ConnectionPtr& connection) : - _amdCB(amdCB), - _instance(instance), - _connection(connection) - { - } - - virtual void - ice_exception(const Exception& ex) - { - // - // If the connection has been lost, destroy the session. - // - if(_connection) - { - if(dynamic_cast<const Ice::SocketException*>(&ex) || - dynamic_cast<const Ice::TimeoutException*>(&ex) || - dynamic_cast<const Ice::ProtocolException*>(&ex)) - { - try - { - _instance->sessionRouter()->destroySession(_connection); - } - catch(const Exception&) - { - } - } - } - - if(_amdCB) - { - _amdCB->ice_exception(ex); - } - } - -protected: - - const AMD_Object_ice_invokePtr _amdCB; - const InstancePtr _instance; - const ConnectionPtr _connection; -}; - -class TwowayIceInvokeI : public IceInvokeI -{ -public: - - TwowayIceInvokeI(const AMD_Object_ice_invokePtr& amdCB, const InstancePtr& instance, const ConnectionPtr& con) : - IceInvokeI(amdCB, instance, con) - { - } - - virtual void - ice_response(bool ok, const pair<const Byte*, const Byte*>& outParams) - { - _amdCB->ice_response(ok, outParams); - } -}; - -class OnewayIceInvokeI : public IceInvokeI, public Ice::AMISentCallback -{ -public: - - OnewayIceInvokeI(const AMD_Object_ice_invokePtr& amdCB, const InstancePtr& instance, const ConnectionPtr& con) : - IceInvokeI(amdCB, instance, con) - { - } - - virtual void - ice_response(bool ok, const pair<const Byte*, const Byte*>& outParams) - { - assert(false); - } - - virtual void - ice_sent() - { -#if (defined(_MSC_VER) && (_MSC_VER >= 1600)) - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(nullptr, nullptr)); -#else - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0)); -#endif - } -}; - -} - Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*, const Byte*>& inParams, const Current& current, bool forwardContext, const Ice::Context& sslContext, const AMD_Object_ice_invokePtr& amdCB) : @@ -122,18 +26,6 @@ Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*, _sslContext(sslContext), _amdCB(amdCB) { - // - // If this is a batch call, we can finish the AMD call right away. - // - if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram()) - { -#if (defined(_MSC_VER) && (_MSC_VER >= 1600)) - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(nullptr, nullptr)); -#else - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0)); -#endif - } - Context::const_iterator p = current.ctx.find("_ovrd"); if(p != current.ctx.end()) { @@ -142,8 +34,8 @@ Glacier2::Request::Request(const ObjectPrx& proxy, const std::pair<const Byte*, } -bool -Glacier2::Request::invoke(const InstancePtr& instance, const Ice::ConnectionPtr& connection) +Ice::AsyncResultPtr +Glacier2::Request::invoke(const Callback_Object_ice_invokePtr& cb) { pair<const Byte*, const Byte*> inPair; if(_inParams.size() == 0) @@ -176,61 +68,44 @@ Glacier2::Request::invoke(const InstancePtr& instance, const Ice::ConnectionPtr& { if(_sslContext.size() > 0) { - _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _sslContext); + _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams, _sslContext); } else { _proxy->ice_invoke(_current.operation, _current.mode, inPair, outParams); } } - return true; // Batch invocation. + return 0; } else { - AMI_Array_Object_ice_invokePtr amiCB; - if(_proxy->ice_isTwoway()) - { - amiCB = new TwowayIceInvokeI(_amdCB, instance, connection); - } - else - { - amiCB = new OnewayIceInvokeI(_amdCB, instance, connection); - } - - bool sent; + Ice::AsyncResultPtr result; if(_forwardContext) { if(_sslContext.size() > 0) { Ice::Context ctx = _current.ctx; ctx.insert(_sslContext.begin(), _sslContext.end()); - sent = _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair, ctx); + result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, ctx, cb, this); } else { - sent = _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair, _current.ctx); + result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, _current.ctx, cb, this); } } else { if(_sslContext.size() > 0) { - sent = _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair, _sslContext); + result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, _sslContext, cb, this); } else { - sent = _proxy->ice_invoke_async(amiCB, _current.operation, _current.mode, inPair); + result = _proxy->begin_ice_invoke(_current.operation, _current.mode, inPair, cb, this); } } - if(sent && !_proxy->ice_isTwoway()) - { -#if (defined(_MSC_VER) && (_MSC_VER >= 1600)) - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(nullptr, nullptr)); -#else - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0)); -#endif - } - return false; // Not a batch invocation. + + return result; } } @@ -255,14 +130,50 @@ Glacier2::Request::override(const RequestPtr& other) const } // + // Don't override if the override isn't the same. + // + if(_override != other->_override) + { + return false; + } + + // // We cannot override if the proxies differ. // - if(_proxy != other->_proxy) + return _proxy == other->_proxy; +} + +void +Glacier2::Request::response(bool ok, const pair<const Ice::Byte*, const Ice::Byte*>& outParams) +{ + assert(_proxy->ice_isTwoway()); + _amdCB->ice_response(ok, outParams); +} + +void +Glacier2::Request::exception(const Ice::Exception& ex) +{ + // + // Only for twoways, oneway or batch oneway dispatches are finished + // when queued, see queued(). + // + if(_proxy->ice_isTwoway()) { - return false; + _amdCB->ice_exception(ex); } +} - return _override == other->_override; +void +Glacier2::Request::queued() +{ + if(!_proxy->ice_isTwoway()) + { +#if (defined(_MSC_VER) && (_MSC_VER >= 1600)) + _amdCB->ice_response(true, pair<const Byte*, const Byte*>(nullptr, nullptr)); +#else + _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0)); +#endif + } } Glacier2::RequestQueue::RequestQueue(const RequestQueueThreadPtr& requestQueueThread, @@ -270,7 +181,11 @@ Glacier2::RequestQueue::RequestQueue(const RequestQueueThreadPtr& requestQueueTh const Ice::ConnectionPtr& connection) : _requestQueueThread(requestQueueThread), _instance(instance), - _connection(connection) + _connection(connection), + _callback(newCallback_Object_ice_invoke(this, &RequestQueue::response, &RequestQueue::exception, + &RequestQueue::sent)), + _flushCallback(newCallback_Connection_flushBatchRequests(this, &RequestQueue::exception, &RequestQueue::sent)), + _pendingSend(false) { } @@ -280,7 +195,7 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request) IceUtil::Mutex::Lock lock(*this); if(request->hasOverride()) { - for(vector<RequestPtr>::iterator p = _requests.begin(); p != _requests.end(); ++p) + for(deque<RequestPtr>::iterator p = _requests.begin(); p != _requests.end(); ++p) { // // If the new request overrides an old one, then abort the old @@ -288,6 +203,7 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request) // if(request->override(*p)) { + request->queued(); *p = request; return true; } @@ -297,11 +213,12 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request) // // No override, we add the new request. // - if(_requests.empty()) + if(_requests.empty() && (!_connection || !_pendingSend)) { _requestQueueThread->flushRequestQueue(this); // This might throw if the thread is destroyed. } _requests.push_back(request); + request->queued(); return false; } @@ -309,13 +226,43 @@ void Glacier2::RequestQueue::flushRequests(set<Ice::ObjectPrx>& batchProxies) { IceUtil::Mutex::Lock lock(*this); - for(vector<RequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + if(_connection) + { + if(_pendingSend) + { + return; + } + flush(); + } + else + { + flush(batchProxies); + } +} + +void +Glacier2::RequestQueue::flush() +{ + assert(_connection); + _pendingSend = false; + _pendingSendRequest = 0; + + bool flushBatchRequests = false; + deque<RequestPtr>::iterator p; + for(p = _requests.begin(); p != _requests.end(); ++p) { try { - if((*p)->invoke(_instance, _connection)) // If batch invocation, add the proxy to the batch proxy set. + Ice::AsyncResultPtr result = (*p)->invoke(_callback); + if(!result) { - batchProxies.insert((*p)->getProxy()); + flushBatchRequests = true; + } + else if(!result->sentSynchronously() && !result->isCompleted()) + { + _pendingSend = true; + _pendingSendRequest = *p++; + break; } } catch(const Ice::LocalException&) @@ -323,9 +270,104 @@ Glacier2::RequestQueue::flushRequests(set<Ice::ObjectPrx>& batchProxies) // Ignore, this can occur for batch requests. } } + + if(p == _requests.end()) + { + _requests.clear(); + } + else + { + _requests.erase(_requests.begin(), p); + } + + if(flushBatchRequests) + { + Ice::AsyncResultPtr result = _connection->begin_flushBatchRequests(_flushCallback); + if(!result->sentSynchronously() && !result->isCompleted()) + { + _pendingSend = true; + _pendingSendRequest = 0; + } + } +} + +void +Glacier2::RequestQueue::flush(set<Ice::ObjectPrx>& batchProxies) +{ + assert(!_connection); + + for(deque<RequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + { + try + { + Ice::AsyncResultPtr result = (*p)->invoke(_callback); + if(!result) + { + batchProxies.insert((*p)->getProxy()); + } + } + catch(const Ice::LocalException&) + { + // Ignore, this can occur for batch requests. + } + } _requests.clear(); } +void +Glacier2::RequestQueue::response(bool ok, const pair<const Byte*, const Byte*>& outParams, const RequestPtr& request) +{ + assert(request); + request->response(ok, outParams); +} + +void +Glacier2::RequestQueue::exception(const Ice::Exception& ex, const RequestPtr& request) +{ + // + // If the connection has been lost, destroy the session. + // + if(_connection) + { + if(dynamic_cast<const Ice::SocketException*>(&ex) || + dynamic_cast<const Ice::TimeoutException*>(&ex) || + dynamic_cast<const Ice::ProtocolException*>(&ex)) + { + try + { + _instance->sessionRouter()->destroySession(_connection); + } + catch(const Exception&) + { + } + } + + IceUtil::Mutex::Lock lock(*this); + if(request == _pendingSendRequest) + { + flush(); + } + } + + if(request) + { + request->exception(ex); + } +} + +void +Glacier2::RequestQueue::sent(bool sentSynchronously, const RequestPtr& request) +{ + if(_connection && !sentSynchronously) + { + IceUtil::Mutex::Lock lock(*this); + if(request == _pendingSendRequest) + { + flush(); + } + } +} + Glacier2::RequestQueueThread::RequestQueueThread(const IceUtil::Time& sleepTime) : IceUtil::Thread("Glacier2 request queue thread"), _sleepTime(sleepTime), @@ -445,29 +487,10 @@ Glacier2::RequestQueueThread::run() (*p)->flushRequests(flushProxySet); } - set<Ice::ConnectionPtr> flushConnectionSet; for(set<Ice::ObjectPrx>::const_iterator q = flushProxySet.begin(); q != flushProxySet.end(); ++q) { - // - // As an optimization, we only flush the proxy batch requests if we didn't - // already flush the requests of a proxy which is using the same connection. - // - Ice::ConnectionPtr connection = (*q)->ice_getCachedConnection(); - if(!connection || flushConnectionSet.find(connection) == flushConnectionSet.end()) - { - class FlushCB : public AMI_Object_ice_flushBatchRequests - { - public: - - virtual void ice_exception(const Ice::Exception&) { } // Ignore. - }; - (*q)->ice_flushBatchRequests_async(new FlushCB()); - - if(connection) - { - flushConnectionSet.insert(connection); - } - } + (*q)->begin_ice_flushBatchRequests(); } } } + diff --git a/cpp/src/Glacier2/RequestQueue.h b/cpp/src/Glacier2/RequestQueue.h index d117c0c0b96..ad13383320d 100644 --- a/cpp/src/Glacier2/RequestQueue.h +++ b/cpp/src/Glacier2/RequestQueue.h @@ -14,6 +14,8 @@ #include <IceUtil/Monitor.h> #include <Ice/Ice.h> +#include <deque> + namespace Glacier2 { @@ -26,20 +28,25 @@ typedef IceUtil::Handle<Request> RequestPtr; class RequestQueueThread; typedef IceUtil::Handle<RequestQueueThread> RequestQueueThreadPtr; -class Request : public IceUtil::Shared +class Request : public Ice::LocalObject { public: Request(const Ice::ObjectPrx&, const std::pair<const Ice::Byte*, const Ice::Byte*>&, const Ice::Current&, bool, const Ice::Context&, const Ice::AMD_Object_ice_invokePtr&); - bool invoke(const InstancePtr&, const Ice::ConnectionPtr&); + Ice::AsyncResultPtr invoke(const Ice::Callback_Object_ice_invokePtr& callback); bool override(const RequestPtr&) const; const Ice::ObjectPrx& getProxy() const { return _proxy; } bool hasOverride() const { return !_override.empty(); } private: + friend class RequestQueue; + void response(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&); + void exception(const Ice::Exception&); + void queued(); + const Ice::ObjectPrx _proxy; const Ice::ByteSeq _inParams; const Ice::Current _current; @@ -59,11 +66,23 @@ public: void flushRequests(std::set<Ice::ObjectPrx>&); private: + + void flush(); + void flush(std::set<Ice::ObjectPrx>&); + + void response(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&, const RequestPtr&); + void exception(const Ice::Exception&, const RequestPtr&); + void sent(bool, const RequestPtr&); const RequestQueueThreadPtr _requestQueueThread; const InstancePtr _instance; const Ice::ConnectionPtr _connection; - std::vector<RequestPtr> _requests; + const Ice::Callback_Object_ice_invokePtr _callback; + const Ice::Callback_Connection_flushBatchRequestsPtr _flushCallback; + + std::deque<RequestPtr> _requests; + bool _pendingSend; + RequestPtr _pendingSendRequest; }; typedef IceUtil::Handle<RequestQueue> RequestQueuePtr; diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index e90848d7b1e..722c6a6c81b 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -1487,7 +1487,18 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyn IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(--_dispatchCount == 0) { - if(_state == StateFinished) + if(_state == StateClosing) + { + try + { + initiateShutdown(); + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + } + } + else if(_state == StateFinished) { _reaper->add(this); } diff --git a/cpp/src/Ice/ProxyFactory.cpp b/cpp/src/Ice/ProxyFactory.cpp index 0e5cbbef8f7..431994dc34a 100644 --- a/cpp/src/Ice/ProxyFactory.cpp +++ b/cpp/src/Ice/ProxyFactory.cpp @@ -14,6 +14,7 @@ #include <Ice/Proxy.h> #include <Ice/ReferenceFactory.h> #include <Ice/LocatorInfo.h> +#include <Ice/RouterInfo.h> #include <Ice/BasicStream.h> #include <Ice/Properties.h> #include <Ice/LoggerUtil.h> @@ -141,6 +142,9 @@ IceInternal::ProxyFactory::checkRetryAfterException(const LocalException& ex, // must *always* retry, so that the missing proxy is added // to the router. // + + ref->getRouterInfo()->clearCache(ref); + if(traceLevels->retry >= 1) { Trace out(logger, traceLevels->retryCat); diff --git a/cpp/src/Ice/RouterInfo.cpp b/cpp/src/Ice/RouterInfo.cpp index 911e553363c..f648cf7c8c3 100644 --- a/cpp/src/Ice/RouterInfo.cpp +++ b/cpp/src/Ice/RouterInfo.cpp @@ -338,6 +338,13 @@ IceInternal::RouterInfo::getAdapter() const return _adapter; } +void +IceInternal::RouterInfo::clearCache(const ReferencePtr& ref) +{ + IceUtil::Mutex::Lock sync(*this); + _identities.erase(ref->getIdentity()); +} + vector<EndpointIPtr> IceInternal::RouterInfo::setClientEndpoints(const Ice::ObjectPrx& proxy) { diff --git a/cpp/src/Ice/RouterInfo.h b/cpp/src/Ice/RouterInfo.h index e91f4d4bf98..679ee536691 100644 --- a/cpp/src/Ice/RouterInfo.h +++ b/cpp/src/Ice/RouterInfo.h @@ -84,6 +84,8 @@ public: void setAdapter(const Ice::ObjectAdapterPtr&); Ice::ObjectAdapterPtr getAdapter() const; + void clearCache(const ReferencePtr&); + // // The following methods need to be public for access by AMI callbacks. // |