diff options
author | randomdan <randomdan@localhost> | 2012-12-06 21:58:40 +0000 |
---|---|---|
committer | randomdan <randomdan@localhost> | 2012-12-06 21:58:40 +0000 |
commit | 886e30e3943a9a2f2755f2ec79772ac333f94a8b (patch) | |
tree | d5ecaf3f1c25c9f717d8c4d0f113350fc8441272 /libmysqlpp | |
parent | Fix error checking on mysql_ping causing db connection reuse failure; always ... (diff) | |
download | libdbpp-mysql-886e30e3943a9a2f2755f2ec79772ac333f94a8b.tar.bz2 libdbpp-mysql-886e30e3943a9a2f2755f2ec79772ac333f94a8b.tar.xz libdbpp-mysql-886e30e3943a9a2f2755f2ec79772ac333f94a8b.zip |
Implement a basic MySQL bulk loader
Diffstat (limited to 'libmysqlpp')
-rw-r--r-- | libmysqlpp/connection.cpp | 92 | ||||
-rw-r--r-- | libmysqlpp/connection.h | 4 |
2 files changed, 92 insertions, 4 deletions
diff --git a/libmysqlpp/connection.cpp b/libmysqlpp/connection.cpp index 65e7303..ab0e2e2 100644 --- a/libmysqlpp/connection.cpp +++ b/libmysqlpp/connection.cpp @@ -3,6 +3,7 @@ #include "selectcommand.h" #include "modifycommand.h" #include "reflection.h" +#include <ucontext.h> class Opts { public: @@ -135,23 +136,106 @@ MySQL::Connection::newModifyCommand(const std::string & sql) const return new ModifyCommand(this, sql); } +namespace MySQL { + class LoadContext { + public: + LoadContext(MYSQL * c, const char * table, const char * extra) : + loadBuf(NULL), + loadBufLen(0), + bufOff(0), + conn(c) + { + static char buf[BUFSIZ]; + int len = snprintf(buf, BUFSIZ, "LOAD DATA LOCAL INFILE 'any' INTO TABLE %s %s", table, extra); + mysql_send_query(conn, buf, len); + } + + static int local_infile_init(void ** ptr, const char *, void * ctx) { + *ptr = ctx; + return 0; + } + + static int local_infile_read(void * obj, char * buf, unsigned int bufSize) { + LoadContext * ctx = static_cast<LoadContext *>(obj); + if (ctx->loadBufLen - ctx->bufOff == 0) { + swapcontext(&ctx->jmpMySQL, &ctx->jmpP2); + if (ctx->loadBufLen - ctx->bufOff <= 0) { + // Nothing to do or error + return ctx->bufOff; + } + } + unsigned int copy = std::min(ctx->loadBufLen - ctx->bufOff, bufSize); + memcpy(buf, ctx->loadBuf + ctx->bufOff, copy); + ctx->bufOff += copy; + return copy; + } + + static void local_infile_end(void *) { + } + + static int local_infile_error(void *, char*, unsigned int) { + return 0; + } + + static void loadLocalData(LoadContext * ctx) + { + ctx->loadReturn = mysql_read_query_result(ctx->conn); + } + + char stack[16384]; + ucontext_t jmpP2; + ucontext_t jmpMySQL; + const char * loadBuf; + unsigned int loadBufLen; + int bufOff; + MYSQL * conn; + int loadReturn; + }; +} + void MySQL::Connection::beginBulkUpload(const char * table, const char * extra) const { - (void)table; - (void)extra; + ctx = boost::shared_ptr<LoadContext>(new MySQL::LoadContext(&conn, table, extra)); + getcontext(&ctx->jmpMySQL); + ctx->jmpMySQL.uc_stack.ss_sp = ctx->stack; + ctx->jmpMySQL.uc_stack.ss_size = sizeof(ctx->stack); + ctx->jmpMySQL.uc_link = &ctx->jmpP2; + makecontext(&ctx->jmpMySQL, (void (*)())&LoadContext::loadLocalData, 1, ctx.get()); + + mysql_set_local_infile_handler(&conn, LoadContext::local_infile_init, LoadContext::local_infile_read, + LoadContext::local_infile_end, LoadContext::local_infile_error, ctx.get()); + + // begin the load, context swaps back when buffer is empty + swapcontext(&ctx->jmpP2, &ctx->jmpMySQL); } void MySQL::Connection::endBulkUpload(const char * msg) const { - (void)msg; + ctx->loadBuf = NULL; + ctx->loadBufLen = 0; + ctx->bufOff = msg ? -1 : 0; + // switch context with empty buffer fires finished + swapcontext(&ctx->jmpP2, &ctx->jmpMySQL); + // Check result + if (!msg) { + if (ctx->loadReturn) { + ctx.reset(); + throw Error(mysql_error(&conn)); + } + } + ctx.reset(); } size_t MySQL::Connection::bulkUploadData(const char * data, size_t len) const { - (void)data; + ctx->loadBuf = data; + ctx->loadBufLen = len; + ctx->bufOff = 0; + // switch context to load the buffered data + swapcontext(&ctx->jmpP2, &ctx->jmpMySQL); return len; } diff --git a/libmysqlpp/connection.h b/libmysqlpp/connection.h index 0524009..5b6fa79 100644 --- a/libmysqlpp/connection.h +++ b/libmysqlpp/connection.h @@ -4,8 +4,10 @@ #include "../libdbpp/connection.h" #include "error.h" #include <mysql.h> +#include <boost/shared_ptr.hpp> namespace MySQL { + class LoadContext; class Connection : public DB::Connection { public: Connection(const std::string & info); @@ -34,6 +36,8 @@ namespace MySQL { mutable unsigned int txDepth; mutable bool rolledback; + + mutable boost::shared_ptr<LoadContext> ctx; }; } |