summaryrefslogtreecommitdiff
path: root/project2/processes/processStream.cpp
diff options
context:
space:
mode:
authorrandomdan <randomdan@localhost>2012-03-20 20:02:54 +0000
committerrandomdan <randomdan@localhost>2012-03-20 20:02:54 +0000
commit2f88888c5fe5d18d07eb39dd60a573f5eedee683 (patch)
tree630f12359e8b442942d123017d6177c6a09b0c7b /project2/processes/processStream.cpp
parentA stream interface, an RDBMS bulk load interface, a decompression layer, an i... (diff)
downloadproject2-2f88888c5fe5d18d07eb39dd60a573f5eedee683.tar.bz2
project2-2f88888c5fe5d18d07eb39dd60a573f5eedee683.tar.xz
project2-2f88888c5fe5d18d07eb39dd60a573f5eedee683.zip
Switch to the new stream style of things
Diffstat (limited to 'project2/processes/processStream.cpp')
-rw-r--r--project2/processes/processStream.cpp58
1 files changed, 58 insertions, 0 deletions
diff --git a/project2/processes/processStream.cpp b/project2/processes/processStream.cpp
new file mode 100644
index 0000000..13c2555
--- /dev/null
+++ b/project2/processes/processStream.cpp
@@ -0,0 +1,58 @@
+#include "iHaveParameters.h"
+#include "scriptLoader.h"
+#include "scripts.h"
+#include "stream.h"
+#include <exception>
+#include <sys/wait.h>
+#include <misc.h>
+#include <boost/foreach.hpp>
+
+SimpleMessageException(SubProcessFailedToStart);
+SimpleMessageException(SubProcessFailed);
+
+/// Project2 component to create a row set from the output of a locally executed program
+class ProcessStream : public Stream, IHaveParameters {
+ public:
+ ProcessStream(ScriptNodePtr p) :
+ Stream(p),
+ IHaveParameters(p),
+ path(p, "command")
+ {
+ }
+
+ void runStream(const Sink & sink) const
+ {
+ const char * callProc[parameters.size() + 2];
+ callProc[0] = path();
+ int pidx = 1;
+ BOOST_FOREACH(const Parameters::value_type & p, parameters) {
+ callProc[pidx++] = p.second();
+ }
+ callProc[pidx] = NULL;
+ popenrw(callProc, fds);
+
+ char buf[BUFSIZ];
+ while (ssize_t r = read(fds[1], buf, BUFSIZ) != 0) {
+ if (r < 0) {
+ throw syscall_error(errno);
+ }
+ sink(buf, r);
+ }
+
+ close(fds[0]);
+ close(fds[1]);
+ int status;
+ wait(&status);
+ // ignore any error if the application is still running,
+ // but if there is already an exception being thrown, we don't
+ // want to throw another.
+ if (status != 0 && !std::uncaught_exception()) {
+ throw SubProcessFailed(strerror(status));
+ }
+ }
+ protected:
+ mutable int fds[2];
+ const Variable path;
+};
+
+DECLARE_LOADER("processstream", ProcessStream);