summaryrefslogtreecommitdiff
path: root/project2/sql/rdbmsDataSource.cpp
diff options
context:
space:
mode:
authorrandomdan <randomdan@localhost>2013-12-24 18:09:08 +0000
committerrandomdan <randomdan@localhost>2013-12-24 18:09:08 +0000
commitfc40b0a163347431650e73d4704a9a33f2a7fee3 (patch)
tree42f80c2f6bd6c0d09a25937c927dc554f172d0b1 /project2/sql/rdbmsDataSource.cpp
parentFix slice scanner and split .ice files back into logical blocks (diff)
downloadproject2-fc40b0a163347431650e73d4704a9a33f2a7fee3.tar.bz2
project2-fc40b0a163347431650e73d4704a9a33f2a7fee3.tar.xz
project2-fc40b0a163347431650e73d4704a9a33f2a7fee3.zip
Manage database connections on a per thread basis
Diffstat (limited to 'project2/sql/rdbmsDataSource.cpp')
-rw-r--r--project2/sql/rdbmsDataSource.cpp162
1 files changed, 130 insertions, 32 deletions
diff --git a/project2/sql/rdbmsDataSource.cpp b/project2/sql/rdbmsDataSource.cpp
index bf69dcd..7965a29 100644
--- a/project2/sql/rdbmsDataSource.cpp
+++ b/project2/sql/rdbmsDataSource.cpp
@@ -8,6 +8,7 @@
#include <boost/foreach.hpp>
SimpleMessageException(UnknownConnectionProvider);
+#define LOCK(l) std::lock_guard<std::mutex> _lock##l(l)
/// Specialized ElementLoader for instances of RdbmsDataSource; handles persistent DB connections
class RdbmsDataSourceLoader : public ElementLoader::For<RdbmsDataSource> {
@@ -38,7 +39,8 @@ DECLARE_CUSTOM_LOADER("rdbmsdatasource", RdbmsDataSourceLoader);
RdbmsDataSource::DBHosts RdbmsDataSource::dbhosts;
RdbmsDataSource::FailedHosts RdbmsDataSource::failedhosts;
-RdbmsDataSource::DSNSet RdbmsDataSource::changedDSNs;
+RdbmsDataSource::ChangedDSNs RdbmsDataSource::changedDSNs;
+std::mutex RdbmsDataSource::glock;
RdbmsDataSource::RdbmsDataSource(ScriptNodePtr p) :
DataSource(p),
@@ -54,24 +56,31 @@ RdbmsDataSource::~RdbmsDataSource()
{
}
-const DB::Connection &
+RdbmsDataSource::ConnectionRef
RdbmsDataSource::getWritable() const
{
+ LOCK(ilock);
ConnectionPtr master = connectTo(masterDsn);
if (!master->txOpen) {
master->connection->beginTx();
master->txOpen = true;
}
- changedDSNs.insert(name);
- return *master->connection;
+ LOCK(glock);
+ changedDSNs.insert({name, std::this_thread::get_id()});
+ return master.get();
}
-const DB::Connection &
+RdbmsDataSource::ConnectionRef
RdbmsDataSource::getReadonly() const
{
- if (changedDSNs.find(name) != changedDSNs.end()) {
- return *connectTo(masterDsn)->connection;
+ {
+ LOCK(glock);
+ if (changedDSNs.find({name, std::this_thread::get_id()}) != changedDSNs.end()) {
+ glock.unlock();
+ return connectTo(masterDsn).get();
+ }
}
+ LOCK(ilock);
if (localhost.length() == 0 && preferLocal) {
struct utsname name;
if (uname(&name)) {
@@ -89,9 +98,9 @@ RdbmsDataSource::getReadonly() const
if (ro == roDSNs.end()) {
Logger()->messagef(LOG_INFO, "%s: No database host matches local host name (%s) Will use master DSN",
__PRETTY_FUNCTION__, localhost.c_str());
- return *connectTo(masterDsn)->connection;
+ return connectTo(masterDsn).get();
}
- return *connectTo(ro->second)->connection;
+ return connectTo(ro->second).get();
}
catch (...) {
// Failed to connect to a preferred DB... carry on and try the others...
@@ -99,38 +108,55 @@ RdbmsDataSource::getReadonly() const
}
BOOST_FOREACH(ReadonlyDSNs::value_type db, roDSNs) {
try {
- return *connectTo(db.second)->connection;
+ return connectTo(db.second).get();
}
catch (...) {
}
}
- return *connectTo(masterDsn)->connection;
+ return connectTo(masterDsn).get();
}
void
RdbmsDataSource::commit()
{
- DBHosts::const_iterator m = dbhosts.find(masterDsn);
- if (m != dbhosts.end() && m->second->txOpen) {
- m->second->connection->commitTx();
- m->second->txOpen = false;
+ LOCK(ilock);
+ LOCK(glock);
+ auto masters = dbhosts.equal_range(masterDsn);
+ for (auto m = masters.first; m != masters.second; m++) {
+ if (m->second->threadId) {
+ }
+ if (m->second->txOpen && m->second->threadId && *m->second->threadId == std::this_thread::get_id()) {
+ m->second->connection->commitTx();
+ m->second->txOpen = false;
+ if (m->second->users == 0) {
+ m->second->threadId.reset();
+ }
+ }
}
}
void
RdbmsDataSource::rollback()
{
- DBHosts::const_iterator m = dbhosts.find(masterDsn);
- if (m != dbhosts.end() && m->second->txOpen) {
- m->second->connection->rollbackTx();
- m->second->txOpen = false;
+ LOCK(ilock);
+ LOCK(glock);
+ auto masters = dbhosts.equal_range(masterDsn);
+ for (auto m = masters.first; m != masters.second; m++) {
+ if (m->second->txOpen && m->second->threadId && *m->second->threadId == std::this_thread::get_id()) {
+ m->second->connection->rollbackTx();
+ m->second->txOpen = false;
+ if (m->second->users == 0) {
+ m->second->threadId.reset();
+ }
+ }
}
- changedDSNs.erase(name);
+ changedDSNs.erase({name, std::this_thread::get_id()});
}
RdbmsDataSource::ConnectionPtr
RdbmsDataSource::connectTo(const ConnectionInfo & dsn)
{
+ LOCK(glock);
FailedHosts::iterator dbf = failedhosts.find(dsn);
if (dbf != failedhosts.end()) {
if (time(NULL) - 20 > dbf->second.FailureTime) {
@@ -140,23 +166,27 @@ RdbmsDataSource::connectTo(const ConnectionInfo & dsn)
throw dbf->second;
}
}
- DBHosts::const_iterator dbi = dbhosts.find(dsn);
- if (dbi != dbhosts.end()) {
- try {
- dbi->second->connection->ping();
- dbi->second->touch();
- return dbi->second;
- }
- catch (...) {
- // Connection in failed state
- Logger()->messagef(LOG_DEBUG, "%s: Cached connection failed", __PRETTY_FUNCTION__);
+ auto dbis = dbhosts.equal_range(dsn);
+ for (auto dbi = dbis.first; dbi != dbis.second; dbi++) {
+ if (!dbi->second->threadId || *dbi->second->threadId == std::this_thread::get_id()) {
+ try {
+ dbi->second->connection->ping();
+ dbi->second->threadId = std::this_thread::get_id();
+ dbi->second->touch();
+ return dbi->second;
+ }
+ catch (...) {
+ // Connection in failed state
+ Logger()->messagef(LOG_DEBUG, "%s: Cached connection failed", __PRETTY_FUNCTION__);
+ }
}
}
try {
ConnectionPtr db = ConnectionPtr(new RdbmsConnection(dsn.connect(), 300));
- dbhosts[dsn] = db;
+ db->threadId = std::this_thread::get_id();
db->touch();
+ dbhosts.insert({dsn, db});
return db;
}
catch (const DB::ConnectionError & e) {
@@ -171,6 +201,7 @@ RdbmsDataSource::RdbmsConnection::RdbmsConnection(const DB::Connection * con, ti
connection(con),
txOpen(false),
lastUsedTime(0),
+ users(0),
keepAliveTime(kat)
{
}
@@ -187,10 +218,25 @@ RdbmsDataSource::RdbmsConnection::touch() const
time(&lastUsedTime);
}
+void
+RdbmsDataSource::RdbmsConnection::incRef()
+{
+ users += 1;
+}
+
+void
+RdbmsDataSource::RdbmsConnection::decRef()
+{
+ users -= 1;
+ if (users == 0 && !txOpen) {
+ threadId.reset();
+ }
+}
+
bool
RdbmsDataSource::RdbmsConnection::isExpired() const
{
- return (time(NULL) > lastUsedTime + keepAliveTime);
+ return ((time(NULL) > lastUsedTime + keepAliveTime) && (users == 0));
}
RdbmsDataSource::ConnectionInfo::ConnectionInfo(ScriptNodePtr node) :
@@ -211,3 +257,55 @@ RdbmsDataSource::ConnectionInfo::operator<(const RdbmsDataSource::ConnectionInfo
return ((typeId < other.typeId) || ((typeId == other.typeId) && (dsn < other.dsn)));
}
+RdbmsDataSource::ConnectionRef::ConnectionRef() :
+ conn(NULL)
+{
+}
+
+RdbmsDataSource::ConnectionRef::ConnectionRef(RdbmsConnection * c) :
+ conn(c)
+{
+ if (conn)
+ conn->incRef();
+}
+
+RdbmsDataSource::ConnectionRef::ConnectionRef(const ConnectionRef & ref) :
+ conn(ref.conn)
+{
+ if (conn)
+ conn->incRef();
+}
+
+RdbmsDataSource::ConnectionRef::~ConnectionRef()
+{
+ if (conn)
+ conn->decRef();
+}
+
+void RdbmsDataSource::ConnectionRef::operator=(const ConnectionRef & ref)
+{
+ if (conn)
+ conn->decRef();
+ conn = ref.conn;
+ if (conn)
+ conn->incRef();
+}
+
+const DB::Connection *
+RdbmsDataSource::ConnectionRef::operator->() const
+{
+ return conn->connection;
+}
+
+const DB::Connection &
+RdbmsDataSource::ConnectionRef::operator*() const
+{
+ return *conn->connection;
+}
+
+const DB::Connection *
+RdbmsDataSource::ConnectionRef::get() const
+{
+ return conn->connection;
+}
+