From 27603486be2576b1b1311d85719d109ed59cd224 Mon Sep 17 00:00:00 2001 From: randomdan Date: Tue, 20 Mar 2012 01:24:43 +0000 Subject: A stream interface, an RDBMS bulk load interface, a decompression layer, an implementation of CURL streams and a sqlbulkload task. --- libpqpp/connection.cpp | 38 ++++++++++++++++++++++++++++++++++++++ libpqpp/connection.h | 4 ++++ 2 files changed, 42 insertions(+) diff --git a/libpqpp/connection.cpp b/libpqpp/connection.cpp index 8bfc2ff..d07b9d2 100644 --- a/libpqpp/connection.cpp +++ b/libpqpp/connection.cpp @@ -130,3 +130,41 @@ PQ::Connection::checkResultFree(PGresult * res, int expected, int alt) const PQclear(res); } +void +PQ::Connection::beginBulkUpload(const char * table, const char * extra) const +{ + char buf[BUFSIZ]; + snprintf(buf, BUFSIZ, "COPY %s FROM STDIN %s", table, extra); + checkResultFree(PQexec(conn, buf), PGRES_COPY_IN); +} + +void +PQ::Connection::endBulkUpload(const char * msg) const +{ + switch (PQputCopyEnd(conn, msg)) { + case 0:// block + sleep(1); + endBulkUpload(msg); + return; + case 1:// success + checkResultFree(PQgetResult(conn), PGRES_COMMAND_OK); + return; + default:// -1 is error + throw Error(PQerrorMessage(conn)); + } +} + +size_t +PQ::Connection::bulkUploadData(const char * data, size_t len) const +{ + switch (PQputCopyData(conn, data, len)) { + case 0:// block + sleep(1); + return bulkUploadData(data, len); + case 1:// success + return len; + default:// -1 is error + throw Error(PQerrorMessage(conn)); + } +} + diff --git a/libpqpp/connection.h b/libpqpp/connection.h index d4cf60a..59c5858 100644 --- a/libpqpp/connection.h +++ b/libpqpp/connection.h @@ -22,6 +22,10 @@ namespace PQ { DB::SelectCommand * newSelectCommand(const std::string & sql) const; DB::ModifyCommand * newModifyCommand(const std::string & sql) const; + void beginBulkUpload(const char *, const char *) const; + void endBulkUpload(const char *) const; + size_t bulkUploadData(const char *, size_t) const; + PGresult * checkResult(PGresult * res, int expected, int alternative = -1) const; void checkResultFree(PGresult * res, int expected, int alternative = -1) const; -- cgit v1.2.3