diff options
author | Dan Goodliffe <dan@randomdan.homeip.net> | 2021-07-04 19:27:32 +0100 |
---|---|---|
committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2021-07-04 19:27:32 +0100 |
commit | df70cd1da9537e0af3e8f62062a01d1ba9bd7160 (patch) | |
tree | 495ec810d799f40e0ab68458a3f9a1887f9976e7 /lib/output | |
parent | Get columns and values from cursors (diff) | |
download | mygrate-df70cd1da9537e0af3e8f62062a01d1ba9bd7160.tar.bz2 mygrate-df70cd1da9537e0af3e8f62062a01d1ba9bd7160.tar.xz mygrate-df70cd1da9537e0af3e8f62062a01d1ba9bd7160.zip |
Support bulk upload to PQ
Diffstat (limited to 'lib/output')
-rw-r--r-- | lib/output/pq/pqConn.cpp | 36 | ||||
-rw-r--r-- | lib/output/pq/pqConn.h | 4 |
2 files changed, 40 insertions, 0 deletions
diff --git a/lib/output/pq/pqConn.cpp b/lib/output/pq/pqConn.cpp index 76a3a75..53fd3db 100644 --- a/lib/output/pq/pqConn.cpp +++ b/lib/output/pq/pqConn.cpp @@ -1,6 +1,7 @@ #include "pqConn.h" #include "pqBindings.h" #include "pqStmt.h" +#include <compileTimeFormatter.h> #include <dbConn.h> #include <helpers.h> #include <libpq-fe.h> @@ -62,6 +63,41 @@ namespace MyGrate::Output::Pq { query("ROLLBACK"); } + FILE * + PqConn::beginBulkUpload(const char * schema, const char * table) + { + ResPtr res {PQexec(conn.get(), scprintf<"COPY %?.%? FROM STDIN">(schema, table).c_str()), &PQclear}; + verify<PqErr>(PQresultStatus(res.get()) == PGRES_COPY_IN, "begin copy", res.get()); + return fopencookie(this, "w", + {nullptr, + [](void * cookie, const char * buf, size_t size) -> ssize_t { + auto pq = static_cast<PqConn *>(cookie); + int rc; + while (!(rc = PQputCopyData(pq->conn.get(), buf, size))) { + sleep(1); + } + verify<PqErr>(rc == 1, "copy data", pq->conn.get()); + return size; + }, + nullptr, + [](void * cookie) -> int { + static_cast<PqConn *>(cookie)->endBulkUpload(nullptr); + return 0; + }}); + } + + void + PqConn::endBulkUpload(const char * errormsg) + { + int rc; + while (!(rc = PQputCopyEnd(this->conn.get(), errormsg))) { + sleep(1); + } + verify<PqErr>(rc == 1, "copy end rc", conn.get()); + ResPtr res {PQgetResult(conn.get()), &PQclear}; + verify<PqErr>(PQresultStatus(res.get()) == PGRES_COMMAND_OK, "end copy", res.get()); + } + void PqConn::notice_processor(void * p, const char * n) { diff --git a/lib/output/pq/pqConn.h b/lib/output/pq/pqConn.h index 3b27f97..6ee6dc0 100644 --- a/lib/output/pq/pqConn.h +++ b/lib/output/pq/pqConn.h @@ -2,6 +2,7 @@ #define MYGRATE_OUTPUT_PQ_PQCONN_H #include <cstddef> +#include <cstdio> #include <dbConn.h> #include <functional> #include <initializer_list> @@ -36,6 +37,9 @@ namespace MyGrate::Output::Pq { void commitTx() override; void rollbackTx() override; + FILE * beginBulkUpload(const char * schema, const char * table); + void endBulkUpload(const char * errormsg); + const std::string connstr; private: |