summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrandomdan <randomdan@localhost>2012-03-20 01:24:43 +0000
committerrandomdan <randomdan@localhost>2012-03-20 01:24:43 +0000
commit27603486be2576b1b1311d85719d109ed59cd224 (patch)
tree4b097d215d7b7e12661659226d4c48216e63de37
parentFix double free on cursor error (diff)
downloadlibdbpp-postgresql-27603486be2576b1b1311d85719d109ed59cd224.tar.bz2
libdbpp-postgresql-27603486be2576b1b1311d85719d109ed59cd224.tar.xz
libdbpp-postgresql-27603486be2576b1b1311d85719d109ed59cd224.zip
A stream interface, an RDBMS bulk load interface, a decompression layer, an implementation of CURL streams and a sqlbulkload task.
-rw-r--r--libpqpp/connection.cpp38
-rw-r--r--libpqpp/connection.h4
2 files changed, 42 insertions, 0 deletions
diff --git a/libpqpp/connection.cpp b/libpqpp/connection.cpp
index 8bfc2ff..d07b9d2 100644
--- a/libpqpp/connection.cpp
+++ b/libpqpp/connection.cpp
@@ -130,3 +130,41 @@ PQ::Connection::checkResultFree(PGresult * res, int expected, int alt) const
PQclear(res);
}
+void
+PQ::Connection::beginBulkUpload(const char * table, const char * extra) const
+{
+ char buf[BUFSIZ];
+ snprintf(buf, BUFSIZ, "COPY %s FROM STDIN %s", table, extra);
+ checkResultFree(PQexec(conn, buf), PGRES_COPY_IN);
+}
+
+void
+PQ::Connection::endBulkUpload(const char * msg) const
+{
+ switch (PQputCopyEnd(conn, msg)) {
+ case 0:// block
+ sleep(1);
+ endBulkUpload(msg);
+ return;
+ case 1:// success
+ checkResultFree(PQgetResult(conn), PGRES_COMMAND_OK);
+ return;
+ default:// -1 is error
+ throw Error(PQerrorMessage(conn));
+ }
+}
+
+size_t
+PQ::Connection::bulkUploadData(const char * data, size_t len) const
+{
+ switch (PQputCopyData(conn, data, len)) {
+ case 0:// block
+ sleep(1);
+ return bulkUploadData(data, len);
+ case 1:// success
+ return len;
+ default:// -1 is error
+ throw Error(PQerrorMessage(conn));
+ }
+}
+
diff --git a/libpqpp/connection.h b/libpqpp/connection.h
index d4cf60a..59c5858 100644
--- a/libpqpp/connection.h
+++ b/libpqpp/connection.h
@@ -22,6 +22,10 @@ namespace PQ {
DB::SelectCommand * newSelectCommand(const std::string & sql) const;
DB::ModifyCommand * newModifyCommand(const std::string & sql) const;
+ void beginBulkUpload(const char *, const char *) const;
+ void endBulkUpload(const char *) const;
+ size_t bulkUploadData(const char *, size_t) const;
+
PGresult * checkResult(PGresult * res, int expected, int alternative = -1) const;
void checkResultFree(PGresult * res, int expected, int alternative = -1) const;