diff options
author | randomdan <randomdan@localhost> | 2013-12-24 18:09:08 +0000 |
---|---|---|
committer | randomdan <randomdan@localhost> | 2013-12-24 18:09:08 +0000 |
commit | 53f55ff3ff65de2f9bd4410e7f245d0e26f29ca3 (patch) | |
tree | 42f80c2f6bd6c0d09a25937c927dc554f172d0b1 /project2/sql/rdbmsDataSource.cpp | |
parent | Fix slice scanner and split .ice files back into logical blocks (diff) | |
download | project2-53f55ff3ff65de2f9bd4410e7f245d0e26f29ca3.tar.bz2 project2-53f55ff3ff65de2f9bd4410e7f245d0e26f29ca3.tar.xz project2-53f55ff3ff65de2f9bd4410e7f245d0e26f29ca3.zip |
Manage database connections on a per thread basis
Diffstat (limited to 'project2/sql/rdbmsDataSource.cpp')
-rw-r--r-- | project2/sql/rdbmsDataSource.cpp | 162 |
1 files changed, 130 insertions, 32 deletions
diff --git a/project2/sql/rdbmsDataSource.cpp b/project2/sql/rdbmsDataSource.cpp index bf69dcd..7965a29 100644 --- a/project2/sql/rdbmsDataSource.cpp +++ b/project2/sql/rdbmsDataSource.cpp @@ -8,6 +8,7 @@ #include <boost/foreach.hpp> SimpleMessageException(UnknownConnectionProvider); +#define LOCK(l) std::lock_guard<std::mutex> _lock##l(l) /// Specialized ElementLoader for instances of RdbmsDataSource; handles persistent DB connections class RdbmsDataSourceLoader : public ElementLoader::For<RdbmsDataSource> { @@ -38,7 +39,8 @@ DECLARE_CUSTOM_LOADER("rdbmsdatasource", RdbmsDataSourceLoader); RdbmsDataSource::DBHosts RdbmsDataSource::dbhosts; RdbmsDataSource::FailedHosts RdbmsDataSource::failedhosts; -RdbmsDataSource::DSNSet RdbmsDataSource::changedDSNs; +RdbmsDataSource::ChangedDSNs RdbmsDataSource::changedDSNs; +std::mutex RdbmsDataSource::glock; RdbmsDataSource::RdbmsDataSource(ScriptNodePtr p) : DataSource(p), @@ -54,24 +56,31 @@ RdbmsDataSource::~RdbmsDataSource() { } -const DB::Connection & +RdbmsDataSource::ConnectionRef RdbmsDataSource::getWritable() const { + LOCK(ilock); ConnectionPtr master = connectTo(masterDsn); if (!master->txOpen) { master->connection->beginTx(); master->txOpen = true; } - changedDSNs.insert(name); - return *master->connection; + LOCK(glock); + changedDSNs.insert({name, std::this_thread::get_id()}); + return master.get(); } -const DB::Connection & +RdbmsDataSource::ConnectionRef RdbmsDataSource::getReadonly() const { - if (changedDSNs.find(name) != changedDSNs.end()) { - return *connectTo(masterDsn)->connection; + { + LOCK(glock); + if (changedDSNs.find({name, std::this_thread::get_id()}) != changedDSNs.end()) { + glock.unlock(); + return connectTo(masterDsn).get(); + } } + LOCK(ilock); if (localhost.length() == 0 && preferLocal) { struct utsname name; if (uname(&name)) { @@ -89,9 +98,9 @@ RdbmsDataSource::getReadonly() const if (ro == roDSNs.end()) { Logger()->messagef(LOG_INFO, "%s: No database host matches local host name (%s) Will use master DSN", __PRETTY_FUNCTION__, localhost.c_str()); - return *connectTo(masterDsn)->connection; + return connectTo(masterDsn).get(); } - return *connectTo(ro->second)->connection; + return connectTo(ro->second).get(); } catch (...) { // Failed to connect to a preferred DB... carry on and try the others... @@ -99,38 +108,55 @@ RdbmsDataSource::getReadonly() const } BOOST_FOREACH(ReadonlyDSNs::value_type db, roDSNs) { try { - return *connectTo(db.second)->connection; + return connectTo(db.second).get(); } catch (...) { } } - return *connectTo(masterDsn)->connection; + return connectTo(masterDsn).get(); } void RdbmsDataSource::commit() { - DBHosts::const_iterator m = dbhosts.find(masterDsn); - if (m != dbhosts.end() && m->second->txOpen) { - m->second->connection->commitTx(); - m->second->txOpen = false; + LOCK(ilock); + LOCK(glock); + auto masters = dbhosts.equal_range(masterDsn); + for (auto m = masters.first; m != masters.second; m++) { + if (m->second->threadId) { + } + if (m->second->txOpen && m->second->threadId && *m->second->threadId == std::this_thread::get_id()) { + m->second->connection->commitTx(); + m->second->txOpen = false; + if (m->second->users == 0) { + m->second->threadId.reset(); + } + } } } void RdbmsDataSource::rollback() { - DBHosts::const_iterator m = dbhosts.find(masterDsn); - if (m != dbhosts.end() && m->second->txOpen) { - m->second->connection->rollbackTx(); - m->second->txOpen = false; + LOCK(ilock); + LOCK(glock); + auto masters = dbhosts.equal_range(masterDsn); + for (auto m = masters.first; m != masters.second; m++) { + if (m->second->txOpen && m->second->threadId && *m->second->threadId == std::this_thread::get_id()) { + m->second->connection->rollbackTx(); + m->second->txOpen = false; + if (m->second->users == 0) { + m->second->threadId.reset(); + } + } } - changedDSNs.erase(name); + changedDSNs.erase({name, std::this_thread::get_id()}); } RdbmsDataSource::ConnectionPtr RdbmsDataSource::connectTo(const ConnectionInfo & dsn) { + LOCK(glock); FailedHosts::iterator dbf = failedhosts.find(dsn); if (dbf != failedhosts.end()) { if (time(NULL) - 20 > dbf->second.FailureTime) { @@ -140,23 +166,27 @@ RdbmsDataSource::connectTo(const ConnectionInfo & dsn) throw dbf->second; } } - DBHosts::const_iterator dbi = dbhosts.find(dsn); - if (dbi != dbhosts.end()) { - try { - dbi->second->connection->ping(); - dbi->second->touch(); - return dbi->second; - } - catch (...) { - // Connection in failed state - Logger()->messagef(LOG_DEBUG, "%s: Cached connection failed", __PRETTY_FUNCTION__); + auto dbis = dbhosts.equal_range(dsn); + for (auto dbi = dbis.first; dbi != dbis.second; dbi++) { + if (!dbi->second->threadId || *dbi->second->threadId == std::this_thread::get_id()) { + try { + dbi->second->connection->ping(); + dbi->second->threadId = std::this_thread::get_id(); + dbi->second->touch(); + return dbi->second; + } + catch (...) { + // Connection in failed state + Logger()->messagef(LOG_DEBUG, "%s: Cached connection failed", __PRETTY_FUNCTION__); + } } } try { ConnectionPtr db = ConnectionPtr(new RdbmsConnection(dsn.connect(), 300)); - dbhosts[dsn] = db; + db->threadId = std::this_thread::get_id(); db->touch(); + dbhosts.insert({dsn, db}); return db; } catch (const DB::ConnectionError & e) { @@ -171,6 +201,7 @@ RdbmsDataSource::RdbmsConnection::RdbmsConnection(const DB::Connection * con, ti connection(con), txOpen(false), lastUsedTime(0), + users(0), keepAliveTime(kat) { } @@ -187,10 +218,25 @@ RdbmsDataSource::RdbmsConnection::touch() const time(&lastUsedTime); } +void +RdbmsDataSource::RdbmsConnection::incRef() +{ + users += 1; +} + +void +RdbmsDataSource::RdbmsConnection::decRef() +{ + users -= 1; + if (users == 0 && !txOpen) { + threadId.reset(); + } +} + bool RdbmsDataSource::RdbmsConnection::isExpired() const { - return (time(NULL) > lastUsedTime + keepAliveTime); + return ((time(NULL) > lastUsedTime + keepAliveTime) && (users == 0)); } RdbmsDataSource::ConnectionInfo::ConnectionInfo(ScriptNodePtr node) : @@ -211,3 +257,55 @@ RdbmsDataSource::ConnectionInfo::operator<(const RdbmsDataSource::ConnectionInfo return ((typeId < other.typeId) || ((typeId == other.typeId) && (dsn < other.dsn))); } +RdbmsDataSource::ConnectionRef::ConnectionRef() : + conn(NULL) +{ +} + +RdbmsDataSource::ConnectionRef::ConnectionRef(RdbmsConnection * c) : + conn(c) +{ + if (conn) + conn->incRef(); +} + +RdbmsDataSource::ConnectionRef::ConnectionRef(const ConnectionRef & ref) : + conn(ref.conn) +{ + if (conn) + conn->incRef(); +} + +RdbmsDataSource::ConnectionRef::~ConnectionRef() +{ + if (conn) + conn->decRef(); +} + +void RdbmsDataSource::ConnectionRef::operator=(const ConnectionRef & ref) +{ + if (conn) + conn->decRef(); + conn = ref.conn; + if (conn) + conn->incRef(); +} + +const DB::Connection * +RdbmsDataSource::ConnectionRef::operator->() const +{ + return conn->connection; +} + +const DB::Connection & +RdbmsDataSource::ConnectionRef::operator*() const +{ + return *conn->connection; +} + +const DB::Connection * +RdbmsDataSource::ConnectionRef::get() const +{ + return conn->connection; +} + |