diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceGrid/LocatorI.cpp | 297 | ||||
-rw-r--r-- | cpp/src/IceGrid/LocatorI.h | 19 |
2 files changed, 152 insertions, 164 deletions
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp index 7cbcec60d68..d8162b85709 100644 --- a/cpp/src/IceGrid/LocatorI.cpp +++ b/cpp/src/IceGrid/LocatorI.cpp @@ -26,67 +26,51 @@ class AMI_Adapter_getDirectProxyI : public AMI_Adapter_getDirectProxy { public: - AMI_Adapter_getDirectProxyI(const LocatorI::RequestPtr& request, const string& id) : - _request(request), _id(id) + AMI_Adapter_getDirectProxyI(const LocatorIPtr& locator, const LocatorAdapterInfo& adapter) : + _locator(locator), _adapter(adapter) { } virtual void ice_response(const ::Ice::ObjectPrx& obj) { assert(obj); - _request->response(_id, obj); + _locator->getDirectProxyResponse(_adapter, obj); } virtual void ice_exception(const ::Ice::Exception& e) { - try - { - e.ice_throw(); - } - catch(const AdapterNotActiveException& ex) - { - if(ex.activatable) - { - _request->activate(_id); - return; - } - } - catch(const Ice::Exception&) - { - } - - _request->exception(_id, e); + _locator->getDirectProxyException(_adapter, e); } private: - const LocatorI::RequestPtr _request; - const string _id; + const LocatorIPtr _locator; + const LocatorAdapterInfo _adapter; }; class AMI_Adapter_activateI : public AMI_Adapter_activate { public: - AMI_Adapter_activateI(const LocatorIPtr& locator, const string& id) : - _locator(locator), _id(id) + AMI_Adapter_activateI(const LocatorIPtr& locator, const LocatorAdapterInfo& adapter) : + _locator(locator), _adapter(adapter) { } virtual void ice_response(const ::Ice::ObjectPrx& obj) { - _locator->activateFinished(_id, obj); + _locator->getDirectProxyResponse(_adapter, obj); } virtual void ice_exception(const ::Ice::Exception& ex) { - _locator->activateException(_id, ex); + _locator->getDirectProxyException(_adapter, ex); } private: const LocatorIPtr _locator; - const string _id; + const LocatorAdapterInfo _adapter; }; // @@ -223,50 +207,35 @@ LocatorI::Request::execute() for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p) { - p->proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, p->id)); + if(_locator->getDirectProxy(*p, this)) + { + activating(); + } } } void -LocatorI::Request::activate(const string& id) +LocatorI::Request::activating() { // - // Activate the adapter - // - // NOTE: we use a timeout large enough to ensure that the activate() call won't - // timeout if the server hangs in deactivation and/or activation. - // - { - Lock sync(*this); - for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) - { - if(p->id == id) - { - _locator->activate(*p, this); - _activating.insert(id); - } - } - } - - // - // If this is a request for a replica group, don't wait for the activation to - // complete. Instead, we query the next adapter which might be already active. + // An adapter is being activated. If this is a request for a replica group, don't + // wait for the activation to complete. Instead, we query the next adapter which + // might be already active. // if(_replicaGroup) { LocatorAdapterInfo adapter; + do { Lock sync(*this); - if(_lastAdapter != _adapters.end()) + if(_lastAdapter == _adapters.end()) { - adapter = *_lastAdapter; - ++_lastAdapter; + break; } + adapter = *_lastAdapter; + ++_lastAdapter; } - if(adapter.proxy) - { - adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter.id)); - } + while(_locator->getDirectProxy(adapter, this)); } } @@ -276,13 +245,15 @@ LocatorI::Request::exception(const string& id, const Ice::Exception& ex) LocatorAdapterInfo adapter; { Lock sync(*this); + if(_proxies.size() == _count) // Nothing to do if we already sent the response. + { + return; + } if(!_exception.get()) { _exception.reset(ex.ice_clone()); } - _activating.erase(id); - if(_lastAdapter == _adapters.end()) { --_count; // Expect one less adapter proxy if there's no more adapters to query. @@ -305,7 +276,10 @@ LocatorI::Request::exception(const string& id, const Ice::Exception& ex) if(adapter.proxy) { - adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter.id)); + if(_locator->getDirectProxy(adapter, this)) + { + activating(); + } } } @@ -319,10 +293,11 @@ LocatorI::Request::response(const string& id, const Ice::ObjectPrx& proxy) } Lock sync(*this); - assert(proxy); - - _activating.erase(id); - + if(_proxies.size() == _count) // Nothing to do if we already sent the response. + { + return; + } + _proxies[id] = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")); // @@ -397,18 +372,13 @@ LocatorI::Request::sendResponse() } } - for(set<string>::const_iterator q = _activating.begin(); q != _activating.end(); ++q) - { - _locator->cancelActivate(*q, this); - } - Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default"); _amdCB->ice_response(proxy->ice_endpoints(endpoints)); } if(_roundRobin) { - _locator->removePendingResolve(_id, roundRobinCount); + _locator->removePendingRoundRobinRequest(_id, roundRobinCount); } } @@ -475,7 +445,8 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, { LocatorIPtr self = const_cast<LocatorI*>(this); bool pending = false; - if(self->addPendingResolve(id, cb, true, pending)) // Add only if there's already requests pending. + if(self->addPendingRoundRobinRequest(id, cb, true, pending)) // Add only if there's already round robin requests + // pending. { // // Another request is currently resolving the adapter endpoints. We'll @@ -511,14 +482,14 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, // if(roundRobin) { - if(self->addPendingResolve(id, cb, false, pending)) + if(self->addPendingRoundRobinRequest(id, cb, false, pending)) { return; } } else if(pending) { - self->removePendingResolve(id, 0); + self->removePendingRoundRobinRequest(id, 0); } RequestPtr request = new Request(cb, self, id, replicaGroup, roundRobin, adapters, count); @@ -528,7 +499,7 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, { if(pending) { - self->removePendingResolve(id, 0); + self->removePendingRoundRobinRequest(id, 0); } try @@ -545,7 +516,7 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, { if(pending) { - self->removePendingResolve(id, 0); + self->removePendingRoundRobinRequest(id, 0); } const TraceLevelsPtr traceLevels = _database->getTraceLevels(); @@ -596,85 +567,11 @@ LocatorI::getTraceLevels() const return _database->getTraceLevels(); } -void -LocatorI::activate(const LocatorAdapterInfo& adapter, const RequestPtr& request) -{ - { - Lock sync(*this); - - // - // Check if there's already pending requests for this adapter. If that's the case, - // we just add this one to the queue. If not, we add it to the queue and initiate - // a call on the adapter to get its direct proxy. - // - PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id); - if(p != _pendingRequests.end()) - { - p->second.insert(request); - return; - } - - p = _pendingRequests.insert(make_pair(adapter.id, PendingRequests())).first; - p->second.insert(request); - } - - AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, adapter.id); - int timeout = adapter.activationTimeout + adapter.deactivationTimeout; - AdapterPrx::uncheckedCast(adapter.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB); -} - -void -LocatorI::cancelActivate(const string& id, const RequestPtr& request) -{ - Lock sync(*this); - PendingRequestsMap::iterator p = _pendingRequests.find(id); - if(p != _pendingRequests.end()) - { - p->second.erase(request); - } -} - -void -LocatorI::activateFinished(const string& id, const Ice::ObjectPrx& proxy) -{ - PendingRequests requests; - { - Lock sync(*this); - PendingRequestsMap::iterator p = _pendingRequests.find(id); - assert(p != _pendingRequests.end()); - requests.swap(p->second); - _pendingRequests.erase(p); - } - - for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) - { - (*q)->response(id, proxy); - } -} - -void -LocatorI::activateException(const string& id, const Ice::Exception& ex) -{ - PendingRequests requests; - { - Lock sync(*this); - PendingRequestsMap::iterator p = _pendingRequests.find(id); - assert(p != _pendingRequests.end()); - requests.swap(p->second); - _pendingRequests.erase(p); - } - - for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) - { - (*q)->exception(id, ex); - } -} - bool -LocatorI::addPendingResolve(const string& adapterId, - const Ice::AMD_Locator_findAdapterByIdPtr& cb, - bool addIfExists, - bool& pending) +LocatorI::addPendingRoundRobinRequest(const string& adapterId, + const Ice::AMD_Locator_findAdapterByIdPtr& cb, + bool addIfExists, + bool& pending) { Lock sync(*this); pending = false; @@ -698,7 +595,7 @@ LocatorI::addPendingResolve(const string& adapterId, } void -LocatorI::removePendingResolve(const string& adapterId, int roundRobinCount) +LocatorI::removePendingRoundRobinRequest(const string& adapterId, int roundRobinCount) { Ice::AMD_Locator_findAdapterByIdPtr cb; { @@ -741,3 +638,97 @@ LocatorI::removePendingResolve(const string& adapterId, int roundRobinCount) findAdapterById_async(cb, adapterId); } } + +bool +LocatorI::getDirectProxy(const LocatorAdapterInfo& adapter, const RequestPtr& request) +{ + { + Lock sync(*this); + PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id); + if(p != _pendingRequests.end()) + { + p->second.push_back(request); + return _activating.find(adapter.id) != _activating.end(); + } + + PendingRequests requests; + requests.push_back(request); + _pendingRequests.insert(make_pair(adapter.id, requests)); + } + + adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter)); + return false; +} + +void +LocatorI::getDirectProxyResponse(const LocatorAdapterInfo& adapter, const Ice::ObjectPrx& proxy) +{ + PendingRequests requests; + { + Lock sync(*this); + PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id); + assert(p != _pendingRequests.end()); + requests.swap(p->second); + _pendingRequests.erase(p); + _activating.erase(adapter.id); + } + + for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + { + (*q)->response(adapter.id, proxy); + } +} + +void +LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice::Exception& ex) +{ + bool activate = false; + try + { + ex.ice_throw(); + } + catch(const AdapterNotActiveException& e) + { + activate = e.activatable; + } + catch(const Ice::Exception&) + { + } + + PendingRequests requests; + { + Lock sync(*this); + PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id); + assert(p != _pendingRequests.end()); + if(activate) + { + _activating.insert(adapter.id); + requests = p->second; + } + else + { + requests.swap(p->second); + _pendingRequests.erase(p); + _activating.erase(adapter.id); + } + } + + if(activate) + { + for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + { + (*q)->activating(); + } + + AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, adapter); + int timeout = adapter.activationTimeout + adapter.deactivationTimeout; + AdapterPrx::uncheckedCast(adapter.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB); + } + else + { + for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + { + (*q)->exception(adapter.id, ex); + } + } +} diff --git a/cpp/src/IceGrid/LocatorI.h b/cpp/src/IceGrid/LocatorI.h index 9b66c51ac99..452afd2c03b 100644 --- a/cpp/src/IceGrid/LocatorI.h +++ b/cpp/src/IceGrid/LocatorI.h @@ -43,7 +43,7 @@ public: void execute(); void response(const std::string&, const Ice::ObjectPrx&); - void activate(const std::string&); + void activating(); void exception(const std::string&, const Ice::Exception&); virtual bool @@ -68,7 +68,6 @@ public: LocatorAdapterInfoSeq::const_iterator _lastAdapter; std::map<std::string, Ice::ObjectPrx> _proxies; std::auto_ptr<Ice::Exception> _exception; - std::set<std::string> _activating; }; typedef IceUtil::Handle<Request> RequestPtr; @@ -88,14 +87,12 @@ public: const Ice::CommunicatorPtr& getCommunicator() const; const TraceLevelsPtr& getTraceLevels() const; - void activate(const LocatorAdapterInfo&, const RequestPtr&); - void cancelActivate(const std::string&, const RequestPtr&); + bool addPendingRoundRobinRequest(const std::string&, const Ice::AMD_Locator_findAdapterByIdPtr&, bool, bool&); + void removePendingRoundRobinRequest(const std::string&, int); - void activateFinished(const std::string&, const Ice::ObjectPrx&); - void activateException(const std::string&, const Ice::Exception&); - - bool addPendingResolve(const std::string&, const Ice::AMD_Locator_findAdapterByIdPtr&, bool, bool&); - void removePendingResolve(const std::string&, int); + bool getDirectProxy(const LocatorAdapterInfo&, const RequestPtr&); + void getDirectProxyResponse(const LocatorAdapterInfo&, const Ice::ObjectPrx&); + void getDirectProxyException(const LocatorAdapterInfo&, const Ice::Exception&); protected: @@ -105,10 +102,10 @@ protected: const RegistryPrx _localRegistry; const QueryPrx _localQuery; - typedef std::set<RequestPtr> PendingRequests; + typedef std::vector<RequestPtr> PendingRequests; typedef std::map<std::string, PendingRequests> PendingRequestsMap; - PendingRequestsMap _pendingRequests; + std::set<std::string> _activating; std::map<std::string, std::deque<Ice::AMD_Locator_findAdapterByIdPtr> > _resolves; }; |