diff options
Diffstat (limited to 'cpp/src/IceGrid/LocatorI.cpp')
-rw-r--r-- | cpp/src/IceGrid/LocatorI.cpp | 229 |
1 files changed, 139 insertions, 90 deletions
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp index 9158def55ff..702eded7cd7 100644 --- a/cpp/src/IceGrid/LocatorI.cpp +++ b/cpp/src/IceGrid/LocatorI.cpp @@ -26,53 +26,67 @@ class AMI_Adapter_getDirectProxyI : public AMI_Adapter_getDirectProxy { public: - AMI_Adapter_getDirectProxyI(const LocatorIPtr& locator, const string& id, const LocatorAdapterInfo& adapter) : - _locator(locator), _id(id), _adapter(adapter) + AMI_Adapter_getDirectProxyI(const LocatorI::RequestPtr& request, const string& id) : + _request(request), _id(id) { } virtual void ice_response(const ::Ice::ObjectPrx& obj) { assert(obj); - _locator->getDirectProxyCallback(_adapter.proxy->ice_getIdentity(), obj); + _request->response(_id, obj); } - virtual void ice_exception(const ::Ice::Exception& ex) - { - _locator->getDirectProxyException(_adapter, _id, ex); + virtual void ice_exception(const ::Ice::Exception& e) + { + try + { + e.ice_throw(); + } + catch(const AdapterNotActiveException& ex) + { + if(ex.activatable) + { + _request->activate(_id); + return; + } + } + catch(const Ice::Exception& ex) + { + } + + _request->exception(_id, e); } private: - const LocatorIPtr _locator; + const LocatorI::RequestPtr _request; const string _id; - const LocatorAdapterInfo _adapter; }; class AMI_Adapter_activateI : public AMI_Adapter_activate { public: - AMI_Adapter_activateI(const LocatorIPtr& locator, const string& id, const LocatorAdapterInfo& adapter) : - _locator(locator), _id(id), _adapter(adapter) + AMI_Adapter_activateI(const LocatorIPtr& locator, const string& id) : + _locator(locator), _id(id) { } virtual void ice_response(const ::Ice::ObjectPrx& obj) { - _locator->getDirectProxyCallback(_adapter.proxy->ice_getIdentity(), obj); + _locator->activateFinished(_id, obj); } virtual void ice_exception(const ::Ice::Exception& ex) { - _locator->getDirectProxyException(_adapter, _id, ex); + _locator->activateException(_id, ex); } private: const LocatorIPtr _locator; const string _id; - const LocatorAdapterInfo _adapter; }; // @@ -205,29 +219,70 @@ LocatorI::Request::execute() ++_lastAdapter; } } - assert(!adapters.empty()); + for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p) { - requestAdapter(*p); + p->proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, p->id)); + } +} + +void +LocatorI::Request::activate(const string& id) +{ + // + // Activate the adapter + // + // NOTE: we use a timeout large enough to ensure that the activate() call won't + // timeout if the server hangs in deactivation and/or activation. + // + for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + { + if(p->id == id) + { + _locator->activate(*p, this); + _activating.insert(id); + } + } + + // + // If this is a request for a replica group, don't wait for the activation to + // complete. Instead, we query the next adapter which might be already active. + // + if(_replicaGroup) + { + LocatorAdapterInfo adapter; + { + Lock sync(*this); + if(_lastAdapter != _adapters.end()) + { + adapter = *_lastAdapter; + ++_lastAdapter; + } + } + if(adapter.proxy) + { + adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter.id)); + } } } void -LocatorI::Request::exception(const Ice::Exception& ex) +LocatorI::Request::exception(const string& id, const Ice::Exception& ex) { LocatorAdapterInfo adapter; { Lock sync(*this); - if(!_exception.get()) { _exception.reset(ex.ice_clone()); } + + _activating.erase(id); if(_lastAdapter == _adapters.end()) { --_count; // Expect one less adapter proxy if there's no more adapters to query. - + // // If we received all the required proxies, it's time to send the // answer back to the client. @@ -236,7 +291,6 @@ LocatorI::Request::exception(const Ice::Exception& ex) { sendResponse(); } - return; } else { @@ -244,16 +298,28 @@ LocatorI::Request::exception(const Ice::Exception& ex) ++_lastAdapter; } } - requestAdapter(adapter); + + if(adapter.proxy) + { + adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter.id)); + } } void -LocatorI::Request::response(const Ice::ObjectPrx& proxy) +LocatorI::Request::response(const string& id, const Ice::ObjectPrx& proxy) { + if(!proxy) + { + exception(id, AdapterNotActiveException()); + return; + } + Lock sync(*this); assert(proxy); - _proxies.push_back(proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"))); + _activating.erase(id); + + _proxies[id] = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")); // // If we received all the required proxies, it's time to send the @@ -266,22 +332,11 @@ LocatorI::Request::response(const Ice::ObjectPrx& proxy) } void -LocatorI::Request::requestAdapter(const LocatorAdapterInfo& adapter) -{ - assert(adapter.proxy); - if(_locator->getDirectProxyRequest(this, adapter)) - { - AMI_Adapter_getDirectProxyPtr amiCB = new AMI_Adapter_getDirectProxyI(_locator, _id, adapter); - adapter.proxy->getDirectProxy_async(amiCB); - } -} - -void LocatorI::Request::sendResponse() { if(_proxies.size() == 1) { - _amdCB->ice_response(_proxies.back()); + _amdCB->ice_response(_proxies.begin()->second); } else if(_proxies.empty()) { @@ -304,10 +359,19 @@ LocatorI::Request::sendResponse() { Ice::EndpointSeq endpoints; endpoints.reserve(_proxies.size()); - for(vector<Ice::ObjectPrx>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p) + for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) { - Ice::EndpointSeq edpts = (*p)->ice_getEndpoints(); - endpoints.insert(endpoints.end(), edpts.begin(), edpts.end()); + map<string, Ice::ObjectPrx>::const_iterator q = _proxies.find(p->id); + if(q != _proxies.end()) + { + Ice::EndpointSeq edpts = q->second->ice_getEndpoints(); + endpoints.insert(endpoints.end(), edpts.begin(), edpts.end()); + } + } + + for(set<string>::const_iterator q = _activating.begin(); q != _activating.end(); ++q) + { + _locator->cancelActivate(*q, this); } Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default"); @@ -442,54 +506,55 @@ LocatorI::getLocalQuery(const Ice::Current&) const return _localQuery; } -bool -LocatorI::getDirectProxyRequest(const RequestPtr& request, const LocatorAdapterInfo& adapter) +const Ice::CommunicatorPtr& +LocatorI::getCommunicator() const { - Lock sync(*this); - - // - // Check if there's already pending requests for this adapter. If that's the case, - // we just add this one to the queue. If not, we add it to the queue and initiate - // a call on the adapter to get its direct proxy. - // - PendingRequestsMap::iterator p; - p = _pendingRequests.insert(make_pair(adapter.proxy->ice_getIdentity(), PendingRequests())).first; - p->second.push_back(request); - return p->second.size() == 1; + return _communicator; } void -LocatorI::getDirectProxyException(const LocatorAdapterInfo& adpt, const string& id, const Ice::Exception& ex) +LocatorI::activate(const LocatorAdapterInfo& adapter, const RequestPtr& request) { - try - { - ex.ice_throw(); - } - catch(const AdapterNotActiveException& ex) { - if(ex.activatable) + Lock sync(*this); + + // + // Check if there's already pending requests for this adapter. If that's the case, + // we just add this one to the queue. If not, we add it to the queue and initiate + // a call on the adapter to get its direct proxy. + // + PendingRequestsMap::iterator p; + p = _pendingRequests.insert(make_pair(adapter.id, PendingRequests())).first; + p->second.insert(request); + if(p->second.size() != 1) { - // - // Activate the adapter if it can be activated on demand. - // - // NOTE: we use a timeout large enough to ensure that the - // activate() call won't timeout if the server hangs in - // deactivation and/or activation. - // - AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, id, adpt); - int timeout = adpt.activationTimeout + adpt.deactivationTimeout; - AdapterPrx::uncheckedCast(adpt.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB); return; } } - catch(const Ice::Exception&) + + AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, adapter.id); + int timeout = adapter.activationTimeout + adapter.deactivationTimeout; + AdapterPrx::uncheckedCast(adapter.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB); +} + +void +LocatorI::cancelActivate(const string& id, const RequestPtr& request) +{ + Lock sync(*this); + PendingRequestsMap::iterator p = _pendingRequests.find(id); + if(p != _pendingRequests.end()) { + p->second.erase(request); } +} +void +LocatorI::activateFinished(const string& id, const Ice::ObjectPrx& proxy) +{ PendingRequests requests; { Lock sync(*this); - PendingRequestsMap::iterator p = _pendingRequests.find(adpt.proxy->ice_getIdentity()); + PendingRequestsMap::iterator p = _pendingRequests.find(id); assert(p != _pendingRequests.end()); requests.swap(p->second); _pendingRequests.erase(p); @@ -497,40 +562,24 @@ LocatorI::getDirectProxyException(const LocatorAdapterInfo& adpt, const string& for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) { - (*q)->exception(ex); + (*q)->response(id, proxy); } } void -LocatorI::getDirectProxyCallback(const Ice::Identity& adapterId, const Ice::ObjectPrx& proxy) +LocatorI::activateException(const string& id, const Ice::Exception& ex) { PendingRequests requests; { Lock sync(*this); - PendingRequestsMap::iterator p = _pendingRequests.find(adapterId); + PendingRequestsMap::iterator p = _pendingRequests.find(id); assert(p != _pendingRequests.end()); requests.swap(p->second); _pendingRequests.erase(p); } - if(proxy) - { - for(PendingRequests::const_iterator q = requests.begin(); q != requests.end(); ++q) - { - (*q)->response(proxy); - } - } - else + for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) { - for(PendingRequests::const_iterator q = requests.begin(); q != requests.end(); ++q) - { - (*q)->exception(AdapterNotActiveException()); - } + (*q)->exception(id, ex); } } - -const Ice::CommunicatorPtr& -LocatorI::getCommunicator() const -{ - return _communicator; -} |