summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/IceStormDB.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/IceStormDB.cpp')
-rw-r--r--cpp/src/IceStorm/IceStormDB.cpp125
1 files changed, 83 insertions, 42 deletions
diff --git a/cpp/src/IceStorm/IceStormDB.cpp b/cpp/src/IceStorm/IceStormDB.cpp
index 0777c842272..d6d29630fa5 100644
--- a/cpp/src/IceStorm/IceStormDB.cpp
+++ b/cpp/src/IceStorm/IceStormDB.cpp
@@ -7,14 +7,12 @@
//
// **********************************************************************
-#include <IceUtil/DisableWarnings.h>
#include <IceUtil/Options.h>
-#include <IceUtil/FileUtil.h>
#include <Ice/Application.h>
-#include <Freeze/Freeze.h>
-#include <DBTypes.h>
-#include <LLUMap.h>
-#include <SubscriberMap.h>
+#include <IceDB/IceDB.h>
+#include <IceStorm/DBTypes.h>
+#include <IcePatch2Lib/Util.h>
+#include <IceUtil/DisableWarnings.h>
using namespace std;
using namespace Ice;
@@ -55,7 +53,9 @@ Client::usage()
"-v, --version Display version.\n"
"--import FILE Import database from FILE.\n"
"--export FILE Export database to FILE.\n"
- "--dbhome DIR The database directory.\n"
+ "--dbhome DIR Source or target database environment.\n"
+ "--dbpath DIR Source or target database environment.\n"
+ "--mapsize VALUE Set LMDB map size in MB (optional, import only).\n"
"-d, --debug Print debug messages.\n"
;
}
@@ -70,6 +70,8 @@ Client::run(int argc, char* argv[])
opts.addOpt("", "import", IceUtilInternal::Options::NeedArg);
opts.addOpt("", "export", IceUtilInternal::Options::NeedArg);
opts.addOpt("", "dbhome", IceUtilInternal::Options::NeedArg);
+ opts.addOpt("", "dbpath", IceUtilInternal::Options::NeedArg);
+ opts.addOpt("", "mapsize", IceUtilInternal::Options::NeedArg);
vector<string> args;
try
@@ -101,16 +103,16 @@ Client::run(int argc, char* argv[])
return EXIT_SUCCESS;
}
- if((!opts.isSet("import") && !opts.isSet("export")) || (opts.isSet("import") && opts.isSet("export")))
+ if(!(opts.isSet("import") ^ opts.isSet("export")))
{
cerr << argv[0] << ": either --import or --export must be set" << endl;
usage();
return EXIT_FAILURE;
}
- if(!opts.isSet("dbhome"))
+ if(!(opts.isSet("dbhome") ^ opts.isSet("dbpath")))
{
- cerr << argv[0] << ": database path must be specified" << endl;
+ cerr << argv[0] << ": set the database environment directory with either --dbhome or --dbpath" << endl;
usage();
return EXIT_FAILURE;
}
@@ -118,17 +120,27 @@ Client::run(int argc, char* argv[])
bool debug = opts.isSet("debug");
bool import = opts.isSet("import");
string dbFile = opts.optArg(import ? "import" : "export");
- string dbPath = opts.optArg("dbhome");
+ string dbPath;
+ if(opts.isSet("dbhome"))
+ {
+ dbPath = opts.optArg("dbhome");
+ }
+ else
+ {
+ dbPath = opts.optArg("dbpath");
+ }
+
+ string mapSizeStr = opts.optArg("mapsize");
+ size_t mapSize = IceDB::getMapSize(atoi(mapSizeStr.c_str()));
try
{
IceStorm::AllData data;
- EncodingVersion encoding;
- encoding.major = 1;
- encoding.minor = 1;
-
- communicator()->getProperties()->setProperty("Freeze.DbEnv.IceStorm.DbHome", dbPath);
+ IceDB::IceContext dbContext;
+ dbContext.communicator = communicator();
+ dbContext.encoding.major = 1;
+ dbContext.encoding.minor = 1;
if(import)
{
@@ -156,6 +168,14 @@ Client::run(int argc, char* argv[])
fs.seekg(0, ios::end);
streampos fileSize = fs.tellg();
+
+ if(!fileSize)
+ {
+ fs.close();
+ cerr << argv[0] << ": empty input file" << endl;
+ return EXIT_FAILURE;
+ }
+
fs.seekg(0, ios::beg);
vector<Ice::Byte> buf;
@@ -167,33 +187,35 @@ Client::run(int argc, char* argv[])
string type;
int version;
- Ice::InputStreamPtr stream = Ice::wrapInputStream(communicator(), buf, encoding);
- stream->read(type);
+ Ice::InputStream stream(communicator(), dbContext.encoding, buf);
+ stream.read(type);
if(type != "IceStorm")
{
cerr << argv[0] << ": incorrect input file type: " << type << endl;
return EXIT_FAILURE;
}
- stream->read(version);
- stream->read(data);
+ stream.read(version);
+ stream.read(data);
{
- Freeze::ConnectionPtr connection = Freeze::createConnection(communicator(), "IceStorm");
- Freeze::TransactionHolder txn(connection);
+ IceDB::Env env(dbPath, 2, mapSize);
+ IceDB::ReadWriteTxn txn(env);
if(debug)
{
cout << "Writing LLU Map:" << endl;
}
- IceStorm::LLUMap llumap(connection, "llu");
+ IceDB::Dbi<string, LogUpdate, IceDB::IceContext, Ice::OutputStream>
+ lluMap(txn, "llu", dbContext, MDB_CREATE);
+
for(StringLogUpdateDict::const_iterator p = data.llus.begin(); p != data.llus.end(); ++p)
{
if(debug)
{
cout << " KEY = " << p->first << endl;
}
- llumap.put(*p);
+ lluMap.put(txn, p->first, p->second);
}
if(debug)
@@ -201,7 +223,9 @@ Client::run(int argc, char* argv[])
cout << "Writing Subscriber Map:" << endl;
}
- IceStorm::SubscriberMap subscribers(connection, "subscribers");
+ IceDB::Dbi<SubscriberRecordKey, SubscriberRecord, IceDB::IceContext, Ice::OutputStream>
+ subscriberMap(txn, "subscribers", dbContext, MDB_CREATE);
+
for(SubscriberRecordDict::const_iterator q = data.subscribers.begin(); q != data.subscribers.end(); ++q)
{
if(debug)
@@ -209,10 +233,11 @@ Client::run(int argc, char* argv[])
cout << " KEY = TOPIC(" << communicator()->identityToString(q->first.topic)
<< ") ID(" << communicator()->identityToString(q->first.id) << ")" <<endl;
}
- subscribers.put(*q);
+ subscriberMap.put(txn, q->first, q->second);
}
txn.commit();
+ env.close();
}
}
else
@@ -220,45 +245,61 @@ Client::run(int argc, char* argv[])
cout << "Exporting database from directory " << dbPath << " to file " << dbFile << endl;
{
- Freeze::ConnectionPtr connection = Freeze::createConnection(communicator(), "IceStorm");
+ IceDB::Env env(dbPath, 2);
+ IceDB::ReadOnlyTxn txn(env);
if(debug)
{
cout << "Reading LLU Map:" << endl;
}
- IceStorm::LLUMap llumap(connection, "llu", false);
- for(IceStorm::LLUMap::const_iterator p = llumap.begin(); p != llumap.end(); ++p)
+ IceDB::Dbi<string, LogUpdate, IceDB::IceContext, Ice::OutputStream>
+ lluMap(txn, "llu", dbContext, 0);
+
+ string s;
+ LogUpdate llu;
+ IceDB::ReadOnlyCursor<string, LogUpdate, IceDB::IceContext, Ice::OutputStream> lluCursor(lluMap, txn);
+ while(lluCursor.get(s, llu, MDB_NEXT))
{
if(debug)
{
- cout << " KEY = " << p->first << endl;
+ cout << " KEY = " << s << endl;
}
- data.llus.insert(*p);
+ data.llus.insert(std::make_pair(s, llu));
}
+ lluCursor.close();
if(debug)
{
cout << "Reading Subscriber Map:" << endl;
}
- IceStorm::SubscriberMap subscribers(connection, "subscribers", false);
- for(IceStorm::SubscriberMap::const_iterator q = subscribers.begin(); q != subscribers.end(); ++q)
+ IceDB::Dbi<SubscriberRecordKey, SubscriberRecord, IceDB::IceContext, Ice::OutputStream>
+ subscriberMap(txn, "subscribers", dbContext, 0);
+
+ SubscriberRecordKey key;
+ SubscriberRecord record;
+ IceDB::ReadOnlyCursor<SubscriberRecordKey, SubscriberRecord, IceDB::IceContext, Ice::OutputStream>
+ subCursor(subscriberMap, txn);
+ while(subCursor.get(key, record, MDB_NEXT))
{
if(debug)
{
- cout << " KEY = TOPIC(" << communicator()->identityToString(q->first.topic)
- << ") ID(" << communicator()->identityToString(q->first.id) << ")" <<endl;
+ cout << " KEY = TOPIC(" << communicator()->identityToString(key.topic)
+ << ") ID(" << communicator()->identityToString(key.id) << ")" <<endl;
}
- data.subscribers.insert(*q);
+ data.subscribers.insert(std::make_pair(key, record));
}
+ subCursor.close();
+
+ txn.rollback();
+ env.close();
}
- Ice::OutputStreamPtr stream = Ice::createOutputStream(communicator(), encoding);
- stream->write("IceStorm");
- stream->write(ICE_INT_VERSION);
- stream->write(data);
- pair<const Ice::Byte*, const Ice::Byte*> buf = stream->finished();
+ Ice::OutputStream stream(communicator(), dbContext.encoding);
+ stream.write("IceStorm");
+ stream.write(ICE_INT_VERSION);
+ stream.write(data);
ofstream fs(dbFile.c_str(), ios::binary);
if(fs.fail())
@@ -266,7 +307,7 @@ Client::run(int argc, char* argv[])
cerr << argv[0] << ": could not open output file: " << strerror(errno) << endl;
return EXIT_FAILURE;
}
- fs.write(reinterpret_cast<const char*>(buf.first), buf.second - buf.first);
+ fs.write(reinterpret_cast<const char*>(stream.b.begin()), stream.b.size());
fs.close();
}
}