diff options
author | randomdan <randomdan@localhost> | 2012-03-20 20:02:54 +0000 |
---|---|---|
committer | randomdan <randomdan@localhost> | 2012-03-20 20:02:54 +0000 |
commit | 9f913dbf616b5de56fc6255d18b5a94a36ac1e1e (patch) | |
tree | 630f12359e8b442942d123017d6177c6a09b0c7b | |
parent | A stream interface, an RDBMS bulk load interface, a decompression layer, an i... (diff) | |
download | project2-9f913dbf616b5de56fc6255d18b5a94a36ac1e1e.tar.bz2 project2-9f913dbf616b5de56fc6255d18b5a94a36ac1e1e.tar.xz project2-9f913dbf616b5de56fc6255d18b5a94a36ac1e1e.zip |
Switch to the new stream style of things
-rw-r--r-- | project2/Jamfile.jam | 1 | ||||
-rw-r--r-- | project2/files/fileRows.cpp | 50 | ||||
-rw-r--r-- | project2/files/fileRows.h | 27 | ||||
-rw-r--r-- | project2/files/fileStream.cpp | 42 | ||||
-rw-r--r-- | project2/files/pch.hpp | 2 | ||||
-rw-r--r-- | project2/files/streamRows.cpp | 107 | ||||
-rw-r--r-- | project2/files/streamRows.h | 46 | ||||
-rw-r--r-- | project2/processes/Jamfile.jam | 3 | ||||
-rw-r--r-- | project2/processes/procRows.cpp | 52 | ||||
-rw-r--r-- | project2/processes/procRows.h | 21 | ||||
-rw-r--r-- | project2/processes/processStream.cpp | 58 | ||||
-rw-r--r-- | project2/streams/Jamfile.jam | 13 | ||||
-rw-r--r-- | project2/streams/streamRows.cpp | 158 | ||||
-rw-r--r-- | project2/url/Jamfile.jam | 1 | ||||
-rw-r--r-- | project2/url/urlRows.cpp | 47 | ||||
-rw-r--r-- | project2/url/urlRows.h | 25 |
16 files changed, 273 insertions, 380 deletions
diff --git a/project2/Jamfile.jam b/project2/Jamfile.jam index 641c6a2..9969d67 100644 --- a/project2/Jamfile.jam +++ b/project2/Jamfile.jam @@ -15,6 +15,7 @@ alias p2parts : : : : <library>xml//p2xml <library>json//p2json <library>compression//p2compression + <library>streams//p2streams ; project diff --git a/project2/files/fileRows.cpp b/project2/files/fileRows.cpp deleted file mode 100644 index bb0c059..0000000 --- a/project2/files/fileRows.cpp +++ /dev/null @@ -1,50 +0,0 @@ -#include <pch.hpp> -#include "fileRows.h" -#include "logger.h" -#include "scopeObject.h" -#include "rowProcessor.h" -#include "scriptLoader.h" -#include "exceptions.h" -#include <boost/algorithm/string/predicate.hpp> - -DECLARE_LOADER("filerows", FileRows); - -FileRows::FileRows(ScriptNodePtr p) : - StreamRows(p), - path(p, "path") -{ -} - -FileRows::~FileRows() -{ -} - -void -FileRows::setFilter(const Glib::ustring &) -{ - throw NotSupported(__PRETTY_FUNCTION__); -} - -void -FileRows::execute(const Glib::ustring &, const RowProcessor * rp) const -{ - Glib::RefPtr<Glib::IOChannel> c(doOpen()); - ScopeObject so(boost::bind(&FileRows::doClose, this, c)); - c->set_encoding(encoding); - gunichar ch; - ParseState ps(this, rp); - while (c->read(ch) == Glib::IO_STATUS_NORMAL) { - this->pushChar(ch, ps); - } -} - -Glib::RefPtr<Glib::IOChannel> -FileRows::doOpen() const -{ - return Glib::IOChannel::create_from_file(path(), "r"); -} - -void -FileRows::doClose(Glib::RefPtr<Glib::IOChannel> c) const { - c->close(); -} diff --git a/project2/files/fileRows.h b/project2/files/fileRows.h deleted file mode 100644 index e58b855..0000000 --- a/project2/files/fileRows.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef FILEROWS_H -#define FILEROWS_H - -#include "streamRows.h" -#include <glibmm/iochannel.h> - -class CommonObjects; - -/// Project2 component to create a row set from the contents of a file on the local filesystem -class FileRows : public StreamRows { - public: - FileRows(ScriptNodePtr p); - ~FileRows(); - - void execute(const Glib::ustring &, const RowProcessor *) const; - virtual void setFilter(const Glib::ustring &); - - const Variable path; - - protected: - virtual Glib::RefPtr<Glib::IOChannel> doOpen() const; - virtual void doClose(Glib::RefPtr<Glib::IOChannel>) const; -}; - -#endif - - diff --git a/project2/files/fileStream.cpp b/project2/files/fileStream.cpp new file mode 100644 index 0000000..f6a1124 --- /dev/null +++ b/project2/files/fileStream.cpp @@ -0,0 +1,42 @@ +#include <pch.hpp> +#include "logger.h" +#include "scopeObject.h" +#include "stream.h" +#include "rowProcessor.h" +#include "scriptLoader.h" +#include "exceptions.h" +#include <boost/algorithm/string/predicate.hpp> + + +class FileStream : public Stream { + public: + FileStream(ScriptNodePtr p) : + Stream(p), + path(p, "path") + { + } + + void setFilter(const Glib::ustring &) + { + throw NotSupported(__PRETTY_FUNCTION__); + } + + void runStream(const Sink & sink) const + { + FILE * file = fopen(path(), "r"); + if (!file) { + throw syscall_error(errno); + } + ScopeObject so([&] { fclose(file); }); + while (!feof(file)) { + char buf[BUFSIZ]; + size_t len = fread(buf, 1, BUFSIZ, file); + if (len > 0) { + sink(buf, len); + } + } + } + + const Variable path; +}; +DECLARE_LOADER("filestream", FileStream); diff --git a/project2/files/pch.hpp b/project2/files/pch.hpp index 0721191..4b9183d 100644 --- a/project2/files/pch.hpp +++ b/project2/files/pch.hpp @@ -13,7 +13,6 @@ #include <boost/lexical_cast.hpp> #include "definedColumns.h" #include "exceptions.h" -#include "fileRows.h" #include "fsRows.h" #include <glibmm/fileutils.h> #include <glibmm/iochannel.h> @@ -25,7 +24,6 @@ #include "scriptLoader.h" #include "scripts.h" #include "scriptStorage.h" -#include "streamRows.h" #include "variables.h" #endif diff --git a/project2/files/streamRows.cpp b/project2/files/streamRows.cpp deleted file mode 100644 index 26fb55b..0000000 --- a/project2/files/streamRows.cpp +++ /dev/null @@ -1,107 +0,0 @@ -#include <pch.hpp> -#include "streamRows.h" -#include "rowProcessor.h" - -StreamRows::StreamRows(ScriptNodePtr p) : - DefinedColumns(p, "columns", boost::bind(&Column::make, _1, _2)), - RowSet(p), - fieldSep(p->value("fieldSep", ",").as<Glib::ustring>()[0]), - quoteChar(p->value("quoteChar", "\"").as<Glib::ustring>()[0]), - keepBlankRows(p->value("keepBlankRows", false)), - countBlankRows(p->value("countBlankRows", false)), - newline(p->value("newline", "\n").as<Glib::ustring>()), - newlin(newline, 0, newline.length() - 1), - encoding(p->value("encoding", "utf-8").as<std::string>()), - skipheader(p->value("skipheader", 0).as<int64_t>()) -{ -} - -StreamRows::~StreamRows() -{ -} - -void -StreamRows::pushChar(gunichar c, ParseState & ps) const -{ - if ((!ps.inQuotes) && (c == *newline.rbegin()) && (ps.tok.compare(ps.tok.length() - newlin.length(), newlin.length(), newlin) == 0)) { - if (skipheader) { - ps.skipheader -= 1; - } - else { - ps.tok.erase(ps.tok.length() - newlin.length()); - if (!ps.tok.empty()) { - *ps.curCol++ = VariableType(ps.tok); - } - if (keepBlankRows || ps.curCol != ps.fields.begin()) { - while (ps.curCol != ps.fields.end()) { - *ps.curCol++ = Null(); - } - ps.process(ps.rp); - } - else if (countBlankRows) { - ps.blankRow(); - } - ps.curCol = ps.fields.begin(); - } - ps.tok.clear(); - } - else if (c == quoteChar) { - if (ps.prevWasQuote) { - ps.tok += c; - ps.prevWasQuote = false; - ps.inQuotes = !ps.inQuotes; - } - else { - ps.prevWasQuote = ps.inQuotes; - ps.inQuotes = !ps.inQuotes; - } - } - else if ((!ps.inQuotes) && (c == fieldSep)) { - ps.prevWasQuote = false; - if (skipheader == 0) { - *ps.curCol++ = VariableType(ps.tok); - } - ps.tok.clear(); - } - else { - ps.prevWasQuote = false; - ps.tok += c; - } -} - -StreamRows::ParseState::ParseState(const StreamRows * rows, const RowProcessor * proc) : - ColumnValues(rows), - sr(rows), - rp(proc), - inQuotes(false), - prevWasQuote(false), - curCol(fields.begin()) -{ -} - -StreamRows::ParseState::~ParseState() -{ - if (!std::uncaught_exception()) { - sr->end(*this); - } -} - -void -StreamRows::end(ParseState & ps) const -{ - if (!ps.tok.empty()) { - if (skipheader == 0) { - *ps.curCol++ = VariableType(ps.tok); - } - } - if (keepBlankRows || ps.curCol != ps.fields.begin()) { - while (ps.curCol != ps.fields.end()) { - *ps.curCol++ = Null(); - } - ps.process(ps.rp); - } - else if (countBlankRows) { - ps.blankRow(); - } -} - diff --git a/project2/files/streamRows.h b/project2/files/streamRows.h deleted file mode 100644 index 95d8131..0000000 --- a/project2/files/streamRows.h +++ /dev/null @@ -1,46 +0,0 @@ -#ifndef STREAMROWS_H -#define STREAMROWS_H - -#include "variables.h" -#include "definedColumns.h" - -class RowProcessor; - -/// Base class for Project2 components that create a row set based on the contents of a byte stream -class StreamRows : public DefinedColumns, public RowSet { - public: - StreamRows(ScriptNodePtr p); - ~StreamRows(); - - protected: - class ParseState : public ColumnValues { - public: - ParseState(const StreamRows *, const RowProcessor *); - ~ParseState(); - - const StreamRows * sr; - const RowProcessor * rp; - size_t skipheader; - bool inQuotes; - bool prevWasQuote; - Glib::ustring tok; - FieldValues::iterator curCol; - - friend class StreamRows; - }; - void pushChar(gunichar ch, ParseState &) const; - void end(ParseState &) const; - - public: - const gunichar fieldSep; - const gunichar quoteChar; - const bool keepBlankRows; - const bool countBlankRows; - const Glib::ustring newline; - const Glib::ustring newlin; - const std::string encoding; - const size_t skipheader; -}; - -#endif - diff --git a/project2/processes/Jamfile.jam b/project2/processes/Jamfile.jam index aef9251..cba36f2 100644 --- a/project2/processes/Jamfile.jam +++ b/project2/processes/Jamfile.jam @@ -3,11 +3,10 @@ alias glibmm : : : : <linkflags>"`pkg-config --libs glibmm-2.4`" ; lib p2processes : - procRows.cpp + [ glob *.cpp ] : <include>../../libmisc <library>glibmm <library>../common//p2common - <library>../files//p2files ; diff --git a/project2/processes/procRows.cpp b/project2/processes/procRows.cpp deleted file mode 100644 index eb41b1d..0000000 --- a/project2/processes/procRows.cpp +++ /dev/null @@ -1,52 +0,0 @@ -#include "procRows.h" -#include "scriptLoader.h" -#include "scripts.h" -#include <exception> -#include <sys/wait.h> -#include <misc.h> - -DECLARE_LOADER("procrows", ProcRows); - -SimpleMessageException(SubProcessFailedToStart); -SimpleMessageException(SubProcessFailed); - -ProcRows::ProcRows(ScriptNodePtr p) : - FileRows(p), - IHaveParameters(p) -{ -} - -ProcRows::~ProcRows() -{ -} - -Glib::RefPtr<Glib::IOChannel> -ProcRows::doOpen() const -{ - const char * callProc[parameters.size() + 2]; - callProc[0] = path(); - int pidx = 1; - BOOST_FOREACH(const Parameters::value_type & p, parameters) { - callProc[pidx++] = p.second(); - } - callProc[pidx] = NULL; - popenrw(callProc, fds); - return Glib::IOChannel::create_from_fd(fds[1]); -} - -void -ProcRows::doClose(Glib::RefPtr<Glib::IOChannel> c) const -{ - FileRows::doClose(c); - close(fds[0]); - close(fds[1]); - int status; - wait(&status); - // ignore any error if the application is still running, - // but if there is already an exception being thrown, we don't - // want to throw another. - if (status != 0 && !std::uncaught_exception()) { - throw SubProcessFailed(strerror(status)); - } -} - diff --git a/project2/processes/procRows.h b/project2/processes/procRows.h deleted file mode 100644 index b2085b8..0000000 --- a/project2/processes/procRows.h +++ /dev/null @@ -1,21 +0,0 @@ -#ifndef PROCROWS_H -#define PROCROWS_H - -#include "fileRows.h" -#include "iHaveParameters.h" - -/// Project2 component to create a row set from the output of a locally executed program -class ProcRows : public FileRows, IHaveParameters { - public: - ProcRows(ScriptNodePtr p); - ~ProcRows(); - - Glib::RefPtr<Glib::IOChannel> doOpen() const; - void doClose(Glib::RefPtr<Glib::IOChannel>) const; - - protected: - mutable int fds[2]; -}; - -#endif - diff --git a/project2/processes/processStream.cpp b/project2/processes/processStream.cpp new file mode 100644 index 0000000..13c2555 --- /dev/null +++ b/project2/processes/processStream.cpp @@ -0,0 +1,58 @@ +#include "iHaveParameters.h" +#include "scriptLoader.h" +#include "scripts.h" +#include "stream.h" +#include <exception> +#include <sys/wait.h> +#include <misc.h> +#include <boost/foreach.hpp> + +SimpleMessageException(SubProcessFailedToStart); +SimpleMessageException(SubProcessFailed); + +/// Project2 component to create a row set from the output of a locally executed program +class ProcessStream : public Stream, IHaveParameters { + public: + ProcessStream(ScriptNodePtr p) : + Stream(p), + IHaveParameters(p), + path(p, "command") + { + } + + void runStream(const Sink & sink) const + { + const char * callProc[parameters.size() + 2]; + callProc[0] = path(); + int pidx = 1; + BOOST_FOREACH(const Parameters::value_type & p, parameters) { + callProc[pidx++] = p.second(); + } + callProc[pidx] = NULL; + popenrw(callProc, fds); + + char buf[BUFSIZ]; + while (ssize_t r = read(fds[1], buf, BUFSIZ) != 0) { + if (r < 0) { + throw syscall_error(errno); + } + sink(buf, r); + } + + close(fds[0]); + close(fds[1]); + int status; + wait(&status); + // ignore any error if the application is still running, + // but if there is already an exception being thrown, we don't + // want to throw another. + if (status != 0 && !std::uncaught_exception()) { + throw SubProcessFailed(strerror(status)); + } + } + protected: + mutable int fds[2]; + const Variable path; +}; + +DECLARE_LOADER("processstream", ProcessStream); diff --git a/project2/streams/Jamfile.jam b/project2/streams/Jamfile.jam new file mode 100644 index 0000000..be72d6e --- /dev/null +++ b/project2/streams/Jamfile.jam @@ -0,0 +1,13 @@ +alias glibmm : : : : + <cflags>"`pkg-config --cflags glibmm-2.4`" + <linkflags>"`pkg-config --libs glibmm-2.4`" + ; + +lib p2streams : + [ glob *.cpp ] + : + <include>../../libmisc + <library>glibmm + <library>../common//p2common + ; + diff --git a/project2/streams/streamRows.cpp b/project2/streams/streamRows.cpp new file mode 100644 index 0000000..3bf1578 --- /dev/null +++ b/project2/streams/streamRows.cpp @@ -0,0 +1,158 @@ +#include <pch.hpp> +#include "variables.h" +#include "stream.h" +#include "definedColumns.h" +#include "rowProcessor.h" + +class RowProcessor; + +/// Base class for Project2 components that create a row set based on the contents of a byte stream +class StreamRows : public DefinedColumns, public RowSet { + public: + class ParseState : public ColumnValues { + public: + ParseState(const StreamRows * rows, const RowProcessor * proc) : + ColumnValues(rows), + sr(rows), + rp(proc), + inQuotes(false), + prevWasQuote(false), + curCol(fields.begin()) + { + } + + ~ParseState() + { + if (!std::uncaught_exception()) { + sr->end(*this); + } + } + + const StreamRows * sr; + const RowProcessor * rp; + size_t skipheader; + bool inQuotes; + bool prevWasQuote; + Glib::ustring tok; + FieldValues::iterator curCol; + + friend class StreamRows; + }; + + StreamRows(ScriptNodePtr p) : + DefinedColumns(p, "columns", boost::bind(&Column::make, _1, _2)), + RowSet(p), + fieldSep(p->value("fieldSep", ",").as<Glib::ustring>()[0]), + quoteChar(p->value("quoteChar", "\"").as<Glib::ustring>()[0]), + keepBlankRows(p->value("keepBlankRows", false)), + countBlankRows(p->value("countBlankRows", false)), + newline(p->value("newline", "\n").as<Glib::ustring>()), + newlin(newline, 0, newline.length() - 1), + encoding(p->value("encoding", "utf-8").as<std::string>()), + skipheader(p->value("skipheader", 0).as<int64_t>()), + convertRequired(encoding != "utf-8") + { + p->script->loader.addLoadTarget(p, Storer::into<ElementLoader>(&stream)); + } + + void execute(const Glib::ustring &, const RowProcessor * rp) const + { + ParseState ps(this, rp); + stream->runStream([&](const char * bytes, size_t bytesLen) -> size_t { + size_t used = 0, len = 0; + const gchar * utf8 = this->convertRequired ? g_convert(bytes, bytesLen, "utf-8", encoding.c_str(), &used, &len, NULL) : bytes; + for (const gchar * iter = utf8; *iter; iter = g_utf8_next_char(iter)) { + this->pushChar(*iter, ps); + } + if (convertRequired) { + // We allocated it.. sooo.... + free(const_cast<gchar *>(utf8)); + return used; + } + else { + return bytesLen; + } + }); + } + + void pushChar(gunichar c, ParseState & ps) const + { + if ((!ps.inQuotes) && (c == *newline.rbegin()) && (ps.tok.compare(ps.tok.length() - newlin.length(), newlin.length(), newlin) == 0)) { + if (skipheader) { + ps.skipheader -= 1; + } + else { + ps.tok.erase(ps.tok.length() - newlin.length()); + if (!ps.tok.empty()) { + *ps.curCol++ = VariableType(ps.tok); + } + if (keepBlankRows || ps.curCol != ps.fields.begin()) { + while (ps.curCol != ps.fields.end()) { + *ps.curCol++ = Null(); + } + ps.process(ps.rp); + } + else if (countBlankRows) { + ps.blankRow(); + } + ps.curCol = ps.fields.begin(); + } + ps.tok.clear(); + } + else if (c == quoteChar) { + if (ps.prevWasQuote) { + ps.tok += c; + ps.prevWasQuote = false; + ps.inQuotes = !ps.inQuotes; + } + else { + ps.prevWasQuote = ps.inQuotes; + ps.inQuotes = !ps.inQuotes; + } + } + else if ((!ps.inQuotes) && (c == fieldSep)) { + ps.prevWasQuote = false; + if (skipheader == 0) { + *ps.curCol++ = VariableType(ps.tok); + } + ps.tok.clear(); + } + else { + ps.prevWasQuote = false; + ps.tok += c; + } + } + + protected: + void end(ParseState & ps) const + { + if (!ps.tok.empty()) { + if (skipheader == 0) { + *ps.curCol++ = VariableType(ps.tok); + } + } + if (keepBlankRows || ps.curCol != ps.fields.begin()) { + while (ps.curCol != ps.fields.end()) { + *ps.curCol++ = Null(); + } + ps.process(ps.rp); + } + else if (countBlankRows) { + ps.blankRow(); + } + } + + private: + StreamPtr stream; + const gunichar fieldSep; + const gunichar quoteChar; + const bool keepBlankRows; + const bool countBlankRows; + const Glib::ustring newline; + const Glib::ustring newlin; + const std::string encoding; + const size_t skipheader; + bool convertRequired; +}; +DECLARE_LOADER("streamrows", StreamRows); + diff --git a/project2/url/Jamfile.jam b/project2/url/Jamfile.jam index 1f48e2a..272749d 100644 --- a/project2/url/Jamfile.jam +++ b/project2/url/Jamfile.jam @@ -9,7 +9,6 @@ lib p2url : ../../libmisc/curlsup.cpp : <library>../common//p2common - <library>../files//p2files <include>../../libmisc <library>glibmm <library>curl diff --git a/project2/url/urlRows.cpp b/project2/url/urlRows.cpp deleted file mode 100644 index 84253c2..0000000 --- a/project2/url/urlRows.cpp +++ /dev/null @@ -1,47 +0,0 @@ -#include "urlRows.h" -#include "rowProcessor.h" -#include "scriptLoader.h" -#include "exceptions.h" -#include "../libmisc/curlsup.h" -#include <stdexcept> -#include <queue> - -DECLARE_LOADER("urlrows", UrlRows); - -UrlRows::UrlRows(ScriptNodePtr p) : - StreamRows(p), - VariableCurlHelper(p), - convertRequired(encoding != "utf-8") -{ -} - -UrlRows::~UrlRows() -{ -} - -size_t -UrlRows::handleData(ParseState * ps, const char * bytes, size_t bytesLen) const -{ - size_t used = 0, len = 0; - const gchar * utf8 = convertRequired ? g_convert(bytes, bytesLen, "utf-8", encoding.c_str(), &used, &len, NULL) : bytes; - for (const gchar * iter = utf8; *iter; iter = g_utf8_next_char(iter)) { - this->pushChar(*iter, *ps); - } - if (convertRequired) { - // We allocated it.. sooo.... - free(const_cast<gchar *>(utf8)); - return used; - } - else { - return bytesLen; - } -} - -void -UrlRows::execute(const Glib::ustring &, const RowProcessor * rp) const -{ - ParseState ps(this, rp); - CurlPtr c = newCurl(); - c->performRead(boost::bind(&UrlRows::handleData, this, &ps, _1, _2)); -} - diff --git a/project2/url/urlRows.h b/project2/url/urlRows.h deleted file mode 100644 index 77c85ab..0000000 --- a/project2/url/urlRows.h +++ /dev/null @@ -1,25 +0,0 @@ -#ifndef URLROWS_H -#define URLROWS_H - -#include <boost/intrusive_ptr.hpp> -#include <boost/shared_ptr.hpp> -#include <map> -#include "streamRows.h" -#include "curlHelper.h" - -/// Project2 component to create a row set from the contents of a file accessible via libcurl -class UrlRows : public StreamRows, VariableCurlHelper { - public: - UrlRows(ScriptNodePtr p); - ~UrlRows(); - - void execute(const Glib::ustring &, const RowProcessor *) const; - - private: - bool convertRequired; - size_t handleData(ParseState * ps, const char * bytes, size_t bytesLen) const; - -}; - -#endif - |