From c16e95343a9b5f68a100db4b5e0356f005d0e952 Mon Sep 17 00:00:00 2001 From: randomdan Date: Thu, 28 Mar 2013 00:12:42 +0000 Subject: A stream processing module that reads sets of name=value pairs --- project2/streams/streamNvpRows.cpp | 183 +++++++++++++++++++++++++++++++++++++ 1 file changed, 183 insertions(+) create mode 100644 project2/streams/streamNvpRows.cpp diff --git a/project2/streams/streamNvpRows.cpp b/project2/streams/streamNvpRows.cpp new file mode 100644 index 0000000..895826a --- /dev/null +++ b/project2/streams/streamNvpRows.cpp @@ -0,0 +1,183 @@ +#include +#include "variables.h" +#include "scopeObject.h" +#include "stream.h" +#include "definedColumns.h" +#include "rowProcessor.h" +#include "textReader.h" +#include +#include + +class RowProcessor; + +/// Base class for Project2 components that create a row set based on the contents of a byte stream +class StreamNvpRows : public RowSet { + public: + class ParseState : public RowState { + public: + typedef boost::function Process; + + class Token { + public: + Token(const Glib::ustring cs, Process p) : + chars(cs), + process(p) + { + } + bool operator<(const Token & other) const { + return (other.chars.length() < this->chars.length() || + (other.chars.length() == this->chars.length() && this->chars < other.chars)); + } + const Glib::ustring chars; + const Process process; + mutable size_t firstMatch; + }; + + ParseState(const StreamNvpRows * rows, const RowProcessor * proc) : + sr(rows), + rp(proc), + inQuotes(false), + inValue(false), + prevWasQuote(false) + { + tokens.insert(Token(sr->newline, boost::bind(&StreamNvpRows::ParseState::newRecord, this, _1))); + tokens.insert(Token(sr->assign, boost::bind(&StreamNvpRows::ParseState::newField, this, _1))); + tokens.insert(Token(sr->fieldSep, boost::bind(&StreamNvpRows::ParseState::newValue, this, _1))); + if (!sr->quoteChar.empty()) { + tokens.insert(Token(sr->quoteChar, boost::bind(&StreamNvpRows::ParseState::quote, this, _1))); + } + } + + ~ParseState() + { + if (!std::uncaught_exception()) { + newRecord(tok.length()); + } + } + const Columns & getColumns() const { + return columns; + } + + void pushChar(gunichar c) + { + tok += c; + if (inQuotes) { + if (boost::algorithm::ends_with(tok, sr->quoteChar)) { + quote(tok.length() - sr->quoteChar.length()); + } + else { + prevWasQuote = false; + } + } + else { + if (tok.length() >= tokens.begin()->chars.length()) { + BOOST_FOREACH(auto & t, tokens) { + t.firstMatch = tok.rfind(t.chars); + if (t.firstMatch < tok.length() - tokens.begin()->chars.length()) { + t.firstMatch = -1; + } + } + Tokens::const_iterator t = std::min_element(tokens.begin(), tokens.end(), + [](const Token & a, const Token & b) { return a.firstMatch < b.firstMatch; }); + if (t->firstMatch != (size_t)-1) { + if (t->process(t->firstMatch)) { + tok = tok.substr(t->firstMatch + t->chars.length()); + } + } + } + } + } + + private: + bool newRecord(size_t start) { + if (start > 0) { + if (inValue) { + newValue(start); + } + else { + newField(start); + } + } + if (!columns.empty()) { + process(rp); + } + fields.clear(); + columns.clear(); + return true; + } + + bool newField(size_t start) { + columns.insert(new Column(columns.size(), + boost::algorithm::trim_copy_if(tok.substr(0, start), g_unichar_isspace))); + fields.push_back(Null()); + inValue = true; + return true; + } + + bool newValue(size_t start) { + fields.back() = + boost::algorithm::trim_copy_if(tok.substr(0, start), g_unichar_isspace); + inValue = false; + return true; + } + + bool quote(size_t start) { + if (prevWasQuote) { + prevWasQuote = false; + inQuotes = !inQuotes; + } + else { + prevWasQuote = inQuotes; + inQuotes = !inQuotes; + tok.erase(start, sr->quoteChar.length()); + } + return false; + } + + const StreamNvpRows * sr; + const RowProcessor * rp; + bool inQuotes; + bool inValue; + bool prevWasQuote; + Glib::ustring tok; + mutable Columns columns; + + typedef std::set Tokens; + Tokens tokens; + + }; + + StreamNvpRows(ScriptNodePtr p) : + RowSet(p), + fieldSep(p->value("fieldSep", ",").as()), + quoteChar(p->value("quoteChar", "\"").as()), + keepBlankRows(p->value("keepBlankRows", false)), + countBlankRows(p->value("countBlankRows", false)), + newline(p->value("newline", "\n").as()), + assign(p->value("assign", "=").as()), + encoding(p->value("encoding", "utf-8").as()) + { + p->script->loader.addLoadTarget(p, Storer::into(&stream)); + } + + void execute(const Glib::ustring &, const RowProcessor * rp) const + { + ParseState ps(this, rp); + TextReader::CharSink cs = boost::bind(&StreamNvpRows::ParseState::pushChar, &ps, _1); + TextReader t(encoding.c_str()); + stream->runStream(boost::bind(&TextReader::read, &t, _1, _2, cs)); + } + + private: + StreamPtr stream; + const Glib::ustring fieldSep; + const Glib::ustring quoteChar; + const bool keepBlankRows; + const bool countBlankRows; + const Glib::ustring newline; + const Glib::ustring assign; + const std::string encoding; + bool convertRequired; +}; +DECLARE_LOADER("streamnvprows", StreamNvpRows); + -- cgit v1.2.3