summaryrefslogtreecommitdiff
path: root/lib/output/pq
diff options
context:
space:
mode:
Diffstat (limited to 'lib/output/pq')
-rw-r--r--lib/output/pq/pqBindings.h57
-rw-r--r--lib/output/pq/pqConn.cpp73
-rw-r--r--lib/output/pq/pqConn.h14
-rw-r--r--lib/output/pq/pqRecordSet.cpp86
-rw-r--r--lib/output/pq/pqRecordSet.h25
-rw-r--r--lib/output/pq/pqStmt.cpp54
-rw-r--r--lib/output/pq/pqStmt.h37
7 files changed, 285 insertions, 61 deletions
diff --git a/lib/output/pq/pqBindings.h b/lib/output/pq/pqBindings.h
new file mode 100644
index 0000000..ef0df84
--- /dev/null
+++ b/lib/output/pq/pqBindings.h
@@ -0,0 +1,57 @@
+#ifndef MYGRATE_OUTPUT_PQ_PQBINDINGS
+#define MYGRATE_OUTPUT_PQ_PQBINDINGS
+
+#include <dbTypes.h>
+#include <helpers.h>
+#include <initializer_list>
+#include <variant>
+#include <vector>
+
+namespace MyGrate::Output::Pq {
+ struct Bindings {
+ // NOLINTNEXTLINE(hicpp-explicit-conversions)
+ explicit Bindings(const std::initializer_list<DbValue> & vs)
+ {
+ bufs.reserve(vs.size());
+ values.reserve(vs.size());
+ lengths.reserve(vs.size());
+ for (const auto & v : vs) {
+ std::visit(*this, v);
+ }
+ }
+ template<Stringable T>
+ void
+ operator()(const T & v)
+ {
+ bufs.emplace_back(std::to_string(v));
+ const auto & vw {bufs.back()};
+ values.emplace_back(vw.data());
+ lengths.emplace_back(vw.length());
+ }
+ template<Viewable T>
+ void
+ operator()(const T & v)
+ {
+ values.emplace_back(v.data());
+ lengths.emplace_back(v.size());
+ }
+ template<typename T>
+ void
+ operator()(const T &)
+ {
+ throw std::runtime_error("Not implemented");
+ }
+ void
+ operator()(const std::nullptr_t &)
+ {
+ values.emplace_back(nullptr);
+ lengths.emplace_back(0);
+ }
+
+ std::vector<std::string> bufs;
+ std::vector<const char *> values;
+ std::vector<int> lengths;
+ };
+}
+
+#endif
diff --git a/lib/output/pq/pqConn.cpp b/lib/output/pq/pqConn.cpp
index 4f55ba8..81d9610 100644
--- a/lib/output/pq/pqConn.cpp
+++ b/lib/output/pq/pqConn.cpp
@@ -1,89 +1,44 @@
#include "pqConn.h"
+#include "pqBindings.h"
+#include "pqStmt.h"
+#include <dbConn.h>
#include <dbTypes.h>
#include <helpers.h>
#include <libpq-fe.h>
#include <memory>
-#include <sstream>
#include <stdexcept>
#include <string>
-#include <variant>
#include <vector>
namespace MyGrate::Output::Pq {
- using ResPtr = std::unique_ptr<PGresult, decltype(&PQclear)>;
-
- PqConn::PqConn(const char * const str) : conn {PQconnectdb(str)}
+ PqConn::PqConn(const char * const str) : conn {PQconnectdb(str), PQfinish}
{
- verify<std::runtime_error>(PQstatus(conn) == CONNECTION_OK, "Connection failure");
- PQsetNoticeProcessor(conn, notice_processor, this);
- }
-
- PqConn::~PqConn()
- {
- PQfinish(conn);
+ verify<std::runtime_error>(PQstatus(conn.get()) == CONNECTION_OK, "Connection failure");
+ PQsetNoticeProcessor(conn.get(), notice_processor, this);
}
void
PqConn::query(const char * const q)
{
- ResPtr res {PQexec(conn, q), &PQclear};
+ ResPtr res {PQexec(conn.get(), q), &PQclear};
verify<std::runtime_error>(PQresultStatus(res.get()) == PGRES_COMMAND_OK, q);
}
- struct Bindings {
- // NOLINTNEXTLINE(hicpp-explicit-conversions)
- explicit Bindings(const std::initializer_list<DbValue> & vs)
- {
- bufs.reserve(vs.size());
- values.reserve(vs.size());
- lengths.reserve(vs.size());
- for (const auto & v : vs) {
- std::visit(*this, v);
- }
- }
- template<Stringable T>
- void
- operator()(const T & v)
- {
- bufs.emplace_back(std::to_string(v));
- const auto & vw {bufs.back()};
- values.emplace_back(vw.data());
- lengths.emplace_back(vw.length());
- }
- template<Viewable T>
- void
- operator()(const T & v)
- {
- values.emplace_back(v.data());
- lengths.emplace_back(v.size());
- }
- template<typename T>
- void
- operator()(const T &)
- {
- throw std::runtime_error("Not implemented");
- }
- void
- operator()(const std::nullptr_t &)
- {
- values.emplace_back(nullptr);
- lengths.emplace_back(0);
- }
-
- std::vector<std::string> bufs;
- std::vector<const char *> values;
- std::vector<int> lengths;
- };
-
void
PqConn::query(const char * const q, const std::initializer_list<DbValue> & vs)
{
Bindings b {vs};
- ResPtr res {PQexecParams(conn, q, (int)vs.size(), nullptr, b.values.data(), b.lengths.data(), nullptr, 0),
+ ResPtr res {PQexecParams(conn.get(), q, (int)vs.size(), nullptr, b.values.data(), b.lengths.data(), nullptr, 0),
&PQclear};
verify<std::runtime_error>(PQresultStatus(res.get()) == PGRES_COMMAND_OK, q);
}
+ DbPrepStmtPtr
+ PqConn::prepare(const char * const q, std::size_t n)
+ {
+ return std::make_unique<PqPrepStmt>(q, n, this);
+ }
+
void
PqConn::notice_processor(void * p, const char * n)
{
diff --git a/lib/output/pq/pqConn.h b/lib/output/pq/pqConn.h
index 613af6f..856683d 100644
--- a/lib/output/pq/pqConn.h
+++ b/lib/output/pq/pqConn.h
@@ -1,25 +1,35 @@
#ifndef MYGRATE_OUTPUT_PQ_PQCONN_H
#define MYGRATE_OUTPUT_PQ_PQCONN_H
+#include <cstddef>
#include <dbConn.h>
#include <dbTypes.h>
+#include <functional>
#include <initializer_list>
#include <libpq-fe.h>
+#include <map>
+#include <memory>
+#include <string>
namespace MyGrate::Output::Pq {
class PqConn : public DbConn {
public:
explicit PqConn(const char * const str);
- virtual ~PqConn();
+ virtual ~PqConn() = default;
void query(const char * const) override;
void query(const char * const, const std::initializer_list<DbValue> &) override;
+ DbPrepStmtPtr prepare(const char * const, std::size_t nParams) override;
+
private:
static void notice_processor(void *, const char *);
virtual void notice_processor(const char *) const;
- PGconn * const conn;
+ std::unique_ptr<PGconn, decltype(&PQfinish)> const conn;
+
+ friend class PqPrepStmt;
+ std::map<std::string, std::string, std::less<>> stmts;
};
}
diff --git a/lib/output/pq/pqRecordSet.cpp b/lib/output/pq/pqRecordSet.cpp
new file mode 100644
index 0000000..71ddee4
--- /dev/null
+++ b/lib/output/pq/pqRecordSet.cpp
@@ -0,0 +1,86 @@
+#include "pqRecordSet.h"
+#include "dbTypes.h"
+#include "pqStmt.h"
+#include <cstddef>
+#include <cstdint>
+#include <cstdlib>
+#include <ctime>
+#include <helpers.h>
+#include <libpq-fe.h>
+#include <server/catalog/pg_type_d.h>
+#include <stdexcept>
+#include <string_view>
+#include <utility>
+
+namespace MyGrate::Output::Pq {
+ PqRecordSet::PqRecordSet(ResPtr r) : res {std::move(r)} { }
+
+ std::size_t
+ PqRecordSet::rows() const
+ {
+ return PQntuples(res.get());
+ }
+
+ std::size_t
+ PqRecordSet::columns() const
+ {
+ return PQnfields(res.get());
+ }
+
+ DbValue
+ PqRecordSet::at(std::size_t row, std::size_t col) const
+ {
+ if (PQgetisnull(res.get(), (int)row, (int)col)) {
+ return nullptr;
+ }
+ const auto value {PQgetvalue(res.get(), (int)row, (int)col)};
+ const auto size {static_cast<size_t>(PQgetlength(res.get(), (int)row, (int)col))};
+ const auto type {PQftype(res.get(), (int)col)};
+ switch (type) {
+ // case BITOID: TODO bool
+ // case BOOLOID: TODO bool
+ // case BOOLARRAYOID:
+ case VARBITOID:
+ case BYTEAOID:
+ // This is wrong :)
+ return Blob {reinterpret_cast<const std::byte *>(value), size};
+ case INT2OID:
+ return static_cast<int16_t>(std::strtol(value, nullptr, 10));
+ case INT4OID:
+ return static_cast<int32_t>(std::strtol(value, nullptr, 10));
+ case INT8OID:
+ return std::strtol(value, nullptr, 10);
+ case FLOAT4OID:
+ return std::strtof(value, nullptr);
+ case FLOAT8OID:
+ case CASHOID:
+ case NUMERICOID:
+ return std::strtod(value, nullptr);
+ case DATEOID: {
+ tm tm {};
+ const auto end = strptime(value, "%F", &tm);
+ verify<std::runtime_error>(end && !*end, "Invalid date string");
+ return Date {tm};
+ }
+ case TIMEOID: {
+ tm tm {};
+ const auto end = strptime(value, "%T", &tm);
+ verify<std::runtime_error>(end && !*end, "Invalid time string");
+ return Time {tm};
+ }
+ case TIMESTAMPOID: {
+ tm tm {};
+ const auto end = strptime(value, "%FT%T", &tm);
+ verify<std::runtime_error>(end && !*end, "Invalid timestamp string");
+ return DateTime {tm};
+ }
+ // case TIMESTAMPTZOID: Maybe add TZ support?
+ // case INTERVALOID: Maybe add interval support?
+ // case TIMETZOID: Maybe add TZ support?
+ case VOIDOID:
+ return nullptr;
+ default:
+ return std::string_view {value, size};
+ }
+ }
+}
diff --git a/lib/output/pq/pqRecordSet.h b/lib/output/pq/pqRecordSet.h
new file mode 100644
index 0000000..2934d84
--- /dev/null
+++ b/lib/output/pq/pqRecordSet.h
@@ -0,0 +1,25 @@
+#ifndef MYGRATE_OUTPUT_PQ_PQRECORDSET_H
+#define MYGRATE_OUTPUT_PQ_PQRECORDSET_H
+
+#include "dbRecordSet.h"
+#include "dbTypes.h"
+#include "pqStmt.h"
+#include <cstddef>
+
+namespace MyGrate::Output::Pq {
+ class PqRecordSet : public RecordSet {
+ public:
+ explicit PqRecordSet(ResPtr r);
+
+ std::size_t rows() const override;
+
+ std::size_t columns() const override;
+
+ DbValue at(std::size_t row, std::size_t col) const override;
+
+ private:
+ ResPtr res;
+ };
+}
+
+#endif
diff --git a/lib/output/pq/pqStmt.cpp b/lib/output/pq/pqStmt.cpp
new file mode 100644
index 0000000..04b48c6
--- /dev/null
+++ b/lib/output/pq/pqStmt.cpp
@@ -0,0 +1,54 @@
+#include "pqStmt.h"
+#include "libpq-fe.h"
+#include "pqBindings.h"
+#include "pqConn.h"
+#include "pqRecordSet.h"
+#include <compileTimeFormatter.h>
+#include <cstdlib>
+#include <functional>
+#include <helpers.h>
+#include <map>
+#include <stdexcept>
+#include <utility>
+#include <vector>
+
+namespace MyGrate::Output::Pq {
+ PqPrepStmt::PqPrepStmt(const char * const q, std::size_t n, PqConn * c) :
+ conn {c->conn.get()}, name {prepareAsNeeded(q, n, c)}, res {nullptr, nullptr}
+ {
+ }
+
+ void
+ PqPrepStmt::execute(const std::initializer_list<DbValue> & vs)
+ {
+ Bindings b {vs};
+ res = {PQexecPrepared(conn, name.c_str(), (int)vs.size(), b.values.data(), b.lengths.data(), nullptr, 0),
+ &PQclear};
+ verify<std::runtime_error>(
+ PQresultStatus(res.get()) == PGRES_COMMAND_OK || PQresultStatus(res.get()) == PGRES_TUPLES_OK, name);
+ }
+
+ std::size_t
+ PqPrepStmt::rows() const
+ {
+ return std::strtoul(PQcmdTuples(res.get()), nullptr, 10);
+ }
+
+ RecordSetPtr
+ PqPrepStmt::recordSet()
+ {
+ return std::make_unique<PqRecordSet>(std::move(res));
+ }
+
+ std::string
+ PqPrepStmt::prepareAsNeeded(const char * const q, std::size_t n, PqConn * c)
+ {
+ if (const auto i = c->stmts.find(q); i != c->stmts.end()) {
+ return i->second;
+ }
+ auto nam {AdHoc::scprintf<"pst%0x">(c->stmts.size())};
+ ResPtr res {PQprepare(c->conn.get(), nam.c_str(), q, (int)n, nullptr), PQclear};
+ verify<std::runtime_error>(PQresultStatus(res.get()) == PGRES_COMMAND_OK, q);
+ return c->stmts.emplace(q, std::move(nam)).first->second;
+ }
+}
diff --git a/lib/output/pq/pqStmt.h b/lib/output/pq/pqStmt.h
new file mode 100644
index 0000000..b806617
--- /dev/null
+++ b/lib/output/pq/pqStmt.h
@@ -0,0 +1,37 @@
+#ifndef MYGRATE_OUTPUT_PQ_PQSTMT_H
+#define MYGRATE_OUTPUT_PQ_PQSTMT_H
+
+#include "dbConn.h"
+#include "dbRecordSet.h"
+#include "dbTypes.h"
+#include <cstddef>
+#include <initializer_list>
+#include <libpq-fe.h>
+#include <memory>
+#include <string>
+
+namespace MyGrate::Output::Pq {
+ class PqConn;
+
+ using ResPtr = std::unique_ptr<PGresult, decltype(&PQclear)>;
+
+ class PqPrepStmt : public DbPrepStmt {
+ public:
+ PqPrepStmt(const char * const q, std::size_t n, PqConn * c);
+
+ void execute(const std::initializer_list<DbValue> & vs) override;
+
+ std::size_t rows() const override;
+
+ RecordSetPtr recordSet() override;
+
+ private:
+ static std::string prepareAsNeeded(const char * const q, std::size_t n, PqConn * c);
+
+ PGconn * conn;
+ std::string name;
+ ResPtr res;
+ };
+}
+
+#endif