From 5b78629137fd85b989933b477277d9cdcbb771ae Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Sun, 10 Jan 2016 17:11:48 +0000 Subject: Simple wrappers for bulk uploading from a std::stream or FILE * --- libdbpp/connection.cpp | 30 ++++++++++++++++++++++++++++++ libdbpp/connection.h | 4 ++++ libdbpp/unittests/testUtils.cpp | 26 ++++++++++++++++++++++++++ libdbpp/unittests/util.sql | 2 ++ 4 files changed, 62 insertions(+) diff --git a/libdbpp/connection.cpp b/libdbpp/connection.cpp index 33ae543..fb6be71 100644 --- a/libdbpp/connection.cpp +++ b/libdbpp/connection.cpp @@ -150,6 +150,36 @@ DB::Connection::bulkUploadData(const char *, size_t) const throw DB::BulkUploadNotSupported(); } +size_t +DB::Connection::bulkUploadData(std::istream & in) const +{ + if (!in.good()) throw std::runtime_error("Input stream is not good"); + char buf[BUFSIZ]; + size_t total = 0; + for (std::streamsize r; (r = in.readsome(buf, sizeof(buf))) > 0; ) { + bulkUploadData(buf, r); + total += r; + } + return total; +} + +size_t +DB::Connection::bulkUploadData(FILE * in) const +{ + if (!in) throw std::runtime_error("Input file handle is null"); + char buf[BUFSIZ]; + size_t total = 0, r; + while ((r = fread(buf, 1, sizeof(buf), in)) > 0) { + bulkUploadData(buf, r); + total += r; + } + if ((int)r < 0) { + throw std::system_error(-r, std::system_category()); + } + return total; + +} + boost::optional DB::Connection::resolvePlugin(const std::type_info &, const std::string & name) { diff --git a/libdbpp/connection.h b/libdbpp/connection.h index 0c03f0b..5beb80f 100644 --- a/libdbpp/connection.h +++ b/libdbpp/connection.h @@ -132,6 +132,10 @@ namespace DB { virtual void endBulkUpload(const char *); /// Load data for the current bulk load operation. virtual size_t bulkUploadData(const char *, size_t) const; + /// Load bulk data from a file (wrapper) + size_t bulkUploadData(std::istream &) const; + /// Load bulk data from a file (wrapper) + size_t bulkUploadData(FILE *) const; /// Return the Id used in the last insert virtual int64_t insertId(); diff --git a/libdbpp/unittests/testUtils.cpp b/libdbpp/unittests/testUtils.cpp index 3108641..4c269e9 100644 --- a/libdbpp/unittests/testUtils.cpp +++ b/libdbpp/unittests/testUtils.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -102,3 +103,28 @@ BOOST_AUTO_TEST_CASE( extract ) BOOST_REQUIRE(!sel->fetch()); } +BOOST_AUTO_TEST_CASE( bulkLoadStream ) +{ + std::ifstream in((rootDir / "source.dat").string()); + auto db = DB::ConnectionPtr(DB::MockDatabase::openConnectionTo("pqmock")); + db->beginBulkUpload("bulk1", ""); + BOOST_REQUIRE_EQUAL(56, db->bulkUploadData(in)); + db->endBulkUpload(nullptr); + db->select("SELECT COUNT(*) FROM bulk1")->forEachRow([](auto n) { + BOOST_REQUIRE_EQUAL(4, n); + }); +} + +BOOST_AUTO_TEST_CASE( bulkLoadFile ) +{ + auto f = fopen((rootDir / "source.dat").c_str(), "r"); + auto db = DB::ConnectionPtr(DB::MockDatabase::openConnectionTo("pqmock")); + db->beginBulkUpload("bulk2", ""); + BOOST_REQUIRE_EQUAL(56, db->bulkUploadData(f)); + db->endBulkUpload(nullptr); + fclose(f); + db->select("SELECT COUNT(*) FROM bulk2")->forEachRow([](auto n) { + BOOST_REQUIRE_EQUAL(4, n); + }); +} + diff --git a/libdbpp/unittests/util.sql b/libdbpp/unittests/util.sql index 13391ea..73a9a08 100644 --- a/libdbpp/unittests/util.sql +++ b/libdbpp/unittests/util.sql @@ -8,3 +8,5 @@ CREATE TABLE foreachrow ( INSERT INTO foreachrow(a, b, c, d, e, f) VALUES(1, 4.3, 'Some text', '2015-11-07 13:39:17', '04:03:02', true); INSERT INTO foreachrow(a, b, c, f) VALUES(2, 4.3, 'Some text', false); +CREATE TABLE bulk1(a int, b int, c text, d text); +CREATE TABLE bulk2(a int, b int, c text, d text); -- cgit v1.2.3