summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/LocatorI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/LocatorI.cpp')
-rw-r--r--cpp/src/IceGrid/LocatorI.cpp794
1 files changed, 492 insertions, 302 deletions
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp
index bf910ba31d1..d038ccae641 100644
--- a/cpp/src/IceGrid/LocatorI.cpp
+++ b/cpp/src/IceGrid/LocatorI.cpp
@@ -1,6 +1,6 @@
// **********************************************************************
//
-// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved.
+// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
//
// This copy of Ice is licensed to you under the terms described in the
// ICE_LICENSE file included in this distribution.
@@ -26,67 +26,51 @@ class AMI_Adapter_getDirectProxyI : public AMI_Adapter_getDirectProxy
{
public:
- AMI_Adapter_getDirectProxyI(const LocatorI::RequestPtr& request, const string& id) :
- _request(request), _id(id)
+ AMI_Adapter_getDirectProxyI(const LocatorIPtr& locator, const LocatorAdapterInfo& adapter) :
+ _locator(locator), _adapter(adapter)
{
}
virtual void ice_response(const ::Ice::ObjectPrx& obj)
{
assert(obj);
- _request->response(_id, obj);
+ _locator->getDirectProxyResponse(_adapter, obj);
}
virtual void ice_exception(const ::Ice::Exception& e)
{
- try
- {
- e.ice_throw();
- }
- catch(const AdapterNotActiveException& ex)
- {
- if(ex.activatable)
- {
- _request->activate(_id);
- return;
- }
- }
- catch(const Ice::Exception&)
- {
- }
-
- _request->exception(_id, e);
+ _locator->getDirectProxyException(_adapter, e);
}
private:
- const LocatorI::RequestPtr _request;
- const string _id;
+ const LocatorIPtr _locator;
+ const LocatorAdapterInfo _adapter;
};
class AMI_Adapter_activateI : public AMI_Adapter_activate
{
public:
- AMI_Adapter_activateI(const LocatorIPtr& locator, const string& id) :
- _locator(locator), _id(id)
+ AMI_Adapter_activateI(const LocatorIPtr& locator, const LocatorAdapterInfo& adapter) :
+ _locator(locator), _adapter(adapter)
{
}
virtual void ice_response(const ::Ice::ObjectPrx& obj)
{
- _locator->activateFinished(_id, obj);
+ _locator->getDirectProxyResponse(_adapter, obj);
}
virtual void ice_exception(const ::Ice::Exception& ex)
{
- _locator->activateException(_id, ex);
+ _locator->getDirectProxyException(_adapter, ex);
}
private:
const LocatorIPtr _locator;
- const string _id;
+ const LocatorAdapterInfo _adapter;
};
//
@@ -166,248 +150,483 @@ private:
const Ice::ObjectPrx _obj;
};
-}
-
-LocatorI::Request::Request(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB,
- const LocatorIPtr& locator,
- const string& id,
- bool replicaGroup,
- bool roundRobin,
- const LocatorAdapterInfoSeq& adapters,
- int count) :
- _amdCB(amdCB),
- _locator(locator),
- _id(id),
- _replicaGroup(replicaGroup),
- _roundRobin(roundRobin),
- _adapters(adapters),
- _traceLevels(locator->getTraceLevels()),
- _count(count),
- _lastAdapter(_adapters.begin())
+class AdapterRequest : public LocatorI::Request
{
- assert((_count == 0 && _adapters.empty()) || _count > 0);
-}
+public:
-void
-LocatorI::Request::execute()
-{
- //
- // If there's no adapters to request, we're done, send the
- // response.
- //
- if(_adapters.empty())
+ AdapterRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB,
+ const LocatorIPtr& locator,
+ const LocatorAdapterInfo& adapter) :
+ _amdCB(amdCB),
+ _locator(locator),
+ _adapter(adapter),
+ _traceLevels(locator->getTraceLevels())
{
- sendResponse();
- return;
+ assert(_adapter.proxy);
}
- //
- // Otherwise, request as many adapters as required.
- //
- LocatorAdapterInfoSeq adapters;
+ virtual void
+ execute()
{
- Lock sync(*this);
- assert(_count > 0 && _lastAdapter != _adapters.end());
- for(unsigned int i = static_cast<unsigned int>(_proxies.size()); i < _count; ++i)
+ _locator->getDirectProxy(_adapter, this);
+ }
+
+ virtual void
+ activating(const string&)
+ {
+ // Nothing to do.
+ }
+
+ virtual void
+ response(const std::string&, const Ice::ObjectPrx& proxy)
+ {
+ assert(proxy);
+ _amdCB->ice_response(proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")));
+ }
+
+ virtual void
+ exception(const std::string&, const Ice::Exception& ex)
+ {
+ if(_traceLevels->locator > 0)
{
- if(_lastAdapter == _adapters.end())
- {
- _count = i;
- break;
- }
- assert(_lastAdapter->proxy);
- adapters.push_back(*_lastAdapter);
+ Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
+ out << "couldn't resolve adapter`" << _adapter.id << "' endpoints:\n" << toString(ex);
+ }
+ _amdCB->ice_response(0);
+ }
+
+private:
+
+ const Ice::AMD_Locator_findAdapterByIdPtr _amdCB;
+ const LocatorIPtr _locator;
+ const LocatorAdapterInfo _adapter;
+ const TraceLevelsPtr _traceLevels;
+};
+
+class ReplicaGroupRequest : public LocatorI::Request, public IceUtil::Mutex
+{
+public:
+
+ ReplicaGroupRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB,
+ const LocatorIPtr& locator,
+ const string& id,
+ const LocatorAdapterInfoSeq& adapters,
+ int count,
+ Ice::ObjectPrx firstProxy) :
+ _amdCB(amdCB),
+ _locator(locator),
+ _id(id),
+ _adapters(adapters),
+ _traceLevels(locator->getTraceLevels()),
+ _count(count),
+ _lastAdapter(_adapters.begin())
+ {
+ assert(_adapters.empty() || _count > 0);
+
+ if(_adapters.empty())
+ {
+ _count = 0;
+ }
+
+ //
+ // If the first adapter proxy is provided, store it in _proxies.
+ //
+ if(firstProxy)
+ {
+ assert(!_adapters.empty());
+ _proxies[_adapters[0].id] = firstProxy;
++_lastAdapter;
}
}
-
- for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p)
+
+ virtual void
+ execute()
{
- p->proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, p->id));
+ //
+ // Otherwise, request as many adapters as required.
+ //
+ LocatorAdapterInfoSeq adapters;
+ {
+ Lock sync(*this);
+ for(unsigned int i = static_cast<unsigned int>(_proxies.size()); i < _count; ++i)
+ {
+ if(_lastAdapter == _adapters.end())
+ {
+ _count = i;
+ break;
+ }
+ assert(_lastAdapter->proxy);
+ adapters.push_back(*_lastAdapter);
+ ++_lastAdapter;
+ }
+
+ //
+ // If there's no adapters to request, we're done, send the
+ // response.
+ //
+ if(_proxies.size() == _count)
+ {
+ sendResponse();
+ return;
+ }
+ }
+
+
+ for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p)
+ {
+ if(_locator->getDirectProxy(*p, this))
+ {
+ activating(p->id);
+ }
+ }
}
-}
-void
-LocatorI::Request::activate(const string& id)
-{
- //
- // Activate the adapter
- //
- // NOTE: we use a timeout large enough to ensure that the activate() call won't
- // timeout if the server hangs in deactivation and/or activation.
- //
- for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ virtual void
+ activating(const string&)
{
- if(p->id == id)
+ //
+ // An adapter is being activated. Don't wait for the activation to complete. Instead,
+ // we query the next adapter which might be already active.
+ //
+ LocatorAdapterInfo adapter;
+ do
{
- _locator->activate(*p, this);
- _activating.insert(id);
+ Lock sync(*this);
+ if(_lastAdapter == _adapters.end())
+ {
+ break;
+ }
+ adapter = *_lastAdapter;
+ ++_lastAdapter;
}
+ while(_locator->getDirectProxy(adapter, this));
}
-
- //
- // If this is a request for a replica group, don't wait for the activation to
- // complete. Instead, we query the next adapter which might be already active.
- //
- if(_replicaGroup)
+
+ virtual void
+ exception(const string& id, const Ice::Exception& ex)
{
LocatorAdapterInfo adapter;
{
Lock sync(*this);
- if(_lastAdapter != _adapters.end())
+ if(_proxies.size() == _count) // Nothing to do if we already sent the response.
+ {
+ return;
+ }
+
+ if(!_exception.get())
+ {
+ _exception.reset(ex.ice_clone());
+ }
+
+ if(_lastAdapter == _adapters.end())
+ {
+ --_count; // Expect one less adapter proxy if there's no more adapters to query.
+
+ //
+ // If we received all the required proxies, it's time to send the
+ // answer back to the client.
+ //
+ if(_count == _proxies.size())
+ {
+ sendResponse();
+ }
+ }
+ else
{
adapter = *_lastAdapter;
++_lastAdapter;
}
}
+
if(adapter.proxy)
{
- adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter.id));
+ if(_locator->getDirectProxy(adapter, this))
+ {
+ activating(adapter.id);
+ }
}
}
-}
-void
-LocatorI::Request::exception(const string& id, const Ice::Exception& ex)
-{
- LocatorAdapterInfo adapter;
+ virtual void
+ response(const string& id, const Ice::ObjectPrx& proxy)
{
Lock sync(*this);
- if(!_exception.get())
+ assert(proxy);
+ if(_proxies.size() == _count) // Nothing to do if we already sent the response.
{
- _exception.reset(ex.ice_clone());
+ return;
}
- _activating.erase(id);
+ _proxies[id] = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"));
+
+ //
+ // If we received all the required proxies, it's time to send the
+ // answer back to the client.
+ //
+ if(_proxies.size() == _count)
+ {
+ sendResponse();
+ }
+ }
+
+private:
- if(_lastAdapter == _adapters.end())
+ void
+ sendResponse()
+ {
+ if(_proxies.size() == 1)
+ {
+ _amdCB->ice_response(_proxies.begin()->second);
+ }
+ else if(_proxies.empty())
{
- --_count; // Expect one less adapter proxy if there's no more adapters to query.
-
//
- // If we received all the required proxies, it's time to send the
- // answer back to the client.
+ // If there's no proxies, it's either because we couldn't contact the adapters or
+ // because the replica group has no members.
//
- if(_count == _proxies.size())
+ assert(_exception.get() || _adapters.empty());
+ if(_traceLevels->locator > 0)
{
- sendResponse();
+ Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
+ out << "couldn't resolve replica group `" << _id << "' endpoints:\n";
+ out << (_exception.get() ? toString(*_exception) : string("replica group is empty"));
}
+ _amdCB->ice_response(0);
}
- else
+ else if(_proxies.size() > 1)
{
- adapter = *_lastAdapter;
- ++_lastAdapter;
+ Ice::EndpointSeq endpoints;
+ endpoints.reserve(_proxies.size());
+ for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ {
+ map<string, Ice::ObjectPrx>::const_iterator q = _proxies.find(p->id);
+ if(q != _proxies.end())
+ {
+ Ice::EndpointSeq edpts = q->second->ice_getEndpoints();
+ endpoints.insert(endpoints.end(), edpts.begin(), edpts.end());
+ }
+ }
+
+ Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default");
+ _amdCB->ice_response(proxy->ice_endpoints(endpoints));
}
}
- if(adapter.proxy)
- {
- adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter.id));
- }
-}
+ const Ice::AMD_Locator_findAdapterByIdPtr _amdCB;
+ const LocatorIPtr _locator;
+ const std::string _id;
+ LocatorAdapterInfoSeq _adapters;
+ const TraceLevelsPtr _traceLevels;
+ unsigned int _count;
+ LocatorAdapterInfoSeq::const_iterator _lastAdapter;
+ std::map<std::string, Ice::ObjectPrx> _proxies;
+ std::auto_ptr<Ice::Exception> _exception;
+};
-void
-LocatorI::Request::response(const string& id, const Ice::ObjectPrx& proxy)
+class RoundRobinRequest : public LocatorI::Request, public IceUtil::Mutex
{
- if(!proxy)
+public:
+
+ RoundRobinRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB,
+ const LocatorIPtr& locator,
+ const string& id,
+ const LocatorAdapterInfoSeq& adapters,
+ int count) :
+ _amdCB(amdCB),
+ _locator(locator),
+ _id(id),
+ _adapters(adapters),
+ _traceLevels(locator->getTraceLevels()),
+ _count(count),
+ _waitForActivation(false)
{
- exception(id, AdapterNotActiveException());
- return;
+ assert(_adapters.empty() || _count > 0);
}
- Lock sync(*this);
- assert(proxy);
-
- _activating.erase(id);
-
- _proxies[id] = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"));
-
- //
- // If we received all the required proxies, it's time to send the
- // answer back to the client.
- //
- if(_proxies.size() == _count)
+ virtual void
+ execute()
{
- sendResponse();
+ if(_adapters.empty())
+ {
+ if(_traceLevels->locator > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
+ out << "couldn't resolve replica group `" << _id << "' endpoints:\nreplica group is empty";
+ }
+ _amdCB->ice_response(0);
+ return;
+ }
+
+ LocatorAdapterInfo adapter = _adapters[0];
+ assert(adapter.proxy);
+ if(_locator->getDirectProxy(adapter, this))
+ {
+ activating(adapter.id);
+ }
}
-}
-void
-LocatorI::Request::sendResponse()
-{
- int roundRobinCount = 0;
- if(_proxies.size() == 1)
+ virtual void
+ activating(const string& id)
{
- if(_roundRobin)
+ LocatorAdapterInfo adapter;
+ adapter.id = id;
+ do
{
- for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ Lock sync(*this);
+ if(_adapters.empty() || _waitForActivation)
{
- if(_proxies.find(p->id) != _proxies.end())
- {
- break;
- }
- //
- // We count the number of object adapters which are inactive until we find
- // one active. This count will be used to update the round robin counter.
- //
- ++roundRobinCount;
+ return;
}
+ _activatingOrFailed.insert(adapter.id);
+ adapter = nextAdapter();
}
- _amdCB->ice_response(_proxies.begin()->second);
+ while(adapter.proxy && _locator->getDirectProxy(adapter, this));
}
- else if(_proxies.empty())
+
+ virtual void
+ response(const std::string& id, const Ice::ObjectPrx& proxy)
{
- //
- // If there's no proxies, it's either because we couldn't
- // contact the adapters or because the replica group has
- // no members.
- //
- assert(_exception.get() || (_replicaGroup && _adapters.empty()));
+ Lock sync(*this);
+ assert(proxy);
+ if(_adapters.empty() || id != _adapters[0].id)
+ {
+ return;
+ }
- if(_traceLevels->locator > 0)
+ if(_count > 1)
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
- out << "couldn't resolve " << (_replicaGroup ? "replica group `" : "adapter `") << _id << "' endpoints:\n";
- out << (_exception.get() ? toString(*_exception) : string("replica group is empty"));
+ Ice::ObjectPrx p = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"));
+ LocatorI::RequestPtr request = new ReplicaGroupRequest(_amdCB, _locator, _id, _adapters, _count, p);
+ request->execute();
}
- _amdCB->ice_response(0);
+ else
+ {
+ _amdCB->ice_response(proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")));
+ }
+ _adapters.clear();
}
- else if(_proxies.size() > 1)
+
+ virtual void
+ exception(const std::string& id, const Ice::Exception& ex)
{
- Ice::EndpointSeq endpoints;
- endpoints.reserve(_proxies.size());
- for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ LocatorAdapterInfo adapter;
{
- map<string, Ice::ObjectPrx>::const_iterator q = _proxies.find(p->id);
- if(q != _proxies.end())
+ Lock sync(*this);
+ _failed.insert(id);
+ _activatingOrFailed.insert(id);
+
+ if(!_exception.get())
{
- Ice::EndpointSeq edpts = q->second->ice_getEndpoints();
- endpoints.insert(endpoints.end(), edpts.begin(), edpts.end());
+ _exception.reset(ex.ice_clone());
}
- else if(_roundRobin && endpoints.empty())
+
+ if(_adapters.empty() || id != _adapters[0].id)
{
- //
- // We count the number of object adapters which are inactive until we find
- // one active. This count will be used to update the round robin counter.
- //
- ++roundRobinCount;
+ return;
}
+
+ adapter = nextAdapter();
}
- for(set<string>::const_iterator q = _activating.begin(); q != _activating.end(); ++q)
+ if(adapter.proxy && _locator->getDirectProxy(adapter, this))
{
- _locator->cancelActivate(*q, this);
+ activating(adapter.id);
}
-
- Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default");
- _amdCB->ice_response(proxy->ice_endpoints(endpoints));
}
- if(_roundRobin)
+private:
+
+ LocatorAdapterInfo
+ nextAdapter()
{
- _locator->removePendingResolve(_id, roundRobinCount);
+ bool replicaGroup;
+ bool roundRobin;
+
+ _adapters.clear();
+
+ try
+ {
+ if(!_waitForActivation)
+ {
+ _locator->getAdapterInfo(_id, _adapters, _count, replicaGroup, roundRobin, _activatingOrFailed);
+ }
+
+ if(_waitForActivation || (_adapters.empty() && _activatingOrFailed.size() > _failed.size()))
+ {
+ //
+ // If there are no more adapters to try and some servers were being activated, we
+ // try again but this time we wait for the server activation.
+ //
+ _locator->getAdapterInfo(_id, _adapters, _count, replicaGroup, roundRobin, _failed);
+ _waitForActivation = true;
+ }
+
+ if(!roundRobin)
+ {
+ LocatorI::RequestPtr request;
+ if(replicaGroup)
+ {
+ request = new ReplicaGroupRequest(_amdCB, _locator, _id, _adapters, _count, 0);
+ }
+ else
+ {
+ assert(!_adapters.empty());
+ request = new AdapterRequest(_amdCB, _locator, _adapters[0]);
+ }
+ request->execute();
+ _adapters.clear();
+ return LocatorAdapterInfo();
+ }
+ else if(!_adapters.empty())
+ {
+ return _adapters[0];
+ }
+ else
+ {
+ assert(_adapters.empty());
+ if(_traceLevels->locator > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
+ out << "couldn't resolve replica group `" << _id << "' endpoints:\n";
+ out << (_exception.get() ? toString(*_exception) : string("replica group is empty"));
+ }
+ _amdCB->ice_response(0);
+ return LocatorAdapterInfo();
+ }
+ }
+ catch(const AdapterNotExistException&)
+ {
+ assert(_adapters.empty());
+ _amdCB->ice_exception(Ice::AdapterNotFoundException());
+ return LocatorAdapterInfo();
+ }
+ catch(const Ice::Exception& ex)
+ {
+ assert(_adapters.empty());
+ if(_traceLevels->locator > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
+ out << "couldn't resolve replica group `" << _id << "' endpoints:\n" << toString(ex);
+ }
+ _amdCB->ice_response(0);
+ return LocatorAdapterInfo();
+ }
}
-}
+
+ const Ice::AMD_Locator_findAdapterByIdPtr _amdCB;
+ const LocatorIPtr _locator;
+ const std::string _id;
+ LocatorAdapterInfoSeq _adapters;
+ const TraceLevelsPtr _traceLevels;
+ int _count;
+ bool _waitForActivation;
+ set<string> _failed;
+ set<string> _activatingOrFailed;
+ std::auto_ptr<Ice::Exception> _exception;
+};
+
+};
+
LocatorI::LocatorI(const Ice::CommunicatorPtr& communicator,
const DatabasePtr& database,
@@ -471,20 +690,6 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
const Ice::Current&) const
{
LocatorIPtr self = const_cast<LocatorI*>(this);
- if(self->addPendingResolve(id, cb))
- {
- //
- // Another request is currently resolving the adapter endpoints. We'll
- // answer this request once it's done.
- //
- return;
- }
-
- //
- // If no other request is resolving the adapter endpoints, resolve
- // the endpoints now.
- //
-
bool replicaGroup = false;
try
{
@@ -497,20 +702,20 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
LocatorAdapterInfoSeq adapters;
bool roundRobin;
_database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup, roundRobin);
-
- //
- // Round robin replica group requests are serialized. This is
- // required to make sure the round robin counter is accurate
- // even if some adapters are unreachable (bug 2576). For
- // adapters, and replica groups, there's no need to serialize
- // the requests.
- //
- if(!roundRobin)
+ RequestPtr request;
+ if(roundRobin)
{
- self->removePendingResolve(id, 0);
+ request = new RoundRobinRequest(cb, self, id, adapters, count);
+ }
+ else if(replicaGroup)
+ {
+ request = new ReplicaGroupRequest(cb, self, id, adapters, count, 0);
+ }
+ else
+ {
+ assert(adapters.size() == 1);
+ request = new AdapterRequest(cb, self, adapters[0]);
}
-
- RequestPtr request = new Request(cb, self, id, replicaGroup, roundRobin, adapters, count);
request->execute();
}
catch(const AdapterNotExistException&)
@@ -523,7 +728,6 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
{
cb->ice_exception(Ice::AdapterNotFoundException());
}
- self->removePendingResolve(id, 0);
return;
}
catch(const Ice::Exception& ex)
@@ -542,7 +746,6 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
}
}
cb->ice_response(0);
- self->removePendingResolve(id, 0);
return;
}
}
@@ -577,130 +780,117 @@ LocatorI::getTraceLevels() const
return _database->getTraceLevels();
}
-void
-LocatorI::activate(const LocatorAdapterInfo& adapter, const RequestPtr& request)
+bool
+LocatorI::getDirectProxy(const LocatorAdapterInfo& adapter, const RequestPtr& request)
{
{
Lock sync(*this);
-
- //
- // Check if there's already pending requests for this adapter. If that's the case,
- // we just add this one to the queue. If not, we add it to the queue and initiate
- // a call on the adapter to get its direct proxy.
- //
- PendingRequestsMap::iterator p;
- p = _pendingRequests.insert(make_pair(adapter.id, PendingRequests())).first;
- p->second.insert(request);
- if(p->second.size() != 1)
+ PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id);
+ if(p != _pendingRequests.end())
{
- return;
+ p->second.push_back(request);
+ return _activating.find(adapter.id) != _activating.end();
}
+
+ PendingRequests requests;
+ requests.push_back(request);
+ _pendingRequests.insert(make_pair(adapter.id, requests));
}
- AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, adapter.id);
- int timeout = adapter.activationTimeout + adapter.deactivationTimeout;
- AdapterPrx::uncheckedCast(adapter.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB);
-}
-
-void
-LocatorI::cancelActivate(const string& id, const RequestPtr& request)
-{
- Lock sync(*this);
- PendingRequestsMap::iterator p = _pendingRequests.find(id);
- if(p != _pendingRequests.end())
- {
- p->second.erase(request);
- }
+ adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter));
+ return false;
}
void
-LocatorI::activateFinished(const string& id, const Ice::ObjectPrx& proxy)
+LocatorI::getDirectProxyResponse(const LocatorAdapterInfo& adapter, const Ice::ObjectPrx& proxy)
{
PendingRequests requests;
{
Lock sync(*this);
- PendingRequestsMap::iterator p = _pendingRequests.find(id);
+ PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id);
assert(p != _pendingRequests.end());
requests.swap(p->second);
_pendingRequests.erase(p);
+ _activating.erase(adapter.id);
}
- for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
+ if(proxy)
+ {
+ for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
+ {
+ (*q)->response(adapter.id, proxy);
+ }
+ }
+ else
{
- (*q)->response(id, proxy);
+ for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
+ {
+ (*q)->exception(adapter.id, AdapterNotActiveException());
+ }
}
}
void
-LocatorI::activateException(const string& id, const Ice::Exception& ex)
+LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice::Exception& ex)
{
- PendingRequests requests;
+ bool activate = false;
+ try
{
- Lock sync(*this);
- PendingRequestsMap::iterator p = _pendingRequests.find(id);
- assert(p != _pendingRequests.end());
- requests.swap(p->second);
- _pendingRequests.erase(p);
+ ex.ice_throw();
}
-
- for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
+ catch(const AdapterNotActiveException& e)
{
- (*q)->exception(id, ex);
+ activate = e.activatable;
}
-}
-
-bool
-LocatorI::addPendingResolve(const string& adapterId, const Ice::AMD_Locator_findAdapterByIdPtr& cb)
-{
- Lock sync(*this);
- map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId);
- if(p == _resolves.end())
- {
- p = _resolves.insert(make_pair(adapterId, deque<Ice::AMD_Locator_findAdapterByIdPtr>())).first;
- }
- else if(p->second.front().get() == cb.get())
+ catch(const Ice::Exception&)
{
- return false;
}
-
- p->second.push_back(cb);
- return p->second.size() > 1;
-}
-
-void
-LocatorI::removePendingResolve(const string& adapterId, int roundRobinCount)
-{
- Ice::AMD_Locator_findAdapterByIdPtr cb;
+
+ PendingRequests requests;
{
Lock sync(*this);
-
- //
- // Bump the round robin counter. We bump the round robin counter by
- // the number of inactive adapters. This ensures that if the first
- // adapters are inactive, if the first adapter to be inactive is the
- // Nth adapter, the next adapter to be returned will be the Nth + 1.
- //
- if(roundRobinCount > 0)
- {
- _database->getAdapter(adapterId)->increaseRoundRobinCount(roundRobinCount);
- }
-
- map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId);
- assert(p != _resolves.end());
-
- p->second.pop_front();
- if(p->second.empty())
+ PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id);
+ assert(p != _pendingRequests.end());
+ if(activate)
{
- _resolves.erase(p);
+ _activating.insert(adapter.id);
+ requests = p->second;
}
else
{
- cb = p->second.front();
+ requests.swap(p->second);
+ _pendingRequests.erase(p);
+ _activating.erase(adapter.id);
}
}
- if(cb)
+ if(activate)
+ {
+ for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
+ {
+ (*q)->activating(adapter.id);
+ }
+
+ AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, adapter);
+ int timeout = adapter.activationTimeout + adapter.deactivationTimeout;
+ AdapterPrx::uncheckedCast(adapter.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB);
+ }
+ else
{
- findAdapterById_async(cb, adapterId);
+ for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
+ {
+ (*q)->exception(adapter.id, ex);
+ }
}
}
+
+void
+LocatorI::getAdapterInfo(const string& id,
+ LocatorAdapterInfoSeq& adapters,
+ int& count,
+ bool& replicaGroup,
+ bool& roundRobin,
+ const set<string>& excludes)
+{
+ _database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup, roundRobin, excludes);
+}