diff options
Diffstat (limited to 'project2/processes/processStream.cpp')
-rw-r--r-- | project2/processes/processStream.cpp | 58 |
1 files changed, 58 insertions, 0 deletions
diff --git a/project2/processes/processStream.cpp b/project2/processes/processStream.cpp new file mode 100644 index 0000000..13c2555 --- /dev/null +++ b/project2/processes/processStream.cpp @@ -0,0 +1,58 @@ +#include "iHaveParameters.h" +#include "scriptLoader.h" +#include "scripts.h" +#include "stream.h" +#include <exception> +#include <sys/wait.h> +#include <misc.h> +#include <boost/foreach.hpp> + +SimpleMessageException(SubProcessFailedToStart); +SimpleMessageException(SubProcessFailed); + +/// Project2 component to create a row set from the output of a locally executed program +class ProcessStream : public Stream, IHaveParameters { + public: + ProcessStream(ScriptNodePtr p) : + Stream(p), + IHaveParameters(p), + path(p, "command") + { + } + + void runStream(const Sink & sink) const + { + const char * callProc[parameters.size() + 2]; + callProc[0] = path(); + int pidx = 1; + BOOST_FOREACH(const Parameters::value_type & p, parameters) { + callProc[pidx++] = p.second(); + } + callProc[pidx] = NULL; + popenrw(callProc, fds); + + char buf[BUFSIZ]; + while (ssize_t r = read(fds[1], buf, BUFSIZ) != 0) { + if (r < 0) { + throw syscall_error(errno); + } + sink(buf, r); + } + + close(fds[0]); + close(fds[1]); + int status; + wait(&status); + // ignore any error if the application is still running, + // but if there is already an exception being thrown, we don't + // want to throw another. + if (status != 0 && !std::uncaught_exception()) { + throw SubProcessFailed(strerror(status)); + } + } + protected: + mutable int fds[2]; + const Variable path; +}; + +DECLARE_LOADER("processstream", ProcessStream); |