summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2009-01-07 19:37:17 +0100
committerBenoit Foucher <benoit@zeroc.com>2009-01-07 19:37:17 +0100
commitb5042ce24aaa2dcff2092046b322ff61c3d9ef8c (patch)
treedcdca6930377ad9098eeb9996ce1f7663c79e5db /cpp/src
parentOther fix for 3601 - plugins can be destroyed twice (diff)
downloadice-b5042ce24aaa2dcff2092046b322ff61c3d9ef8c.tar.bz2
ice-b5042ce24aaa2dcff2092046b322ff61c3d9ef8c.tar.xz
ice-b5042ce24aaa2dcff2092046b322ff61c3d9ef8c.zip
Squashed commit of the following:
commit 8019e6de4480f361a83d8944afec60793454c322 Author: Benoit Foucher <benoit@zeroc.com> Date: Wed Jan 7 17:16:40 2009 +0100 Fixed bug 3516 - Fixed scaling issue when using round-robin replica groups commit 6c36afb32dda8b37b7d5330ed51a439bc73b17db Author: Benoit Foucher <benoit@zeroc.com> Date: Wed Jan 7 17:16:36 2009 +0100 Fixed bug 3230 - IceGrid node leak
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceGrid/AdapterCache.cpp47
-rw-r--r--cpp/src/IceGrid/AdapterCache.h13
-rw-r--r--cpp/src/IceGrid/IceGridNode.cpp12
-rw-r--r--cpp/src/IceGrid/LocatorI.cpp694
-rw-r--r--cpp/src/IceGrid/LocatorI.h43
-rw-r--r--cpp/src/IceGrid/NodeI.cpp10
-rw-r--r--cpp/src/IceGrid/NodeI.h2
-rw-r--r--cpp/src/IceGrid/ServerI.cpp8
-rw-r--r--cpp/src/IceGrid/ServerI.h1
9 files changed, 486 insertions, 344 deletions
diff --git a/cpp/src/IceGrid/AdapterCache.cpp b/cpp/src/IceGrid/AdapterCache.cpp
index f6970f4e09e..af24d9da061 100644
--- a/cpp/src/IceGrid/AdapterCache.cpp
+++ b/cpp/src/IceGrid/AdapterCache.cpp
@@ -231,7 +231,7 @@ ServerAdapterEntry::ServerAdapterEntry(AdapterCache& cache,
void
ServerAdapterEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& nReplicas, bool& replicaGroup,
- bool& roundRobin)
+ bool& roundRobin, const set<string>&)
{
nReplicas = 1;
replicaGroup = false;
@@ -242,11 +242,6 @@ ServerAdapterEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int&
adapters.push_back(info);
}
-void
-ServerAdapterEntry::increaseRoundRobinCount(int roundRobinCount)
-{
-}
-
float
ServerAdapterEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const
{
@@ -383,7 +378,7 @@ ReplicaGroupEntry::update(const LoadBalancingPolicyPtr& policy)
void
ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& nReplicas, bool& replicaGroup,
- bool& roundRobin)
+ bool& roundRobin, const set<string>& excludes)
{
vector<ServerAdapterEntryPtr> replicas;
bool adaptive = false;
@@ -453,32 +448,28 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n
//
for(vector<ServerAdapterEntryPtr>::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
{
- try
- {
- int dummy;
- bool dummy2;
- bool dummy3;
- (*p)->getLocatorAdapterInfo(adapters, dummy, dummy2, dummy3);
- }
- catch(const AdapterNotExistException&)
- {
- }
- catch(const NodeUnreachableException&)
- {
- }
- catch(const DeploymentException&)
+ if(!roundRobin || excludes.find((*p)->getId()) == excludes.end())
{
+ try
+ {
+ int dummy;
+ bool dummy2;
+ bool dummy3;
+ (*p)->getLocatorAdapterInfo(adapters, dummy, dummy2, dummy3, set<string>());
+ }
+ catch(const AdapterNotExistException&)
+ {
+ }
+ catch(const NodeUnreachableException&)
+ {
+ }
+ catch(const DeploymentException&)
+ {
+ }
}
}
}
-void
-ReplicaGroupEntry::increaseRoundRobinCount(int count)
-{
- Lock sync(*this);
- _lastReplica = (_lastReplica + count) % static_cast<int>(_replicas.size());
-}
-
float
ReplicaGroupEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const
{
diff --git a/cpp/src/IceGrid/AdapterCache.h b/cpp/src/IceGrid/AdapterCache.h
index 3a862d12d97..8ee42bc1531 100644
--- a/cpp/src/IceGrid/AdapterCache.h
+++ b/cpp/src/IceGrid/AdapterCache.h
@@ -43,8 +43,11 @@ public:
AdapterEntry(AdapterCache&, const std::string&, const std::string&);
- virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&) = 0;
- virtual void increaseRoundRobinCount(int) = 0;
+ virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&, const std::set<std::string>&) = 0;
+ void getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& count, bool& replicaGroup, bool& roundRobin)
+ {
+ getLocatorAdapterInfo(adapters, count, replicaGroup, roundRobin, std::set<std::string>());
+ }
virtual float getLeastLoadedNodeLoad(LoadSample) const = 0;
virtual AdapterInfoSeq getAdapterInfo() const = 0;
@@ -68,8 +71,7 @@ public:
ServerAdapterEntry(AdapterCache&, const std::string&, const std::string&, const std::string&, int,
const ServerEntryPtr&);
- virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&);
- virtual void increaseRoundRobinCount(int);
+ virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&, const std::set<std::string>&);
virtual float getLeastLoadedNodeLoad(LoadSample) const;
virtual AdapterInfoSeq getAdapterInfo() const;
virtual const std::string& getReplicaGroupId() const { return _replicaGroupId; }
@@ -91,8 +93,7 @@ public:
ReplicaGroupEntry(AdapterCache&, const std::string&, const std::string&, const LoadBalancingPolicyPtr&);
- virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&);
- virtual void increaseRoundRobinCount(int);
+ virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&, const std::set<std::string>&);
virtual float getLeastLoadedNodeLoad(LoadSample) const;
virtual AdapterInfoSeq getAdapterInfo() const;
diff --git a/cpp/src/IceGrid/IceGridNode.cpp b/cpp/src/IceGrid/IceGridNode.cpp
index 1b55a7e2066..4dc9a62e978 100644
--- a/cpp/src/IceGrid/IceGridNode.cpp
+++ b/cpp/src/IceGrid/IceGridNode.cpp
@@ -718,12 +718,6 @@ NodeService::stop()
_node->getPlatformInfo().stop();
//
- // Break cylic reference counts.
- //
- _node->destroy();
- _node = 0;
-
- //
// We can now safely shutdown the communicator.
//
try
@@ -739,6 +733,12 @@ NodeService::stop()
}
//
+ // Break cylic reference counts.
+ //
+ _node->shutdown();
+ _node = 0;
+
+ //
// And shutdown the collocated registry.
//
if(_registry)
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp
index 4e354c5d9bf..d038ccae641 100644
--- a/cpp/src/IceGrid/LocatorI.cpp
+++ b/cpp/src/IceGrid/LocatorI.cpp
@@ -150,80 +150,144 @@ private:
const Ice::ObjectPrx _obj;
};
-}
-
-LocatorI::Request::Request(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB,
- const LocatorIPtr& locator,
- const string& id,
- bool replicaGroup,
- bool roundRobin,
- const LocatorAdapterInfoSeq& adapters,
- int count) :
- _amdCB(amdCB),
- _locator(locator),
- _id(id),
- _replicaGroup(replicaGroup),
- _roundRobin(roundRobin),
- _adapters(adapters),
- _traceLevels(locator->getTraceLevels()),
- _count(count),
- _lastAdapter(_adapters.begin())
+class AdapterRequest : public LocatorI::Request
{
- assert((_count == 0 && _adapters.empty()) || _count > 0);
-}
+public:
-void
-LocatorI::Request::execute()
+ AdapterRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB,
+ const LocatorIPtr& locator,
+ const LocatorAdapterInfo& adapter) :
+ _amdCB(amdCB),
+ _locator(locator),
+ _adapter(adapter),
+ _traceLevels(locator->getTraceLevels())
+ {
+ assert(_adapter.proxy);
+ }
+
+ virtual void
+ execute()
+ {
+ _locator->getDirectProxy(_adapter, this);
+ }
+
+ virtual void
+ activating(const string&)
+ {
+ // Nothing to do.
+ }
+
+ virtual void
+ response(const std::string&, const Ice::ObjectPrx& proxy)
+ {
+ assert(proxy);
+ _amdCB->ice_response(proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")));
+ }
+
+ virtual void
+ exception(const std::string&, const Ice::Exception& ex)
+ {
+ if(_traceLevels->locator > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
+ out << "couldn't resolve adapter`" << _adapter.id << "' endpoints:\n" << toString(ex);
+ }
+ _amdCB->ice_response(0);
+ }
+
+private:
+
+ const Ice::AMD_Locator_findAdapterByIdPtr _amdCB;
+ const LocatorIPtr _locator;
+ const LocatorAdapterInfo _adapter;
+ const TraceLevelsPtr _traceLevels;
+};
+
+class ReplicaGroupRequest : public LocatorI::Request, public IceUtil::Mutex
{
- //
- // If there's no adapters to request, we're done, send the
- // response.
- //
- if(_adapters.empty())
+public:
+
+ ReplicaGroupRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB,
+ const LocatorIPtr& locator,
+ const string& id,
+ const LocatorAdapterInfoSeq& adapters,
+ int count,
+ Ice::ObjectPrx firstProxy) :
+ _amdCB(amdCB),
+ _locator(locator),
+ _id(id),
+ _adapters(adapters),
+ _traceLevels(locator->getTraceLevels()),
+ _count(count),
+ _lastAdapter(_adapters.begin())
{
- sendResponse();
- return;
+ assert(_adapters.empty() || _count > 0);
+
+ if(_adapters.empty())
+ {
+ _count = 0;
+ }
+
+ //
+ // If the first adapter proxy is provided, store it in _proxies.
+ //
+ if(firstProxy)
+ {
+ assert(!_adapters.empty());
+ _proxies[_adapters[0].id] = firstProxy;
+ ++_lastAdapter;
+ }
}
- //
- // Otherwise, request as many adapters as required.
- //
- LocatorAdapterInfoSeq adapters;
+ virtual void
+ execute()
{
- Lock sync(*this);
- assert(_count > 0 && _lastAdapter != _adapters.end());
- for(unsigned int i = static_cast<unsigned int>(_proxies.size()); i < _count; ++i)
+ //
+ // Otherwise, request as many adapters as required.
+ //
+ LocatorAdapterInfoSeq adapters;
{
- if(_lastAdapter == _adapters.end())
+ Lock sync(*this);
+ for(unsigned int i = static_cast<unsigned int>(_proxies.size()); i < _count; ++i)
{
- _count = i;
- break;
+ if(_lastAdapter == _adapters.end())
+ {
+ _count = i;
+ break;
+ }
+ assert(_lastAdapter->proxy);
+ adapters.push_back(*_lastAdapter);
+ ++_lastAdapter;
+ }
+
+ //
+ // If there's no adapters to request, we're done, send the
+ // response.
+ //
+ if(_proxies.size() == _count)
+ {
+ sendResponse();
+ return;
}
- assert(_lastAdapter->proxy);
- adapters.push_back(*_lastAdapter);
- ++_lastAdapter;
}
- }
+
- for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p)
- {
- if(_locator->getDirectProxy(*p, this))
+ for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p)
{
- activating();
+ if(_locator->getDirectProxy(*p, this))
+ {
+ activating(p->id);
+ }
}
}
-}
-void
-LocatorI::Request::activating()
-{
- //
- // An adapter is being activated. If this is a request for a replica group, don't
- // wait for the activation to complete. Instead, we query the next adapter which
- // might be already active.
- //
- if(_replicaGroup)
+ virtual void
+ activating(const string&)
{
+ //
+ // An adapter is being activated. Don't wait for the activation to complete. Instead,
+ // we query the next adapter which might be already active.
+ //
LocatorAdapterInfo adapter;
do
{
@@ -237,150 +301,332 @@ LocatorI::Request::activating()
}
while(_locator->getDirectProxy(adapter, this));
}
-}
+
+ virtual void
+ exception(const string& id, const Ice::Exception& ex)
+ {
+ LocatorAdapterInfo adapter;
+ {
+ Lock sync(*this);
+ if(_proxies.size() == _count) // Nothing to do if we already sent the response.
+ {
+ return;
+ }
+
+ if(!_exception.get())
+ {
+ _exception.reset(ex.ice_clone());
+ }
+
+ if(_lastAdapter == _adapters.end())
+ {
+ --_count; // Expect one less adapter proxy if there's no more adapters to query.
+
+ //
+ // If we received all the required proxies, it's time to send the
+ // answer back to the client.
+ //
+ if(_count == _proxies.size())
+ {
+ sendResponse();
+ }
+ }
+ else
+ {
+ adapter = *_lastAdapter;
+ ++_lastAdapter;
+ }
+ }
-void
-LocatorI::Request::exception(const string& id, const Ice::Exception& ex)
-{
- LocatorAdapterInfo adapter;
+ if(adapter.proxy)
+ {
+ if(_locator->getDirectProxy(adapter, this))
+ {
+ activating(adapter.id);
+ }
+ }
+ }
+
+ virtual void
+ response(const string& id, const Ice::ObjectPrx& proxy)
{
Lock sync(*this);
+ assert(proxy);
if(_proxies.size() == _count) // Nothing to do if we already sent the response.
{
return;
}
- if(!_exception.get())
+
+ _proxies[id] = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"));
+
+ //
+ // If we received all the required proxies, it's time to send the
+ // answer back to the client.
+ //
+ if(_proxies.size() == _count)
{
- _exception.reset(ex.ice_clone());
+ sendResponse();
}
-
- if(_lastAdapter == _adapters.end())
+ }
+
+private:
+
+ void
+ sendResponse()
+ {
+ if(_proxies.size() == 1)
+ {
+ _amdCB->ice_response(_proxies.begin()->second);
+ }
+ else if(_proxies.empty())
{
- --_count; // Expect one less adapter proxy if there's no more adapters to query.
-
//
- // If we received all the required proxies, it's time to send the
- // answer back to the client.
+ // If there's no proxies, it's either because we couldn't contact the adapters or
+ // because the replica group has no members.
//
- if(_count == _proxies.size())
+ assert(_exception.get() || _adapters.empty());
+ if(_traceLevels->locator > 0)
{
- sendResponse();
+ Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
+ out << "couldn't resolve replica group `" << _id << "' endpoints:\n";
+ out << (_exception.get() ? toString(*_exception) : string("replica group is empty"));
}
+ _amdCB->ice_response(0);
}
- else
+ else if(_proxies.size() > 1)
{
- adapter = *_lastAdapter;
- ++_lastAdapter;
+ Ice::EndpointSeq endpoints;
+ endpoints.reserve(_proxies.size());
+ for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ {
+ map<string, Ice::ObjectPrx>::const_iterator q = _proxies.find(p->id);
+ if(q != _proxies.end())
+ {
+ Ice::EndpointSeq edpts = q->second->ice_getEndpoints();
+ endpoints.insert(endpoints.end(), edpts.begin(), edpts.end());
+ }
+ }
+
+ Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default");
+ _amdCB->ice_response(proxy->ice_endpoints(endpoints));
}
}
- if(adapter.proxy)
+ const Ice::AMD_Locator_findAdapterByIdPtr _amdCB;
+ const LocatorIPtr _locator;
+ const std::string _id;
+ LocatorAdapterInfoSeq _adapters;
+ const TraceLevelsPtr _traceLevels;
+ unsigned int _count;
+ LocatorAdapterInfoSeq::const_iterator _lastAdapter;
+ std::map<std::string, Ice::ObjectPrx> _proxies;
+ std::auto_ptr<Ice::Exception> _exception;
+};
+
+class RoundRobinRequest : public LocatorI::Request, public IceUtil::Mutex
+{
+public:
+
+ RoundRobinRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB,
+ const LocatorIPtr& locator,
+ const string& id,
+ const LocatorAdapterInfoSeq& adapters,
+ int count) :
+ _amdCB(amdCB),
+ _locator(locator),
+ _id(id),
+ _adapters(adapters),
+ _traceLevels(locator->getTraceLevels()),
+ _count(count),
+ _waitForActivation(false)
{
+ assert(_adapters.empty() || _count > 0);
+ }
+
+ virtual void
+ execute()
+ {
+ if(_adapters.empty())
+ {
+ if(_traceLevels->locator > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
+ out << "couldn't resolve replica group `" << _id << "' endpoints:\nreplica group is empty";
+ }
+ _amdCB->ice_response(0);
+ return;
+ }
+
+ LocatorAdapterInfo adapter = _adapters[0];
+ assert(adapter.proxy);
if(_locator->getDirectProxy(adapter, this))
{
- activating();
+ activating(adapter.id);
}
}
-}
-void
-LocatorI::Request::response(const string& id, const Ice::ObjectPrx& proxy)
-{
- if(!proxy)
+ virtual void
+ activating(const string& id)
{
- exception(id, AdapterNotActiveException());
- return;
+ LocatorAdapterInfo adapter;
+ adapter.id = id;
+ do
+ {
+ Lock sync(*this);
+ if(_adapters.empty() || _waitForActivation)
+ {
+ return;
+ }
+ _activatingOrFailed.insert(adapter.id);
+ adapter = nextAdapter();
+ }
+ while(adapter.proxy && _locator->getDirectProxy(adapter, this));
}
- Lock sync(*this);
- if(_proxies.size() == _count) // Nothing to do if we already sent the response.
+ virtual void
+ response(const std::string& id, const Ice::ObjectPrx& proxy)
{
- return;
- }
-
- _proxies[id] = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"));
+ Lock sync(*this);
+ assert(proxy);
+ if(_adapters.empty() || id != _adapters[0].id)
+ {
+ return;
+ }
- //
- // If we received all the required proxies, it's time to send the
- // answer back to the client.
- //
- if(_proxies.size() == _count)
- {
- sendResponse();
+ if(_count > 1)
+ {
+ Ice::ObjectPrx p = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"));
+ LocatorI::RequestPtr request = new ReplicaGroupRequest(_amdCB, _locator, _id, _adapters, _count, p);
+ request->execute();
+ }
+ else
+ {
+ _amdCB->ice_response(proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")));
+ }
+ _adapters.clear();
}
-}
-void
-LocatorI::Request::sendResponse()
-{
- int roundRobinCount = 0;
- if(_proxies.size() == 1)
+ virtual void
+ exception(const std::string& id, const Ice::Exception& ex)
{
- if(_roundRobin)
+ LocatorAdapterInfo adapter;
{
- for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ Lock sync(*this);
+ _failed.insert(id);
+ _activatingOrFailed.insert(id);
+
+ if(!_exception.get())
{
- if(_proxies.find(p->id) != _proxies.end())
- {
- break;
- }
- //
- // We count the number of object adapters which are inactive until we find
- // one active. This count will be used to update the round robin counter.
- //
- ++roundRobinCount;
+ _exception.reset(ex.ice_clone());
}
+
+ if(_adapters.empty() || id != _adapters[0].id)
+ {
+ return;
+ }
+
+ adapter = nextAdapter();
}
- _amdCB->ice_response(_proxies.begin()->second);
- }
- else if(_proxies.empty())
- {
- //
- // If there's no proxies, it's either because we couldn't
- // contact the adapters or because the replica group has
- // no members.
- //
- assert(_exception.get() || (_replicaGroup && _adapters.empty()));
- if(_traceLevels->locator > 0)
+ if(adapter.proxy && _locator->getDirectProxy(adapter, this))
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
- out << "couldn't resolve " << (_replicaGroup ? "replica group `" : "adapter `") << _id << "' endpoints:\n";
- out << (_exception.get() ? toString(*_exception) : string("replica group is empty"));
+ activating(adapter.id);
}
- _amdCB->ice_response(0);
}
- else if(_proxies.size() > 1)
+
+private:
+
+ LocatorAdapterInfo
+ nextAdapter()
{
- Ice::EndpointSeq endpoints;
- endpoints.reserve(_proxies.size());
- for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ bool replicaGroup;
+ bool roundRobin;
+
+ _adapters.clear();
+
+ try
{
- map<string, Ice::ObjectPrx>::const_iterator q = _proxies.find(p->id);
- if(q != _proxies.end())
+ if(!_waitForActivation)
{
- Ice::EndpointSeq edpts = q->second->ice_getEndpoints();
- endpoints.insert(endpoints.end(), edpts.begin(), edpts.end());
+ _locator->getAdapterInfo(_id, _adapters, _count, replicaGroup, roundRobin, _activatingOrFailed);
}
- else if(_roundRobin && endpoints.empty())
+
+ if(_waitForActivation || (_adapters.empty() && _activatingOrFailed.size() > _failed.size()))
{
//
- // We count the number of object adapters which are inactive until we find
- // one active. This count will be used to update the round robin counter.
+ // If there are no more adapters to try and some servers were being activated, we
+ // try again but this time we wait for the server activation.
//
- ++roundRobinCount;
+ _locator->getAdapterInfo(_id, _adapters, _count, replicaGroup, roundRobin, _failed);
+ _waitForActivation = true;
}
- }
- Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default");
- _amdCB->ice_response(proxy->ice_endpoints(endpoints));
+ if(!roundRobin)
+ {
+ LocatorI::RequestPtr request;
+ if(replicaGroup)
+ {
+ request = new ReplicaGroupRequest(_amdCB, _locator, _id, _adapters, _count, 0);
+ }
+ else
+ {
+ assert(!_adapters.empty());
+ request = new AdapterRequest(_amdCB, _locator, _adapters[0]);
+ }
+ request->execute();
+ _adapters.clear();
+ return LocatorAdapterInfo();
+ }
+ else if(!_adapters.empty())
+ {
+ return _adapters[0];
+ }
+ else
+ {
+ assert(_adapters.empty());
+ if(_traceLevels->locator > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
+ out << "couldn't resolve replica group `" << _id << "' endpoints:\n";
+ out << (_exception.get() ? toString(*_exception) : string("replica group is empty"));
+ }
+ _amdCB->ice_response(0);
+ return LocatorAdapterInfo();
+ }
+ }
+ catch(const AdapterNotExistException&)
+ {
+ assert(_adapters.empty());
+ _amdCB->ice_exception(Ice::AdapterNotFoundException());
+ return LocatorAdapterInfo();
+ }
+ catch(const Ice::Exception& ex)
+ {
+ assert(_adapters.empty());
+ if(_traceLevels->locator > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
+ out << "couldn't resolve replica group `" << _id << "' endpoints:\n" << toString(ex);
+ }
+ _amdCB->ice_response(0);
+ return LocatorAdapterInfo();
+ }
}
- if(_roundRobin)
- {
- _locator->removePendingRoundRobinRequest(_id, roundRobinCount);
- }
-}
+ const Ice::AMD_Locator_findAdapterByIdPtr _amdCB;
+ const LocatorIPtr _locator;
+ const std::string _id;
+ LocatorAdapterInfoSeq _adapters;
+ const TraceLevelsPtr _traceLevels;
+ int _count;
+ bool _waitForActivation;
+ set<string> _failed;
+ set<string> _activatingOrFailed;
+ std::auto_ptr<Ice::Exception> _exception;
+};
+
+};
+
LocatorI::LocatorI(const Ice::CommunicatorPtr& communicator,
const DatabasePtr& database,
@@ -444,22 +690,6 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
const Ice::Current&) const
{
LocatorIPtr self = const_cast<LocatorI*>(this);
- bool pending = false;
- if(self->addPendingRoundRobinRequest(id, cb, true, pending)) // Add only if there's already round robin requests
- // pending.
- {
- //
- // Another request is currently resolving the adapter endpoints. We'll
- // answer this request once it's done.
- //
- return;
- }
-
- //
- // If no other request is resolving the adapter endpoints, resolve
- // the endpoints now.
- //
-
bool replicaGroup = false;
try
{
@@ -472,36 +702,24 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
LocatorAdapterInfoSeq adapters;
bool roundRobin;
_database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup, roundRobin);
-
- //
- // Round robin replica group requests are serialized. This is
- // required to make sure the round robin counter is accurate
- // even if some adapters are unreachable (bug 2576). For
- // adapters, and replica groups, there's no need to serialize
- // the requests.
- //
+ RequestPtr request;
if(roundRobin)
{
- if(self->addPendingRoundRobinRequest(id, cb, false, pending))
- {
- return;
- }
+ request = new RoundRobinRequest(cb, self, id, adapters, count);
}
- else if(pending)
+ else if(replicaGroup)
{
- self->removePendingRoundRobinRequest(id, 0);
+ request = new ReplicaGroupRequest(cb, self, id, adapters, count, 0);
+ }
+ else
+ {
+ assert(adapters.size() == 1);
+ request = new AdapterRequest(cb, self, adapters[0]);
}
-
- RequestPtr request = new Request(cb, self, id, replicaGroup, roundRobin, adapters, count);
request->execute();
}
catch(const AdapterNotExistException&)
{
- if(pending)
- {
- self->removePendingRoundRobinRequest(id, 0);
- }
-
try
{
cb->ice_response(_database->getAdapterDirectProxy(id));
@@ -514,11 +732,6 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
}
catch(const Ice::Exception& ex)
{
- if(pending)
- {
- self->removePendingRoundRobinRequest(id, 0);
- }
-
const TraceLevelsPtr traceLevels = _database->getTraceLevels();
if(traceLevels->locator > 0)
{
@@ -568,78 +781,6 @@ LocatorI::getTraceLevels() const
}
bool
-LocatorI::addPendingRoundRobinRequest(const string& adapterId,
- const Ice::AMD_Locator_findAdapterByIdPtr& cb,
- bool addIfExists,
- bool& pending)
-{
- Lock sync(*this);
- pending = false;
- map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId);
- if(p == _resolves.end())
- {
- if(addIfExists)
- {
- return false;
- }
- p = _resolves.insert(make_pair(adapterId, deque<Ice::AMD_Locator_findAdapterByIdPtr>())).first;
- }
- else if(p->second.front().get() == cb.get())
- {
- pending = true;
- return false;
- }
-
- p->second.push_back(cb);
- return p->second.size() > 1;
-}
-
-void
-LocatorI::removePendingRoundRobinRequest(const string& adapterId, int roundRobinCount)
-{
- Ice::AMD_Locator_findAdapterByIdPtr cb;
- {
- Lock sync(*this);
-
- //
- // Bump the round robin counter. We bump the round robin counter by
- // the number of inactive adapters. This ensures that if the first
- // adapters are inactive, if the first adapter to be inactive is the
- // Nth adapter, the next adapter to be returned will be the Nth + 1.
- //
- if(roundRobinCount > 0)
- {
- try
- {
- _database->getAdapter(adapterId)->increaseRoundRobinCount(roundRobinCount);
- }
- catch(const Ice::Exception&)
- {
- // Ignore.
- }
- }
-
- map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId);
- assert(p != _resolves.end());
-
- p->second.pop_front();
- if(p->second.empty())
- {
- _resolves.erase(p);
- }
- else
- {
- cb = p->second.front();
- }
- }
-
- if(cb)
- {
- findAdapterById_async(cb, adapterId);
- }
-}
-
-bool
LocatorI::getDirectProxy(const LocatorAdapterInfo& adapter, const RequestPtr& request)
{
{
@@ -673,9 +814,19 @@ LocatorI::getDirectProxyResponse(const LocatorAdapterInfo& adapter, const Ice::O
_activating.erase(adapter.id);
}
- for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
+ if(proxy)
{
- (*q)->response(adapter.id, proxy);
+ for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
+ {
+ (*q)->response(adapter.id, proxy);
+ }
+ }
+ else
+ {
+ for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
+ {
+ (*q)->exception(adapter.id, AdapterNotActiveException());
+ }
}
}
@@ -717,7 +868,7 @@ LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice::
{
for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
{
- (*q)->activating();
+ (*q)->activating(adapter.id);
}
AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, adapter);
@@ -732,3 +883,14 @@ LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice::
}
}
}
+
+void
+LocatorI::getAdapterInfo(const string& id,
+ LocatorAdapterInfoSeq& adapters,
+ int& count,
+ bool& replicaGroup,
+ bool& roundRobin,
+ const set<string>& excludes)
+{
+ _database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup, roundRobin, excludes);
+}
diff --git a/cpp/src/IceGrid/LocatorI.h b/cpp/src/IceGrid/LocatorI.h
index 51befd24e85..124b62b7dc1 100644
--- a/cpp/src/IceGrid/LocatorI.h
+++ b/cpp/src/IceGrid/LocatorI.h
@@ -34,40 +34,14 @@ class LocatorI : public Locator, public IceUtil::Mutex
{
public:
- class Request : public IceUtil::Mutex, public IceUtil::Shared
+ class Request : public IceUtil::Shared
{
public:
- Request(const Ice::AMD_Locator_findAdapterByIdPtr&, const LocatorIPtr&, const std::string&, bool, bool,
- const LocatorAdapterInfoSeq&, int);
-
- void execute();
- void response(const std::string&, const Ice::ObjectPrx&);
- void activating();
- void exception(const std::string&, const Ice::Exception&);
-
- virtual bool
- operator<(const Request& r) const
- {
- return this < &r;
- }
-
- private:
-
- void requestAdapter(const LocatorAdapterInfo&);
- void sendResponse();
-
- const Ice::AMD_Locator_findAdapterByIdPtr _amdCB;
- const LocatorIPtr _locator;
- const std::string _id;
- const bool _replicaGroup;
- const bool _roundRobin;
- LocatorAdapterInfoSeq _adapters;
- const TraceLevelsPtr _traceLevels;
- unsigned int _count;
- LocatorAdapterInfoSeq::const_iterator _lastAdapter;
- std::map<std::string, Ice::ObjectPrx> _proxies;
- std::auto_ptr<Ice::Exception> _exception;
+ virtual void execute() = 0;
+ virtual void activating(const std::string&) = 0;
+ virtual void response(const std::string&, const Ice::ObjectPrx&) = 0;
+ virtual void exception(const std::string&, const Ice::Exception&) = 0;
};
typedef IceUtil::Handle<Request> RequestPtr;
@@ -87,13 +61,12 @@ public:
const Ice::CommunicatorPtr& getCommunicator() const;
const TraceLevelsPtr& getTraceLevels() const;
- bool addPendingRoundRobinRequest(const std::string&, const Ice::AMD_Locator_findAdapterByIdPtr&, bool, bool&);
- void removePendingRoundRobinRequest(const std::string&, int);
-
bool getDirectProxy(const LocatorAdapterInfo&, const RequestPtr&);
void getDirectProxyResponse(const LocatorAdapterInfo&, const Ice::ObjectPrx&);
void getDirectProxyException(const LocatorAdapterInfo&, const Ice::Exception&);
+ void getAdapterInfo(const std::string&, LocatorAdapterInfoSeq&, int&, bool&, bool&, const std::set<std::string>&);
+
protected:
const Ice::CommunicatorPtr _communicator;
@@ -106,8 +79,6 @@ protected:
typedef std::map<std::string, PendingRequests> PendingRequestsMap;
PendingRequestsMap _pendingRequests;
std::set<std::string> _activating;
-
- std::map<std::string, std::deque<Ice::AMD_Locator_findAdapterByIdPtr> > _resolves;
};
}
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
index 6381b39760e..77ed21db79e 100644
--- a/cpp/src/IceGrid/NodeI.cpp
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -831,9 +831,17 @@ NodeI::read(const string& filename, Ice::Long pos, int size, Ice::Long& newPos,
}
void
-NodeI::destroy()
+NodeI::shutdown()
{
IceUtil::Mutex::Lock sync(_serversLock);
+ for(map<string, set<ServerIPtr> >::const_iterator p = _serversByApplication.begin();
+ p != _serversByApplication.end(); ++p)
+ {
+ for(set<ServerIPtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q)
+ {
+ (*q)->shutdown();
+ }
+ }
_serversByApplication.clear();
}
diff --git a/cpp/src/IceGrid/NodeI.h b/cpp/src/IceGrid/NodeI.h
index 087d661a49d..46c6470bb83 100644
--- a/cpp/src/IceGrid/NodeI.h
+++ b/cpp/src/IceGrid/NodeI.h
@@ -92,7 +92,7 @@ public:
virtual Ice::Long getOffsetFromEnd(const std::string&, int, const Ice::Current&) const;
virtual bool read(const std::string&, Ice::Long, int, Ice::Long&, Ice::StringSeq&, const Ice::Current&) const;
- void destroy();
+ void shutdown();
IceUtil::TimerPtr getTimer() const;
Ice::CommunicatorPtr getCommunicator() const;
diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp
index 79b5cd80b67..b01219c4e46 100644
--- a/cpp/src/IceGrid/ServerI.cpp
+++ b/cpp/src/IceGrid/ServerI.cpp
@@ -1694,6 +1694,14 @@ ServerI::terminated(const string& msg, int status)
}
void
+ServerI::shutdown()
+{
+ Lock sync(*this);
+ assert(_state == ServerI::Inactive);
+ _timerTask = 0;
+}
+
+void
ServerI::update()
{
ServerCommandPtr command;
diff --git a/cpp/src/IceGrid/ServerI.h b/cpp/src/IceGrid/ServerI.h
index 375a5c1fc80..9d04b017b7f 100644
--- a/cpp/src/IceGrid/ServerI.h
+++ b/cpp/src/IceGrid/ServerI.h
@@ -109,6 +109,7 @@ public:
void update();
void destroy();
void terminated(const std::string&, int);
+ void shutdown();
//
// A proxy to the Process facet of the real Admin object; called by the AdminFacade servant implementation