From df70cd1da9537e0af3e8f62062a01d1ba9bd7160 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Sun, 4 Jul 2021 19:27:32 +0100 Subject: Support bulk upload to PQ --- lib/output/pq/pqConn.cpp | 36 ++++++++++++++++++++++++++++++++++++ lib/output/pq/pqConn.h | 4 ++++ 2 files changed, 40 insertions(+) diff --git a/lib/output/pq/pqConn.cpp b/lib/output/pq/pqConn.cpp index 76a3a75..53fd3db 100644 --- a/lib/output/pq/pqConn.cpp +++ b/lib/output/pq/pqConn.cpp @@ -1,6 +1,7 @@ #include "pqConn.h" #include "pqBindings.h" #include "pqStmt.h" +#include #include #include #include @@ -62,6 +63,41 @@ namespace MyGrate::Output::Pq { query("ROLLBACK"); } + FILE * + PqConn::beginBulkUpload(const char * schema, const char * table) + { + ResPtr res {PQexec(conn.get(), scprintf<"COPY %?.%? FROM STDIN">(schema, table).c_str()), &PQclear}; + verify(PQresultStatus(res.get()) == PGRES_COPY_IN, "begin copy", res.get()); + return fopencookie(this, "w", + {nullptr, + [](void * cookie, const char * buf, size_t size) -> ssize_t { + auto pq = static_cast(cookie); + int rc; + while (!(rc = PQputCopyData(pq->conn.get(), buf, size))) { + sleep(1); + } + verify(rc == 1, "copy data", pq->conn.get()); + return size; + }, + nullptr, + [](void * cookie) -> int { + static_cast(cookie)->endBulkUpload(nullptr); + return 0; + }}); + } + + void + PqConn::endBulkUpload(const char * errormsg) + { + int rc; + while (!(rc = PQputCopyEnd(this->conn.get(), errormsg))) { + sleep(1); + } + verify(rc == 1, "copy end rc", conn.get()); + ResPtr res {PQgetResult(conn.get()), &PQclear}; + verify(PQresultStatus(res.get()) == PGRES_COMMAND_OK, "end copy", res.get()); + } + void PqConn::notice_processor(void * p, const char * n) { diff --git a/lib/output/pq/pqConn.h b/lib/output/pq/pqConn.h index 3b27f97..6ee6dc0 100644 --- a/lib/output/pq/pqConn.h +++ b/lib/output/pq/pqConn.h @@ -2,6 +2,7 @@ #define MYGRATE_OUTPUT_PQ_PQCONN_H #include +#include #include #include #include @@ -36,6 +37,9 @@ namespace MyGrate::Output::Pq { void commitTx() override; void rollbackTx() override; + FILE * beginBulkUpload(const char * schema, const char * table); + void endBulkUpload(const char * errormsg); + const std::string connstr; private: -- cgit v1.2.3