diff options
Diffstat (limited to 'libpqpp/pq-connection.cpp')
-rw-r--r-- | libpqpp/pq-connection.cpp | 211 |
1 files changed, 211 insertions, 0 deletions
diff --git a/libpqpp/pq-connection.cpp b/libpqpp/pq-connection.cpp new file mode 100644 index 0000000..db46dc5 --- /dev/null +++ b/libpqpp/pq-connection.cpp @@ -0,0 +1,211 @@ +#include "pq-connection.h" +#include "pq-error.h" +#include "pq-selectcommand.h" +#include "pq-modifycommand.h" +#include <unistd.h> +#include <poll.h> +#include <boost/assert.hpp> + +NAMEDFACTORY("postgresql", PQ::Connection, DB::ConnectionFactory); + +static void setup() __attribute__((constructor(101))); +static void setup() +{ + BOOST_ASSERT(PQisthreadsafe() == 1); + PQinitOpenSSL(1, 0); +} + +static void +noNoticeProcessor(void *, const char *) +{ +} + +PQ::Connection::Connection(const std::string & info) : + conn(PQconnectdb(info.c_str())), + txDepth(0), + pstmntNo(0), + rolledback(false) +{ + if (PQstatus(conn) != CONNECTION_OK) { + ConnectionError ce(conn); + PQfinish(conn); + throw ce; + } + PQsetNoticeProcessor(conn, noNoticeProcessor, NULL); +} + +PQ::Connection::~Connection() +{ + PQfinish(conn); +} + +void +PQ::Connection::finish() const +{ + if (txDepth != 0) { + rollbackTx(); + throw Error("Transaction still open"); + } +} + +int +PQ::Connection::beginTx() const +{ + if (txDepth == 0) { + checkResultFree(PQexec(conn, "BEGIN"), PGRES_COMMAND_OK); + rolledback = false; + } + return ++txDepth; +} + +int +PQ::Connection::commitTx() const +{ + if (rolledback) { + return rollbackTx(); + } + if (--txDepth == 0) { + checkResultFree(PQexec(conn, "COMMIT"), PGRES_COMMAND_OK); + } + return txDepth; +} + +int +PQ::Connection::rollbackTx() const +{ + if (--txDepth == 0) { + checkResultFree(PQexec(conn, "ROLLBACK"), PGRES_COMMAND_OK); + } + else { + rolledback = true; + } + return txDepth; +} + +bool +PQ::Connection::inTx() const +{ + return txDepth; +} + +void +PQ::Connection::execute(const std::string & sql) const +{ + checkResultFree(PQexec(conn, sql.c_str()), PGRES_COMMAND_OK, PGRES_TUPLES_OK); +} + +DB::BulkDeleteStyle +PQ::Connection::bulkDeleteStyle() const +{ + return DB::BulkDeleteUsingSubSelect; +} + +DB::BulkUpdateStyle +PQ::Connection::bulkUpdateStyle() const +{ + return DB::BulkUpdateUsingFromSrc; +} + +void +PQ::Connection::ping() const +{ + struct pollfd fd { PQsocket(conn), POLLRDHUP | POLLERR | POLLHUP | POLLNVAL, 0 }; + if (PQstatus(conn) != CONNECTION_OK || poll(&fd, 1, 1)) { + if (inTx()) { + throw ConnectionError(conn); + } + PQreset(conn); + if (PQstatus(conn) != CONNECTION_OK) { + throw ConnectionError(conn); + } + } +} + + +DB::SelectCommand * +PQ::Connection::newSelectCommand(const std::string & sql) const +{ + return new SelectCommand(this, sql, pstmntNo++); +} + +DB::ModifyCommand * +PQ::Connection::newModifyCommand(const std::string & sql) const +{ + return new ModifyCommand(this, sql, pstmntNo++); +} + +bool +PQ::Connection::checkResultInt(PGresult * res, int expected, int alt) +{ + return (PQresultStatus(res) == expected) || (alt != -1 && (PQresultStatus(res) == alt)); +} + +PGresult * +PQ::Connection::checkResult(PGresult * res, int expected, int alt) const +{ + if (!checkResultInt(res, expected, alt)) { + PQclear(res); + throw Error(PQerrorMessage(conn)); + } + return res; +} + +void +PQ::Connection::checkResultFree(PGresult * res, int expected, int alt) const +{ + if (!checkResultInt(res, expected, alt)) { + PQclear(res); + throw Error(PQerrorMessage(conn)); + } + PQclear(res); +} + +void +PQ::Connection::beginBulkUpload(const char * table, const char * extra) const +{ + char buf[BUFSIZ]; + snprintf(buf, BUFSIZ, "COPY %s FROM STDIN %s", table, extra); + checkResultFree(PQexec(conn, buf), PGRES_COPY_IN); +} + +void +PQ::Connection::endBulkUpload(const char * msg) const +{ + switch (PQputCopyEnd(conn, msg)) { + case 0:// block + sleep(1); + endBulkUpload(msg); + return; + case 1:// success + checkResultFree(PQgetResult(conn), PGRES_COMMAND_OK); + return; + default:// -1 is error + throw Error(PQerrorMessage(conn)); + } +} + +size_t +PQ::Connection::bulkUploadData(const char * data, size_t len) const +{ + switch (PQputCopyData(conn, data, len)) { + case 0:// block + sleep(1); + return bulkUploadData(data, len); + case 1:// success + return len; + default:// -1 is error + throw Error(PQerrorMessage(conn)); + } +} + +int64_t +PQ::Connection::insertId() const +{ + SelectCommand getId(this, "SELECT lastval()", pstmntNo++); + int64_t id = -1; + while (getId.fetch()) { + getId[0] >> id; + } + return id; +} + |