diff options
author | randomdan <randomdan@localhost> | 2012-03-20 01:24:43 +0000 |
---|---|---|
committer | randomdan <randomdan@localhost> | 2012-03-20 01:24:43 +0000 |
commit | 409e65b49cc1d76a34d18441a4ba16ebfa26c464 (patch) | |
tree | ca77f5701731db21cf8e149740d6a2889be77315 | |
parent | Add a task to download a URL to a file (diff) | |
download | project2-409e65b49cc1d76a34d18441a4ba16ebfa26c464.tar.bz2 project2-409e65b49cc1d76a34d18441a4ba16ebfa26c464.tar.xz project2-409e65b49cc1d76a34d18441a4ba16ebfa26c464.zip |
A stream interface, an RDBMS bulk load interface, a decompression layer, an implementation of CURL streams and a sqlbulkload task.
-rw-r--r-- | project2/Jamfile.jam | 1 | ||||
-rw-r--r-- | project2/common/stream.h | 16 | ||||
-rw-r--r-- | project2/compression/Jamfile.jam | 17 | ||||
-rw-r--r-- | project2/compression/z.cpp | 67 | ||||
-rw-r--r-- | project2/sql/Jamfile.jam | 3 | ||||
-rw-r--r-- | project2/sql/sqlBulkLoad.cpp | 40 | ||||
-rw-r--r-- | project2/url/urlStream.cpp | 24 |
7 files changed, 166 insertions, 2 deletions
diff --git a/project2/Jamfile.jam b/project2/Jamfile.jam index fd4de5d..641c6a2 100644 --- a/project2/Jamfile.jam +++ b/project2/Jamfile.jam @@ -14,6 +14,7 @@ alias p2parts : : : : <library>regex//p2regex <library>xml//p2xml <library>json//p2json + <library>compression//p2compression ; project diff --git a/project2/common/stream.h b/project2/common/stream.h new file mode 100644 index 0000000..14293f0 --- /dev/null +++ b/project2/common/stream.h @@ -0,0 +1,16 @@ +#ifndef STREAM_H +#define STREAM_H + +#include "sourceObject.h" +#include <boost/function.hpp> + +class Stream : public SourceObject { + public: + template<typename... X> Stream(const X &... x) : SourceObject(x...) { } + typedef boost::function2<size_t, const char *, size_t> Sink; + virtual void runStream(const Sink &) const = 0; +}; +typedef boost::intrusive_ptr<Stream> StreamPtr; + +#endif + diff --git a/project2/compression/Jamfile.jam b/project2/compression/Jamfile.jam new file mode 100644 index 0000000..99d3b75 --- /dev/null +++ b/project2/compression/Jamfile.jam @@ -0,0 +1,17 @@ +alias glibmm : : : : + <cflags>"`pkg-config --cflags glibmm-2.4`" + <linkflags>"`pkg-config --libs glibmm-2.4`" + ; + +lib libz : : <name>z ; + +lib p2compression : + [ glob *.cpp ] + : + <include>../../libmisc + <library>glibmm + <library>libz + <library>../common//p2common + <library>../files//p2files + ; + diff --git a/project2/compression/z.cpp b/project2/compression/z.cpp new file mode 100644 index 0000000..7d5d06b --- /dev/null +++ b/project2/compression/z.cpp @@ -0,0 +1,67 @@ +#include "stream.h" +#include "scriptLoader.h" +#include "scripts.h" +#include "scopeObject.h" +#include "scriptStorage.h" +#include <zlib.h> + +class DecompressStream : public Stream { + public: + DecompressStream(ScriptNodePtr p) : + Stream(p), + method(p, "method") + { + p->script->loader.addLoadTarget(p, Storer::into<ElementLoader>(&stream)); + } + + void runStream(const Sink & sink) const + { + unsigned have; + z_stream strm; + unsigned char out[BUFSIZ]; + + /* allocate inflate state */ + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + strm.avail_in = 0; + strm.next_in = Z_NULL; + int ret = inflateInit2(&strm, 16+MAX_WBITS); + if (ret != Z_OK) + throw std::runtime_error("inflateInit failed"); + ScopeObject tidy([&]{ inflateEnd(&strm); }); + + /* decompress until deflate stream ends or end of file */ + stream->runStream([&](const char * data, size_t len) -> size_t { + strm.avail_in = len; + strm.next_in = (unsigned char *)data; + + /* run inflate() on input until output buffer not full */ + do { + strm.avail_out = BUFSIZ; + strm.next_out = out; + ret = inflate(&strm, Z_NO_FLUSH); + assert(ret != Z_STREAM_ERROR); /* state not clobbered */ + switch (ret) { + case Z_NEED_DICT: + ret = Z_DATA_ERROR; /* and fall through */ + case Z_DATA_ERROR: + case Z_MEM_ERROR: + throw std::runtime_error("inflate failed"); + } + have = BUFSIZ - strm.avail_out; + sink(reinterpret_cast<const char *>(out), have); + } while (strm.avail_out == 0); + + return len; }); + + if (ret != Z_STREAM_END) { + throw std::runtime_error("decompression of stream failed"); + } + } + + StreamPtr stream; + const Variable method; +}; + +DECLARE_LOADER("decompstream", DecompressStream); diff --git a/project2/sql/Jamfile.jam b/project2/sql/Jamfile.jam index 3e63abb..cd6b6a5 100644 --- a/project2/sql/Jamfile.jam +++ b/project2/sql/Jamfile.jam @@ -32,8 +32,7 @@ cpp-pch pch : pch.hpp : ; lib p2sql : pch - sqlTest.cpp sqlWriter.cpp sqlTask.cpp sqlMergeTask.cpp sqlRows.cpp sqlCache.cpp sqlVariableBinder.cpp tablepatch.cpp rdbmsDataSource.cpp - sqlHandleAsVariableType.cpp + [ glob *.cpp : sql-mod*.cpp ] ../../libdbpp//dbpp : <odbc>yes:<library>sql-modODBC diff --git a/project2/sql/sqlBulkLoad.cpp b/project2/sql/sqlBulkLoad.cpp new file mode 100644 index 0000000..227f06c --- /dev/null +++ b/project2/sql/sqlBulkLoad.cpp @@ -0,0 +1,40 @@ +#include <pch.hpp> +#include "task.h" +#include "scopeObject.h" +#include "stream.h" + +class SqlBulkLoad : public Task { + public: + SqlBulkLoad(ScriptNodePtr p) : + SourceObject(p), + Task(p), + dataSource(p, "datasource"), + targetTable(p, "targettable"), + extras(p, "extras") + { + p->script->loader.addLoadTarget(p, Storer::into<ElementLoader>(&stream)); + } + + void loadComplete(const CommonObjects * co) + { + db = co->dataSource<RdbmsDataSource>(dataSource()); + } + + void execute() const + { + db->getWritable().beginBulkUpload(targetTable(), extras()); + ScopeObject tidy([]{}, + [=]{ db->getWritable().endBulkUpload(NULL); }, + [=]{ db->getWritable().endBulkUpload("Stack unwind in progress"); }); + stream->runStream(boost::bind(&DB::Connection::bulkUploadData, &db->getWritable(), _1, _2)); + } + + const Variable dataSource; + const Variable targetTable; + const Variable extras; + StreamPtr stream; + protected: + const RdbmsDataSource * db; +}; + +DECLARE_LOADER("sqlbulkload", SqlBulkLoad); diff --git a/project2/url/urlStream.cpp b/project2/url/urlStream.cpp new file mode 100644 index 0000000..3e1a439 --- /dev/null +++ b/project2/url/urlStream.cpp @@ -0,0 +1,24 @@ +#include "curlHelper.h" +#include "stream.h" +#include "scriptLoader.h" +#include "scripts.h" +#include "../libmisc/curlsup.h" + +/// Project2 component to create a row set from the contents of a file accessible via libcurl +class UrlStream : public Stream, VariableCurlHelper { + public: + UrlStream(ScriptNodePtr p) : + Stream(p), + VariableCurlHelper(p) + { + } + + void runStream(const Sink & sink) const + { + CurlPtr c = newCurl(); + c->performRead(sink); + } +}; + +DECLARE_LOADER("urlstream", UrlStream); + |