summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2008-01-11 14:14:33 +0100
committerBenoit Foucher <benoit@zeroc.com>2008-01-11 14:14:33 +0100
commit81255b93d09d5963eeb40dba272838650c939994 (patch)
treeb977ac8538f1933f5ad7d7ff2a7229951108adfc /cpp/src
parentAllow third party software locations to be set from environment (diff)
downloadice-81255b93d09d5963eeb40dba272838650c939994.tar.bz2
ice-81255b93d09d5963eeb40dba272838650c939994.tar.xz
ice-81255b93d09d5963eeb40dba272838650c939994.zip
Fixed bug 2576
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceGrid/AdapterCache.cpp24
-rw-r--r--cpp/src/IceGrid/AdapterCache.h9
-rw-r--r--cpp/src/IceGrid/LocatorI.cpp135
-rw-r--r--cpp/src/IceGrid/LocatorI.h13
4 files changed, 165 insertions, 16 deletions
diff --git a/cpp/src/IceGrid/AdapterCache.cpp b/cpp/src/IceGrid/AdapterCache.cpp
index e3bf7562803..446696fdcdf 100644
--- a/cpp/src/IceGrid/AdapterCache.cpp
+++ b/cpp/src/IceGrid/AdapterCache.cpp
@@ -230,16 +230,23 @@ ServerAdapterEntry::ServerAdapterEntry(AdapterCache& cache,
}
void
-ServerAdapterEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& nReplicas, bool& replicaGroup)
+ServerAdapterEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& nReplicas, bool& replicaGroup,
+ bool& roundRobin)
{
nReplicas = 1;
replicaGroup = false;
+ roundRobin = false;
LocatorAdapterInfo info;
info.id = _id;
info.proxy = _server->getAdapter(info.activationTimeout, info.deactivationTimeout, _id, true);
adapters.push_back(info);
}
+void
+ServerAdapterEntry::increaseRoundRobinCount(int roundRobinCount)
+{
+}
+
float
ServerAdapterEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const
{
@@ -375,7 +382,8 @@ ReplicaGroupEntry::update(const LoadBalancingPolicyPtr& policy)
}
void
-ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& nReplicas, bool& replicaGroup)
+ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& nReplicas, bool& replicaGroup,
+ bool& roundRobin)
{
vector<ServerAdapterEntryPtr> replicas;
bool adaptive = false;
@@ -383,6 +391,7 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n
{
Lock sync(*this);
replicaGroup = true;
+ roundRobin = false;
nReplicas = _loadBalancingNReplicas > 0 ? _loadBalancingNReplicas : static_cast<int>(_replicas.size());
if(_replicas.empty())
@@ -398,6 +407,7 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n
replicas.push_back(_replicas[(_lastReplica + i) % _replicas.size()]);
}
_lastReplica = (_lastReplica + 1) % static_cast<int>(_replicas.size());
+ roundRobin = true;
}
else if(AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing))
{
@@ -447,7 +457,8 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n
{
int dummy;
bool dummy2;
- (*p)->getLocatorAdapterInfo(adapters, dummy, dummy2);
+ bool dummy3;
+ (*p)->getLocatorAdapterInfo(adapters, dummy, dummy2, dummy3);
}
catch(const AdapterNotExistException&)
{
@@ -461,6 +472,13 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n
}
}
+void
+ReplicaGroupEntry::increaseRoundRobinCount(int count)
+{
+ Lock sync(*this);
+ _lastReplica = (_lastReplica + count) % static_cast<int>(_replicas.size());
+}
+
float
ReplicaGroupEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const
{
diff --git a/cpp/src/IceGrid/AdapterCache.h b/cpp/src/IceGrid/AdapterCache.h
index cddd18b7f72..ac5440ca3ce 100644
--- a/cpp/src/IceGrid/AdapterCache.h
+++ b/cpp/src/IceGrid/AdapterCache.h
@@ -43,7 +43,8 @@ public:
AdapterEntry(AdapterCache&, const std::string&, const std::string&);
- virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&) = 0;
+ virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&) = 0;
+ virtual void increaseRoundRobinCount(int) = 0;
virtual float getLeastLoadedNodeLoad(LoadSample) const = 0;
virtual AdapterInfoSeq getAdapterInfo() const = 0;
@@ -67,7 +68,8 @@ public:
ServerAdapterEntry(AdapterCache&, const std::string&, const std::string&, const std::string&, int,
const ServerEntryPtr&);
- virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&);
+ virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&);
+ virtual void increaseRoundRobinCount(int);
virtual float getLeastLoadedNodeLoad(LoadSample) const;
virtual AdapterInfoSeq getAdapterInfo() const;
virtual const std::string& getReplicaGroupId() const { return _replicaGroupId; }
@@ -89,7 +91,8 @@ public:
ReplicaGroupEntry(AdapterCache&, const std::string&, const std::string&, const LoadBalancingPolicyPtr&);
- virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&);
+ virtual void getLocatorAdapterInfo(LocatorAdapterInfoSeq&, int&, bool&, bool&);
+ virtual void increaseRoundRobinCount(int);
virtual float getLeastLoadedNodeLoad(LoadSample) const;
virtual AdapterInfoSeq getAdapterInfo() const;
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp
index b7361dce4a1..60a5a1b6688 100644
--- a/cpp/src/IceGrid/LocatorI.cpp
+++ b/cpp/src/IceGrid/LocatorI.cpp
@@ -172,15 +172,16 @@ LocatorI::Request::Request(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB,
const LocatorIPtr& locator,
const string& id,
bool replicaGroup,
+ bool roundRobin,
const LocatorAdapterInfoSeq& adapters,
- int count,
- const TraceLevelsPtr& traceLevels) :
+ int count) :
_amdCB(amdCB),
_locator(locator),
_id(id),
_replicaGroup(replicaGroup),
+ _roundRobin(roundRobin),
_adapters(adapters),
- _traceLevels(traceLevels),
+ _traceLevels(locator->getTraceLevels()),
_count(count),
_lastAdapter(_adapters.begin())
{
@@ -207,7 +208,7 @@ LocatorI::Request::execute()
{
Lock sync(*this);
assert(_count > 0 && _lastAdapter != _adapters.end());
- for(unsigned int i = 0; i < _count; ++i)
+ for(unsigned int i = _proxies.size(); i < _count; ++i)
{
if(_lastAdapter == _adapters.end())
{
@@ -334,8 +335,24 @@ LocatorI::Request::response(const string& id, const Ice::ObjectPrx& proxy)
void
LocatorI::Request::sendResponse()
{
+ int roundRobinCount = 0;
if(_proxies.size() == 1)
{
+ if(_roundRobin)
+ {
+ for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ {
+ if(_proxies.find(p->id) != _proxies.end())
+ {
+ break;
+ }
+ //
+ // We count the number of object adapters which are inactive until we find
+ // one active. This count will be used to update the round robin counter.
+ //
+ ++roundRobinCount;
+ }
+ }
_amdCB->ice_response(_proxies.begin()->second);
}
else if(_proxies.empty())
@@ -367,6 +384,14 @@ LocatorI::Request::sendResponse()
Ice::EndpointSeq edpts = q->second->ice_getEndpoints();
endpoints.insert(endpoints.end(), edpts.begin(), edpts.end());
}
+ else if(_roundRobin && endpoints.empty())
+ {
+ //
+ // We count the number of object adapters which are inactive until we find
+ // one active. This count will be used to update the round robin counter.
+ //
+ ++roundRobinCount;
+ }
}
for(set<string>::const_iterator q = _activating.begin(); q != _activating.end(); ++q)
@@ -377,6 +402,11 @@ LocatorI::Request::sendResponse()
Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default");
_amdCB->ice_response(proxy->ice_endpoints(endpoints));
}
+
+ if(_roundRobin)
+ {
+ _locator->removePendingResolve(_id, roundRobinCount);
+ }
}
LocatorI::LocatorI(const Ice::CommunicatorPtr& communicator,
@@ -440,6 +470,21 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
const string& id,
const Ice::Current&) const
{
+ LocatorIPtr self = const_cast<LocatorI*>(this);
+ if(self->addPendingResolve(id, cb))
+ {
+ //
+ // Another request is currently resolving the adapter endpoints. We'll
+ // answer this request once it's done.
+ //
+ return;
+ }
+
+ //
+ // If no other request is resolving the adapter endpoints, resolve
+ // the endpoints now.
+ //
+
bool replicaGroup = false;
try
{
@@ -450,10 +495,22 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
//
int count;
LocatorAdapterInfoSeq adapters;
- _database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup);
+ bool roundRobin;
+ _database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup, roundRobin);
+
+ //
+ // Round robin replica group requests are serialized. This is
+ // required to make sure the round robin counter is accurate
+ // even if some adapters are unreachable (bug 2576). For
+ // adapters, and replica groups, there's no need to serialize
+ // the requests.
+ //
+ if(!roundRobin)
+ {
+ self->removePendingResolve(id, 0);
+ }
- LocatorIPtr self = const_cast<LocatorI*>(this);
- RequestPtr request = new Request(cb, self, id, replicaGroup, adapters, count, _database->getTraceLevels());
+ RequestPtr request = new Request(cb, self, id, replicaGroup, roundRobin, adapters, count);
request->execute();
}
catch(const AdapterNotExistException&)
@@ -466,6 +523,7 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
{
cb->ice_exception(Ice::AdapterNotFoundException());
}
+ self->removePendingResolve(id, 0);
return;
}
catch(const Ice::Exception& ex)
@@ -484,6 +542,7 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
}
}
cb->ice_response(0);
+ self->removePendingResolve(id, 0);
return;
}
}
@@ -512,6 +571,12 @@ LocatorI::getCommunicator() const
return _communicator;
}
+const TraceLevelsPtr&
+LocatorI::getTraceLevels() const
+{
+ return _database->getTraceLevels();
+}
+
void
LocatorI::activate(const LocatorAdapterInfo& adapter, const RequestPtr& request)
{
@@ -583,3 +648,59 @@ LocatorI::activateException(const string& id, const Ice::Exception& ex)
(*q)->exception(id, ex);
}
}
+
+bool
+LocatorI::addPendingResolve(const string& adapterId, const Ice::AMD_Locator_findAdapterByIdPtr& cb)
+{
+ Lock sync(*this);
+ map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId);
+ if(p == _resolves.end())
+ {
+ p = _resolves.insert(make_pair(adapterId, deque<Ice::AMD_Locator_findAdapterByIdPtr>())).first;
+ }
+ else if(p->second.front().get() == cb.get())
+ {
+ return false;
+ }
+
+ p->second.push_back(cb);
+ return p->second.size() > 1;
+}
+
+void
+LocatorI::removePendingResolve(const string& adapterId, int roundRobinCount)
+{
+ Ice::AMD_Locator_findAdapterByIdPtr cb;
+ {
+ Lock sync(*this);
+
+ //
+ // Bump the round robin counter. We bump the round robin counter by
+ // the number of inactive adapters. This ensures that if the first
+ // adapters are inactive, if the first adapter to be inactive is the
+ // Nth adapter, the next adapter to be returned will be the Nth + 1.
+ //
+ if(roundRobinCount > 0)
+ {
+ _database->getAdapter(adapterId)->increaseRoundRobinCount(roundRobinCount);
+ }
+
+ map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId);
+ assert(p != _resolves.end());
+
+ p->second.pop_front();
+ if(p->second.empty())
+ {
+ _resolves.erase(p);
+ }
+ else
+ {
+ cb = p->second.front();
+ }
+ }
+
+ if(cb)
+ {
+ findAdapterById_async(cb, adapterId);
+ }
+}
diff --git a/cpp/src/IceGrid/LocatorI.h b/cpp/src/IceGrid/LocatorI.h
index f0f97239c83..778f2443f55 100644
--- a/cpp/src/IceGrid/LocatorI.h
+++ b/cpp/src/IceGrid/LocatorI.h
@@ -38,8 +38,8 @@ public:
{
public:
- Request(const Ice::AMD_Locator_findAdapterByIdPtr&, const LocatorIPtr&, const std::string&, bool,
- const LocatorAdapterInfoSeq&, int, const TraceLevelsPtr&);
+ Request(const Ice::AMD_Locator_findAdapterByIdPtr&, const LocatorIPtr&, const std::string&, bool, bool,
+ const LocatorAdapterInfoSeq&, int);
void execute();
void response(const std::string&, const Ice::ObjectPrx&);
@@ -61,6 +61,7 @@ public:
const LocatorIPtr _locator;
const std::string _id;
const bool _replicaGroup;
+ const bool _roundRobin;
LocatorAdapterInfoSeq _adapters;
const TraceLevelsPtr _traceLevels;
unsigned int _count;
@@ -78,13 +79,14 @@ public:
const Ice::Current&) const;
virtual void findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr&, const ::std::string&,
- const Ice::Current&) const;
+ const Ice::Current& = Ice::Current()) const;
virtual Ice::LocatorRegistryPrx getRegistry(const Ice::Current&) const;
virtual RegistryPrx getLocalRegistry(const Ice::Current&) const;
virtual QueryPrx getLocalQuery(const Ice::Current&) const;
const Ice::CommunicatorPtr& getCommunicator() const;
+ const TraceLevelsPtr& getTraceLevels() const;
void activate(const LocatorAdapterInfo&, const RequestPtr&);
void cancelActivate(const std::string&, const RequestPtr&);
@@ -92,6 +94,9 @@ public:
void activateFinished(const std::string&, const Ice::ObjectPrx&);
void activateException(const std::string&, const Ice::Exception&);
+ bool addPendingResolve(const std::string&, const Ice::AMD_Locator_findAdapterByIdPtr&);
+ void removePendingResolve(const std::string&, int);
+
protected:
const Ice::CommunicatorPtr _communicator;
@@ -104,6 +109,8 @@ protected:
typedef std::map<std::string, PendingRequests> PendingRequestsMap;
PendingRequestsMap _pendingRequests;
+
+ std::map<std::string, std::deque<Ice::AMD_Locator_findAdapterByIdPtr> > _resolves;
};
}