From 0a8131b39c16105e05189756a7353779633c9c3c Mon Sep 17 00:00:00 2001 From: randomdan Date: Tue, 20 Mar 2012 01:24:43 +0000 Subject: A stream interface, an RDBMS bulk load interface, a decompression layer, an implementation of CURL streams and a sqlbulkload task. --- project2/Jamfile.jam | 1 + project2/common/stream.h | 16 ++++++++++ project2/compression/Jamfile.jam | 17 ++++++++++ project2/compression/z.cpp | 67 ++++++++++++++++++++++++++++++++++++++++ project2/sql/Jamfile.jam | 3 +- project2/sql/sqlBulkLoad.cpp | 40 ++++++++++++++++++++++++ project2/url/urlStream.cpp | 24 ++++++++++++++ 7 files changed, 166 insertions(+), 2 deletions(-) create mode 100644 project2/common/stream.h create mode 100644 project2/compression/Jamfile.jam create mode 100644 project2/compression/z.cpp create mode 100644 project2/sql/sqlBulkLoad.cpp create mode 100644 project2/url/urlStream.cpp diff --git a/project2/Jamfile.jam b/project2/Jamfile.jam index fd4de5d..641c6a2 100644 --- a/project2/Jamfile.jam +++ b/project2/Jamfile.jam @@ -14,6 +14,7 @@ alias p2parts : : : : regex//p2regex xml//p2xml json//p2json + compression//p2compression ; project diff --git a/project2/common/stream.h b/project2/common/stream.h new file mode 100644 index 0000000..14293f0 --- /dev/null +++ b/project2/common/stream.h @@ -0,0 +1,16 @@ +#ifndef STREAM_H +#define STREAM_H + +#include "sourceObject.h" +#include + +class Stream : public SourceObject { + public: + template Stream(const X &... x) : SourceObject(x...) { } + typedef boost::function2 Sink; + virtual void runStream(const Sink &) const = 0; +}; +typedef boost::intrusive_ptr StreamPtr; + +#endif + diff --git a/project2/compression/Jamfile.jam b/project2/compression/Jamfile.jam new file mode 100644 index 0000000..99d3b75 --- /dev/null +++ b/project2/compression/Jamfile.jam @@ -0,0 +1,17 @@ +alias glibmm : : : : + "`pkg-config --cflags glibmm-2.4`" + "`pkg-config --libs glibmm-2.4`" + ; + +lib libz : : z ; + +lib p2compression : + [ glob *.cpp ] + : + ../../libmisc + glibmm + libz + ../common//p2common + ../files//p2files + ; + diff --git a/project2/compression/z.cpp b/project2/compression/z.cpp new file mode 100644 index 0000000..7d5d06b --- /dev/null +++ b/project2/compression/z.cpp @@ -0,0 +1,67 @@ +#include "stream.h" +#include "scriptLoader.h" +#include "scripts.h" +#include "scopeObject.h" +#include "scriptStorage.h" +#include + +class DecompressStream : public Stream { + public: + DecompressStream(ScriptNodePtr p) : + Stream(p), + method(p, "method") + { + p->script->loader.addLoadTarget(p, Storer::into(&stream)); + } + + void runStream(const Sink & sink) const + { + unsigned have; + z_stream strm; + unsigned char out[BUFSIZ]; + + /* allocate inflate state */ + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + strm.avail_in = 0; + strm.next_in = Z_NULL; + int ret = inflateInit2(&strm, 16+MAX_WBITS); + if (ret != Z_OK) + throw std::runtime_error("inflateInit failed"); + ScopeObject tidy([&]{ inflateEnd(&strm); }); + + /* decompress until deflate stream ends or end of file */ + stream->runStream([&](const char * data, size_t len) -> size_t { + strm.avail_in = len; + strm.next_in = (unsigned char *)data; + + /* run inflate() on input until output buffer not full */ + do { + strm.avail_out = BUFSIZ; + strm.next_out = out; + ret = inflate(&strm, Z_NO_FLUSH); + assert(ret != Z_STREAM_ERROR); /* state not clobbered */ + switch (ret) { + case Z_NEED_DICT: + ret = Z_DATA_ERROR; /* and fall through */ + case Z_DATA_ERROR: + case Z_MEM_ERROR: + throw std::runtime_error("inflate failed"); + } + have = BUFSIZ - strm.avail_out; + sink(reinterpret_cast(out), have); + } while (strm.avail_out == 0); + + return len; }); + + if (ret != Z_STREAM_END) { + throw std::runtime_error("decompression of stream failed"); + } + } + + StreamPtr stream; + const Variable method; +}; + +DECLARE_LOADER("decompstream", DecompressStream); diff --git a/project2/sql/Jamfile.jam b/project2/sql/Jamfile.jam index 3e63abb..cd6b6a5 100644 --- a/project2/sql/Jamfile.jam +++ b/project2/sql/Jamfile.jam @@ -32,8 +32,7 @@ cpp-pch pch : pch.hpp : ; lib p2sql : pch - sqlTest.cpp sqlWriter.cpp sqlTask.cpp sqlMergeTask.cpp sqlRows.cpp sqlCache.cpp sqlVariableBinder.cpp tablepatch.cpp rdbmsDataSource.cpp - sqlHandleAsVariableType.cpp + [ glob *.cpp : sql-mod*.cpp ] ../../libdbpp//dbpp : yes:sql-modODBC diff --git a/project2/sql/sqlBulkLoad.cpp b/project2/sql/sqlBulkLoad.cpp new file mode 100644 index 0000000..227f06c --- /dev/null +++ b/project2/sql/sqlBulkLoad.cpp @@ -0,0 +1,40 @@ +#include +#include "task.h" +#include "scopeObject.h" +#include "stream.h" + +class SqlBulkLoad : public Task { + public: + SqlBulkLoad(ScriptNodePtr p) : + SourceObject(p), + Task(p), + dataSource(p, "datasource"), + targetTable(p, "targettable"), + extras(p, "extras") + { + p->script->loader.addLoadTarget(p, Storer::into(&stream)); + } + + void loadComplete(const CommonObjects * co) + { + db = co->dataSource(dataSource()); + } + + void execute() const + { + db->getWritable().beginBulkUpload(targetTable(), extras()); + ScopeObject tidy([]{}, + [=]{ db->getWritable().endBulkUpload(NULL); }, + [=]{ db->getWritable().endBulkUpload("Stack unwind in progress"); }); + stream->runStream(boost::bind(&DB::Connection::bulkUploadData, &db->getWritable(), _1, _2)); + } + + const Variable dataSource; + const Variable targetTable; + const Variable extras; + StreamPtr stream; + protected: + const RdbmsDataSource * db; +}; + +DECLARE_LOADER("sqlbulkload", SqlBulkLoad); diff --git a/project2/url/urlStream.cpp b/project2/url/urlStream.cpp new file mode 100644 index 0000000..3e1a439 --- /dev/null +++ b/project2/url/urlStream.cpp @@ -0,0 +1,24 @@ +#include "curlHelper.h" +#include "stream.h" +#include "scriptLoader.h" +#include "scripts.h" +#include "../libmisc/curlsup.h" + +/// Project2 component to create a row set from the contents of a file accessible via libcurl +class UrlStream : public Stream, VariableCurlHelper { + public: + UrlStream(ScriptNodePtr p) : + Stream(p), + VariableCurlHelper(p) + { + } + + void runStream(const Sink & sink) const + { + CurlPtr c = newCurl(); + c->performRead(sink); + } +}; + +DECLARE_LOADER("urlstream", UrlStream); + -- cgit v1.2.3