diff options
author | Benoit Foucher <benoit@zeroc.com> | 2007-11-19 18:40:04 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2007-11-19 18:40:04 +0100 |
commit | 9332213216d90c982f41f6670d0471fc92cd206b (patch) | |
tree | f9fafa332285cda639f057e1e4577f6acc12b860 /cpp/src | |
parent | Fixed bug 2554 (diff) | |
download | ice-9332213216d90c982f41f6670d0471fc92cd206b.tar.bz2 ice-9332213216d90c982f41f6670d0471fc92cd206b.tar.xz ice-9332213216d90c982f41f6670d0471fc92cd206b.zip |
Fixed bug 2368
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceGrid/LocatorI.cpp | 229 | ||||
-rw-r--r-- | cpp/src/IceGrid/LocatorI.h | 34 |
2 files changed, 162 insertions, 101 deletions
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp index 9158def55ff..702eded7cd7 100644 --- a/cpp/src/IceGrid/LocatorI.cpp +++ b/cpp/src/IceGrid/LocatorI.cpp @@ -26,53 +26,67 @@ class AMI_Adapter_getDirectProxyI : public AMI_Adapter_getDirectProxy { public: - AMI_Adapter_getDirectProxyI(const LocatorIPtr& locator, const string& id, const LocatorAdapterInfo& adapter) : - _locator(locator), _id(id), _adapter(adapter) + AMI_Adapter_getDirectProxyI(const LocatorI::RequestPtr& request, const string& id) : + _request(request), _id(id) { } virtual void ice_response(const ::Ice::ObjectPrx& obj) { assert(obj); - _locator->getDirectProxyCallback(_adapter.proxy->ice_getIdentity(), obj); + _request->response(_id, obj); } - virtual void ice_exception(const ::Ice::Exception& ex) - { - _locator->getDirectProxyException(_adapter, _id, ex); + virtual void ice_exception(const ::Ice::Exception& e) + { + try + { + e.ice_throw(); + } + catch(const AdapterNotActiveException& ex) + { + if(ex.activatable) + { + _request->activate(_id); + return; + } + } + catch(const Ice::Exception& ex) + { + } + + _request->exception(_id, e); } private: - const LocatorIPtr _locator; + const LocatorI::RequestPtr _request; const string _id; - const LocatorAdapterInfo _adapter; }; class AMI_Adapter_activateI : public AMI_Adapter_activate { public: - AMI_Adapter_activateI(const LocatorIPtr& locator, const string& id, const LocatorAdapterInfo& adapter) : - _locator(locator), _id(id), _adapter(adapter) + AMI_Adapter_activateI(const LocatorIPtr& locator, const string& id) : + _locator(locator), _id(id) { } virtual void ice_response(const ::Ice::ObjectPrx& obj) { - _locator->getDirectProxyCallback(_adapter.proxy->ice_getIdentity(), obj); + _locator->activateFinished(_id, obj); } virtual void ice_exception(const ::Ice::Exception& ex) { - _locator->getDirectProxyException(_adapter, _id, ex); + _locator->activateException(_id, ex); } private: const LocatorIPtr _locator; const string _id; - const LocatorAdapterInfo _adapter; }; // @@ -205,29 +219,70 @@ LocatorI::Request::execute() ++_lastAdapter; } } - assert(!adapters.empty()); + for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p) { - requestAdapter(*p); + p->proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, p->id)); + } +} + +void +LocatorI::Request::activate(const string& id) +{ + // + // Activate the adapter + // + // NOTE: we use a timeout large enough to ensure that the activate() call won't + // timeout if the server hangs in deactivation and/or activation. + // + for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + { + if(p->id == id) + { + _locator->activate(*p, this); + _activating.insert(id); + } + } + + // + // If this is a request for a replica group, don't wait for the activation to + // complete. Instead, we query the next adapter which might be already active. + // + if(_replicaGroup) + { + LocatorAdapterInfo adapter; + { + Lock sync(*this); + if(_lastAdapter != _adapters.end()) + { + adapter = *_lastAdapter; + ++_lastAdapter; + } + } + if(adapter.proxy) + { + adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter.id)); + } } } void -LocatorI::Request::exception(const Ice::Exception& ex) +LocatorI::Request::exception(const string& id, const Ice::Exception& ex) { LocatorAdapterInfo adapter; { Lock sync(*this); - if(!_exception.get()) { _exception.reset(ex.ice_clone()); } + + _activating.erase(id); if(_lastAdapter == _adapters.end()) { --_count; // Expect one less adapter proxy if there's no more adapters to query. - + // // If we received all the required proxies, it's time to send the // answer back to the client. @@ -236,7 +291,6 @@ LocatorI::Request::exception(const Ice::Exception& ex) { sendResponse(); } - return; } else { @@ -244,16 +298,28 @@ LocatorI::Request::exception(const Ice::Exception& ex) ++_lastAdapter; } } - requestAdapter(adapter); + + if(adapter.proxy) + { + adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter.id)); + } } void -LocatorI::Request::response(const Ice::ObjectPrx& proxy) +LocatorI::Request::response(const string& id, const Ice::ObjectPrx& proxy) { + if(!proxy) + { + exception(id, AdapterNotActiveException()); + return; + } + Lock sync(*this); assert(proxy); - _proxies.push_back(proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"))); + _activating.erase(id); + + _proxies[id] = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")); // // If we received all the required proxies, it's time to send the @@ -266,22 +332,11 @@ LocatorI::Request::response(const Ice::ObjectPrx& proxy) } void -LocatorI::Request::requestAdapter(const LocatorAdapterInfo& adapter) -{ - assert(adapter.proxy); - if(_locator->getDirectProxyRequest(this, adapter)) - { - AMI_Adapter_getDirectProxyPtr amiCB = new AMI_Adapter_getDirectProxyI(_locator, _id, adapter); - adapter.proxy->getDirectProxy_async(amiCB); - } -} - -void LocatorI::Request::sendResponse() { if(_proxies.size() == 1) { - _amdCB->ice_response(_proxies.back()); + _amdCB->ice_response(_proxies.begin()->second); } else if(_proxies.empty()) { @@ -304,10 +359,19 @@ LocatorI::Request::sendResponse() { Ice::EndpointSeq endpoints; endpoints.reserve(_proxies.size()); - for(vector<Ice::ObjectPrx>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p) + for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) { - Ice::EndpointSeq edpts = (*p)->ice_getEndpoints(); - endpoints.insert(endpoints.end(), edpts.begin(), edpts.end()); + map<string, Ice::ObjectPrx>::const_iterator q = _proxies.find(p->id); + if(q != _proxies.end()) + { + Ice::EndpointSeq edpts = q->second->ice_getEndpoints(); + endpoints.insert(endpoints.end(), edpts.begin(), edpts.end()); + } + } + + for(set<string>::const_iterator q = _activating.begin(); q != _activating.end(); ++q) + { + _locator->cancelActivate(*q, this); } Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default"); @@ -442,54 +506,55 @@ LocatorI::getLocalQuery(const Ice::Current&) const return _localQuery; } -bool -LocatorI::getDirectProxyRequest(const RequestPtr& request, const LocatorAdapterInfo& adapter) +const Ice::CommunicatorPtr& +LocatorI::getCommunicator() const { - Lock sync(*this); - - // - // Check if there's already pending requests for this adapter. If that's the case, - // we just add this one to the queue. If not, we add it to the queue and initiate - // a call on the adapter to get its direct proxy. - // - PendingRequestsMap::iterator p; - p = _pendingRequests.insert(make_pair(adapter.proxy->ice_getIdentity(), PendingRequests())).first; - p->second.push_back(request); - return p->second.size() == 1; + return _communicator; } void -LocatorI::getDirectProxyException(const LocatorAdapterInfo& adpt, const string& id, const Ice::Exception& ex) +LocatorI::activate(const LocatorAdapterInfo& adapter, const RequestPtr& request) { - try - { - ex.ice_throw(); - } - catch(const AdapterNotActiveException& ex) { - if(ex.activatable) + Lock sync(*this); + + // + // Check if there's already pending requests for this adapter. If that's the case, + // we just add this one to the queue. If not, we add it to the queue and initiate + // a call on the adapter to get its direct proxy. + // + PendingRequestsMap::iterator p; + p = _pendingRequests.insert(make_pair(adapter.id, PendingRequests())).first; + p->second.insert(request); + if(p->second.size() != 1) { - // - // Activate the adapter if it can be activated on demand. - // - // NOTE: we use a timeout large enough to ensure that the - // activate() call won't timeout if the server hangs in - // deactivation and/or activation. - // - AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, id, adpt); - int timeout = adpt.activationTimeout + adpt.deactivationTimeout; - AdapterPrx::uncheckedCast(adpt.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB); return; } } - catch(const Ice::Exception&) + + AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, adapter.id); + int timeout = adapter.activationTimeout + adapter.deactivationTimeout; + AdapterPrx::uncheckedCast(adapter.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB); +} + +void +LocatorI::cancelActivate(const string& id, const RequestPtr& request) +{ + Lock sync(*this); + PendingRequestsMap::iterator p = _pendingRequests.find(id); + if(p != _pendingRequests.end()) { + p->second.erase(request); } +} +void +LocatorI::activateFinished(const string& id, const Ice::ObjectPrx& proxy) +{ PendingRequests requests; { Lock sync(*this); - PendingRequestsMap::iterator p = _pendingRequests.find(adpt.proxy->ice_getIdentity()); + PendingRequestsMap::iterator p = _pendingRequests.find(id); assert(p != _pendingRequests.end()); requests.swap(p->second); _pendingRequests.erase(p); @@ -497,40 +562,24 @@ LocatorI::getDirectProxyException(const LocatorAdapterInfo& adpt, const string& for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) { - (*q)->exception(ex); + (*q)->response(id, proxy); } } void -LocatorI::getDirectProxyCallback(const Ice::Identity& adapterId, const Ice::ObjectPrx& proxy) +LocatorI::activateException(const string& id, const Ice::Exception& ex) { PendingRequests requests; { Lock sync(*this); - PendingRequestsMap::iterator p = _pendingRequests.find(adapterId); + PendingRequestsMap::iterator p = _pendingRequests.find(id); assert(p != _pendingRequests.end()); requests.swap(p->second); _pendingRequests.erase(p); } - if(proxy) - { - for(PendingRequests::const_iterator q = requests.begin(); q != requests.end(); ++q) - { - (*q)->response(proxy); - } - } - else + for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) { - for(PendingRequests::const_iterator q = requests.begin(); q != requests.end(); ++q) - { - (*q)->exception(AdapterNotActiveException()); - } + (*q)->exception(id, ex); } } - -const Ice::CommunicatorPtr& -LocatorI::getCommunicator() const -{ - return _communicator; -} diff --git a/cpp/src/IceGrid/LocatorI.h b/cpp/src/IceGrid/LocatorI.h index e36741631f9..f0f97239c83 100644 --- a/cpp/src/IceGrid/LocatorI.h +++ b/cpp/src/IceGrid/LocatorI.h @@ -13,6 +13,8 @@ #include <IceGrid/Internal.h> #include <IceGrid/Locator.h> +#include <set> + namespace IceGrid { @@ -30,6 +32,8 @@ typedef std::vector<LocatorAdapterInfo> LocatorAdapterInfoSeq; class LocatorI : public Locator, public IceUtil::Mutex { +public: + class Request : public IceUtil::Mutex, public IceUtil::Shared { public: @@ -38,8 +42,15 @@ class LocatorI : public Locator, public IceUtil::Mutex const LocatorAdapterInfoSeq&, int, const TraceLevelsPtr&); void execute(); - void response(const Ice::ObjectPrx&); - void exception(const Ice::Exception&); + void response(const std::string&, const Ice::ObjectPrx&); + void activate(const std::string&); + void exception(const std::string&, const Ice::Exception&); + + virtual bool + operator<(const Request& r) const + { + return this < &r; + } private: @@ -54,13 +65,12 @@ class LocatorI : public Locator, public IceUtil::Mutex const TraceLevelsPtr _traceLevels; unsigned int _count; LocatorAdapterInfoSeq::const_iterator _lastAdapter; - std::vector<Ice::ObjectPrx> _proxies; + std::map<std::string, Ice::ObjectPrx> _proxies; std::auto_ptr<Ice::Exception> _exception; + std::set<std::string> _activating; }; typedef IceUtil::Handle<Request> RequestPtr; -public: - LocatorI(const Ice::CommunicatorPtr&, const DatabasePtr&, const Ice::LocatorRegistryPrx&, const RegistryPrx&, const QueryPrx&); @@ -73,13 +83,15 @@ public: virtual Ice::LocatorRegistryPrx getRegistry(const Ice::Current&) const; virtual RegistryPrx getLocalRegistry(const Ice::Current&) const; virtual QueryPrx getLocalQuery(const Ice::Current&) const; - - bool getDirectProxyRequest(const RequestPtr&, const LocatorAdapterInfo&); - void getDirectProxyException(const LocatorAdapterInfo&, const std::string&, const Ice::Exception&); - void getDirectProxyCallback(const Ice::Identity&, const Ice::ObjectPrx&); const Ice::CommunicatorPtr& getCommunicator() const; + void activate(const LocatorAdapterInfo&, const RequestPtr&); + void cancelActivate(const std::string&, const RequestPtr&); + + void activateFinished(const std::string&, const Ice::ObjectPrx&); + void activateException(const std::string&, const Ice::Exception&); + protected: const Ice::CommunicatorPtr _communicator; @@ -88,8 +100,8 @@ protected: const RegistryPrx _localRegistry; const QueryPrx _localQuery; - typedef std::vector<RequestPtr> PendingRequests; - typedef std::map<Ice::Identity, PendingRequests> PendingRequestsMap; + typedef std::set<RequestPtr> PendingRequests; + typedef std::map<std::string, PendingRequests> PendingRequestsMap; PendingRequestsMap _pendingRequests; }; |