summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrandomdan <randomdan@localhost>2012-03-20 01:24:43 +0000
committerrandomdan <randomdan@localhost>2012-03-20 01:24:43 +0000
commit409e65b49cc1d76a34d18441a4ba16ebfa26c464 (patch)
treeca77f5701731db21cf8e149740d6a2889be77315
parentAdd a task to download a URL to a file (diff)
downloadproject2-409e65b49cc1d76a34d18441a4ba16ebfa26c464.tar.bz2
project2-409e65b49cc1d76a34d18441a4ba16ebfa26c464.tar.xz
project2-409e65b49cc1d76a34d18441a4ba16ebfa26c464.zip
A stream interface, an RDBMS bulk load interface, a decompression layer, an implementation of CURL streams and a sqlbulkload task.
-rw-r--r--project2/Jamfile.jam1
-rw-r--r--project2/common/stream.h16
-rw-r--r--project2/compression/Jamfile.jam17
-rw-r--r--project2/compression/z.cpp67
-rw-r--r--project2/sql/Jamfile.jam3
-rw-r--r--project2/sql/sqlBulkLoad.cpp40
-rw-r--r--project2/url/urlStream.cpp24
7 files changed, 166 insertions, 2 deletions
diff --git a/project2/Jamfile.jam b/project2/Jamfile.jam
index fd4de5d..641c6a2 100644
--- a/project2/Jamfile.jam
+++ b/project2/Jamfile.jam
@@ -14,6 +14,7 @@ alias p2parts : : : :
<library>regex//p2regex
<library>xml//p2xml
<library>json//p2json
+ <library>compression//p2compression
;
project
diff --git a/project2/common/stream.h b/project2/common/stream.h
new file mode 100644
index 0000000..14293f0
--- /dev/null
+++ b/project2/common/stream.h
@@ -0,0 +1,16 @@
+#ifndef STREAM_H
+#define STREAM_H
+
+#include "sourceObject.h"
+#include <boost/function.hpp>
+
+class Stream : public SourceObject {
+ public:
+ template<typename... X> Stream(const X &... x) : SourceObject(x...) { }
+ typedef boost::function2<size_t, const char *, size_t> Sink;
+ virtual void runStream(const Sink &) const = 0;
+};
+typedef boost::intrusive_ptr<Stream> StreamPtr;
+
+#endif
+
diff --git a/project2/compression/Jamfile.jam b/project2/compression/Jamfile.jam
new file mode 100644
index 0000000..99d3b75
--- /dev/null
+++ b/project2/compression/Jamfile.jam
@@ -0,0 +1,17 @@
+alias glibmm : : : :
+ <cflags>"`pkg-config --cflags glibmm-2.4`"
+ <linkflags>"`pkg-config --libs glibmm-2.4`"
+ ;
+
+lib libz : : <name>z ;
+
+lib p2compression :
+ [ glob *.cpp ]
+ :
+ <include>../../libmisc
+ <library>glibmm
+ <library>libz
+ <library>../common//p2common
+ <library>../files//p2files
+ ;
+
diff --git a/project2/compression/z.cpp b/project2/compression/z.cpp
new file mode 100644
index 0000000..7d5d06b
--- /dev/null
+++ b/project2/compression/z.cpp
@@ -0,0 +1,67 @@
+#include "stream.h"
+#include "scriptLoader.h"
+#include "scripts.h"
+#include "scopeObject.h"
+#include "scriptStorage.h"
+#include <zlib.h>
+
+class DecompressStream : public Stream {
+ public:
+ DecompressStream(ScriptNodePtr p) :
+ Stream(p),
+ method(p, "method")
+ {
+ p->script->loader.addLoadTarget(p, Storer::into<ElementLoader>(&stream));
+ }
+
+ void runStream(const Sink & sink) const
+ {
+ unsigned have;
+ z_stream strm;
+ unsigned char out[BUFSIZ];
+
+ /* allocate inflate state */
+ strm.zalloc = Z_NULL;
+ strm.zfree = Z_NULL;
+ strm.opaque = Z_NULL;
+ strm.avail_in = 0;
+ strm.next_in = Z_NULL;
+ int ret = inflateInit2(&strm, 16+MAX_WBITS);
+ if (ret != Z_OK)
+ throw std::runtime_error("inflateInit failed");
+ ScopeObject tidy([&]{ inflateEnd(&strm); });
+
+ /* decompress until deflate stream ends or end of file */
+ stream->runStream([&](const char * data, size_t len) -> size_t {
+ strm.avail_in = len;
+ strm.next_in = (unsigned char *)data;
+
+ /* run inflate() on input until output buffer not full */
+ do {
+ strm.avail_out = BUFSIZ;
+ strm.next_out = out;
+ ret = inflate(&strm, Z_NO_FLUSH);
+ assert(ret != Z_STREAM_ERROR); /* state not clobbered */
+ switch (ret) {
+ case Z_NEED_DICT:
+ ret = Z_DATA_ERROR; /* and fall through */
+ case Z_DATA_ERROR:
+ case Z_MEM_ERROR:
+ throw std::runtime_error("inflate failed");
+ }
+ have = BUFSIZ - strm.avail_out;
+ sink(reinterpret_cast<const char *>(out), have);
+ } while (strm.avail_out == 0);
+
+ return len; });
+
+ if (ret != Z_STREAM_END) {
+ throw std::runtime_error("decompression of stream failed");
+ }
+ }
+
+ StreamPtr stream;
+ const Variable method;
+};
+
+DECLARE_LOADER("decompstream", DecompressStream);
diff --git a/project2/sql/Jamfile.jam b/project2/sql/Jamfile.jam
index 3e63abb..cd6b6a5 100644
--- a/project2/sql/Jamfile.jam
+++ b/project2/sql/Jamfile.jam
@@ -32,8 +32,7 @@ cpp-pch pch : pch.hpp :
;
lib p2sql :
pch
- sqlTest.cpp sqlWriter.cpp sqlTask.cpp sqlMergeTask.cpp sqlRows.cpp sqlCache.cpp sqlVariableBinder.cpp tablepatch.cpp rdbmsDataSource.cpp
- sqlHandleAsVariableType.cpp
+ [ glob *.cpp : sql-mod*.cpp ]
../../libdbpp//dbpp
:
<odbc>yes:<library>sql-modODBC
diff --git a/project2/sql/sqlBulkLoad.cpp b/project2/sql/sqlBulkLoad.cpp
new file mode 100644
index 0000000..227f06c
--- /dev/null
+++ b/project2/sql/sqlBulkLoad.cpp
@@ -0,0 +1,40 @@
+#include <pch.hpp>
+#include "task.h"
+#include "scopeObject.h"
+#include "stream.h"
+
+class SqlBulkLoad : public Task {
+ public:
+ SqlBulkLoad(ScriptNodePtr p) :
+ SourceObject(p),
+ Task(p),
+ dataSource(p, "datasource"),
+ targetTable(p, "targettable"),
+ extras(p, "extras")
+ {
+ p->script->loader.addLoadTarget(p, Storer::into<ElementLoader>(&stream));
+ }
+
+ void loadComplete(const CommonObjects * co)
+ {
+ db = co->dataSource<RdbmsDataSource>(dataSource());
+ }
+
+ void execute() const
+ {
+ db->getWritable().beginBulkUpload(targetTable(), extras());
+ ScopeObject tidy([]{},
+ [=]{ db->getWritable().endBulkUpload(NULL); },
+ [=]{ db->getWritable().endBulkUpload("Stack unwind in progress"); });
+ stream->runStream(boost::bind(&DB::Connection::bulkUploadData, &db->getWritable(), _1, _2));
+ }
+
+ const Variable dataSource;
+ const Variable targetTable;
+ const Variable extras;
+ StreamPtr stream;
+ protected:
+ const RdbmsDataSource * db;
+};
+
+DECLARE_LOADER("sqlbulkload", SqlBulkLoad);
diff --git a/project2/url/urlStream.cpp b/project2/url/urlStream.cpp
new file mode 100644
index 0000000..3e1a439
--- /dev/null
+++ b/project2/url/urlStream.cpp
@@ -0,0 +1,24 @@
+#include "curlHelper.h"
+#include "stream.h"
+#include "scriptLoader.h"
+#include "scripts.h"
+#include "../libmisc/curlsup.h"
+
+/// Project2 component to create a row set from the contents of a file accessible via libcurl
+class UrlStream : public Stream, VariableCurlHelper {
+ public:
+ UrlStream(ScriptNodePtr p) :
+ Stream(p),
+ VariableCurlHelper(p)
+ {
+ }
+
+ void runStream(const Sink & sink) const
+ {
+ CurlPtr c = newCurl();
+ c->performRead(sink);
+ }
+};
+
+DECLARE_LOADER("urlstream", UrlStream);
+