summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
authorJoe George <joe@zeroc.com>2021-01-28 16:26:44 -0500
committerJoe George <joe@zeroc.com>2021-02-01 16:59:30 -0500
commit92a6531e409f2691d82591e185a92299d415fc0f (patch)
tree60c79e2a8f327b8f0b6ebc06b06f48a2e8086f6a /cpp/src/IceGrid/Database.cpp
parentPort Glacier2, IceBox, IceBridge, IceDB, IceXML, icegriddb (diff)
downloadice-92a6531e409f2691d82591e185a92299d415fc0f.tar.bz2
ice-92a6531e409f2691d82591e185a92299d415fc0f.tar.xz
ice-92a6531e409f2691d82591e185a92299d415fc0f.zip
IceGrid and IceStorm
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp581
1 files changed, 310 insertions, 271 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 4abfe9e436d..697e7fd8e70 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -4,7 +4,6 @@
#include <IceUtil/StringUtil.h>
#include <IceUtil/Random.h>
-#include <IceUtil/Functional.h>
#include <Ice/LoggerUtil.h>
#include <Ice/Communicator.h>
#include <Ice/ObjectAdapter.h>
@@ -25,11 +24,11 @@
using namespace std;
using namespace IceGrid;
-typedef IceDB::ReadWriteCursor<string, ApplicationInfo, IceDB::IceContext, Ice::OutputStream> ApplicationMapRWCursor;
-typedef IceDB::ReadOnlyCursor<string, AdapterInfo, IceDB::IceContext, Ice::OutputStream> AdapterMapROCursor;
-typedef IceDB::Cursor<string, string, IceDB::IceContext, Ice::OutputStream> AdaptersByGroupMapCursor;
-typedef IceDB::ReadOnlyCursor<string, Ice::Identity, IceDB::IceContext, Ice::OutputStream> ObjectsByTypeMapROCursor;
-typedef IceDB::ReadOnlyCursor<Ice::Identity, ObjectInfo, IceDB::IceContext, Ice::OutputStream> ObjectsMapROCursor;
+using ApplicationMapRWCursor = IceDB::ReadWriteCursor<string, ApplicationInfo, IceDB::IceContext, Ice::OutputStream>;
+using AdapterMapROCursor = IceDB::ReadOnlyCursor<string, AdapterInfo, IceDB::IceContext, Ice::OutputStream>;
+using AdaptersByGroupMapCursor = IceDB::Cursor<string, string, IceDB::IceContext, Ice::OutputStream>;
+using ObjectsByTypeMapROCursor = IceDB::ReadOnlyCursor<string, Ice::Identity, IceDB::IceContext, Ice::OutputStream>;
+using ObjectsMapROCursor = IceDB::ReadOnlyCursor<Ice::Identity, ObjectInfo, IceDB::IceContext, Ice::OutputStream>;
namespace
{
@@ -43,14 +42,6 @@ const string internalObjectsDbName = "internal-objects";
const string internalObjectsByTypeDbName = "internal-objectsByType";
const string serialsDbName = "serials";
-struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::ObjectPrx, float>&, bool>
-{
- bool operator()(const pair<Ice::ObjectPrx, float>& lhs, const pair<Ice::ObjectPrx, float>& rhs)
- {
- return lhs.second < rhs.second;
- }
-};
-
template<typename K, typename V, typename C, typename H> vector<V>
toVector(const IceDB::ReadOnlyTxn& txn, const IceDB::Dbi<K, V, C, H>& m)
{
@@ -82,7 +73,7 @@ toMap(const IceDB::Txn& txn, const IceDB::Dbi<K, V, C, H>& d)
}
void
-logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex)
+logError(const shared_ptr<Ice::Communicator>& com, const IceDB::LMDBException& ex)
{
Ice::Error error(com->getLogger());
error << "LMDB error: " << ex;
@@ -91,8 +82,8 @@ logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex)
void
filterAdapterInfos(const string& filter,
const string& replicaGroupId,
- const RegistryPluginFacadeIPtr& pluginFacade,
- const Ice::ConnectionPtr& con,
+ const shared_ptr<RegistryPluginFacadeI>& pluginFacade,
+ const shared_ptr<Ice::Connection>& con,
const Ice::Context& ctx,
AdapterInfoSeq& infos)
{
@@ -101,7 +92,7 @@ filterAdapterInfos(const string& filter,
return;
}
- vector<ReplicaGroupFilterPtr> filters = pluginFacade->getReplicaGroupFilters(filter);
+ auto filters = pluginFacade->getReplicaGroupFilters(filter);
if(filters.empty())
{
return;
@@ -109,25 +100,25 @@ filterAdapterInfos(const string& filter,
Ice::StringSeq adapterIds;
adapterIds.reserve(infos.size());
- for(vector<AdapterInfo>::const_iterator p = infos.begin(); p != infos.end(); ++p)
+ for(const auto& info : infos)
{
- adapterIds.push_back(p->id);
+ adapterIds.push_back(info.id);
}
- for(vector<ReplicaGroupFilterPtr>::const_iterator q = filters.begin(); q != filters.end(); ++q)
+ for(const auto& f : filters)
{
- adapterIds = (*q)->filter(replicaGroupId, adapterIds, con, ctx);
+ adapterIds = f->filter(replicaGroupId, adapterIds, con, ctx);
}
vector<AdapterInfo> filteredAdpts;
filteredAdpts.reserve(infos.size());
- for(Ice::StringSeq::const_iterator q = adapterIds.begin(); q != adapterIds.end(); ++q)
+ for(const auto& id : adapterIds)
{
- for(vector<AdapterInfo>::const_iterator r = infos.begin(); r != infos.end(); ++r)
+ for(const auto& info : infos)
{
- if(*q == r->id)
+ if(id == info.id)
{
- filteredAdpts.push_back(*r);
+ filteredAdpts.push_back(info);
break;
}
}
@@ -187,10 +178,25 @@ findByType(const IceDB::ReadOnlyTxn& txn,
}
-Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
- const IceStorm::TopicManagerPrx& topicManager,
+shared_ptr<Database>
+Database::create(const shared_ptr<Ice::ObjectAdapter>& registryAdapter,
+ const shared_ptr<IceStorm::TopicManagerPrx>& topicManager,
+ const string& instanceName,
+ const shared_ptr<TraceLevels>& traceLevels,
+ const RegistryInfo& info,
+ bool readonly)
+{
+ shared_ptr<Database> db(new Database(registryAdapter, topicManager, instanceName, traceLevels, info, readonly));
+
+ db->_pluginFacade->setDatabase(db);
+
+ return db;
+}
+
+Database::Database(const shared_ptr<Ice::ObjectAdapter>& registryAdapter,
+ const shared_ptr<IceStorm::TopicManagerPrx>& topicManager,
const string& instanceName,
- const TraceLevelsPtr& traceLevels,
+ const shared_ptr<TraceLevels>& traceLevels,
const RegistryInfo& info,
bool readonly) :
_communicator(registryAdapter->getCommunicator()),
@@ -209,8 +215,8 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_dbLock(_communicator->getProperties()->getProperty("IceGrid.Registry.LMDB.Path") + "/icedb.lock"),
_env(_communicator->getProperties()->getProperty("IceGrid.Registry.LMDB.Path"), 8,
IceDB::getMapSize(_communicator->getProperties()->getPropertyAsInt("IceGrid.Registry.LMDB.MapSize"))),
- _pluginFacade(RegistryPluginFacadeIPtr::dynamicCast(getRegistryPluginFacade())),
- _lock(0)
+ _pluginFacade(dynamic_pointer_cast<RegistryPluginFacadeI>(getRegistryPluginFacade())),
+ _lock(nullptr)
{
IceDB::ReadWriteTxn txn(_env);
@@ -257,13 +263,13 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_objectCache.setTraceLevels(_traceLevels);
_allocatableObjectCache.setTraceLevels(_traceLevels);
- _nodeObserverTopic = new NodeObserverTopic(_topicManager, _internalAdapter);
- _registryObserverTopic = new RegistryObserverTopic(_topicManager);
+ _nodeObserverTopic = NodeObserverTopic::create(_topicManager, _internalAdapter);
+ _registryObserverTopic = make_shared<RegistryObserverTopic>(_topicManager);
_serverCache.setNodeObserverTopic(_nodeObserverTopic);
// Set all serials to 1 if they have not yet been set.
- Ice::Long serial;
+ long long serial;
if(!_serials.get(txn, applicationsDbName, serial))
{
_serials.put(txn, applicationsDbName, 1);
@@ -277,18 +283,16 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_serials.put(txn, objectsDbName, 1);
}
- _applicationObserverTopic =
- new ApplicationObserverTopic(_topicManager, toMap(txn, _applications), getSerial(txn, applicationsDbName));
- _adapterObserverTopic =
- new AdapterObserverTopic(_topicManager, toMap(txn, _adapters), getSerial(txn, adaptersDbName));
- _objectObserverTopic =
- new ObjectObserverTopic(_topicManager, toMap(txn, _objects), getSerial(txn, objectsDbName));
+ _applicationObserverTopic = make_shared<ApplicationObserverTopic>(_topicManager, toMap(txn, _applications),
+ getSerial(txn, applicationsDbName));
+ _adapterObserverTopic = make_shared<AdapterObserverTopic>(_topicManager, toMap(txn, _adapters),
+ getSerial(txn, adaptersDbName));
+ _objectObserverTopic = make_shared<ObjectObserverTopic>(_topicManager, toMap(txn, _objects),
+ getSerial(txn, objectsDbName));
txn.commit();
_registryObserverTopic->registryUp(info);
-
- _pluginFacade->setDatabase(this);
}
std::string
@@ -300,7 +304,7 @@ Database::getInstanceName() const
void
Database::destroy()
{
- _pluginFacade->setDatabase(0);
+ _pluginFacade->setDatabase(nullptr);
_registryObserverTopic->destroy();
_nodeObserverTopic->destroy();
@@ -309,31 +313,31 @@ Database::destroy()
_objectObserverTopic->destroy();
}
-ObserverTopicPtr
+shared_ptr<ObserverTopic>
Database::getObserverTopic(TopicName name) const
{
switch(name)
{
- case RegistryObserverTopicName:
+ case TopicName::RegistryObserver:
return _registryObserverTopic;
- case NodeObserverTopicName:
+ case TopicName::NodeObserver:
return _nodeObserverTopic;
- case ApplicationObserverTopicName:
+ case TopicName::ApplicationObserver:
return _applicationObserverTopic;
- case AdapterObserverTopicName:
+ case TopicName::AdapterObserver:
return _adapterObserverTopic;
- case ObjectObserverTopicName:
+ case TopicName::ObjectObserver:
return _objectObserverTopic;
default:
break;
}
- return 0;
+ return nullptr;
}
void
Database::checkSessionLock(AdminSessionI* session)
{
- if(_lock != 0 && session != _lock)
+ if(_lock != nullptr && session != _lock)
{
throw AccessDeniedException(_lockUserId); // Lock held by another session.
}
@@ -342,13 +346,13 @@ Database::checkSessionLock(AdminSessionI* session)
int
Database::lock(AdminSessionI* session, const string& userId)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
- if(_lock != 0 && session != _lock)
+ if(_lock != nullptr && session != _lock)
{
throw AccessDeniedException(_lockUserId); // Lock held by another session.
}
- assert(_lock == 0 || _lock == session);
+ assert(_lock == nullptr || _lock == session);
_lock = session;
_lockUserId = userId;
@@ -359,23 +363,24 @@ Database::lock(AdminSessionI* session, const string& userId)
void
Database::unlock(AdminSessionI* session)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
+
if(_lock != session)
{
throw AccessDeniedException();
}
- _lock = 0;
+ _lock = nullptr;
_lockUserId.clear();
}
void
-Database::syncApplications(const ApplicationInfoSeq& newApplications, Ice::Long dbSerial)
+Database::syncApplications(const ApplicationInfoSeq& newApplications, long long dbSerial)
{
assert(dbSerial != 0);
int serial = 0;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
map<string, ApplicationInfo> oldApplications;
try
@@ -433,7 +438,10 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications, Ice::Long
}
}
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ for(const auto& entry : entries)
+ {
+ entry->sync();
+ }
if(_traceLevels->application > 0)
{
@@ -447,21 +455,21 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications, Ice::Long
}
void
-Database::syncAdapters(const AdapterInfoSeq& adapters, Ice::Long dbSerial)
+Database::syncAdapters(const AdapterInfoSeq& adapters, long long dbSerial)
{
assert(dbSerial != 0);
int serial = 0;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
try
{
IceDB::ReadWriteTxn txn(_env);
_adapters.clear(txn);
_adaptersByGroupId.clear(txn);
- for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r)
+ for(const auto& adapter : adapters)
{
- addAdapter(txn, *r);
+ addAdapter(txn, adapter);
}
dbSerial = updateSerial(txn, adaptersDbName, dbSerial);
@@ -489,21 +497,21 @@ Database::syncAdapters(const AdapterInfoSeq& adapters, Ice::Long dbSerial)
}
void
-Database::syncObjects(const ObjectInfoSeq& objects, Ice::Long dbSerial)
+Database::syncObjects(const ObjectInfoSeq& objects, long long dbSerial)
{
assert(dbSerial != 0);
int serial = 0;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
try
{
IceDB::ReadWriteTxn txn(_env);
_objects.clear(txn);
_objectsByType.clear(txn);
- for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q)
+ for(const auto& obj : objects)
{
- addObject(txn, *q, false);
+ addObject(txn, obj, false);
}
dbSerial = updateSerial(txn, objectsDbName, dbSerial);
@@ -527,7 +535,7 @@ Database::syncObjects(const ObjectInfoSeq& objects, Ice::Long dbSerial)
}
ApplicationInfoSeq
-Database::getApplications(Ice::Long& serial)
+Database::getApplications(long long& serial)
{
try
{
@@ -544,7 +552,7 @@ Database::getApplications(Ice::Long& serial)
}
AdapterInfoSeq
-Database::getAdapters(Ice::Long& serial)
+Database::getAdapters(long long& serial)
{
try
{
@@ -561,7 +569,7 @@ Database::getAdapters(Ice::Long& serial)
}
ObjectInfoSeq
-Database::getObjects(Ice::Long& serial)
+Database::getObjects(long long& serial)
{
try
{
@@ -585,7 +593,7 @@ Database::getSerials() const
}
void
-Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ice::Long dbSerial)
+Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, long long dbSerial)
{
assert(dbSerial != 0 || _master);
@@ -593,10 +601,10 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic
ServerEntrySeq entries;
try
{
- Lock sync(*this);
+ unique_lock lock(_mutex);
checkSessionLock(session);
- waitForUpdate(info.descriptor.name);
+ waitForUpdate(lock, info.descriptor.name);
IceDB::ReadWriteTxn txn(_env);
@@ -614,7 +622,11 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic
load(helper, entries, info.uuid, info.revision);
startUpdating(info.descriptor.name, info.uuid, info.revision);
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ for(const auto& entry : entries)
+ {
+ entry->sync();
+ }
+
serial = _applicationObserverTopic->applicationAdded(dbSerial, info);
}
catch(const IceDB::KeyTooLongException& ex)
@@ -634,7 +646,8 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic
// for the nodes to start the servers.
//
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
+
vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), info.descriptor.name);
assert(p != _updating.end());
p->markUpdated();
@@ -660,7 +673,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic
{
try
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
entries.clear();
unload(ApplicationHelper(_communicator, info.descriptor), entries);
@@ -668,7 +681,11 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic
dbSerial = removeApplication(info.descriptor.name, txn);
txn.commit();
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ for(const auto& entry : entries)
+ {
+ entry->sync();
+ }
+
serial = _applicationObserverTopic->applicationRemoved(dbSerial, info.descriptor.name);
}
catch(const DeploymentException& ex)
@@ -682,7 +699,10 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic
}
_applicationObserverTopic->waitForSyncedSubscribers(serial);
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow));
+ for(const auto& entry : entries)
+ {
+ entry->waitForSyncNoThrow();
+ }
finishUpdating(info.descriptor.name);
throw;
}
@@ -704,14 +724,14 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, A
ApplicationInfo oldApp;
ApplicationUpdateInfo update = updt;
- IceInternal::UniquePtr<ApplicationHelper> previous;
- IceInternal::UniquePtr<ApplicationHelper> helper;
+ unique_ptr<ApplicationHelper> previous;
+ unique_ptr<ApplicationHelper> helper;
try
{
- Lock sync(*this);
+ unique_lock lock(_mutex);
checkSessionLock(session);
- waitForUpdate(update.descriptor.name);
+ waitForUpdate(lock, update.descriptor.name);
IceDB::ReadOnlyTxn txn(_env);
@@ -725,8 +745,8 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, A
update.revision = oldApp.revision + 1;
}
- previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor));
- helper.reset(new ApplicationHelper(_communicator, previous->update(update.descriptor), true));
+ previous = make_unique<ApplicationHelper>(_communicator, oldApp.descriptor);
+ helper = make_unique<ApplicationHelper>(_communicator, previous->update(update.descriptor), true);
startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1);
}
@@ -736,7 +756,7 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, A
throw;
}
- finishApplicationUpdate(update, oldApp, *previous, *helper, session, noRestart, dbSerial);
+ finishApplicationUpdate(update, oldApp, *previous.get(), *helper.get(), session, noRestart, dbSerial);
}
void
@@ -746,14 +766,14 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool n
ApplicationUpdateInfo update;
ApplicationInfo oldApp;
- IceInternal::UniquePtr<ApplicationHelper> previous;
- IceInternal::UniquePtr<ApplicationHelper> helper;
+ unique_ptr<ApplicationHelper> previous;
+ unique_ptr<ApplicationHelper> helper;
try
{
- Lock sync(*this);
+ unique_lock lock(_mutex);
checkSessionLock(session);
- waitForUpdate(newDesc.name);
+ waitForUpdate(lock, newDesc.name);
IceDB::ReadOnlyTxn txn(_env);
@@ -762,8 +782,8 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool n
throw ApplicationNotExistException(newDesc.name);
}
- previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor));
- helper.reset(new ApplicationHelper(_communicator, newDesc, true));
+ previous = make_unique<ApplicationHelper>(_communicator, oldApp.descriptor);
+ helper = make_unique<ApplicationHelper>(_communicator, newDesc, true);
update.updateTime = IceUtil::Time::now().toMilliSeconds();
update.updateUser = _lockUserId;
@@ -778,7 +798,7 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool n
throw;
}
- finishApplicationUpdate(update, oldApp, *previous, *helper, session, noRestart);
+ finishApplicationUpdate(update, oldApp, *previous.get(), *helper.get(), session, noRestart);
}
void
@@ -791,15 +811,15 @@ Database::instantiateServer(const string& application,
ApplicationUpdateInfo update;
ApplicationInfo oldApp;
- IceInternal::UniquePtr<ApplicationHelper> previous;
- IceInternal::UniquePtr<ApplicationHelper> helper;
+ unique_ptr<ApplicationHelper> previous;
+ unique_ptr<ApplicationHelper> helper;
try
{
- Lock sync(*this);
+ unique_lock lock(_mutex);
checkSessionLock(session);
- waitForUpdate(application);
+ waitForUpdate(lock, application);
IceDB::ReadOnlyTxn txn(_env);
@@ -808,8 +828,8 @@ Database::instantiateServer(const string& application,
throw ApplicationNotExistException(application);
}
- previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor));
- helper.reset(new ApplicationHelper(_communicator, previous->instantiateServer(node, instance), true));
+ previous = make_unique<ApplicationHelper>(_communicator, oldApp.descriptor);
+ helper = make_unique<ApplicationHelper>(_communicator, previous->instantiateServer(node, instance), true);
update.updateTime = IceUtil::Time::now().toMilliSeconds();
update.updateUser = _lockUserId;
@@ -824,7 +844,7 @@ Database::instantiateServer(const string& application,
throw;
}
- finishApplicationUpdate(update, oldApp, *previous, *helper, session, true);
+ finishApplicationUpdate(update, oldApp, *previous.get(), *helper.get(), session, true);
}
void
@@ -836,10 +856,10 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon
int serial = 0; // Initialize to prevent warning.
try
{
- Lock sync(*this);
+ unique_lock lock(_mutex);
checkSessionLock(session);
- waitForUpdate(name);
+ waitForUpdate(lock, name);
ApplicationInfo appInfo;
@@ -871,7 +891,11 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon
startUpdating(name, appInfo.uuid, appInfo.revision);
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ for(const auto& entry : entries)
+ {
+ entry->sync();
+ }
+
serial = _applicationObserverTopic->applicationRemoved(dbSerial, name);
}
catch(const IceDB::LMDBException& ex)
@@ -884,7 +908,10 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon
if(_master)
{
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow));
+ for(const auto& entry : entries)
+ {
+ entry->waitForSyncNoThrow();
+ }
}
if(_traceLevels->application > 0)
@@ -917,20 +944,20 @@ Database::getAllApplications(const string& expression)
}
void
-Database::waitForApplicationUpdate(const AMD_NodeSession_waitForApplicationUpdatePtr& cb,
- const string& uuid,
- int revision)
+Database::waitForApplicationUpdate(const string& uuid,
+ int revision,
+ function<void()> response, function<void(exception_ptr)> exception)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), make_pair(uuid, revision));
if(p != _updating.end() && !p->updated)
{
- p->cbs.push_back(cb);
+ p->cbs.push_back({ response, exception });
}
else
{
- cb->ice_response();
+ response();
}
}
@@ -940,7 +967,7 @@ Database::getNodeCache()
return _nodeCache;
}
-NodeEntryPtr
+shared_ptr<NodeEntry>
Database::getNode(const string& name, bool create) const
{
return _nodeCache.get(name, create);
@@ -952,7 +979,7 @@ Database::getReplicaCache()
return _replicaCache;
}
-ReplicaEntryPtr
+shared_ptr<ReplicaEntry>
Database::getReplica(const string& name) const
{
return _replicaCache.get(name);
@@ -964,7 +991,7 @@ Database::getServerCache()
return _serverCache;
}
-ServerEntryPtr
+shared_ptr<ServerEntry>
Database::getServer(const string& id) const
{
return _serverCache.get(id);
@@ -976,21 +1003,21 @@ Database::getAllocatableObjectCache()
return _allocatableObjectCache;
}
-AllocatableObjectEntryPtr
+shared_ptr<AllocatableObjectEntry>
Database::getAllocatableObject(const Ice::Identity& id) const
{
return _allocatableObjectCache.get(id);
}
void
-Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy,
- Ice::Long dbSerial)
+Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId,
+ const shared_ptr<Ice::ObjectPrx>& proxy, long long dbSerial)
{
assert(dbSerial != 0 || _master);
int serial = 0; // Initialize to prevent warning.
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_adapterCache.has(adapterId))
{
throw AdapterExistsException(adapterId);
@@ -1003,10 +1030,7 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
"can be member of this replica group");
}
- AdapterInfo info;
- info.id = adapterId;
- info.proxy = proxy;
- info.replicaGroupId = replicaGroupId;
+ AdapterInfo info = { adapterId, proxy, replicaGroupId };
bool updated = false;
try
@@ -1077,8 +1101,9 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
_adapterObserverTopic->waitForSyncedSubscribers(serial);
}
-Ice::ObjectPrx
-Database::getAdapterDirectProxy(const string& id, const Ice::EncodingVersion& encoding, const Ice::ConnectionPtr& con,
+shared_ptr<Ice::ObjectPrx>
+Database::getAdapterDirectProxy(const string& id, const Ice::EncodingVersion& encoding,
+ const shared_ptr<Ice::Connection>& con,
const Ice::Context& ctx)
{
IceDB::ReadOnlyTxn txn(_env);
@@ -1119,10 +1144,10 @@ Database::removeAdapter(const string& adapterId)
int serial = 0; // Initialize to prevent warning.
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_adapterCache.has(adapterId))
{
- AdapterEntryPtr adpt = _adapterCache.get(adapterId);
+ auto adpt = _adapterCache.get(adapterId);
throw DeploymentException("removing adapter `" + adapterId + "' is not allowed:\n" +
"the adapter was added with the application descriptor `" +
adpt->getApplication() + "'");
@@ -1188,16 +1213,16 @@ Database::removeAdapter(const string& adapterId)
_adapterObserverTopic->waitForSyncedSubscribers(serial);
}
-AdapterPrx
+shared_ptr<AdapterPrx>
Database::getAdapterProxy(const string& adapterId, const string& replicaGroupId, bool upToDate)
{
- Lock sync(*this); // Make sure this isn't call during an update.
+ lock_guard lock(_mutex); // Make sure this isn't call during an update.
return _adapterCache.get(adapterId)->getProxy(replicaGroupId, upToDate);
}
void
Database::getLocatorAdapterInfo(const string& id,
- const Ice::ConnectionPtr& connection,
+ const shared_ptr<Ice::Connection>& connection,
const Ice::Context& context,
LocatorAdapterInfoSeq& adpts,
int& count,
@@ -1207,13 +1232,13 @@ Database::getLocatorAdapterInfo(const string& id,
{
string filter;
{
- Lock sync(*this); // Make sure this isn't call during an update.
+ lock_guard lock(_mutex); // Make sure this isn't called during an update.
_adapterCache.get(id)->getLocatorAdapterInfo(adpts, count, replicaGroup, roundRobin, filter, excludes);
}
if(_pluginFacade->hasReplicaGroupFilters() && !adpts.empty())
{
- vector<ReplicaGroupFilterPtr> filters = _pluginFacade->getReplicaGroupFilters(filter);
+ auto filters = _pluginFacade->getReplicaGroupFilters(filter);
if(!filters.empty())
{
Ice::StringSeq adapterIds;
@@ -1222,9 +1247,9 @@ Database::getLocatorAdapterInfo(const string& id,
adapterIds.push_back(q->id);
}
- for(vector<ReplicaGroupFilterPtr>::const_iterator q = filters.begin(); q != filters.end(); ++q)
+ for(const auto& f : filters)
{
- adapterIds = (*q)->filter(id, adapterIds, connection, context);
+ adapterIds = f->filter(id, adapterIds, connection, context);
}
LocatorAdapterInfoSeq filteredAdpts;
@@ -1247,10 +1272,10 @@ Database::getLocatorAdapterInfo(const string& id,
bool
Database::addAdapterSyncCallback(const string& id,
- const SynchronizationCallbackPtr& callback,
+ const shared_ptr<SynchronizationCallback>& callback,
const std::set<std::string>& excludes)
{
- Lock sync(*this); // Make sure this isn't call during an update.
+ lock_guard lock(_mutex); // Make sure this isn't call during an update.
return _adapterCache.get(id)->addSyncCallback(callback, excludes);
}
@@ -1262,10 +1287,10 @@ Database::getAdapterInfo(const string& id)
// server, if that's the case we get the adapter proxy from the
// server.
//
- GetAdapterInfoResultPtr result;
+ shared_ptr<GetAdapterInfoResult> result;
try
{
- Lock sync(*this); // Make sure this isn't call during an update.
+ lock_guard lock(_mutex); // Make sure this isn't call during an update.
result = _adapterCache.get(id)->getAdapterInfoAsync();
}
catch(const AdapterNotExistException&)
@@ -1304,7 +1329,7 @@ Database::getAdapterInfo(const string& id)
}
AdapterInfoSeq
-Database::getFilteredAdapterInfo(const string& id, const Ice::ConnectionPtr& con, const Ice::Context& ctx)
+Database::getFilteredAdapterInfo(const string& id, const shared_ptr<Ice::Connection>& con, const Ice::Context& ctx)
{
//
// First we check if the given adapter id is associated to a
@@ -1314,13 +1339,13 @@ Database::getFilteredAdapterInfo(const string& id, const Ice::ConnectionPtr& con
try
{
AdapterInfoSeq infos;
- ReplicaGroupEntryPtr replicaGroup;
+ shared_ptr<ReplicaGroupEntry> replicaGroup;
{
- Lock sync(*this); // Make sure this isn't call during an update.
+ lock_guard lock(_mutex); // Make sure this isn't call during an update.
- AdapterEntryPtr entry = _adapterCache.get(id);
+ auto entry = _adapterCache.get(id);
infos = entry->getAdapterInfoNoEndpoints();
- replicaGroup = ReplicaGroupEntryPtr::dynamicCast(entry);
+ replicaGroup = dynamic_pointer_cast<ReplicaGroupEntry>(entry);
}
if(replicaGroup)
{
@@ -1365,8 +1390,8 @@ Database::getAdapterServer(const string& id) const
{
try
{
- Lock sync(*this); // Make sure this isn't call during an update.
- ServerAdapterEntryPtr adapter = ServerAdapterEntryPtr::dynamicCast(_adapterCache.get(id));
+ lock_guard lock(_mutex); // Make sure this isn't call during an update.
+ auto adapter = dynamic_pointer_cast<ServerAdapterEntry>(_adapterCache.get(id));
if(adapter)
{
return adapter->getServerId();
@@ -1383,7 +1408,7 @@ Database::getAdapterApplication(const string& id) const
{
try
{
- Lock sync(*this); // Make sure this isn't call during an update.
+ lock_guard lock(_mutex); // Make sure this isn't call during an update.
return _adapterCache.get(id)->getApplication();
}
catch(const AdapterNotExistException&)
@@ -1397,8 +1422,8 @@ Database::getAdapterNode(const string& id) const
{
try
{
- Lock sync(*this); // Make sure this isn't call during an update.
- ServerAdapterEntryPtr adapter = ServerAdapterEntryPtr::dynamicCast(_adapterCache.get(id));
+ lock_guard lock(_mutex); // Make sure this isn't call during an update.
+ auto adapter = dynamic_pointer_cast<ServerAdapterEntry>(_adapterCache.get(id));
if(adapter)
{
return adapter->getNodeName();
@@ -1413,7 +1438,7 @@ Database::getAdapterNode(const string& id) const
Ice::StringSeq
Database::getAllAdapters(const string& expression)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
vector<string> result;
vector<string> ids = _adapterCache.getAll(expression);
result.swap(ids);
@@ -1456,7 +1481,7 @@ Database::addObject(const ObjectInfo& info)
int serial = 0;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
const Ice::Identity id = info.proxy->ice_getIdentity();
if(_objectCache.has(id))
@@ -1464,7 +1489,7 @@ Database::addObject(const ObjectInfo& info)
throw ObjectExistsException(id);
}
- Ice::Long dbSerial = 0;
+ long long dbSerial = 0;
try
{
IceDB::ReadWriteTxn txn(_env);
@@ -1496,13 +1521,13 @@ Database::addObject(const ObjectInfo& info)
}
void
-Database::addOrUpdateObject(const ObjectInfo& info, Ice::Long dbSerial)
+Database::addOrUpdateObject(const ObjectInfo& info, long long dbSerial)
{
assert(dbSerial != 0 || _master);
int serial = 0; // Initialize to prevent warning.
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
const Ice::Identity id = info.proxy->ice_getIdentity();
if(_objectCache.has(id))
@@ -1552,13 +1577,13 @@ Database::addOrUpdateObject(const ObjectInfo& info, Ice::Long dbSerial)
}
void
-Database::removeObject(const Ice::Identity& id, Ice::Long dbSerial)
+Database::removeObject(const Ice::Identity& id, long long dbSerial)
{
assert(dbSerial != 0 || _master);
int serial = 0; // Initialize to prevent warning.
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_objectCache.has(id))
{
throw DeploymentException("removing object `" + _communicator->identityToString(id) + "' is not allowed:\n"
@@ -1598,13 +1623,13 @@ Database::removeObject(const Ice::Identity& id, Ice::Long dbSerial)
}
void
-Database::updateObject(const Ice::ObjectPrx& proxy)
+Database::updateObject(const shared_ptr<Ice::ObjectPrx>& proxy)
{
assert(_master);
int serial = 0;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
const Ice::Identity id = proxy->ice_getIdentity();
if(_objectCache.has(id))
@@ -1649,19 +1674,19 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
int
Database::addOrUpdateRegistryWellKnownObjects(const ObjectInfoSeq& objects)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
try
{
IceDB::ReadWriteTxn txn(_env);
- for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
+ for(const auto& obj : objects)
{
- Ice::Identity id = p->proxy->ice_getIdentity();
+ Ice::Identity id = obj.proxy->ice_getIdentity();
ObjectInfo info;
if(_objects.get(txn, id, info))
{
_objectsByType.del(txn, info.type, id);
}
- addObject(txn, *p, false);
+ addObject(txn, obj, false);
}
txn.commit();
}
@@ -1677,13 +1702,13 @@ Database::addOrUpdateRegistryWellKnownObjects(const ObjectInfoSeq& objects)
int
Database::removeRegistryWellKnownObjects(const ObjectInfoSeq& objects)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
try
{
IceDB::ReadWriteTxn txn(_env);
- for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
+ for(const auto& obj : objects)
{
- Ice::Identity id = p->proxy->ice_getIdentity();
+ Ice::Identity id = obj.proxy->ice_getIdentity();
ObjectInfo info;
if(_objects.get(txn, id, info))
{
@@ -1701,7 +1726,7 @@ Database::removeRegistryWellKnownObjects(const ObjectInfoSeq& objects)
return _objectObserverTopic->wellKnownObjectsRemoved(objects);
}
-Ice::ObjectPrx
+shared_ptr<Ice::ObjectPrx>
Database::getObjectProxy(const Ice::Identity& id)
{
try
@@ -1724,8 +1749,8 @@ Database::getObjectProxy(const Ice::Identity& id)
return info.proxy;
}
-Ice::ObjectPrx
-Database::getObjectByType(const string& type, const Ice::ConnectionPtr& con, const Ice::Context& ctx)
+shared_ptr<Ice::ObjectPrx>
+Database::getObjectByType(const string& type, const shared_ptr<Ice::Connection>& con, const Ice::Context& ctx)
{
Ice::ObjectProxySeq objs = getObjectsByType(type, con, ctx);
if(objs.empty())
@@ -1735,9 +1760,9 @@ Database::getObjectByType(const string& type, const Ice::ConnectionPtr& con, con
return objs[IceUtilInternal::random(static_cast<int>(objs.size()))];
}
-Ice::ObjectPrx
-Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample, const Ice::ConnectionPtr& con,
- const Ice::Context& ctx)
+shared_ptr<Ice::ObjectPrx>
+Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample,
+ const shared_ptr<Ice::Connection>& con, const Ice::Context& ctx)
{
Ice::ObjectProxySeq objs = getObjectsByType(type, con, ctx);
if(objs.empty())
@@ -1746,55 +1771,59 @@ Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample
}
IceUtilInternal::shuffle(objs.begin(), objs.end());
- vector<pair<Ice::ObjectPrx, float> > objectsWithLoad;
+ vector<pair<shared_ptr<Ice::ObjectPrx>, float>> objectsWithLoad;
objectsWithLoad.reserve(objs.size());
- for(Ice::ObjectProxySeq::const_iterator p = objs.begin(); p != objs.end(); ++p)
+ for(const auto& obj : objs)
{
float load = 1.0f;
- if(!(*p)->ice_getAdapterId().empty())
+ if(!obj->ice_getAdapterId().empty())
{
try
{
- load = _adapterCache.get((*p)->ice_getAdapterId())->getLeastLoadedNodeLoad(sample);
+ load = _adapterCache.get(obj->ice_getAdapterId())->getLeastLoadedNodeLoad(sample);
}
catch(const AdapterNotExistException&)
{
}
}
- objectsWithLoad.push_back(make_pair(*p, load));
+ objectsWithLoad.push_back(make_pair(obj, load));
}
- return min_element(objectsWithLoad.begin(), objectsWithLoad.end(), ObjectLoadCI())->first;
+ return min_element(objectsWithLoad.begin(), objectsWithLoad.end(), [](const auto& lhs, const auto& rhs)
+ {
+ return lhs.second < rhs.second;
+ })->first;
}
Ice::ObjectProxySeq
-Database::getObjectsByType(const string& type, const Ice::ConnectionPtr& con, const Ice::Context& ctx)
+Database::getObjectsByType(const string& type, const shared_ptr<Ice::Connection>& con, const Ice::Context& ctx)
{
Ice::ObjectProxySeq proxies;
- vector<ObjectEntryPtr> objects = _objectCache.getObjectsByType(type);
- for(vector<ObjectEntryPtr>::const_iterator q = objects.begin(); q != objects.end(); ++q)
+ auto objects = _objectCache.getObjectsByType(type);
+
+ for(const auto& obj : objects)
{
- if(_nodeObserverTopic->isServerEnabled((*q)->getServer())) // Only return proxies from enabled servers.
+ if(_nodeObserverTopic->isServerEnabled(obj->getServer())) // Only return proxies from enabled servers.
{
- proxies.push_back((*q)->getProxy());
+ proxies.push_back(obj->getProxy());
}
}
IceDB::ReadOnlyTxn txn(_env);
vector<ObjectInfo> infos = findByType(txn, _objects, _objectsByType, type);
- for(unsigned int i = 0; i < infos.size(); ++i)
+ for(const auto& info : infos)
{
- proxies.push_back(infos[i].proxy);
+ proxies.push_back(info.proxy);
}
if(con && !proxies.empty() && _pluginFacade->hasTypeFilters())
{
- vector<TypeFilterPtr> filters = _pluginFacade->getTypeFilters(type);
+ auto filters = _pluginFacade->getTypeFilters(type);
if(!filters.empty())
{
- for(vector<TypeFilterPtr>::const_iterator p = filters.begin(); p != filters.end(); ++p)
+ for(const auto& filter: filters)
{
- proxies = (*p)->filter(type, proxies, con, ctx);
+ proxies = filter->filter(type, proxies, con, ctx);
}
}
}
@@ -1806,8 +1835,7 @@ Database::getObjectInfo(const Ice::Identity& id)
{
try
{
- ObjectEntryPtr object = _objectCache.get(id);
- return object->getObjectInfo();
+ return _objectCache.get(id)->getObjectInfo();
}
catch(const ObjectNotRegisteredException&)
{
@@ -1859,7 +1887,7 @@ Database::getObjectInfosByType(const string& type)
void
Database::addInternalObject(const ObjectInfo& info, bool replace)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
const Ice::Identity id = info.proxy->ice_getIdentity();
try
@@ -1889,7 +1917,7 @@ Database::addInternalObject(const ObjectInfo& info, bool replace)
void
Database::removeInternalObject(const Ice::Identity& id)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
try
{
@@ -1934,7 +1962,11 @@ Database::checkForAddition(const ApplicationHelper& app, const IceDB::ReadWriteT
app.getIds(serverIds, adapterIds, objectIds);
- for_each(serverIds.begin(), serverIds.end(), objFunc(*this, &Database::checkServerForAddition));
+ for(const auto& serverId : serverIds)
+ {
+ checkServerForAddition(serverId);
+ }
+
if(!adapterIds.empty())
{
for(set<string>::const_iterator p = adapterIds.begin(); p != adapterIds.end(); ++p)
@@ -1953,7 +1985,10 @@ Database::checkForAddition(const ApplicationHelper& app, const IceDB::ReadWriteT
set<string> repGrps;
set<string> adptRepGrps;
app.getReplicaGroups(repGrps, adptRepGrps);
- for_each(adptRepGrps.begin(), adptRepGrps.end(), objFunc(*this, &Database::checkReplicaGroupExists));
+ for(const auto& repGrp : adptRepGrps)
+ {
+ checkReplicaGroupExists(repGrp);
+ }
}
void
@@ -1970,15 +2005,18 @@ Database::checkForUpdate(const ApplicationHelper& origApp,
Ice::StringSeq addedSvrs;
set_difference(newSvrs.begin(), newSvrs.end(), oldSvrs.begin(), oldSvrs.end(), back_inserter(addedSvrs));
- for_each(addedSvrs.begin(), addedSvrs.end(), objFunc(*this, &Database::checkServerForAddition));
+ for(const auto& svr : addedSvrs)
+ {
+ checkServerForAddition(svr);
+ }
Ice::StringSeq addedAdpts;
set_difference(newAdpts.begin(), newAdpts.end(), oldAdpts.begin(), oldAdpts.end(), back_inserter(addedAdpts));
if(!addedAdpts.empty())
{
- for(Ice::StringSeq::const_iterator p = addedAdpts.begin(); p != addedAdpts.end(); ++p)
+ for(const auto& adpt : addedAdpts)
{
- checkAdapterForAddition(*p, txn);
+ checkAdapterForAddition(adpt, txn);
}
}
@@ -1986,9 +2024,9 @@ Database::checkForUpdate(const ApplicationHelper& origApp,
set_difference(newObjs.begin(), newObjs.end(), oldObjs.begin(), oldObjs.end(), back_inserter(addedObjs));
if(!addedObjs.empty())
{
- for(vector<Ice::Identity>::const_iterator p = addedObjs.begin(); p != addedObjs.end(); ++p)
+ for(const auto& obj : addedObjs)
{
- checkObjectForAddition(*p, txn);
+ checkObjectForAddition(obj, txn);
}
}
@@ -1999,12 +2037,18 @@ Database::checkForUpdate(const ApplicationHelper& origApp,
set<string> rmRepGrps;
set_difference(oldRepGrps.begin(), oldRepGrps.end(), newRepGrps.begin(),newRepGrps.end(), set_inserter(rmRepGrps));
- for_each(rmRepGrps.begin(), rmRepGrps.end(), objFunc(*this, &Database::checkReplicaGroupForRemove));
+ for(const auto& repGrp : rmRepGrps)
+ {
+ checkReplicaGroupForRemove(repGrp);
+ }
set<string> addedAdptRepGrps;
set_difference(newAdptRepGrps.begin(),newAdptRepGrps.end(), oldAdptRepGrps.begin(), oldAdptRepGrps.end(),
set_inserter(addedAdptRepGrps));
- for_each(addedAdptRepGrps.begin(), addedAdptRepGrps.end(), objFunc(*this, &Database::checkReplicaGroupExists));
+ for(const auto& repGrp : addedAdptRepGrps)
+ {
+ checkReplicaGroupExists(repGrp);
+ }
vector<string> invalidAdptRepGrps;
set_intersection(rmRepGrps.begin(), rmRepGrps.end(), newAdptRepGrps.begin(), newAdptRepGrps.end(),
@@ -2021,7 +2065,11 @@ Database::checkForRemove(const ApplicationHelper& app)
set<string> replicaGroups;
set<string> adapterReplicaGroups;
app.getReplicaGroups(replicaGroups, adapterReplicaGroups);
- for_each(replicaGroups.begin(), replicaGroups.end(), objFunc(*this, &Database::checkReplicaGroupForRemove));
+
+ for(const auto& replicaGroup : replicaGroups)
+ {
+ checkReplicaGroupForRemove(replicaGroup);
+ }
}
void
@@ -2088,10 +2136,10 @@ Database::checkObjectForAddition(const Ice::Identity& objectId,
void
Database::checkReplicaGroupExists(const string& replicaGroup)
{
- ReplicaGroupEntryPtr entry;
+ shared_ptr<ReplicaGroupEntry> entry;
try
{
- entry = ReplicaGroupEntryPtr::dynamicCast(_adapterCache.get(replicaGroup));
+ entry = dynamic_pointer_cast<ReplicaGroupEntry>(_adapterCache.get(replicaGroup));
}
catch(const AdapterNotExistException&)
{
@@ -2106,10 +2154,10 @@ Database::checkReplicaGroupExists(const string& replicaGroup)
void
Database::checkReplicaGroupForRemove(const string& replicaGroup)
{
- ReplicaGroupEntryPtr entry;
+ shared_ptr<ReplicaGroupEntry> entry;
try
{
- entry = ReplicaGroupEntryPtr::dynamicCast(_adapterCache.get(replicaGroup));
+ entry = dynamic_pointer_cast<ReplicaGroupEntry>(_adapterCache.get(replicaGroup));
}
catch(const AdapterNotExistException&)
{
@@ -2137,53 +2185,50 @@ Database::load(const ApplicationHelper& app, ServerEntrySeq& entries, const stri
{
const NodeDescriptorDict& nodes = app.getInstance().nodes;
const string application = app.getInstance().name;
- for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n)
+ for(const auto& node : nodes)
{
- _nodeCache.get(n->first, true)->addDescriptor(application, n->second);
+ _nodeCache.get(node.first, true)->addDescriptor(application, node.second);
}
const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups;
- for(ReplicaGroupDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r)
+ for(const auto& adpt : adpts)
{
- assert(!r->id.empty());
- _adapterCache.addReplicaGroup(*r, application);
- for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
+ assert(!adpt.id.empty());
+ _adapterCache.addReplicaGroup(adpt, application);
+ for(const auto& obj : adpt.objects)
{
- _objectCache.add(toObjectInfo(_communicator, *o, r->id), application, "");
+ _objectCache.add(toObjectInfo(_communicator, obj, adpt.id), application, "");
}
}
- map<string, ServerInfo> servers = app.getServerInfos(uuid, revision);
- for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p)
+ for(const auto& server : app.getServerInfos(uuid, revision))
{
- entries.push_back(_serverCache.add(p->second));
+ entries.push_back(_serverCache.add(server.second));
}
}
void
Database::unload(const ApplicationHelper& app, ServerEntrySeq& entries)
{
- map<string, ServerInfo> servers = app.getServerInfos("", 0);
- for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p)
+ for(const auto& server : app.getServerInfos("", 0))
{
- entries.push_back(_serverCache.remove(p->first, false));
+ entries.push_back(_serverCache.remove(server.first, false));
}
- const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups;
- for(ReplicaGroupDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r)
+ for(const auto& adpt : app.getInstance().replicaGroups)
{
- for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
+ for(ObjectDescriptorSeq::const_iterator o = adpt.objects.begin(); o != adpt.objects.end(); ++o)
{
_objectCache.remove(o->id);
}
- _adapterCache.removeReplicaGroup(r->id);
+ _adapterCache.removeReplicaGroup(adpt.id);
}
const NodeDescriptorDict& nodes = app.getInstance().nodes;
const string application = app.getInstance().name;
- for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n)
+ for(const auto& node : nodes)
{
- _nodeCache.get(n->first)->removeDescriptor(application);
+ _nodeCache.get(node.first)->removeDescriptor(application);
}
}
@@ -2200,8 +2245,8 @@ Database::reload(const ApplicationHelper& oldApp,
//
// Remove destroyed servers.
//
- map<string, ServerInfo> oldServers = oldApp.getServerInfos(uuid, revision);
- map<string, ServerInfo> newServers = newApp.getServerInfos(uuid, revision);
+ auto oldServers = oldApp.getServerInfos(uuid, revision);
+ auto newServers = newApp.getServerInfos(uuid, revision);
vector<pair<bool, ServerInfo> > load;
for(map<string, ServerInfo>::const_iterator p = newServers.begin(); p != newServers.end(); ++p)
{
@@ -2217,7 +2262,7 @@ Database::reload(const ApplicationHelper& oldApp,
}
else
{
- ServerEntryPtr server = _serverCache.get(p->first);
+ auto server = _serverCache.get(p->first);
server->update(q->second, noRestart); // Just update the server revision on the node.
entries.push_back(server);
}
@@ -2281,7 +2326,7 @@ Database::reload(const ApplicationHelper& oldApp,
{
try
{
- ReplicaGroupEntryPtr entry = ReplicaGroupEntryPtr::dynamicCast(_adapterCache.get(r->id));
+ auto entry = dynamic_pointer_cast<ReplicaGroupEntry>(_adapterCache.get(r->id));
assert(entry);
entry->update(application, r->loadBalancing, r->filter);
}
@@ -2312,16 +2357,16 @@ Database::reload(const ApplicationHelper& oldApp,
}
}
-Ice::Long
-Database::saveApplication(const ApplicationInfo& info, const IceDB::ReadWriteTxn& txn, Ice::Long dbSerial)
+long long
+Database::saveApplication(const ApplicationInfo& info, const IceDB::ReadWriteTxn& txn, long long dbSerial)
{
assert(dbSerial != 0 || _master);
_applications.put(txn, info.descriptor.name, info);
return updateSerial(txn, applicationsDbName, dbSerial);
}
-Ice::Long
-Database::removeApplication(const string& name, const IceDB::ReadWriteTxn& txn, Ice::Long dbSerial)
+long long
+Database::removeApplication(const string& name, const IceDB::ReadWriteTxn& txn, long long dbSerial)
{
assert(dbSerial != 0 || _master);
_applications.del(txn, name);
@@ -2343,21 +2388,21 @@ Database::checkUpdate(const ApplicationHelper& oldApp,
map<string, ServerInfo>::const_iterator p;
vector<string> servers;
vector<string> reasons;
- vector<CheckUpdateResultPtr> results;
+ vector<shared_ptr<CheckUpdateResult>> results;
set<string> unreachableNodes;
if(noRestart)
{
for(p = oldServers.begin(); p != oldServers.end(); ++p)
{
- map<string, ServerInfo>::const_iterator q = newServers.find(p->first);
+ auto q = newServers.find(p->first);
if(q == newServers.end())
{
try
{
- ServerInfo info = p->second;
+ auto info = p->second;
info.descriptor = 0; // Clear the descriptor to indicate removal.
- CheckUpdateResultPtr result = _serverCache.get(p->first)->checkUpdate(info, true);
+ auto result = _serverCache.get(p->first)->checkUpdate(info, true);
if(result)
{
results.push_back(result);
@@ -2378,7 +2423,7 @@ Database::checkUpdate(const ApplicationHelper& oldApp,
for(p = newServers.begin(); p != newServers.end(); ++p)
{
- map<string, ServerInfo>::const_iterator q = oldServers.find(p->first);
+ auto q = oldServers.find(p->first);
if(q != oldServers.end() && isServerUpdated(p->second, q->second))
{
if(noRestart &&
@@ -2400,7 +2445,7 @@ Database::checkUpdate(const ApplicationHelper& oldApp,
//
try
{
- CheckUpdateResultPtr result = _serverCache.get(p->first)->checkUpdate(p->second, noRestart);
+ auto result = _serverCache.get(p->first)->checkUpdate(p->second, noRestart);
if(result)
{
results.push_back(result);
@@ -2419,11 +2464,11 @@ Database::checkUpdate(const ApplicationHelper& oldApp,
}
}
- for(vector<CheckUpdateResultPtr>::const_iterator q = results.begin(); q != results.end(); ++q)
+ for(const auto& result : results)
{
try
{
- (*q)->getResult();
+ result->getResult();
}
catch(const NodeUnreachableException& ex)
{
@@ -2431,7 +2476,7 @@ Database::checkUpdate(const ApplicationHelper& oldApp,
}
catch(const DeploymentException& ex)
{
- servers.push_back((*q)->getServer());
+ servers.push_back(result->getServer());
reasons.push_back(ex.reason);
}
}
@@ -2446,15 +2491,9 @@ Database::checkUpdate(const ApplicationHelper& oldApp,
out << "check for application `" << application << "' update failed:";
if(!unreachableNodes.empty())
{
-#if defined(__SUNPRO_CC) && defined(_RWSTD_NO_MEMBER_TEMPLATES)
- Ice::StringSeq nodes;
- for(set<string>::const_iterator r = unreachableNodes.begin(); r != unreachableNodes.end(); ++r)
- {
- nodes.push_back(*r);
- }
-#else
+
Ice::StringSeq nodes(unreachableNodes.begin(), unreachableNodes.end());
-#endif
+
if(nodes.size() == 1)
{
out << "\nthe node `" << nodes[0] << "' is down";
@@ -2488,15 +2527,7 @@ Database::checkUpdate(const ApplicationHelper& oldApp,
}
if(!unreachableNodes.empty())
{
-#if defined(__SUNPRO_CC) && defined(_RWSTD_NO_MEMBER_TEMPLATES)
- Ice::StringSeq nodes;
- for(set<string>::const_iterator r = unreachableNodes.begin(); r != unreachableNodes.end(); ++r)
- {
- nodes.push_back(*r);
- }
-#else
Ice::StringSeq nodes(unreachableNodes.begin(), unreachableNodes.end());
-#endif
if(nodes.size() == 1)
{
os << "\nthe node `" << nodes[0] << "' is down";
@@ -2541,14 +2572,17 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
checkUpdate(previousAppHelper, appHelper, oldApp.uuid, oldApp.revision, noRestart);
}
- Lock sync(*this);
+ lock_guard lock(_mutex);
IceDB::ReadWriteTxn txn(_env);
checkForUpdate(previousAppHelper, appHelper, txn);
reload(previousAppHelper, appHelper, entries, oldApp.uuid, oldApp.revision + 1, noRestart);
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ for(const auto& entry : entries)
+ {
+ entry->sync();
+ }
ApplicationInfo info = oldApp;
info.updateTime = update.updateTime;
@@ -2579,8 +2613,8 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
// for the nodes to start servers.
//
{
- Lock sync(*this);
- vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), update.descriptor.name);
+ lock_guard lock(_mutex);
+ auto p = find(_updating.begin(), _updating.end(), update.descriptor.name);
assert(p != _updating.end());
p->markUpdated();
}
@@ -2605,7 +2639,7 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
{
ApplicationUpdateInfo newUpdate;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
entries.clear();
ApplicationHelper previous(_communicator, newDesc);
ApplicationHelper helper(_communicator, oldApp.descriptor);
@@ -2635,12 +2669,20 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
assert(p != _updating.end());
p->unmarkUpdated();
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ for(const auto& entry : entries)
+ {
+ entry->sync();
+ }
serial = _applicationObserverTopic->applicationUpdated(dbSerial, newUpdate);
}
_applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for subscriber to be updated.
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow));
+
+ for(const auto& entry : entries)
+ {
+ entry->waitForSyncNoThrow();
+ }
+
finishUpdating(newDesc.name);
throw;
}
@@ -2655,12 +2697,9 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
}
void
-Database::waitForUpdate(const string& name)
+Database::waitForUpdate(unique_lock<mutex>& lock, const string& name)
{
- while(find(_updating.begin(), _updating.end(), name) != _updating.end())
- {
- wait();
- }
+ _condVar.wait(lock, [this, &name] { return find(_updating.begin(), _updating.end(), name) == _updating.end(); });
}
void
@@ -2674,25 +2713,25 @@ Database::startUpdating(const string& name, const string& uuid, int revision)
void
Database::finishUpdating(const string& name)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), name);
assert(p != _updating.end());
p->markUpdated();
_updating.erase(p);
- notifyAll();
+ _condVar.notify_all();
}
-Ice::Long
+long long
Database::getSerial(const IceDB::Txn& txn, const string& dbName)
{
- Ice::Long serial = 1;
+ long long serial = 1;
_serials.get(txn, dbName, serial);
return serial;
}
-Ice::Long
-Database::updateSerial(const IceDB::ReadWriteTxn& txn, const string& dbName, Ice::Long serial)
+long long
+Database::updateSerial(const IceDB::ReadWriteTxn& txn, const string& dbName, long long serial)
{
if(serial == -1) // The master we are talking to doesn't support serials (old IceGrid versions)
{