summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/AdapterCache.cpp
diff options
context:
space:
mode:
authorJoe George <joe@zeroc.com>2021-01-28 16:26:44 -0500
committerJoe George <joe@zeroc.com>2021-02-01 16:59:30 -0500
commit92a6531e409f2691d82591e185a92299d415fc0f (patch)
tree60c79e2a8f327b8f0b6ebc06b06f48a2e8086f6a /cpp/src/IceGrid/AdapterCache.cpp
parentPort Glacier2, IceBox, IceBridge, IceDB, IceXML, icegriddb (diff)
downloadice-92a6531e409f2691d82591e185a92299d415fc0f.tar.bz2
ice-92a6531e409f2691d82591e185a92299d415fc0f.tar.xz
ice-92a6531e409f2691d82591e185a92299d415fc0f.zip
IceGrid and IceStorm
Diffstat (limited to 'cpp/src/IceGrid/AdapterCache.cpp')
-rw-r--r--cpp/src/IceGrid/AdapterCache.cpp332
1 files changed, 158 insertions, 174 deletions
diff --git a/cpp/src/IceGrid/AdapterCache.cpp b/cpp/src/IceGrid/AdapterCache.cpp
index 941a653b5f3..e6fd6523609 100644
--- a/cpp/src/IceGrid/AdapterCache.cpp
+++ b/cpp/src/IceGrid/AdapterCache.cpp
@@ -11,6 +11,7 @@
#include <IceGrid/ServerCache.h>
#include <IceGrid/NodeCache.h>
#include <IceGrid/SessionI.h>
+#include <IceGrid/Internal.h>
#include <functional>
@@ -20,52 +21,11 @@ using namespace IceGrid;
namespace IceGrid
{
-struct ReplicaLoadComp : binary_function<ServerAdapterEntryPtr&, ServerAdapterEntryPtr&, bool>
-{
- bool operator()(const pair<float, ServerAdapterEntryPtr>& lhs, const pair<float, ServerAdapterEntryPtr>& rhs)
- {
- return lhs.first < rhs.first;
- }
-};
-
-struct ReplicaPriorityComp : binary_function<ServerAdapterEntryPtr&, ServerAdapterEntryPtr&, bool>
-{
- bool operator()(const ServerAdapterEntryPtr& lhs, const ServerAdapterEntryPtr& rhs)
- {
- return lhs->getPriority() < rhs->getPriority();
- }
-};
-
-struct TransformToReplicaLoad :
- public unary_function<const ServerAdapterEntryPtr&, pair<float, ServerAdapterEntryPtr> >
+class ReplicaGroupSyncCallback final : public SynchronizationCallback
{
public:
- TransformToReplicaLoad(LoadSample loadSample) : _loadSample(loadSample) { }
-
- pair<float, ServerAdapterEntryPtr>
- operator()(const ServerAdapterEntryPtr& value)
- {
- return make_pair(value->getLeastLoadedNodeLoad(_loadSample), value);
- }
-
- LoadSample _loadSample;
-};
-
-struct TransformToReplica : public unary_function<const pair<string, ServerAdapterEntryPtr>&, ServerAdapterEntryPtr>
-{
- ServerAdapterEntryPtr
- operator()(const pair<float, ServerAdapterEntryPtr>& value)
- {
- return value.second;
- }
-};
-
-class ReplicaGroupSyncCallback : public SynchronizationCallback, public IceUtil::Mutex
-{
-public:
-
- ReplicaGroupSyncCallback(const SynchronizationCallbackPtr& callback, int count, int nReplicas) :
+ ReplicaGroupSyncCallback(const shared_ptr<SynchronizationCallback>& callback, int count, int nReplicas) :
_callback(callback),
_responseCalled(false),
_synchronizeCount(count),
@@ -77,20 +37,20 @@ public:
bool
response()
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
_responseCalled = true;
if(_synchronizedCount >= _nReplicas)
{
- _callback = 0;
+ _callback = nullptr;
return false;
}
else if(_synchronizeCount == 0)
{
- if(_synchronizedCount == 0 && _exception.get())
+ if(_synchronizedCount == 0 && _exception)
{
- _exception->ice_throw();
+ rethrow_exception(_exception);
}
- _callback = 0;
+ _callback = nullptr;
return false;
}
return true;
@@ -99,9 +59,9 @@ public:
void
synchronized()
{
- SynchronizationCallbackPtr callback;
+ shared_ptr<SynchronizationCallback> callback;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
++_synchronizedCount;
--_synchronizeCount;
@@ -116,7 +76,7 @@ public:
}
callback = _callback;
- _callback = 0;
+ _callback = nullptr;
}
if(callback)
@@ -126,14 +86,14 @@ public:
}
void
- synchronized(const Ice::Exception& ex)
+ synchronized(exception_ptr ex)
{
- SynchronizationCallbackPtr callback;
+ shared_ptr<SynchronizationCallback> callback;
{
- Lock sync(*this);
- if(!_exception.get())
+ lock_guard lock(_mutex);
+ if(!_exception)
{
- _exception.reset(ex.ice_clone());
+ _exception = ex;
}
--_synchronizeCount;
@@ -159,19 +119,20 @@ public:
private:
- SynchronizationCallbackPtr _callback;
+ shared_ptr<SynchronizationCallback> _callback;
bool _responseCalled;
int _synchronizeCount;
int _synchronizedCount;
int _nReplicas;
- IceInternal::UniquePtr<Ice::Exception> _exception;
+ exception_ptr _exception;
+
+ mutex _mutex;
};
-typedef IceUtil::Handle<ReplicaGroupSyncCallback> ReplicaGroupSyncCallbackPtr;
}
void
-GetAdapterInfoResult::add(const ServerAdapterEntryPtr& adapter)
+GetAdapterInfoResult::add(const ServerAdapterEntry* adapter)
{
AdapterInfo info;
info.id = adapter->getId();
@@ -179,15 +140,15 @@ GetAdapterInfoResult::add(const ServerAdapterEntryPtr& adapter)
_adapters.push_back(info);
try
{
- _results.push_back(adapter->getProxy("", true)->begin_getDirectProxy());
+ _results.push_back(adapter->getProxy("", true)->getDirectProxyAsync());
}
catch(const SynchronizationException&)
{
- _results.push_back(0);
+ _results.push_back(nullopt);
}
catch(const Ice::Exception&)
{
- _results.push_back(0);
+ _results.push_back(nullopt);
}
}
@@ -195,13 +156,13 @@ AdapterInfoSeq
GetAdapterInfoResult::get()
{
vector<AdapterInfo>::iterator q = _adapters.begin();
- for(vector<Ice::AsyncResultPtr>::const_iterator p = _results.begin(); p != _results.end(); ++p, ++q)
+ for(auto p = _results.begin(); p != _results.end(); ++p, ++q)
{
try
{
if(*p)
{
- q->proxy = AdapterPrx::uncheckedCast((*p)->getProxy())->end_getDirectProxy(*p);
+ q->proxy = Ice::uncheckedCast<AdapterPrx>((*p)->get());
}
}
catch(const Ice::Exception&)
@@ -211,14 +172,14 @@ GetAdapterInfoResult::get()
return _adapters;
}
-AdapterCache::AdapterCache(const Ice::CommunicatorPtr& communicator) : _communicator(communicator)
+AdapterCache::AdapterCache(const shared_ptr<Ice::Communicator>& communicator) : _communicator(communicator)
{
}
void
-AdapterCache::addServerAdapter(const AdapterDescriptor& desc, const ServerEntryPtr& server, const string& app)
+AdapterCache::addServerAdapter(const AdapterDescriptor& desc, const shared_ptr<ServerEntry>& server, const string& app)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(getImpl(desc.id))
{
Ice::Error out(_communicator->getLogger());
@@ -226,23 +187,29 @@ AdapterCache::addServerAdapter(const AdapterDescriptor& desc, const ServerEntryP
return;
}
- istringstream is(desc.priority);
int priority = 0;
- is >> priority;
+ try
+ {
+ priority = stoi(desc.priority);
+ }
+ catch(const std::exception&)
+ {
+ }
- ServerAdapterEntryPtr entry = new ServerAdapterEntry(*this, desc.id, app, desc.replicaGroupId, priority, server);
+ auto entry = make_shared<ServerAdapterEntry>(*this, desc.id, app, desc.replicaGroupId, priority, server);
addImpl(desc.id, entry);
if(!desc.replicaGroupId.empty())
{
- ReplicaGroupEntryPtr repEntry = ReplicaGroupEntryPtr::dynamicCast(getImpl(desc.replicaGroupId));
+ auto repEntry = dynamic_pointer_cast<ReplicaGroupEntry>(getImpl(desc.replicaGroupId));
if(!repEntry)
{
//
// Add an un-assigned replica group, the replica group will in theory be added
// shortly after when its application is loaded.
//
- repEntry = new ReplicaGroupEntry(*this, desc.replicaGroupId, "", new RandomLoadBalancingPolicy("0"), "");
+ repEntry = make_shared<ReplicaGroupEntry>(*this, desc.replicaGroupId, "",
+ make_shared<RandomLoadBalancingPolicy>("0"), "");
addImpl(desc.replicaGroupId, repEntry);
}
repEntry->addReplica(desc.id, entry);
@@ -252,8 +219,8 @@ AdapterCache::addServerAdapter(const AdapterDescriptor& desc, const ServerEntryP
void
AdapterCache::addReplicaGroup(const ReplicaGroupDescriptor& desc, const string& app)
{
- Lock sync(*this);
- ReplicaGroupEntryPtr repEntry = ReplicaGroupEntryPtr::dynamicCast(getImpl(desc.id));
+ lock_guard lock(_mutex);
+ auto repEntry = dynamic_pointer_cast<ReplicaGroupEntry>(getImpl(desc.id));
if(repEntry)
{
//
@@ -271,14 +238,15 @@ AdapterCache::addReplicaGroup(const ReplicaGroupDescriptor& desc, const string&
}
return;
}
- addImpl(desc.id, new ReplicaGroupEntry(*this, desc.id, app, desc.loadBalancing, desc.filter));
+ addImpl(desc.id, make_shared<ReplicaGroupEntry>(*this, desc.id, app, desc.loadBalancing, desc.filter));
}
-AdapterEntryPtr
+shared_ptr<AdapterEntry>
AdapterCache::get(const string& id) const
{
- Lock sync(*this);
- AdapterEntryPtr entry = getImpl(id);
+ lock_guard lock(_mutex);
+
+ auto entry = getImpl(id);
if(!entry)
{
throw AdapterNotExistException(id);
@@ -289,9 +257,9 @@ AdapterCache::get(const string& id) const
void
AdapterCache::removeServerAdapter(const string& id)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
- ServerAdapterEntryPtr entry = ServerAdapterEntryPtr::dynamicCast(getImpl(id));
+ auto entry = dynamic_pointer_cast<ServerAdapterEntry>(getImpl(id));
if(!entry)
{
Ice::Error out(_communicator->getLogger());
@@ -303,7 +271,7 @@ AdapterCache::removeServerAdapter(const string& id)
string replicaGroupId = entry->getReplicaGroupId();
if(!replicaGroupId.empty())
{
- ReplicaGroupEntryPtr repEntry = ReplicaGroupEntryPtr::dynamicCast(getImpl(replicaGroupId));
+ auto repEntry = dynamic_pointer_cast<ReplicaGroupEntry>(getImpl(replicaGroupId));
if(!repEntry)
{
Ice::Error out(_communicator->getLogger());
@@ -325,8 +293,9 @@ AdapterCache::removeServerAdapter(const string& id)
void
AdapterCache::removeReplicaGroup(const string& id)
{
- Lock sync(*this);
- ReplicaGroupEntryPtr entry = ReplicaGroupEntryPtr::dynamicCast(getImpl(id));
+ lock_guard lock(_mutex);
+
+ auto entry = dynamic_pointer_cast<ReplicaGroupEntry>(getImpl(id));
if(!entry)
{
Ice::Error out(_communicator->getLogger());
@@ -336,8 +305,8 @@ AdapterCache::removeReplicaGroup(const string& id)
removeImpl(id);
}
-AdapterEntryPtr
-AdapterCache::addImpl(const string& id, const AdapterEntryPtr& entry)
+shared_ptr<AdapterEntry>
+AdapterCache::addImpl(const string& id, const shared_ptr<AdapterEntry>& entry)
{
if(_traceLevels && _traceLevels->adapter > 0)
{
@@ -388,7 +357,7 @@ ServerAdapterEntry::ServerAdapterEntry(AdapterCache& cache,
const string& application,
const string& replicaGroupId,
int priority,
- const ServerEntryPtr& server) :
+ const shared_ptr<ServerEntry>& server) :
AdapterEntry(cache, id, application),
_replicaGroupId(replicaGroupId),
_priority(priority),
@@ -397,7 +366,7 @@ ServerAdapterEntry::ServerAdapterEntry(AdapterCache& cache,
}
bool
-ServerAdapterEntry::addSyncCallback(const SynchronizationCallbackPtr& callback, const set<string>&)
+ServerAdapterEntry::addSyncCallback(const shared_ptr<SynchronizationCallback>& callback, const set<string>&)
{
try
{
@@ -448,23 +417,18 @@ ServerAdapterEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const
AdapterInfoSeq
ServerAdapterEntry::getAdapterInfoNoEndpoints() const
{
- AdapterInfo info;
- info.id = _id;
- info.replicaGroupId = _replicaGroupId;
- AdapterInfoSeq infos;
- infos.push_back(info);
- return infos;
+ return { { _id, nullptr, _replicaGroupId } };
}
-GetAdapterInfoResultPtr
+shared_ptr<GetAdapterInfoResult>
ServerAdapterEntry::getAdapterInfoAsync() const
{
- GetAdapterInfoResultPtr result = new GetAdapterInfoResult();
- result->add(const_cast<ServerAdapterEntry*>(this));
+ auto result = make_shared<GetAdapterInfoResult>();
+ result->add(this);
return result;
}
-AdapterPrx
+shared_ptr<AdapterPrx>
ServerAdapterEntry::getProxy(const string& replicaGroupId, bool upToDate) const
{
if(replicaGroupId.empty())
@@ -484,9 +448,10 @@ ServerAdapterEntry::getProxy(const string& replicaGroupId, bool upToDate) const
void
ServerAdapterEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters) const
{
- LocatorAdapterInfo info;
- info.id = _id;
- info.proxy = _server->getAdapter(info.activationTimeout, info.deactivationTimeout, _id, true);
+ chrono::seconds activationTimeout, deactivationTimeout;
+ auto proxy = _server->getAdapter(activationTimeout, deactivationTimeout, _id, true);
+
+ LocatorAdapterInfo info = { _id, move(proxy), activationTimeout, deactivationTimeout };
adapters.push_back(info);
}
@@ -518,7 +483,7 @@ ServerAdapterEntry::getNodeName() const
ReplicaGroupEntry::ReplicaGroupEntry(AdapterCache& cache,
const string& id,
const string& application,
- const LoadBalancingPolicyPtr& policy,
+ const shared_ptr<LoadBalancingPolicy>& policy,
const string& filter) :
AdapterEntry(cache, id, application),
_lastReplica(0),
@@ -528,26 +493,27 @@ ReplicaGroupEntry::ReplicaGroupEntry(AdapterCache& cache,
}
bool
-ReplicaGroupEntry::addSyncCallback(const SynchronizationCallbackPtr& callback, const set<string>& excludes)
+ReplicaGroupEntry::addSyncCallback(const shared_ptr<SynchronizationCallback>& callback, const set<string>& excludes)
{
- vector<ServerAdapterEntryPtr> replicas;
+ vector<shared_ptr<ServerAdapterEntry>> replicas;
int nReplicas;
- int roundRobin = false;
+ bool roundRobin = false;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
+
nReplicas = _loadBalancingNReplicas > 0 ? _loadBalancingNReplicas : static_cast<int>(_replicas.size());
- roundRobin = RoundRobinLoadBalancingPolicyPtr::dynamicCast(_loadBalancing);
+ roundRobin = dynamic_pointer_cast<RoundRobinLoadBalancingPolicy>(_loadBalancing) != nullptr;
if(!roundRobin)
{
replicas = _replicas;
}
else
{
- for(vector<ServerAdapterEntryPtr>::const_iterator p = _replicas.begin(); p != _replicas.end(); ++p)
+ for(const auto& replica : _replicas)
{
- if(excludes.find((*p)->getId()) == excludes.end())
+ if(excludes.find(replica->getId()) == excludes.end())
{
- replicas.push_back(*p);
+ replicas.push_back(replica);
}
}
}
@@ -558,39 +524,37 @@ ReplicaGroupEntry::addSyncCallback(const SynchronizationCallbackPtr& callback, c
}
}
- ReplicaGroupSyncCallbackPtr cb = new ReplicaGroupSyncCallback(callback,
- static_cast<int>(replicas.size()),
- nReplicas);
+ auto cb = make_shared<ReplicaGroupSyncCallback>(callback, static_cast<int>(replicas.size()), nReplicas);
set<string> emptyExcludes;
- for(vector<ServerAdapterEntryPtr>::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ for(const auto& replica : replicas)
{
try
{
- if(!(*p)->addSyncCallback(cb, emptyExcludes))
+ if(!replica->addSyncCallback(cb, emptyExcludes))
{
cb->synchronized();
}
}
- catch(const Ice::Exception& ex)
+ catch(const std::exception&)
{
- cb->synchronized(ex);
+ cb->synchronized(current_exception());
}
}
return cb->response();
}
void
-ReplicaGroupEntry::addReplica(const string& /*replicaId*/, const ServerAdapterEntryPtr& adapter)
+ReplicaGroupEntry::addReplica(const string& /*replicaId*/, const shared_ptr<ServerAdapterEntry>& adapter)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
_replicas.push_back(adapter);
}
bool
ReplicaGroupEntry::removeReplica(const string& replicaId)
{
- Lock sync(*this);
- for(vector<ServerAdapterEntryPtr>::iterator p = _replicas.begin(); p != _replicas.end(); ++p)
+ lock_guard lock(_mutex);
+ for(auto p = _replicas.cbegin(); p != _replicas.cend(); ++p)
{
if(replicaId == (*p)->getId())
{
@@ -606,37 +570,44 @@ ReplicaGroupEntry::removeReplica(const string& replicaId)
}
void
-ReplicaGroupEntry::update(const string& application, const LoadBalancingPolicyPtr& policy, const string& filter)
+ReplicaGroupEntry::update(const string& application, const shared_ptr<LoadBalancingPolicy>& policy, const string& filter)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
+
assert(policy);
_application = application;
_loadBalancing = policy;
_filter = filter;
- istringstream is(_loadBalancing->nReplicas);
int nReplicas = 0;
- is >> nReplicas;
+ try
+ {
+ nReplicas = stoi(_loadBalancing->nReplicas);
+ }
+ catch(const std::exception&)
+ {
+ }
+
_loadBalancingNReplicas = nReplicas < 0 ? 1 : nReplicas;
- AdaptiveLoadBalancingPolicyPtr alb = AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing);
+ auto alb = dynamic_pointer_cast<AdaptiveLoadBalancingPolicy>(_loadBalancing);
if(alb)
{
if(alb->loadSample == "1")
{
- _loadSample = LoadSample1;
+ _loadSample = LoadSample::LoadSample1;
}
else if(alb->loadSample == "5")
{
- _loadSample = LoadSample5;
+ _loadSample = LoadSample::LoadSample5;
}
else if(alb->loadSample == "15")
{
- _loadSample = LoadSample15;
+ _loadSample = LoadSample::LoadSample15;
}
else
{
- _loadSample = LoadSample1;
+ _loadSample = LoadSample::LoadSample1;
}
}
}
@@ -645,11 +616,11 @@ void
ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& nReplicas, bool& replicaGroup,
bool& roundRobin, string& filter, const set<string>& excludes)
{
- vector<ServerAdapterEntryPtr> replicas;
+ vector<shared_ptr<ServerAdapterEntry>> replicas;
bool adaptive = false;
- LoadSample loadSample = LoadSample1;
+ LoadSample loadSample = LoadSample::LoadSample1;
{
- Lock sync(*this);
+ unique_lock lock(_mutex);
replicaGroup = true;
roundRobin = false;
filter = _filter;
@@ -661,13 +632,10 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n
}
replicas.reserve(_replicas.size());
- if(RoundRobinLoadBalancingPolicyPtr::dynamicCast(_loadBalancing))
+ if(dynamic_pointer_cast<RoundRobinLoadBalancingPolicy>(_loadBalancing))
{
// Serialize round-robin requests
- while(_requestInProgress)
- {
- wait();
- }
+ _condVar.wait(lock, [this] { return !_requestInProgress; } );
_requestInProgress = true;
for(size_t i = 0; i < _replicas.size(); ++i)
{
@@ -676,19 +644,23 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n
_lastReplica = (_lastReplica + 1) % static_cast<int>(_replicas.size());
roundRobin = true;
}
- else if(AdaptiveLoadBalancingPolicyPtr::dynamicCast(_loadBalancing))
+ else if(dynamic_pointer_cast<AdaptiveLoadBalancingPolicy>(_loadBalancing))
{
replicas = _replicas;
IceUtilInternal::shuffle(replicas.begin(), replicas.end());
loadSample = _loadSample;
adaptive = true;
}
- else if(OrderedLoadBalancingPolicyPtr::dynamicCast(_loadBalancing))
+ else if(dynamic_pointer_cast<OrderedLoadBalancingPolicy>(_loadBalancing))
{
replicas = _replicas;
- sort(replicas.begin(), replicas.end(), ReplicaPriorityComp());
+ sort(replicas.begin(), replicas.end(),
+ [](const auto& lhs, const auto& rhs)
+ {
+ return lhs->getPriority() < rhs->getPriority();
+ });
}
- else if(RandomLoadBalancingPolicyPtr::dynamicCast(_loadBalancing))
+ else if(dynamic_pointer_cast<RandomLoadBalancingPolicy>(_loadBalancing))
{
replicas = _replicas;
IceUtilInternal::shuffle(replicas.begin(), replicas.end());
@@ -703,16 +675,20 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n
{
//
// This must be done outside the synchronization block since
- // the trasnform() might call and lock each server adapter
+ // the transform() might call and lock each server adapter
// entry. We also can't sort directly as the load of each
// server adapter is not stable so we first take a snapshot of
// each adapter and sort the snapshot.
//
- vector<pair<float, ServerAdapterEntryPtr> > rl;
- transform(replicas.begin(), replicas.end(), back_inserter(rl), TransformToReplicaLoad(loadSample));
- sort(rl.begin(), rl.end(), ReplicaLoadComp());
+ vector<pair<float, shared_ptr<ServerAdapterEntry>>> rl;
+ transform(replicas.begin(), replicas.end(), back_inserter(rl),
+ [loadSample](const auto& value) -> pair<float, shared_ptr<ServerAdapterEntry>>
+ {
+ return { value -> getLeastLoadedNodeLoad(loadSample), value };
+ });
+ sort(rl.begin(), rl.end(), [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; });
replicas.clear();
- transform(rl.begin(), rl.end(), back_inserter(replicas), TransformToReplica());
+ transform(rl.begin(), rl.end(), back_inserter(replicas), [](const auto& value) { return value.second; });
}
//
@@ -722,13 +698,13 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n
//
set<string> emptyExcludes;
bool firstUnreachable = true;
- for(vector<ServerAdapterEntryPtr>::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ for(const auto& replica : replicas)
{
- if(!roundRobin || excludes.find((*p)->getId()) == excludes.end())
+ if(!roundRobin || excludes.find(replica->getId()) == excludes.end())
{
try
{
- (*p)->getLocatorAdapterInfo(adapters);
+ replica->getLocatorAdapterInfo(adapters);
firstUnreachable = false;
}
catch(const SynchronizationException&)
@@ -745,24 +721,24 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n
}
}
}
- catch(...)
+ catch(const std::exception&)
{
if(roundRobin)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
assert(_requestInProgress);
_requestInProgress = false;
- notify();
+ _condVar.notify_one();
}
throw;
}
if(roundRobin)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
assert(_requestInProgress);
_requestInProgress = false;
- notify();
+ _condVar.notify_one();
if(unreachable > 0)
{
_lastReplica = (_lastReplica + unreachable) % static_cast<int>(_replicas.size());
@@ -778,9 +754,9 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n
float
ReplicaGroupEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const
{
- vector<ServerAdapterEntryPtr> replicas;
+ vector<shared_ptr<ServerAdapterEntry>> replicas;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
replicas = _replicas;
}
@@ -795,9 +771,17 @@ ReplicaGroupEntry::getLeastLoadedNodeLoad(LoadSample loadSample) const
else
{
IceUtilInternal::shuffle(replicas.begin(), replicas.end());
- vector<pair<float, ServerAdapterEntryPtr> > rl;
- transform(replicas.begin(), replicas.end(), back_inserter(rl), TransformToReplicaLoad(loadSample));
- return min_element(rl.begin(), rl.end(), ReplicaLoadComp())->first;
+ vector<pair<float, shared_ptr<ServerAdapterEntry>>> rl;
+ transform(replicas.begin(), replicas.end(), back_inserter(rl),
+ [loadSample] (const auto& value) -> pair<float, shared_ptr<ServerAdapterEntry>>
+ {
+ return { value->getLeastLoadedNodeLoad(loadSample), value };
+ });
+ return min_element(rl.begin(), rl.end(),
+ [](const auto& lhs, const auto& rhs)
+ {
+ return lhs.first < rhs.first;
+ })->first;
}
}
@@ -808,34 +792,34 @@ ReplicaGroupEntry::getAdapterInfoNoEndpoints() const
// This method is called with the database locked so we're sure
// that no new adapters will be added or removed concurrently.
//
- vector<ServerAdapterEntryPtr> replicas;
+ vector<shared_ptr<ServerAdapterEntry>> replicas;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
replicas = _replicas;
}
AdapterInfoSeq infos;
- for(vector<ServerAdapterEntryPtr>::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ for(const auto& replica : replicas)
{
- AdapterInfoSeq infs = (*p)->getAdapterInfoNoEndpoints();
+ AdapterInfoSeq infs = replica->getAdapterInfoNoEndpoints();
assert(infs.size() == 1);
infos.push_back(infs[0]);
}
return infos;
}
-GetAdapterInfoResultPtr
+shared_ptr<GetAdapterInfoResult>
ReplicaGroupEntry::getAdapterInfoAsync() const
{
- GetAdapterInfoResultPtr result = new GetAdapterInfoResult();
- vector<ServerAdapterEntryPtr> replicas;
+ auto result = make_shared<GetAdapterInfoResult>();
+ vector<shared_ptr<ServerAdapterEntry>> replicas;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
replicas = _replicas;
}
- for(vector<ServerAdapterEntryPtr>::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ for(const auto& replica : replicas)
{
- result->add(*p);
+ result->add(replica.get());
}
return result;
}
@@ -843,16 +827,16 @@ ReplicaGroupEntry::getAdapterInfoAsync() const
bool
ReplicaGroupEntry::hasAdaptersFromOtherApplications() const
{
- vector<ServerAdapterEntryPtr> replicas;
+ vector<shared_ptr<ServerAdapterEntry>> replicas;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
replicas = _replicas;
}
AdapterInfoSeq infos;
- for(vector<ServerAdapterEntryPtr>::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ for(const auto& replica : replicas)
{
- if((*p)->getApplication() != _application)
+ if(replica->getApplication() != _application)
{
return true;
}