diff options
| -rw-r--r-- | project2/Jamfile.jam | 1 | ||||
| -rw-r--r-- | project2/files/fileRows.cpp | 50 | ||||
| -rw-r--r-- | project2/files/fileRows.h | 27 | ||||
| -rw-r--r-- | project2/files/fileStream.cpp | 42 | ||||
| -rw-r--r-- | project2/files/pch.hpp | 2 | ||||
| -rw-r--r-- | project2/files/streamRows.cpp | 107 | ||||
| -rw-r--r-- | project2/files/streamRows.h | 46 | ||||
| -rw-r--r-- | project2/processes/Jamfile.jam | 3 | ||||
| -rw-r--r-- | project2/processes/procRows.cpp | 52 | ||||
| -rw-r--r-- | project2/processes/procRows.h | 21 | ||||
| -rw-r--r-- | project2/processes/processStream.cpp | 58 | ||||
| -rw-r--r-- | project2/streams/Jamfile.jam | 13 | ||||
| -rw-r--r-- | project2/streams/streamRows.cpp | 158 | ||||
| -rw-r--r-- | project2/url/Jamfile.jam | 1 | ||||
| -rw-r--r-- | project2/url/urlRows.cpp | 47 | ||||
| -rw-r--r-- | project2/url/urlRows.h | 25 | 
16 files changed, 273 insertions, 380 deletions
| diff --git a/project2/Jamfile.jam b/project2/Jamfile.jam index 641c6a2..9969d67 100644 --- a/project2/Jamfile.jam +++ b/project2/Jamfile.jam @@ -15,6 +15,7 @@ alias p2parts : : : :  	<library>xml//p2xml  	<library>json//p2json  	<library>compression//p2compression +	<library>streams//p2streams  	;  project diff --git a/project2/files/fileRows.cpp b/project2/files/fileRows.cpp deleted file mode 100644 index bb0c059..0000000 --- a/project2/files/fileRows.cpp +++ /dev/null @@ -1,50 +0,0 @@ -#include <pch.hpp> -#include "fileRows.h" -#include "logger.h" -#include "scopeObject.h" -#include "rowProcessor.h" -#include "scriptLoader.h" -#include "exceptions.h" -#include <boost/algorithm/string/predicate.hpp> - -DECLARE_LOADER("filerows", FileRows); - -FileRows::FileRows(ScriptNodePtr p) : -	StreamRows(p), -	path(p, "path") -{ -} - -FileRows::~FileRows() -{ -} - -void -FileRows::setFilter(const Glib::ustring &) -{ -	throw NotSupported(__PRETTY_FUNCTION__); -} - -void -FileRows::execute(const Glib::ustring &, const RowProcessor * rp) const -{ -	Glib::RefPtr<Glib::IOChannel> c(doOpen()); -	ScopeObject so(boost::bind(&FileRows::doClose, this, c)); -	c->set_encoding(encoding); -	gunichar ch; -	ParseState ps(this, rp); -	while (c->read(ch) == Glib::IO_STATUS_NORMAL) { -		this->pushChar(ch, ps); -	} -} - -Glib::RefPtr<Glib::IOChannel> -FileRows::doOpen() const -{ -	return Glib::IOChannel::create_from_file(path(), "r"); -} - -void -FileRows::doClose(Glib::RefPtr<Glib::IOChannel> c) const { -	c->close(); -} diff --git a/project2/files/fileRows.h b/project2/files/fileRows.h deleted file mode 100644 index e58b855..0000000 --- a/project2/files/fileRows.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef FILEROWS_H -#define FILEROWS_H - -#include "streamRows.h" -#include <glibmm/iochannel.h> - -class CommonObjects; - -/// Project2 component to create a row set from the contents of a file on the local filesystem -class FileRows : public StreamRows { -	public: -		FileRows(ScriptNodePtr p); -		~FileRows(); - -		void execute(const Glib::ustring &, const RowProcessor *) const; -		virtual void setFilter(const Glib::ustring &); - -		const Variable path; - -	protected: -		virtual Glib::RefPtr<Glib::IOChannel> doOpen() const; -		virtual void doClose(Glib::RefPtr<Glib::IOChannel>) const; -}; - -#endif - - diff --git a/project2/files/fileStream.cpp b/project2/files/fileStream.cpp new file mode 100644 index 0000000..f6a1124 --- /dev/null +++ b/project2/files/fileStream.cpp @@ -0,0 +1,42 @@ +#include <pch.hpp> +#include "logger.h" +#include "scopeObject.h" +#include "stream.h" +#include "rowProcessor.h" +#include "scriptLoader.h" +#include "exceptions.h" +#include <boost/algorithm/string/predicate.hpp> + + +class FileStream : public Stream { +	public: +		FileStream(ScriptNodePtr p) : +			Stream(p), +			path(p, "path") +		{ +		} + +		void setFilter(const Glib::ustring &) +		{ +			throw NotSupported(__PRETTY_FUNCTION__); +		} + +		void runStream(const Sink & sink) const +		{ +			FILE * file = fopen(path(), "r"); +			if (!file) { +				throw syscall_error(errno); +			} +			ScopeObject so([&] { fclose(file); }); +			while (!feof(file)) { +				char buf[BUFSIZ]; +				size_t len = fread(buf, 1, BUFSIZ, file); +				if (len > 0) { +					sink(buf, len); +				} +			} +		} + +		const Variable path; +}; +DECLARE_LOADER("filestream", FileStream); diff --git a/project2/files/pch.hpp b/project2/files/pch.hpp index 0721191..4b9183d 100644 --- a/project2/files/pch.hpp +++ b/project2/files/pch.hpp @@ -13,7 +13,6 @@  #include <boost/lexical_cast.hpp>  #include "definedColumns.h"  #include "exceptions.h" -#include "fileRows.h"  #include "fsRows.h"  #include <glibmm/fileutils.h>  #include <glibmm/iochannel.h> @@ -25,7 +24,6 @@  #include "scriptLoader.h"  #include "scripts.h"  #include "scriptStorage.h" -#include "streamRows.h"  #include "variables.h"  #endif diff --git a/project2/files/streamRows.cpp b/project2/files/streamRows.cpp deleted file mode 100644 index 26fb55b..0000000 --- a/project2/files/streamRows.cpp +++ /dev/null @@ -1,107 +0,0 @@ -#include <pch.hpp> -#include "streamRows.h" -#include "rowProcessor.h" - -StreamRows::StreamRows(ScriptNodePtr p) : -	DefinedColumns(p, "columns", boost::bind(&Column::make, _1, _2)), -	RowSet(p), -	fieldSep(p->value("fieldSep", ",").as<Glib::ustring>()[0]), -	quoteChar(p->value("quoteChar", "\"").as<Glib::ustring>()[0]), -	keepBlankRows(p->value("keepBlankRows", false)), -	countBlankRows(p->value("countBlankRows", false)), -	newline(p->value("newline", "\n").as<Glib::ustring>()), -	newlin(newline, 0, newline.length() - 1), -	encoding(p->value("encoding", "utf-8").as<std::string>()), -	skipheader(p->value("skipheader", 0).as<int64_t>()) -{ -} - -StreamRows::~StreamRows() -{ -} - -void -StreamRows::pushChar(gunichar c, ParseState & ps) const -{ -	if ((!ps.inQuotes) && (c == *newline.rbegin()) && (ps.tok.compare(ps.tok.length() - newlin.length(), newlin.length(), newlin) == 0)) { -		if (skipheader) { -			ps.skipheader -= 1; -		} -		else { -			ps.tok.erase(ps.tok.length() - newlin.length()); -			if (!ps.tok.empty()) { -				*ps.curCol++ = VariableType(ps.tok); -			} -			if (keepBlankRows || ps.curCol != ps.fields.begin()) { -				while (ps.curCol != ps.fields.end()) { -					*ps.curCol++ = Null(); -				} -				ps.process(ps.rp); -			} -			else if (countBlankRows) { -				ps.blankRow(); -			} -			ps.curCol = ps.fields.begin(); -		} -		ps.tok.clear(); -	} -	else if (c == quoteChar) { -		if (ps.prevWasQuote) { -			ps.tok += c; -			ps.prevWasQuote = false; -			ps.inQuotes = !ps.inQuotes; -		} -		else { -			ps.prevWasQuote = ps.inQuotes; -			ps.inQuotes = !ps.inQuotes; -		} -	} -	else if ((!ps.inQuotes) && (c == fieldSep)) { -		ps.prevWasQuote = false; -		if (skipheader == 0) { -			*ps.curCol++ = VariableType(ps.tok); -		} -		ps.tok.clear(); -	} -	else { -		ps.prevWasQuote = false; -		ps.tok += c; -	} -} - -StreamRows::ParseState::ParseState(const StreamRows * rows, const RowProcessor * proc) : -	ColumnValues(rows), -	sr(rows), -	rp(proc), -	inQuotes(false), -	prevWasQuote(false), -	curCol(fields.begin()) -{ -} - -StreamRows::ParseState::~ParseState() -{ -	if (!std::uncaught_exception()) { -		sr->end(*this); -	} -} - -void -StreamRows::end(ParseState & ps) const -{ -	if (!ps.tok.empty()) { -		if (skipheader == 0) { -			*ps.curCol++ = VariableType(ps.tok); -		} -	} -	if (keepBlankRows || ps.curCol != ps.fields.begin()) { -		while (ps.curCol != ps.fields.end()) { -			*ps.curCol++ = Null(); -		} -		ps.process(ps.rp); -	} -	else if (countBlankRows) { -		ps.blankRow(); -	} -} - diff --git a/project2/files/streamRows.h b/project2/files/streamRows.h deleted file mode 100644 index 95d8131..0000000 --- a/project2/files/streamRows.h +++ /dev/null @@ -1,46 +0,0 @@ -#ifndef STREAMROWS_H -#define STREAMROWS_H - -#include "variables.h" -#include "definedColumns.h" - -class RowProcessor; - -/// Base class for Project2 components that create a row set based on the contents of a byte stream -class StreamRows : public DefinedColumns, public RowSet { -	public: -		StreamRows(ScriptNodePtr p); -		~StreamRows(); - -	protected: -		class ParseState : public ColumnValues { -			public: -				ParseState(const StreamRows *, const RowProcessor *); -				~ParseState(); - -				const StreamRows * sr; -				const RowProcessor * rp; -				size_t skipheader; -				bool inQuotes; -				bool prevWasQuote; -				Glib::ustring tok; -				FieldValues::iterator curCol; - -				friend class StreamRows; -		}; -		void pushChar(gunichar ch, ParseState &) const; -		void end(ParseState &) const; - -	public: -		const gunichar fieldSep; -		const gunichar quoteChar; -		const bool keepBlankRows; -		const bool countBlankRows; -		const Glib::ustring newline; -		const Glib::ustring newlin; -		const std::string encoding; -		const size_t skipheader; -}; - -#endif - diff --git a/project2/processes/Jamfile.jam b/project2/processes/Jamfile.jam index aef9251..cba36f2 100644 --- a/project2/processes/Jamfile.jam +++ b/project2/processes/Jamfile.jam @@ -3,11 +3,10 @@ alias glibmm : : : :  	<linkflags>"`pkg-config --libs glibmm-2.4`"  	;  lib p2processes : -	procRows.cpp +	[ glob *.cpp ]  	:  	<include>../../libmisc  	<library>glibmm  	<library>../common//p2common -	<library>../files//p2files  	; diff --git a/project2/processes/procRows.cpp b/project2/processes/procRows.cpp deleted file mode 100644 index eb41b1d..0000000 --- a/project2/processes/procRows.cpp +++ /dev/null @@ -1,52 +0,0 @@ -#include "procRows.h" -#include "scriptLoader.h" -#include "scripts.h" -#include <exception> -#include <sys/wait.h> -#include <misc.h> - -DECLARE_LOADER("procrows", ProcRows); - -SimpleMessageException(SubProcessFailedToStart); -SimpleMessageException(SubProcessFailed); - -ProcRows::ProcRows(ScriptNodePtr p) : -	FileRows(p), -	IHaveParameters(p) -{ -} - -ProcRows::~ProcRows() -{ -} - -Glib::RefPtr<Glib::IOChannel> -ProcRows::doOpen() const -{ -	const char * callProc[parameters.size() + 2]; -	callProc[0] = path(); -	int pidx = 1; -	BOOST_FOREACH(const Parameters::value_type & p, parameters) { -		callProc[pidx++] = p.second(); -	} -	callProc[pidx] = NULL; -	popenrw(callProc, fds); -	return Glib::IOChannel::create_from_fd(fds[1]); -} - -void -ProcRows::doClose(Glib::RefPtr<Glib::IOChannel> c) const -{ -	FileRows::doClose(c); -	close(fds[0]); -	close(fds[1]); -	int status; -	wait(&status); -	// ignore any error if the application is still running, -	// but if there is already an exception being thrown, we don't -	// want to throw another. -	if (status != 0 && !std::uncaught_exception()) { -		throw SubProcessFailed(strerror(status)); -	} -} - diff --git a/project2/processes/procRows.h b/project2/processes/procRows.h deleted file mode 100644 index b2085b8..0000000 --- a/project2/processes/procRows.h +++ /dev/null @@ -1,21 +0,0 @@ -#ifndef PROCROWS_H -#define PROCROWS_H - -#include "fileRows.h" -#include "iHaveParameters.h" - -/// Project2 component to create a row set from the output of a locally executed program -class ProcRows : public FileRows, IHaveParameters { -	public: -		ProcRows(ScriptNodePtr p); -		~ProcRows(); - -		Glib::RefPtr<Glib::IOChannel> doOpen() const; -		void doClose(Glib::RefPtr<Glib::IOChannel>) const; - -	protected: -		mutable int fds[2]; -}; - -#endif - diff --git a/project2/processes/processStream.cpp b/project2/processes/processStream.cpp new file mode 100644 index 0000000..13c2555 --- /dev/null +++ b/project2/processes/processStream.cpp @@ -0,0 +1,58 @@ +#include "iHaveParameters.h" +#include "scriptLoader.h" +#include "scripts.h" +#include "stream.h" +#include <exception> +#include <sys/wait.h> +#include <misc.h> +#include <boost/foreach.hpp> + +SimpleMessageException(SubProcessFailedToStart); +SimpleMessageException(SubProcessFailed); + +/// Project2 component to create a row set from the output of a locally executed program +class ProcessStream : public Stream, IHaveParameters { +	public: +		ProcessStream(ScriptNodePtr p) : +			Stream(p), +			IHaveParameters(p), +			path(p, "command") +	{ +	} + +		void runStream(const Sink & sink) const +		{ +			const char * callProc[parameters.size() + 2]; +			callProc[0] = path(); +			int pidx = 1; +			BOOST_FOREACH(const Parameters::value_type & p, parameters) { +				callProc[pidx++] = p.second(); +			} +			callProc[pidx] = NULL; +			popenrw(callProc, fds); + +			char buf[BUFSIZ]; +			while (ssize_t r = read(fds[1], buf, BUFSIZ) != 0) { +				if (r < 0) { +					throw syscall_error(errno); +				} +				sink(buf, r); +			} + +			close(fds[0]); +			close(fds[1]); +			int status; +			wait(&status); +			// ignore any error if the application is still running, +			// but if there is already an exception being thrown, we don't +			// want to throw another. +			if (status != 0 && !std::uncaught_exception()) { +				throw SubProcessFailed(strerror(status)); +			} +		} +	protected: +		mutable int fds[2]; +		const Variable path; +}; + +DECLARE_LOADER("processstream", ProcessStream); diff --git a/project2/streams/Jamfile.jam b/project2/streams/Jamfile.jam new file mode 100644 index 0000000..be72d6e --- /dev/null +++ b/project2/streams/Jamfile.jam @@ -0,0 +1,13 @@ +alias glibmm : : : : +	<cflags>"`pkg-config --cflags glibmm-2.4`" +	<linkflags>"`pkg-config --libs glibmm-2.4`" +	; + +lib p2streams : +	[ glob *.cpp ] +	: +	<include>../../libmisc +	<library>glibmm +	<library>../common//p2common +	; + diff --git a/project2/streams/streamRows.cpp b/project2/streams/streamRows.cpp new file mode 100644 index 0000000..3bf1578 --- /dev/null +++ b/project2/streams/streamRows.cpp @@ -0,0 +1,158 @@ +#include <pch.hpp> +#include "variables.h" +#include "stream.h" +#include "definedColumns.h" +#include "rowProcessor.h" + +class RowProcessor; + +/// Base class for Project2 components that create a row set based on the contents of a byte stream +class StreamRows : public DefinedColumns, public RowSet { +	public: +		class ParseState : public ColumnValues { +			public: +				ParseState(const StreamRows * rows, const RowProcessor * proc) : +					ColumnValues(rows), +					sr(rows), +					rp(proc), +					inQuotes(false), +					prevWasQuote(false), +					curCol(fields.begin()) +				{ +				} + +				~ParseState() +				{ +					if (!std::uncaught_exception()) { +						sr->end(*this); +					} +				} + +				const StreamRows * sr; +				const RowProcessor * rp; +				size_t skipheader; +				bool inQuotes; +				bool prevWasQuote; +				Glib::ustring tok; +				FieldValues::iterator curCol; + +				friend class StreamRows; +		}; + +		StreamRows(ScriptNodePtr p) : +			DefinedColumns(p, "columns", boost::bind(&Column::make, _1, _2)), +			RowSet(p), +			fieldSep(p->value("fieldSep", ",").as<Glib::ustring>()[0]), +			quoteChar(p->value("quoteChar", "\"").as<Glib::ustring>()[0]), +			keepBlankRows(p->value("keepBlankRows", false)), +			countBlankRows(p->value("countBlankRows", false)), +			newline(p->value("newline", "\n").as<Glib::ustring>()), +			newlin(newline, 0, newline.length() - 1), +			encoding(p->value("encoding", "utf-8").as<std::string>()), +			skipheader(p->value("skipheader", 0).as<int64_t>()), +			convertRequired(encoding != "utf-8") +		{ +			p->script->loader.addLoadTarget(p, Storer::into<ElementLoader>(&stream)); +		} + +		void execute(const Glib::ustring &, const RowProcessor * rp) const +		{ +			ParseState ps(this, rp); +			stream->runStream([&](const char * bytes, size_t bytesLen) -> size_t { +					size_t used = 0, len = 0; +					const gchar * utf8 = this->convertRequired ? g_convert(bytes, bytesLen, "utf-8", encoding.c_str(), &used, &len, NULL) : bytes; +					for (const gchar * iter = utf8; *iter; iter = g_utf8_next_char(iter)) { +						this->pushChar(*iter, ps); +					} +					if (convertRequired) { +						// We allocated it.. sooo.... +						free(const_cast<gchar *>(utf8)); +						return used; +					} +					else { +						return bytesLen; +					} +					}); +		} + +		void pushChar(gunichar c, ParseState & ps) const +		{ +			if ((!ps.inQuotes) && (c == *newline.rbegin()) && (ps.tok.compare(ps.tok.length() - newlin.length(), newlin.length(), newlin) == 0)) { +				if (skipheader) { +					ps.skipheader -= 1; +				} +				else { +					ps.tok.erase(ps.tok.length() - newlin.length()); +					if (!ps.tok.empty()) { +						*ps.curCol++ = VariableType(ps.tok); +					} +					if (keepBlankRows || ps.curCol != ps.fields.begin()) { +						while (ps.curCol != ps.fields.end()) { +							*ps.curCol++ = Null(); +						} +						ps.process(ps.rp); +					} +					else if (countBlankRows) { +						ps.blankRow(); +					} +					ps.curCol = ps.fields.begin(); +				} +				ps.tok.clear(); +			} +			else if (c == quoteChar) { +				if (ps.prevWasQuote) { +					ps.tok += c; +					ps.prevWasQuote = false; +					ps.inQuotes = !ps.inQuotes; +				} +				else { +					ps.prevWasQuote = ps.inQuotes; +					ps.inQuotes = !ps.inQuotes; +				} +			} +			else if ((!ps.inQuotes) && (c == fieldSep)) { +				ps.prevWasQuote = false; +				if (skipheader == 0) { +					*ps.curCol++ = VariableType(ps.tok); +				} +				ps.tok.clear(); +			} +			else { +				ps.prevWasQuote = false; +				ps.tok += c; +			} +		} + +	protected: +		void end(ParseState & ps) const +		{ +			if (!ps.tok.empty()) { +				if (skipheader == 0) { +					*ps.curCol++ = VariableType(ps.tok); +				} +			} +			if (keepBlankRows || ps.curCol != ps.fields.begin()) { +				while (ps.curCol != ps.fields.end()) { +					*ps.curCol++ = Null(); +				} +				ps.process(ps.rp); +			} +			else if (countBlankRows) { +				ps.blankRow(); +			} +		} + +	private: +		StreamPtr stream; +		const gunichar fieldSep; +		const gunichar quoteChar; +		const bool keepBlankRows; +		const bool countBlankRows; +		const Glib::ustring newline; +		const Glib::ustring newlin; +		const std::string encoding; +		const size_t skipheader; +		bool convertRequired; +}; +DECLARE_LOADER("streamrows", StreamRows); + diff --git a/project2/url/Jamfile.jam b/project2/url/Jamfile.jam index 1f48e2a..272749d 100644 --- a/project2/url/Jamfile.jam +++ b/project2/url/Jamfile.jam @@ -9,7 +9,6 @@ lib p2url :  	../../libmisc/curlsup.cpp  	:  	<library>../common//p2common -	<library>../files//p2files  	<include>../../libmisc  	<library>glibmm  	<library>curl diff --git a/project2/url/urlRows.cpp b/project2/url/urlRows.cpp deleted file mode 100644 index 84253c2..0000000 --- a/project2/url/urlRows.cpp +++ /dev/null @@ -1,47 +0,0 @@ -#include "urlRows.h" -#include "rowProcessor.h" -#include "scriptLoader.h" -#include "exceptions.h" -#include "../libmisc/curlsup.h" -#include <stdexcept> -#include <queue> - -DECLARE_LOADER("urlrows", UrlRows); - -UrlRows::UrlRows(ScriptNodePtr p) : -	StreamRows(p), -	VariableCurlHelper(p), -	convertRequired(encoding != "utf-8") -{ -} - -UrlRows::~UrlRows() -{ -} - -size_t -UrlRows::handleData(ParseState * ps, const char * bytes, size_t bytesLen) const -{ -	size_t used = 0, len = 0; -	const gchar * utf8 = convertRequired ? g_convert(bytes, bytesLen, "utf-8", encoding.c_str(), &used, &len, NULL) : bytes; -	for (const gchar * iter = utf8; *iter; iter = g_utf8_next_char(iter)) { -		this->pushChar(*iter, *ps); -	} -	if (convertRequired) { -		// We allocated it.. sooo.... -		free(const_cast<gchar *>(utf8)); -		return used; -	} -	else { -		return bytesLen; -	} -} - -void -UrlRows::execute(const Glib::ustring &, const RowProcessor * rp) const -{ -	ParseState ps(this, rp); -	CurlPtr c = newCurl(); -	c->performRead(boost::bind(&UrlRows::handleData, this, &ps, _1, _2)); -} - diff --git a/project2/url/urlRows.h b/project2/url/urlRows.h deleted file mode 100644 index 77c85ab..0000000 --- a/project2/url/urlRows.h +++ /dev/null @@ -1,25 +0,0 @@ -#ifndef URLROWS_H -#define URLROWS_H - -#include <boost/intrusive_ptr.hpp> -#include <boost/shared_ptr.hpp> -#include <map> -#include "streamRows.h" -#include "curlHelper.h" - -/// Project2 component to create a row set from the contents of a file accessible via libcurl -class UrlRows : public StreamRows, VariableCurlHelper { -	public: -		UrlRows(ScriptNodePtr p); -		~UrlRows(); - -		void execute(const Glib::ustring &, const RowProcessor *) const; - -	private: -		bool convertRequired; -		size_t handleData(ParseState * ps, const char * bytes, size_t bytesLen) const; - -}; - -#endif - | 
