diff options
Diffstat (limited to 'cpp/src/IceGrid/LocatorI.cpp')
-rw-r--r-- | cpp/src/IceGrid/LocatorI.cpp | 794 |
1 files changed, 492 insertions, 302 deletions
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp index bf910ba31d1..d038ccae641 100644 --- a/cpp/src/IceGrid/LocatorI.cpp +++ b/cpp/src/IceGrid/LocatorI.cpp @@ -1,6 +1,6 @@ // ********************************************************************** // -// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. // // This copy of Ice is licensed to you under the terms described in the // ICE_LICENSE file included in this distribution. @@ -26,67 +26,51 @@ class AMI_Adapter_getDirectProxyI : public AMI_Adapter_getDirectProxy { public: - AMI_Adapter_getDirectProxyI(const LocatorI::RequestPtr& request, const string& id) : - _request(request), _id(id) + AMI_Adapter_getDirectProxyI(const LocatorIPtr& locator, const LocatorAdapterInfo& adapter) : + _locator(locator), _adapter(adapter) { } virtual void ice_response(const ::Ice::ObjectPrx& obj) { assert(obj); - _request->response(_id, obj); + _locator->getDirectProxyResponse(_adapter, obj); } virtual void ice_exception(const ::Ice::Exception& e) { - try - { - e.ice_throw(); - } - catch(const AdapterNotActiveException& ex) - { - if(ex.activatable) - { - _request->activate(_id); - return; - } - } - catch(const Ice::Exception&) - { - } - - _request->exception(_id, e); + _locator->getDirectProxyException(_adapter, e); } private: - const LocatorI::RequestPtr _request; - const string _id; + const LocatorIPtr _locator; + const LocatorAdapterInfo _adapter; }; class AMI_Adapter_activateI : public AMI_Adapter_activate { public: - AMI_Adapter_activateI(const LocatorIPtr& locator, const string& id) : - _locator(locator), _id(id) + AMI_Adapter_activateI(const LocatorIPtr& locator, const LocatorAdapterInfo& adapter) : + _locator(locator), _adapter(adapter) { } virtual void ice_response(const ::Ice::ObjectPrx& obj) { - _locator->activateFinished(_id, obj); + _locator->getDirectProxyResponse(_adapter, obj); } virtual void ice_exception(const ::Ice::Exception& ex) { - _locator->activateException(_id, ex); + _locator->getDirectProxyException(_adapter, ex); } private: const LocatorIPtr _locator; - const string _id; + const LocatorAdapterInfo _adapter; }; // @@ -166,248 +150,483 @@ private: const Ice::ObjectPrx _obj; }; -} - -LocatorI::Request::Request(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, - const LocatorIPtr& locator, - const string& id, - bool replicaGroup, - bool roundRobin, - const LocatorAdapterInfoSeq& adapters, - int count) : - _amdCB(amdCB), - _locator(locator), - _id(id), - _replicaGroup(replicaGroup), - _roundRobin(roundRobin), - _adapters(adapters), - _traceLevels(locator->getTraceLevels()), - _count(count), - _lastAdapter(_adapters.begin()) +class AdapterRequest : public LocatorI::Request { - assert((_count == 0 && _adapters.empty()) || _count > 0); -} +public: -void -LocatorI::Request::execute() -{ - // - // If there's no adapters to request, we're done, send the - // response. - // - if(_adapters.empty()) + AdapterRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, + const LocatorIPtr& locator, + const LocatorAdapterInfo& adapter) : + _amdCB(amdCB), + _locator(locator), + _adapter(adapter), + _traceLevels(locator->getTraceLevels()) { - sendResponse(); - return; + assert(_adapter.proxy); } - // - // Otherwise, request as many adapters as required. - // - LocatorAdapterInfoSeq adapters; + virtual void + execute() { - Lock sync(*this); - assert(_count > 0 && _lastAdapter != _adapters.end()); - for(unsigned int i = static_cast<unsigned int>(_proxies.size()); i < _count; ++i) + _locator->getDirectProxy(_adapter, this); + } + + virtual void + activating(const string&) + { + // Nothing to do. + } + + virtual void + response(const std::string&, const Ice::ObjectPrx& proxy) + { + assert(proxy); + _amdCB->ice_response(proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"))); + } + + virtual void + exception(const std::string&, const Ice::Exception& ex) + { + if(_traceLevels->locator > 0) { - if(_lastAdapter == _adapters.end()) - { - _count = i; - break; - } - assert(_lastAdapter->proxy); - adapters.push_back(*_lastAdapter); + Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); + out << "couldn't resolve adapter`" << _adapter.id << "' endpoints:\n" << toString(ex); + } + _amdCB->ice_response(0); + } + +private: + + const Ice::AMD_Locator_findAdapterByIdPtr _amdCB; + const LocatorIPtr _locator; + const LocatorAdapterInfo _adapter; + const TraceLevelsPtr _traceLevels; +}; + +class ReplicaGroupRequest : public LocatorI::Request, public IceUtil::Mutex +{ +public: + + ReplicaGroupRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, + const LocatorIPtr& locator, + const string& id, + const LocatorAdapterInfoSeq& adapters, + int count, + Ice::ObjectPrx firstProxy) : + _amdCB(amdCB), + _locator(locator), + _id(id), + _adapters(adapters), + _traceLevels(locator->getTraceLevels()), + _count(count), + _lastAdapter(_adapters.begin()) + { + assert(_adapters.empty() || _count > 0); + + if(_adapters.empty()) + { + _count = 0; + } + + // + // If the first adapter proxy is provided, store it in _proxies. + // + if(firstProxy) + { + assert(!_adapters.empty()); + _proxies[_adapters[0].id] = firstProxy; ++_lastAdapter; } } - - for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p) + + virtual void + execute() { - p->proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, p->id)); + // + // Otherwise, request as many adapters as required. + // + LocatorAdapterInfoSeq adapters; + { + Lock sync(*this); + for(unsigned int i = static_cast<unsigned int>(_proxies.size()); i < _count; ++i) + { + if(_lastAdapter == _adapters.end()) + { + _count = i; + break; + } + assert(_lastAdapter->proxy); + adapters.push_back(*_lastAdapter); + ++_lastAdapter; + } + + // + // If there's no adapters to request, we're done, send the + // response. + // + if(_proxies.size() == _count) + { + sendResponse(); + return; + } + } + + + for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p) + { + if(_locator->getDirectProxy(*p, this)) + { + activating(p->id); + } + } } -} -void -LocatorI::Request::activate(const string& id) -{ - // - // Activate the adapter - // - // NOTE: we use a timeout large enough to ensure that the activate() call won't - // timeout if the server hangs in deactivation and/or activation. - // - for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + virtual void + activating(const string&) { - if(p->id == id) + // + // An adapter is being activated. Don't wait for the activation to complete. Instead, + // we query the next adapter which might be already active. + // + LocatorAdapterInfo adapter; + do { - _locator->activate(*p, this); - _activating.insert(id); + Lock sync(*this); + if(_lastAdapter == _adapters.end()) + { + break; + } + adapter = *_lastAdapter; + ++_lastAdapter; } + while(_locator->getDirectProxy(adapter, this)); } - - // - // If this is a request for a replica group, don't wait for the activation to - // complete. Instead, we query the next adapter which might be already active. - // - if(_replicaGroup) + + virtual void + exception(const string& id, const Ice::Exception& ex) { LocatorAdapterInfo adapter; { Lock sync(*this); - if(_lastAdapter != _adapters.end()) + if(_proxies.size() == _count) // Nothing to do if we already sent the response. + { + return; + } + + if(!_exception.get()) + { + _exception.reset(ex.ice_clone()); + } + + if(_lastAdapter == _adapters.end()) + { + --_count; // Expect one less adapter proxy if there's no more adapters to query. + + // + // If we received all the required proxies, it's time to send the + // answer back to the client. + // + if(_count == _proxies.size()) + { + sendResponse(); + } + } + else { adapter = *_lastAdapter; ++_lastAdapter; } } + if(adapter.proxy) { - adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter.id)); + if(_locator->getDirectProxy(adapter, this)) + { + activating(adapter.id); + } } } -} -void -LocatorI::Request::exception(const string& id, const Ice::Exception& ex) -{ - LocatorAdapterInfo adapter; + virtual void + response(const string& id, const Ice::ObjectPrx& proxy) { Lock sync(*this); - if(!_exception.get()) + assert(proxy); + if(_proxies.size() == _count) // Nothing to do if we already sent the response. { - _exception.reset(ex.ice_clone()); + return; } - _activating.erase(id); + _proxies[id] = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")); + + // + // If we received all the required proxies, it's time to send the + // answer back to the client. + // + if(_proxies.size() == _count) + { + sendResponse(); + } + } + +private: - if(_lastAdapter == _adapters.end()) + void + sendResponse() + { + if(_proxies.size() == 1) + { + _amdCB->ice_response(_proxies.begin()->second); + } + else if(_proxies.empty()) { - --_count; // Expect one less adapter proxy if there's no more adapters to query. - // - // If we received all the required proxies, it's time to send the - // answer back to the client. + // If there's no proxies, it's either because we couldn't contact the adapters or + // because the replica group has no members. // - if(_count == _proxies.size()) + assert(_exception.get() || _adapters.empty()); + if(_traceLevels->locator > 0) { - sendResponse(); + Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); + out << "couldn't resolve replica group `" << _id << "' endpoints:\n"; + out << (_exception.get() ? toString(*_exception) : string("replica group is empty")); } + _amdCB->ice_response(0); } - else + else if(_proxies.size() > 1) { - adapter = *_lastAdapter; - ++_lastAdapter; + Ice::EndpointSeq endpoints; + endpoints.reserve(_proxies.size()); + for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + { + map<string, Ice::ObjectPrx>::const_iterator q = _proxies.find(p->id); + if(q != _proxies.end()) + { + Ice::EndpointSeq edpts = q->second->ice_getEndpoints(); + endpoints.insert(endpoints.end(), edpts.begin(), edpts.end()); + } + } + + Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default"); + _amdCB->ice_response(proxy->ice_endpoints(endpoints)); } } - if(adapter.proxy) - { - adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter.id)); - } -} + const Ice::AMD_Locator_findAdapterByIdPtr _amdCB; + const LocatorIPtr _locator; + const std::string _id; + LocatorAdapterInfoSeq _adapters; + const TraceLevelsPtr _traceLevels; + unsigned int _count; + LocatorAdapterInfoSeq::const_iterator _lastAdapter; + std::map<std::string, Ice::ObjectPrx> _proxies; + std::auto_ptr<Ice::Exception> _exception; +}; -void -LocatorI::Request::response(const string& id, const Ice::ObjectPrx& proxy) +class RoundRobinRequest : public LocatorI::Request, public IceUtil::Mutex { - if(!proxy) +public: + + RoundRobinRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, + const LocatorIPtr& locator, + const string& id, + const LocatorAdapterInfoSeq& adapters, + int count) : + _amdCB(amdCB), + _locator(locator), + _id(id), + _adapters(adapters), + _traceLevels(locator->getTraceLevels()), + _count(count), + _waitForActivation(false) { - exception(id, AdapterNotActiveException()); - return; + assert(_adapters.empty() || _count > 0); } - Lock sync(*this); - assert(proxy); - - _activating.erase(id); - - _proxies[id] = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")); - - // - // If we received all the required proxies, it's time to send the - // answer back to the client. - // - if(_proxies.size() == _count) + virtual void + execute() { - sendResponse(); + if(_adapters.empty()) + { + if(_traceLevels->locator > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); + out << "couldn't resolve replica group `" << _id << "' endpoints:\nreplica group is empty"; + } + _amdCB->ice_response(0); + return; + } + + LocatorAdapterInfo adapter = _adapters[0]; + assert(adapter.proxy); + if(_locator->getDirectProxy(adapter, this)) + { + activating(adapter.id); + } } -} -void -LocatorI::Request::sendResponse() -{ - int roundRobinCount = 0; - if(_proxies.size() == 1) + virtual void + activating(const string& id) { - if(_roundRobin) + LocatorAdapterInfo adapter; + adapter.id = id; + do { - for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + Lock sync(*this); + if(_adapters.empty() || _waitForActivation) { - if(_proxies.find(p->id) != _proxies.end()) - { - break; - } - // - // We count the number of object adapters which are inactive until we find - // one active. This count will be used to update the round robin counter. - // - ++roundRobinCount; + return; } + _activatingOrFailed.insert(adapter.id); + adapter = nextAdapter(); } - _amdCB->ice_response(_proxies.begin()->second); + while(adapter.proxy && _locator->getDirectProxy(adapter, this)); } - else if(_proxies.empty()) + + virtual void + response(const std::string& id, const Ice::ObjectPrx& proxy) { - // - // If there's no proxies, it's either because we couldn't - // contact the adapters or because the replica group has - // no members. - // - assert(_exception.get() || (_replicaGroup && _adapters.empty())); + Lock sync(*this); + assert(proxy); + if(_adapters.empty() || id != _adapters[0].id) + { + return; + } - if(_traceLevels->locator > 0) + if(_count > 1) { - Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); - out << "couldn't resolve " << (_replicaGroup ? "replica group `" : "adapter `") << _id << "' endpoints:\n"; - out << (_exception.get() ? toString(*_exception) : string("replica group is empty")); + Ice::ObjectPrx p = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")); + LocatorI::RequestPtr request = new ReplicaGroupRequest(_amdCB, _locator, _id, _adapters, _count, p); + request->execute(); } - _amdCB->ice_response(0); + else + { + _amdCB->ice_response(proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"))); + } + _adapters.clear(); } - else if(_proxies.size() > 1) + + virtual void + exception(const std::string& id, const Ice::Exception& ex) { - Ice::EndpointSeq endpoints; - endpoints.reserve(_proxies.size()); - for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + LocatorAdapterInfo adapter; { - map<string, Ice::ObjectPrx>::const_iterator q = _proxies.find(p->id); - if(q != _proxies.end()) + Lock sync(*this); + _failed.insert(id); + _activatingOrFailed.insert(id); + + if(!_exception.get()) { - Ice::EndpointSeq edpts = q->second->ice_getEndpoints(); - endpoints.insert(endpoints.end(), edpts.begin(), edpts.end()); + _exception.reset(ex.ice_clone()); } - else if(_roundRobin && endpoints.empty()) + + if(_adapters.empty() || id != _adapters[0].id) { - // - // We count the number of object adapters which are inactive until we find - // one active. This count will be used to update the round robin counter. - // - ++roundRobinCount; + return; } + + adapter = nextAdapter(); } - for(set<string>::const_iterator q = _activating.begin(); q != _activating.end(); ++q) + if(adapter.proxy && _locator->getDirectProxy(adapter, this)) { - _locator->cancelActivate(*q, this); + activating(adapter.id); } - - Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default"); - _amdCB->ice_response(proxy->ice_endpoints(endpoints)); } - if(_roundRobin) +private: + + LocatorAdapterInfo + nextAdapter() { - _locator->removePendingResolve(_id, roundRobinCount); + bool replicaGroup; + bool roundRobin; + + _adapters.clear(); + + try + { + if(!_waitForActivation) + { + _locator->getAdapterInfo(_id, _adapters, _count, replicaGroup, roundRobin, _activatingOrFailed); + } + + if(_waitForActivation || (_adapters.empty() && _activatingOrFailed.size() > _failed.size())) + { + // + // If there are no more adapters to try and some servers were being activated, we + // try again but this time we wait for the server activation. + // + _locator->getAdapterInfo(_id, _adapters, _count, replicaGroup, roundRobin, _failed); + _waitForActivation = true; + } + + if(!roundRobin) + { + LocatorI::RequestPtr request; + if(replicaGroup) + { + request = new ReplicaGroupRequest(_amdCB, _locator, _id, _adapters, _count, 0); + } + else + { + assert(!_adapters.empty()); + request = new AdapterRequest(_amdCB, _locator, _adapters[0]); + } + request->execute(); + _adapters.clear(); + return LocatorAdapterInfo(); + } + else if(!_adapters.empty()) + { + return _adapters[0]; + } + else + { + assert(_adapters.empty()); + if(_traceLevels->locator > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); + out << "couldn't resolve replica group `" << _id << "' endpoints:\n"; + out << (_exception.get() ? toString(*_exception) : string("replica group is empty")); + } + _amdCB->ice_response(0); + return LocatorAdapterInfo(); + } + } + catch(const AdapterNotExistException&) + { + assert(_adapters.empty()); + _amdCB->ice_exception(Ice::AdapterNotFoundException()); + return LocatorAdapterInfo(); + } + catch(const Ice::Exception& ex) + { + assert(_adapters.empty()); + if(_traceLevels->locator > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); + out << "couldn't resolve replica group `" << _id << "' endpoints:\n" << toString(ex); + } + _amdCB->ice_response(0); + return LocatorAdapterInfo(); + } } -} + + const Ice::AMD_Locator_findAdapterByIdPtr _amdCB; + const LocatorIPtr _locator; + const std::string _id; + LocatorAdapterInfoSeq _adapters; + const TraceLevelsPtr _traceLevels; + int _count; + bool _waitForActivation; + set<string> _failed; + set<string> _activatingOrFailed; + std::auto_ptr<Ice::Exception> _exception; +}; + +}; + LocatorI::LocatorI(const Ice::CommunicatorPtr& communicator, const DatabasePtr& database, @@ -471,20 +690,6 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, const Ice::Current&) const { LocatorIPtr self = const_cast<LocatorI*>(this); - if(self->addPendingResolve(id, cb)) - { - // - // Another request is currently resolving the adapter endpoints. We'll - // answer this request once it's done. - // - return; - } - - // - // If no other request is resolving the adapter endpoints, resolve - // the endpoints now. - // - bool replicaGroup = false; try { @@ -497,20 +702,20 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, LocatorAdapterInfoSeq adapters; bool roundRobin; _database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup, roundRobin); - - // - // Round robin replica group requests are serialized. This is - // required to make sure the round robin counter is accurate - // even if some adapters are unreachable (bug 2576). For - // adapters, and replica groups, there's no need to serialize - // the requests. - // - if(!roundRobin) + RequestPtr request; + if(roundRobin) { - self->removePendingResolve(id, 0); + request = new RoundRobinRequest(cb, self, id, adapters, count); + } + else if(replicaGroup) + { + request = new ReplicaGroupRequest(cb, self, id, adapters, count, 0); + } + else + { + assert(adapters.size() == 1); + request = new AdapterRequest(cb, self, adapters[0]); } - - RequestPtr request = new Request(cb, self, id, replicaGroup, roundRobin, adapters, count); request->execute(); } catch(const AdapterNotExistException&) @@ -523,7 +728,6 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, { cb->ice_exception(Ice::AdapterNotFoundException()); } - self->removePendingResolve(id, 0); return; } catch(const Ice::Exception& ex) @@ -542,7 +746,6 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, } } cb->ice_response(0); - self->removePendingResolve(id, 0); return; } } @@ -577,130 +780,117 @@ LocatorI::getTraceLevels() const return _database->getTraceLevels(); } -void -LocatorI::activate(const LocatorAdapterInfo& adapter, const RequestPtr& request) +bool +LocatorI::getDirectProxy(const LocatorAdapterInfo& adapter, const RequestPtr& request) { { Lock sync(*this); - - // - // Check if there's already pending requests for this adapter. If that's the case, - // we just add this one to the queue. If not, we add it to the queue and initiate - // a call on the adapter to get its direct proxy. - // - PendingRequestsMap::iterator p; - p = _pendingRequests.insert(make_pair(adapter.id, PendingRequests())).first; - p->second.insert(request); - if(p->second.size() != 1) + PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id); + if(p != _pendingRequests.end()) { - return; + p->second.push_back(request); + return _activating.find(adapter.id) != _activating.end(); } + + PendingRequests requests; + requests.push_back(request); + _pendingRequests.insert(make_pair(adapter.id, requests)); } - AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, adapter.id); - int timeout = adapter.activationTimeout + adapter.deactivationTimeout; - AdapterPrx::uncheckedCast(adapter.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB); -} - -void -LocatorI::cancelActivate(const string& id, const RequestPtr& request) -{ - Lock sync(*this); - PendingRequestsMap::iterator p = _pendingRequests.find(id); - if(p != _pendingRequests.end()) - { - p->second.erase(request); - } + adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter)); + return false; } void -LocatorI::activateFinished(const string& id, const Ice::ObjectPrx& proxy) +LocatorI::getDirectProxyResponse(const LocatorAdapterInfo& adapter, const Ice::ObjectPrx& proxy) { PendingRequests requests; { Lock sync(*this); - PendingRequestsMap::iterator p = _pendingRequests.find(id); + PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id); assert(p != _pendingRequests.end()); requests.swap(p->second); _pendingRequests.erase(p); + _activating.erase(adapter.id); } - for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + if(proxy) + { + for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + { + (*q)->response(adapter.id, proxy); + } + } + else { - (*q)->response(id, proxy); + for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + { + (*q)->exception(adapter.id, AdapterNotActiveException()); + } } } void -LocatorI::activateException(const string& id, const Ice::Exception& ex) +LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice::Exception& ex) { - PendingRequests requests; + bool activate = false; + try { - Lock sync(*this); - PendingRequestsMap::iterator p = _pendingRequests.find(id); - assert(p != _pendingRequests.end()); - requests.swap(p->second); - _pendingRequests.erase(p); + ex.ice_throw(); } - - for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + catch(const AdapterNotActiveException& e) { - (*q)->exception(id, ex); + activate = e.activatable; } -} - -bool -LocatorI::addPendingResolve(const string& adapterId, const Ice::AMD_Locator_findAdapterByIdPtr& cb) -{ - Lock sync(*this); - map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId); - if(p == _resolves.end()) - { - p = _resolves.insert(make_pair(adapterId, deque<Ice::AMD_Locator_findAdapterByIdPtr>())).first; - } - else if(p->second.front().get() == cb.get()) + catch(const Ice::Exception&) { - return false; } - - p->second.push_back(cb); - return p->second.size() > 1; -} - -void -LocatorI::removePendingResolve(const string& adapterId, int roundRobinCount) -{ - Ice::AMD_Locator_findAdapterByIdPtr cb; + + PendingRequests requests; { Lock sync(*this); - - // - // Bump the round robin counter. We bump the round robin counter by - // the number of inactive adapters. This ensures that if the first - // adapters are inactive, if the first adapter to be inactive is the - // Nth adapter, the next adapter to be returned will be the Nth + 1. - // - if(roundRobinCount > 0) - { - _database->getAdapter(adapterId)->increaseRoundRobinCount(roundRobinCount); - } - - map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId); - assert(p != _resolves.end()); - - p->second.pop_front(); - if(p->second.empty()) + PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id); + assert(p != _pendingRequests.end()); + if(activate) { - _resolves.erase(p); + _activating.insert(adapter.id); + requests = p->second; } else { - cb = p->second.front(); + requests.swap(p->second); + _pendingRequests.erase(p); + _activating.erase(adapter.id); } } - if(cb) + if(activate) + { + for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + { + (*q)->activating(adapter.id); + } + + AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, adapter); + int timeout = adapter.activationTimeout + adapter.deactivationTimeout; + AdapterPrx::uncheckedCast(adapter.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB); + } + else { - findAdapterById_async(cb, adapterId); + for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + { + (*q)->exception(adapter.id, ex); + } } } + +void +LocatorI::getAdapterInfo(const string& id, + LocatorAdapterInfoSeq& adapters, + int& count, + bool& replicaGroup, + bool& roundRobin, + const set<string>& excludes) +{ + _database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup, roundRobin, excludes); +} |