diff options
Diffstat (limited to 'cpp/src/IceStorm/IceStormDB.cpp')
-rw-r--r-- | cpp/src/IceStorm/IceStormDB.cpp | 125 |
1 files changed, 83 insertions, 42 deletions
diff --git a/cpp/src/IceStorm/IceStormDB.cpp b/cpp/src/IceStorm/IceStormDB.cpp index 0777c842272..d6d29630fa5 100644 --- a/cpp/src/IceStorm/IceStormDB.cpp +++ b/cpp/src/IceStorm/IceStormDB.cpp @@ -7,14 +7,12 @@ // // ********************************************************************** -#include <IceUtil/DisableWarnings.h> #include <IceUtil/Options.h> -#include <IceUtil/FileUtil.h> #include <Ice/Application.h> -#include <Freeze/Freeze.h> -#include <DBTypes.h> -#include <LLUMap.h> -#include <SubscriberMap.h> +#include <IceDB/IceDB.h> +#include <IceStorm/DBTypes.h> +#include <IcePatch2Lib/Util.h> +#include <IceUtil/DisableWarnings.h> using namespace std; using namespace Ice; @@ -55,7 +53,9 @@ Client::usage() "-v, --version Display version.\n" "--import FILE Import database from FILE.\n" "--export FILE Export database to FILE.\n" - "--dbhome DIR The database directory.\n" + "--dbhome DIR Source or target database environment.\n" + "--dbpath DIR Source or target database environment.\n" + "--mapsize VALUE Set LMDB map size in MB (optional, import only).\n" "-d, --debug Print debug messages.\n" ; } @@ -70,6 +70,8 @@ Client::run(int argc, char* argv[]) opts.addOpt("", "import", IceUtilInternal::Options::NeedArg); opts.addOpt("", "export", IceUtilInternal::Options::NeedArg); opts.addOpt("", "dbhome", IceUtilInternal::Options::NeedArg); + opts.addOpt("", "dbpath", IceUtilInternal::Options::NeedArg); + opts.addOpt("", "mapsize", IceUtilInternal::Options::NeedArg); vector<string> args; try @@ -101,16 +103,16 @@ Client::run(int argc, char* argv[]) return EXIT_SUCCESS; } - if((!opts.isSet("import") && !opts.isSet("export")) || (opts.isSet("import") && opts.isSet("export"))) + if(!(opts.isSet("import") ^ opts.isSet("export"))) { cerr << argv[0] << ": either --import or --export must be set" << endl; usage(); return EXIT_FAILURE; } - if(!opts.isSet("dbhome")) + if(!(opts.isSet("dbhome") ^ opts.isSet("dbpath"))) { - cerr << argv[0] << ": database path must be specified" << endl; + cerr << argv[0] << ": set the database environment directory with either --dbhome or --dbpath" << endl; usage(); return EXIT_FAILURE; } @@ -118,17 +120,27 @@ Client::run(int argc, char* argv[]) bool debug = opts.isSet("debug"); bool import = opts.isSet("import"); string dbFile = opts.optArg(import ? "import" : "export"); - string dbPath = opts.optArg("dbhome"); + string dbPath; + if(opts.isSet("dbhome")) + { + dbPath = opts.optArg("dbhome"); + } + else + { + dbPath = opts.optArg("dbpath"); + } + + string mapSizeStr = opts.optArg("mapsize"); + size_t mapSize = IceDB::getMapSize(atoi(mapSizeStr.c_str())); try { IceStorm::AllData data; - EncodingVersion encoding; - encoding.major = 1; - encoding.minor = 1; - - communicator()->getProperties()->setProperty("Freeze.DbEnv.IceStorm.DbHome", dbPath); + IceDB::IceContext dbContext; + dbContext.communicator = communicator(); + dbContext.encoding.major = 1; + dbContext.encoding.minor = 1; if(import) { @@ -156,6 +168,14 @@ Client::run(int argc, char* argv[]) fs.seekg(0, ios::end); streampos fileSize = fs.tellg(); + + if(!fileSize) + { + fs.close(); + cerr << argv[0] << ": empty input file" << endl; + return EXIT_FAILURE; + } + fs.seekg(0, ios::beg); vector<Ice::Byte> buf; @@ -167,33 +187,35 @@ Client::run(int argc, char* argv[]) string type; int version; - Ice::InputStreamPtr stream = Ice::wrapInputStream(communicator(), buf, encoding); - stream->read(type); + Ice::InputStream stream(communicator(), dbContext.encoding, buf); + stream.read(type); if(type != "IceStorm") { cerr << argv[0] << ": incorrect input file type: " << type << endl; return EXIT_FAILURE; } - stream->read(version); - stream->read(data); + stream.read(version); + stream.read(data); { - Freeze::ConnectionPtr connection = Freeze::createConnection(communicator(), "IceStorm"); - Freeze::TransactionHolder txn(connection); + IceDB::Env env(dbPath, 2, mapSize); + IceDB::ReadWriteTxn txn(env); if(debug) { cout << "Writing LLU Map:" << endl; } - IceStorm::LLUMap llumap(connection, "llu"); + IceDB::Dbi<string, LogUpdate, IceDB::IceContext, Ice::OutputStream> + lluMap(txn, "llu", dbContext, MDB_CREATE); + for(StringLogUpdateDict::const_iterator p = data.llus.begin(); p != data.llus.end(); ++p) { if(debug) { cout << " KEY = " << p->first << endl; } - llumap.put(*p); + lluMap.put(txn, p->first, p->second); } if(debug) @@ -201,7 +223,9 @@ Client::run(int argc, char* argv[]) cout << "Writing Subscriber Map:" << endl; } - IceStorm::SubscriberMap subscribers(connection, "subscribers"); + IceDB::Dbi<SubscriberRecordKey, SubscriberRecord, IceDB::IceContext, Ice::OutputStream> + subscriberMap(txn, "subscribers", dbContext, MDB_CREATE); + for(SubscriberRecordDict::const_iterator q = data.subscribers.begin(); q != data.subscribers.end(); ++q) { if(debug) @@ -209,10 +233,11 @@ Client::run(int argc, char* argv[]) cout << " KEY = TOPIC(" << communicator()->identityToString(q->first.topic) << ") ID(" << communicator()->identityToString(q->first.id) << ")" <<endl; } - subscribers.put(*q); + subscriberMap.put(txn, q->first, q->second); } txn.commit(); + env.close(); } } else @@ -220,45 +245,61 @@ Client::run(int argc, char* argv[]) cout << "Exporting database from directory " << dbPath << " to file " << dbFile << endl; { - Freeze::ConnectionPtr connection = Freeze::createConnection(communicator(), "IceStorm"); + IceDB::Env env(dbPath, 2); + IceDB::ReadOnlyTxn txn(env); if(debug) { cout << "Reading LLU Map:" << endl; } - IceStorm::LLUMap llumap(connection, "llu", false); - for(IceStorm::LLUMap::const_iterator p = llumap.begin(); p != llumap.end(); ++p) + IceDB::Dbi<string, LogUpdate, IceDB::IceContext, Ice::OutputStream> + lluMap(txn, "llu", dbContext, 0); + + string s; + LogUpdate llu; + IceDB::ReadOnlyCursor<string, LogUpdate, IceDB::IceContext, Ice::OutputStream> lluCursor(lluMap, txn); + while(lluCursor.get(s, llu, MDB_NEXT)) { if(debug) { - cout << " KEY = " << p->first << endl; + cout << " KEY = " << s << endl; } - data.llus.insert(*p); + data.llus.insert(std::make_pair(s, llu)); } + lluCursor.close(); if(debug) { cout << "Reading Subscriber Map:" << endl; } - IceStorm::SubscriberMap subscribers(connection, "subscribers", false); - for(IceStorm::SubscriberMap::const_iterator q = subscribers.begin(); q != subscribers.end(); ++q) + IceDB::Dbi<SubscriberRecordKey, SubscriberRecord, IceDB::IceContext, Ice::OutputStream> + subscriberMap(txn, "subscribers", dbContext, 0); + + SubscriberRecordKey key; + SubscriberRecord record; + IceDB::ReadOnlyCursor<SubscriberRecordKey, SubscriberRecord, IceDB::IceContext, Ice::OutputStream> + subCursor(subscriberMap, txn); + while(subCursor.get(key, record, MDB_NEXT)) { if(debug) { - cout << " KEY = TOPIC(" << communicator()->identityToString(q->first.topic) - << ") ID(" << communicator()->identityToString(q->first.id) << ")" <<endl; + cout << " KEY = TOPIC(" << communicator()->identityToString(key.topic) + << ") ID(" << communicator()->identityToString(key.id) << ")" <<endl; } - data.subscribers.insert(*q); + data.subscribers.insert(std::make_pair(key, record)); } + subCursor.close(); + + txn.rollback(); + env.close(); } - Ice::OutputStreamPtr stream = Ice::createOutputStream(communicator(), encoding); - stream->write("IceStorm"); - stream->write(ICE_INT_VERSION); - stream->write(data); - pair<const Ice::Byte*, const Ice::Byte*> buf = stream->finished(); + Ice::OutputStream stream(communicator(), dbContext.encoding); + stream.write("IceStorm"); + stream.write(ICE_INT_VERSION); + stream.write(data); ofstream fs(dbFile.c_str(), ios::binary); if(fs.fail()) @@ -266,7 +307,7 @@ Client::run(int argc, char* argv[]) cerr << argv[0] << ": could not open output file: " << strerror(errno) << endl; return EXIT_FAILURE; } - fs.write(reinterpret_cast<const char*>(buf.first), buf.second - buf.first); + fs.write(reinterpret_cast<const char*>(stream.b.begin()), stream.b.size()); fs.close(); } } |