summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/LocatorI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/LocatorI.cpp')
-rw-r--r--cpp/src/IceGrid/LocatorI.cpp135
1 files changed, 128 insertions, 7 deletions
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp
index b7361dce4a1..60a5a1b6688 100644
--- a/cpp/src/IceGrid/LocatorI.cpp
+++ b/cpp/src/IceGrid/LocatorI.cpp
@@ -172,15 +172,16 @@ LocatorI::Request::Request(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB,
const LocatorIPtr& locator,
const string& id,
bool replicaGroup,
+ bool roundRobin,
const LocatorAdapterInfoSeq& adapters,
- int count,
- const TraceLevelsPtr& traceLevels) :
+ int count) :
_amdCB(amdCB),
_locator(locator),
_id(id),
_replicaGroup(replicaGroup),
+ _roundRobin(roundRobin),
_adapters(adapters),
- _traceLevels(traceLevels),
+ _traceLevels(locator->getTraceLevels()),
_count(count),
_lastAdapter(_adapters.begin())
{
@@ -207,7 +208,7 @@ LocatorI::Request::execute()
{
Lock sync(*this);
assert(_count > 0 && _lastAdapter != _adapters.end());
- for(unsigned int i = 0; i < _count; ++i)
+ for(unsigned int i = _proxies.size(); i < _count; ++i)
{
if(_lastAdapter == _adapters.end())
{
@@ -334,8 +335,24 @@ LocatorI::Request::response(const string& id, const Ice::ObjectPrx& proxy)
void
LocatorI::Request::sendResponse()
{
+ int roundRobinCount = 0;
if(_proxies.size() == 1)
{
+ if(_roundRobin)
+ {
+ for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ {
+ if(_proxies.find(p->id) != _proxies.end())
+ {
+ break;
+ }
+ //
+ // We count the number of object adapters which are inactive until we find
+ // one active. This count will be used to update the round robin counter.
+ //
+ ++roundRobinCount;
+ }
+ }
_amdCB->ice_response(_proxies.begin()->second);
}
else if(_proxies.empty())
@@ -367,6 +384,14 @@ LocatorI::Request::sendResponse()
Ice::EndpointSeq edpts = q->second->ice_getEndpoints();
endpoints.insert(endpoints.end(), edpts.begin(), edpts.end());
}
+ else if(_roundRobin && endpoints.empty())
+ {
+ //
+ // We count the number of object adapters which are inactive until we find
+ // one active. This count will be used to update the round robin counter.
+ //
+ ++roundRobinCount;
+ }
}
for(set<string>::const_iterator q = _activating.begin(); q != _activating.end(); ++q)
@@ -377,6 +402,11 @@ LocatorI::Request::sendResponse()
Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default");
_amdCB->ice_response(proxy->ice_endpoints(endpoints));
}
+
+ if(_roundRobin)
+ {
+ _locator->removePendingResolve(_id, roundRobinCount);
+ }
}
LocatorI::LocatorI(const Ice::CommunicatorPtr& communicator,
@@ -440,6 +470,21 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
const string& id,
const Ice::Current&) const
{
+ LocatorIPtr self = const_cast<LocatorI*>(this);
+ if(self->addPendingResolve(id, cb))
+ {
+ //
+ // Another request is currently resolving the adapter endpoints. We'll
+ // answer this request once it's done.
+ //
+ return;
+ }
+
+ //
+ // If no other request is resolving the adapter endpoints, resolve
+ // the endpoints now.
+ //
+
bool replicaGroup = false;
try
{
@@ -450,10 +495,22 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
//
int count;
LocatorAdapterInfoSeq adapters;
- _database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup);
+ bool roundRobin;
+ _database->getAdapter(id)->getLocatorAdapterInfo(adapters, count, replicaGroup, roundRobin);
+
+ //
+ // Round robin replica group requests are serialized. This is
+ // required to make sure the round robin counter is accurate
+ // even if some adapters are unreachable (bug 2576). For
+ // adapters, and replica groups, there's no need to serialize
+ // the requests.
+ //
+ if(!roundRobin)
+ {
+ self->removePendingResolve(id, 0);
+ }
- LocatorIPtr self = const_cast<LocatorI*>(this);
- RequestPtr request = new Request(cb, self, id, replicaGroup, adapters, count, _database->getTraceLevels());
+ RequestPtr request = new Request(cb, self, id, replicaGroup, roundRobin, adapters, count);
request->execute();
}
catch(const AdapterNotExistException&)
@@ -466,6 +523,7 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
{
cb->ice_exception(Ice::AdapterNotFoundException());
}
+ self->removePendingResolve(id, 0);
return;
}
catch(const Ice::Exception& ex)
@@ -484,6 +542,7 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
}
}
cb->ice_response(0);
+ self->removePendingResolve(id, 0);
return;
}
}
@@ -512,6 +571,12 @@ LocatorI::getCommunicator() const
return _communicator;
}
+const TraceLevelsPtr&
+LocatorI::getTraceLevels() const
+{
+ return _database->getTraceLevels();
+}
+
void
LocatorI::activate(const LocatorAdapterInfo& adapter, const RequestPtr& request)
{
@@ -583,3 +648,59 @@ LocatorI::activateException(const string& id, const Ice::Exception& ex)
(*q)->exception(id, ex);
}
}
+
+bool
+LocatorI::addPendingResolve(const string& adapterId, const Ice::AMD_Locator_findAdapterByIdPtr& cb)
+{
+ Lock sync(*this);
+ map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId);
+ if(p == _resolves.end())
+ {
+ p = _resolves.insert(make_pair(adapterId, deque<Ice::AMD_Locator_findAdapterByIdPtr>())).first;
+ }
+ else if(p->second.front().get() == cb.get())
+ {
+ return false;
+ }
+
+ p->second.push_back(cb);
+ return p->second.size() > 1;
+}
+
+void
+LocatorI::removePendingResolve(const string& adapterId, int roundRobinCount)
+{
+ Ice::AMD_Locator_findAdapterByIdPtr cb;
+ {
+ Lock sync(*this);
+
+ //
+ // Bump the round robin counter. We bump the round robin counter by
+ // the number of inactive adapters. This ensures that if the first
+ // adapters are inactive, if the first adapter to be inactive is the
+ // Nth adapter, the next adapter to be returned will be the Nth + 1.
+ //
+ if(roundRobinCount > 0)
+ {
+ _database->getAdapter(adapterId)->increaseRoundRobinCount(roundRobinCount);
+ }
+
+ map<string, deque<Ice::AMD_Locator_findAdapterByIdPtr> >::iterator p = _resolves.find(adapterId);
+ assert(p != _resolves.end());
+
+ p->second.pop_front();
+ if(p->second.empty())
+ {
+ _resolves.erase(p);
+ }
+ else
+ {
+ cb = p->second.front();
+ }
+ }
+
+ if(cb)
+ {
+ findAdapterById_async(cb, adapterId);
+ }
+}