summaryrefslogtreecommitdiff
path: root/libmysqlpp
diff options
context:
space:
mode:
authorrandomdan <randomdan@localhost>2012-12-06 21:58:40 +0000
committerrandomdan <randomdan@localhost>2012-12-06 21:58:40 +0000
commit886e30e3943a9a2f2755f2ec79772ac333f94a8b (patch)
treed5ecaf3f1c25c9f717d8c4d0f113350fc8441272 /libmysqlpp
parentFix error checking on mysql_ping causing db connection reuse failure; always ... (diff)
downloadlibdbpp-mysql-886e30e3943a9a2f2755f2ec79772ac333f94a8b.tar.bz2
libdbpp-mysql-886e30e3943a9a2f2755f2ec79772ac333f94a8b.tar.xz
libdbpp-mysql-886e30e3943a9a2f2755f2ec79772ac333f94a8b.zip
Implement a basic MySQL bulk loader
Diffstat (limited to 'libmysqlpp')
-rw-r--r--libmysqlpp/connection.cpp92
-rw-r--r--libmysqlpp/connection.h4
2 files changed, 92 insertions, 4 deletions
diff --git a/libmysqlpp/connection.cpp b/libmysqlpp/connection.cpp
index 65e7303..ab0e2e2 100644
--- a/libmysqlpp/connection.cpp
+++ b/libmysqlpp/connection.cpp
@@ -3,6 +3,7 @@
#include "selectcommand.h"
#include "modifycommand.h"
#include "reflection.h"
+#include <ucontext.h>
class Opts {
public:
@@ -135,23 +136,106 @@ MySQL::Connection::newModifyCommand(const std::string & sql) const
return new ModifyCommand(this, sql);
}
+namespace MySQL {
+ class LoadContext {
+ public:
+ LoadContext(MYSQL * c, const char * table, const char * extra) :
+ loadBuf(NULL),
+ loadBufLen(0),
+ bufOff(0),
+ conn(c)
+ {
+ static char buf[BUFSIZ];
+ int len = snprintf(buf, BUFSIZ, "LOAD DATA LOCAL INFILE 'any' INTO TABLE %s %s", table, extra);
+ mysql_send_query(conn, buf, len);
+ }
+
+ static int local_infile_init(void ** ptr, const char *, void * ctx) {
+ *ptr = ctx;
+ return 0;
+ }
+
+ static int local_infile_read(void * obj, char * buf, unsigned int bufSize) {
+ LoadContext * ctx = static_cast<LoadContext *>(obj);
+ if (ctx->loadBufLen - ctx->bufOff == 0) {
+ swapcontext(&ctx->jmpMySQL, &ctx->jmpP2);
+ if (ctx->loadBufLen - ctx->bufOff <= 0) {
+ // Nothing to do or error
+ return ctx->bufOff;
+ }
+ }
+ unsigned int copy = std::min(ctx->loadBufLen - ctx->bufOff, bufSize);
+ memcpy(buf, ctx->loadBuf + ctx->bufOff, copy);
+ ctx->bufOff += copy;
+ return copy;
+ }
+
+ static void local_infile_end(void *) {
+ }
+
+ static int local_infile_error(void *, char*, unsigned int) {
+ return 0;
+ }
+
+ static void loadLocalData(LoadContext * ctx)
+ {
+ ctx->loadReturn = mysql_read_query_result(ctx->conn);
+ }
+
+ char stack[16384];
+ ucontext_t jmpP2;
+ ucontext_t jmpMySQL;
+ const char * loadBuf;
+ unsigned int loadBufLen;
+ int bufOff;
+ MYSQL * conn;
+ int loadReturn;
+ };
+}
+
void
MySQL::Connection::beginBulkUpload(const char * table, const char * extra) const
{
- (void)table;
- (void)extra;
+ ctx = boost::shared_ptr<LoadContext>(new MySQL::LoadContext(&conn, table, extra));
+ getcontext(&ctx->jmpMySQL);
+ ctx->jmpMySQL.uc_stack.ss_sp = ctx->stack;
+ ctx->jmpMySQL.uc_stack.ss_size = sizeof(ctx->stack);
+ ctx->jmpMySQL.uc_link = &ctx->jmpP2;
+ makecontext(&ctx->jmpMySQL, (void (*)())&LoadContext::loadLocalData, 1, ctx.get());
+
+ mysql_set_local_infile_handler(&conn, LoadContext::local_infile_init, LoadContext::local_infile_read,
+ LoadContext::local_infile_end, LoadContext::local_infile_error, ctx.get());
+
+ // begin the load, context swaps back when buffer is empty
+ swapcontext(&ctx->jmpP2, &ctx->jmpMySQL);
}
void
MySQL::Connection::endBulkUpload(const char * msg) const
{
- (void)msg;
+ ctx->loadBuf = NULL;
+ ctx->loadBufLen = 0;
+ ctx->bufOff = msg ? -1 : 0;
+ // switch context with empty buffer fires finished
+ swapcontext(&ctx->jmpP2, &ctx->jmpMySQL);
+ // Check result
+ if (!msg) {
+ if (ctx->loadReturn) {
+ ctx.reset();
+ throw Error(mysql_error(&conn));
+ }
+ }
+ ctx.reset();
}
size_t
MySQL::Connection::bulkUploadData(const char * data, size_t len) const
{
- (void)data;
+ ctx->loadBuf = data;
+ ctx->loadBufLen = len;
+ ctx->bufOff = 0;
+ // switch context to load the buffered data
+ swapcontext(&ctx->jmpP2, &ctx->jmpMySQL);
return len;
}
diff --git a/libmysqlpp/connection.h b/libmysqlpp/connection.h
index 0524009..5b6fa79 100644
--- a/libmysqlpp/connection.h
+++ b/libmysqlpp/connection.h
@@ -4,8 +4,10 @@
#include "../libdbpp/connection.h"
#include "error.h"
#include <mysql.h>
+#include <boost/shared_ptr.hpp>
namespace MySQL {
+ class LoadContext;
class Connection : public DB::Connection {
public:
Connection(const std::string & info);
@@ -34,6 +36,8 @@ namespace MySQL {
mutable unsigned int txDepth;
mutable bool rolledback;
+
+ mutable boost::shared_ptr<LoadContext> ctx;
};
}