summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/slice/Freeze/DB.ice276
-rw-r--r--cpp/slice/Freeze/DBF.ice4
-rw-r--r--cpp/slice/Freeze/Evictor.ice20
-rw-r--r--cpp/src/Freeze/DBI.cpp669
-rw-r--r--cpp/src/Freeze/DBI.h54
5 files changed, 647 insertions, 376 deletions
diff --git a/cpp/slice/Freeze/DB.ice b/cpp/slice/Freeze/DB.ice
index 65ffcab9225..2b37b9fc6d4 100644
--- a/cpp/slice/Freeze/DB.ice
+++ b/cpp/slice/Freeze/DB.ice
@@ -11,6 +11,7 @@
#ifndef FREEZE_DB_ICE
#define FREEZE_DB_ICE
+#include <Freeze/DBF.ice>
#include <Freeze/EvictorF.ice>
/**
@@ -21,12 +22,9 @@
module Freeze
{
-local interface DB;
-local interface DBEnv;
-
/**
*
- * A database exception.
+ * A Freeze database exception.
*
**/
struct DBException
@@ -41,119 +39,291 @@ struct DBException
/**
*
- * An object representing a database.
+ * A database environment. Multiple databases can be created from a
+ * single database environment. The database environment also offers
+ * operations to create transactions.
*
* @see DBException
- * @see DBEnv
+ * @see DBTransaction
+ * @see DB
+ * @see DBForServants
+ *
+ **/
+local interface DBEnvironment
+{
+ /**
+ *
+ * Get the name of the database environment.
+ *
+ * @return The name of this database environment.
+ *
+ **/
+ string getName();
+
+ /**
+ *
+ * Open and return a basic key/value database object. If the
+ * database has been opened before, the previously returned
+ * database object is returned again.
+ *
+ * @param name The database name.
+ *
+ * @return The database object.
+ *
+ * @see DB
+ * @see DB::close
+ *
+ **/
+ DB openDB(string name) throws DBException;
+
+ /**
+ *
+ * Open and return a databae for identity/Servant pairs. If the
+ * database has been opened before, the previously returned
+ * database object is returned again.
+ *
+ * @param name The database name.
+ *
+ * @return The database object.
+ *
+ * @see DBForServant
+ * @see DBForServant::close
+ *
+ **/
+ DB openDBForServant(string name) throws DBException;
+
+ /**
+ *
+ * Start a new transaction in this database environment, and
+ * return the transaction object for such new transaction.
+ *
+ * @return The transaction object.
+ *
+ * @see DBTransaction
+ *
+ **/
+ DBTransaction startDBTransaction() throws DBException;
+
+ /**
+ *
+ * Close the database environment and destroy this database
+ * environment object. This operation also calls
+ * <literal>close</literal> on all databases that have been opened
+ * with this database environment object. Subsequent calls to
+ * <literal>close</literal> have no effect.
+ *
+ * @see DB::close
+ *
+ **/
+ void close() throws DBException;
+};
+
+/**
+ *
+ * A transaction object.
+ *
+ **/
+local interface DBTransaction
+{
+ /**
+ *
+ * Commit a transaction.
+ *
+ **/
+ void commit() throws DBException;
+
+ /**
+ *
+ * Abort a transaction.
+ *
+ **/
+ void abort() throws DBException;
+};
+
+/**
+ *
+ * A database key, represented as a sequence of bytes.
+ *
+ **/
+sequence<byte> Key;
+
+/**
+ *
+ * A database value, represented as a sequence of bytes
+ *
+ **/
+sequence<byte> Value;
+
+/**
+ *
+ * A basic database, which can store key/value pairs.
+ *
+ * @see DBException
+ * @see DBEnvironment
*
**/
local interface DB
{
/**
*
- * Put a Servant into the database, using a string as
- * key. Typically, applications will use the identity of the Ice
- * Object implemented by the Servant as key.
+ * Get the name of the database.
+ *
+ * @return The name of this database.
*
- * @param key The key under which the servant will be stored in
+ **/
+ string getName();
+
+ /**
+ *
+ * Save a value in the database under a given key.
+ *
+ * @param key The key under which the value will be stored in
* the database.
*
- * @param servant The servant to store. If the servant is null,
- * this operation does nothing.
+ * @param servant The value to store.
+ *
+ * @param txn If true, do a transaction-protected write to the
+ * database.
*
* @see get
* @see del
+ * @see DBTransaction
*
**/
- void put(string key, Object servant);
+ void put(Key key, Value value, bool txn) throws DBException;
/**
*
- * Get a Servant from the database by it's key.
+ * Get a value from a database by it's key.
*
- * @param key The key under which the servant is stored in the database.
+ * @param key The key under which the value is stored in the database.
*
- * @return The Servant from the database, or null if the key does
- * not exist.
+ * @return The value from the database, or an empty value if the
+ * key does not exist.
*
* @see put
* @see del
*
**/
- Object get(string key);
+ Value get(Key key) throws DBException;
/**
*
- * Remove a key and the corresponding Servant from the
- * database. If the key does not exist, this operation will do
- * nothing.
+ * Remove a key and the corresponding value from the database. If
+ * the key does not exist, this operation will do nothing.
*
- * @param key The key to remove.
+ * @param key The key to remove together with the corresponding
+ * value.
*
* @see put
* @see get
*
**/
- void del(string key);
+ void del(Key key) throws DBException;
/**
*
* Close the database and destroy this database object. Subsequent
* calls to <literal>close</literal> have no effect.
*
- * @see DBEnv::open
- * @see DBEnv::close
+ * @see DBEnvironment::openDB
+ * @see DBEnvironment::close
*
**/
- void close();
-
- /**
- *
- * Create a new Evictor that uses this database.
- *
- * @return The new Evictor.
- *
- * @see Evictor
- *
- **/
- Evictor createEvictor();
+ void close() throws DBException;
};
/**
*
- * An object representing a database environment.
+ * A Servant database, which can store identity/Servant pairs.
*
* @see DBException
- * @see DB
+ * @see DBEnvironment
+ * @see Evictor
*
**/
-local interface DBEnv
+local interface DBForServants
{
/**
*
- * Open a database and return a database object for this
- * database. If the database has been opened before, the
- * previously returned database object is returned again.
+ * Get the name of the database.
*
- * @param name The database name.
+ * @return The name of this database.
*
- * @return The database object.
+ **/
+ string getName();
+
+ /**
*
- * @see DB
- * @see DB::close
+ * Put a Servant into the database, using the Ice Object's
+ * identity implemented by the Servant as a key.
+ *
+ * @param identity The identity under which the servant will be
+ * stored in the database.
+ *
+ * @param servant The servant to store. If the servant is null,
+ * this operation does nothing.
+ *
+ * @param txn If true, do a transaction-protected write to the
+ * database.
+ *
+ * @see getServant
+ * @see delServant
+ * @see DBTransaction
*
**/
- DB open(string name) throws DBException;
+ void putServant(string identity, Object servant, bool txn) throws DBException;
/**
*
- * Close the database environment and destroy this database
- * environment object. This operation also calls
- * <literal>close</literal> on all databases that have been opened
- * with this database environment object. Subsequent
+ * Get a Servant from the database by the Ice Object's identity
+ * which the Servant must implement.
+ *
+ * @param identity The identity under which the servant is stored
+ * in the database.
+ *
+ * @return The Servant from the database, or null if the identity
+ * does not exist.
+ *
+ * @see putServant
+ * @see delServant
+ *
+ **/
+ Object getServant(string identity) throws DBException;
+
+ /**
+ *
+ * Remove an identity and the corresponding Servant from the
+ * database. If the identity does not exist, this operation will
+ * do nothing.
+ *
+ * @param identity The identity to remove together with the
+ * corresponding Servant.
+ *
+ * @see putServant
+ * @see getServant
+ *
+ **/
+ void delServant(string identity) throws DBException;
+
+ /**
+ *
+ * Create a new Evictor that uses this database.
+ *
+ * @return The new Evictor.
+ *
+ * @see Evictor
+ *
+ **/
+ Evictor createEvictor();
+
+ /**
+ *
+ * Close the database and destroy this database object. Subsequent
* calls to <literal>close</literal> have no effect.
*
- * @see DB::close
+ * @see DBEnvironment::openDBForServants
+ * @see DBEnvironment::close
*
**/
void close() throws DBException;
diff --git a/cpp/slice/Freeze/DBF.ice b/cpp/slice/Freeze/DBF.ice
index 53544995457..0ede4ae95d9 100644
--- a/cpp/slice/Freeze/DBF.ice
+++ b/cpp/slice/Freeze/DBF.ice
@@ -14,8 +14,10 @@
module Freeze
{
+local interface DBEnvironment;
+local interface DBTransaction;
local interface DB;
-local interface DBEnv;
+local interface DBForServants;
};
diff --git a/cpp/slice/Freeze/Evictor.ice b/cpp/slice/Freeze/Evictor.ice
index b7a8ebf0f6c..ca99af4d0a1 100644
--- a/cpp/slice/Freeze/Evictor.ice
+++ b/cpp/slice/Freeze/Evictor.ice
@@ -12,7 +12,7 @@
#define FREEZE_EVICTOR_ICE
#include <Ice/ObjectAdapter.ice>
-#include <Freeze/DBF.ice>
+#include <Freeze/DB.ice>
module Freeze
{
@@ -85,10 +85,10 @@ enum EvictorPersistenceMode
* evictor pattern. The Evictor is an extended Servant Locator, with
* an implementation in the Freeze module. Instances of this
* implementation can be created with the operation
- * <literal>DB::createEvictor</literal>, and must be registered with
+ * <literal>DBForServants::createEvictor</literal>, and must be registered with
* an Object Adapter like other Servant Locators.
*
- * @see DB::createEvictor
+ * @see DBForServants::createEvictor
* @see Ice::ServantLocator
*
**/
@@ -96,15 +96,15 @@ local interface Evictor extends Ice::ServantLocator
{
/**
*
- * Get the database that is being used by this Evictor. This is
+ * Get the Servant database that is being used by this Evictor. This is
* the database from which this Evictor was created.
*
* @return The database used by this Evictor.
*
- * @see DB::createEvictor
+ * @see DBForServants::createEvictor
*
**/
- DB getDB();
+ DBForServants getDBForServants();
/**
*
@@ -120,7 +120,7 @@ local interface Evictor extends Ice::ServantLocator
* @see getSize
*
**/
- void setSize(int sz);
+ void setSize(int sz) throws DBException;
/**
*
@@ -147,7 +147,7 @@ local interface Evictor extends Ice::ServantLocator
* @see getPersistenceMode
*
**/
- void setPersistenceMode(EvictorPersistenceMode mode);
+ void setPersistenceMode(EvictorPersistenceMode mode) throws DBException;
/**
*
@@ -177,7 +177,7 @@ local interface Evictor extends Ice::ServantLocator
* @see destroyObject
*
**/
- void createObject(string identity, Object servant);
+ void createObject(string identity, Object servant) throws DBException;
/**
*
@@ -191,7 +191,7 @@ local interface Evictor extends Ice::ServantLocator
* @see createObject
*
**/
- void destroyObject(string identity);
+ void destroyObject(string identity) throws DBException;
/**
*
diff --git a/cpp/src/Freeze/DBI.cpp b/cpp/src/Freeze/DBI.cpp
index c6364c0ba97..32f341628e6 100644
--- a/cpp/src/Freeze/DBI.cpp
+++ b/cpp/src/Freeze/DBI.cpp
@@ -25,16 +25,308 @@ using namespace Freeze;
# define FREEZE_DB_MODE (S_IRUSR | S_IWUSR)
#endif
-Freeze::DBI::DBI(const CommunicatorPtr& communicator, const string& name, const DBEnvIPtr& dbenvObj, ::DB_ENV* dbenv,
- ::DB* db) :
+Freeze::DBEnvI::DBEnvI(const CommunicatorPtr& communicator, const string& name) :
_communicator(communicator),
- _name(name),
+ _logger(communicator->getLogger()),
+ _trace(0),
+ _dbenv(0),
+ _name(name)
+{
+ _errorPrefix = "Freeze::DBEnv(\"" + _name + "\"): ";
+
+ PropertiesPtr properties = _communicator->getProperties();
+ string value;
+
+ value = properties->getProperty("Freeze.Trace.DB");
+ if (!value.empty())
+ {
+ _trace = atoi(value.c_str());
+ }
+
+ int ret;
+
+ ret = db_env_create(&_dbenv, 0);
+
+ if (ret != 0)
+ {
+ ostringstream s;
+ s << _errorPrefix << "db_env_create: " << db_strerror(ret);
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
+
+ if (_trace >= 1)
+ {
+ ostringstream s;
+ s << "opening database environment \"" << _name << "\"";
+ _logger->trace("DB", s.str());
+ }
+
+ ret = _dbenv->open(_dbenv, _name.c_str(),
+ DB_CREATE |
+ DB_INIT_LOCK |
+ DB_INIT_LOG |
+ DB_INIT_MPOOL |
+ //DB_INIT_TXN |
+ DB_RECOVER |
+ DB_THREAD,
+ FREEZE_DB_MODE);
+ if (ret != 0)
+ {
+ ostringstream s;
+ s << _errorPrefix << "DB_ENV->open: " << db_strerror(ret);
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
+}
+
+Freeze::DBEnvI::~DBEnvI()
+{
+ if (_dbenv)
+ {
+ ostringstream s;
+ s << _errorPrefix << "\"" << _name << "\" has not been closed";
+ _communicator->getLogger()->warning(s.str());
+ }
+}
+
+string
+Freeze::DBEnvI::getName()
+{
+ // No mutex lock necessary, _name is immutable
+ return _name;
+}
+
+DBPtr
+Freeze::DBEnvI::openDB(const string& name)
+{
+ JTCSyncT<JTCRecursiveMutex> sync(*this);
+
+ if (!_dbenv)
+ {
+ ostringstream s;
+ s << _errorPrefix << "\"" << _name << "\" has been closed";
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
+
+ map<string, DBPtr>::iterator p = _dbmap.find(name);
+ if (p != _dbmap.end())
+ {
+ return p->second;
+ }
+
+ ::DB* db;
+ int ret = db_create(&db, _dbenv, 0);
+
+ if (ret != 0)
+ {
+ ostringstream s;
+ s << _errorPrefix << "db_create: " << db_strerror(ret);
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
+
+ return new DBI(_communicator, this, db, name);
+}
+
+TXNPtr
+Freeze::DBEnvI::startTXN()
+{
+ JTCSyncT<JTCRecursiveMutex> sync(*this);
+
+ return new TXNI(_communicator, _dbenv, _name);
+}
+
+void
+Freeze::DBEnvI::close()
+{
+ JTCSyncT<JTCRecursiveMutex> sync(*this);
+
+ if (!_dbenv)
+ {
+ return;
+ }
+
+ while(!_dbmap.empty())
+ {
+ _dbmap.begin()->second->close();
+ }
+
+ if (_trace >= 1)
+ {
+ ostringstream s;
+ s << "closing database environment \"" << _name << "\"";
+ _logger->trace("DB", s.str());
+ }
+
+ int ret = _dbenv->close(_dbenv, 0);
+
+ if (ret != 0)
+ {
+ ostringstream s;
+ s << _errorPrefix << "DB_ENV->close: " << db_strerror(ret);
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
+
+ _dbenv = 0;
+}
+
+void
+Freeze::DBEnvI::add(const string& name, const DBPtr& db)
+{
+ JTCSyncT<JTCRecursiveMutex> sync(*this);
+
+ _dbmap[name] = db;
+}
+
+void
+Freeze::DBEnvI::remove(const string& name)
+{
+ JTCSyncT<JTCRecursiveMutex> sync(*this);
+
+ _dbmap.erase(name);
+}
+
+DBEnvPtr
+Freeze::initialize(const CommunicatorPtr& communicator, const string& name)
+{
+ return new DBEnvI(communicator, name);
+}
+
+Freeze::TXNI::TXNI(const CommunicatorPtr& communicator, ::DB_ENV* dbenv, const string& name) :
+ _communicator(communicator),
+ _logger(communicator->getLogger()),
+ _trace(0),
+ _tid(0),
+ _name(name)
+{
+ _errorPrefix = "Freeze::TXN(\"" + _name + "\"): ";
+
+ PropertiesPtr properties = _communicator->getProperties();
+ string value;
+
+ value = properties->getProperty("Freeze.Trace.TXN");
+ if (!value.empty())
+ {
+ _trace = atoi(value.c_str());
+ }
+
+ if (_trace >= 1)
+ {
+ ostringstream s;
+ s << "starting transaction for environment \"" << _name << "\"";
+ _logger->trace("DB", s.str());
+ }
+
+ int ret = txn_begin(dbenv, 0, &_tid, 0);
+
+ if (ret != 0)
+ {
+ ostringstream s;
+ s << _errorPrefix << "txn_begin: " << db_strerror(ret);
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
+}
+
+Freeze::TXNI::~TXNI()
+{
+ if (_tid)
+ {
+ ostringstream s;
+ s << _errorPrefix << "transaction has not been committed or aborted";
+ _communicator->getLogger()->warning(s.str());
+ }
+}
+
+void
+Freeze::TXNI::commit()
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ if (!_tid)
+ {
+ ostringstream s;
+ s << _errorPrefix << "transaction has already been committed or aborted";
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
+
+ if (_trace >= 1)
+ {
+ ostringstream s;
+ s << "committing transaction for environment \"" << _name << "\"";
+ _logger->trace("DB", s.str());
+ }
+
+ int ret = txn_commit(_tid, 0);
+
+ if (ret != 0)
+ {
+ ostringstream s;
+ s << _errorPrefix << "txn_commit: " << db_strerror(ret);
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
+
+ _tid = 0;
+}
+
+void
+Freeze::TXNI::abort()
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ if (!_tid)
+ {
+ ostringstream s;
+ s << _errorPrefix << "transaction has already been committed or aborted";
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
+
+ if (_trace >= 1)
+ {
+ ostringstream s;
+ s << "aborting transaction for environment \"" << _name << "\" due to deadlock";
+ _logger->trace("DB", s.str());
+ }
+
+ int ret = txn_abort(_tid);
+
+ if (ret != 0)
+ {
+ ostringstream s;
+ s << _errorPrefix << "txn_abort: " << db_strerror(ret);
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
+
+ _tid = 0;
+}
+
+Freeze::DBI::DBI(const CommunicatorPtr& communicator, const DBEnvIPtr& dbenvObj, ::DB* db, const string& name) :
+ _communicator(communicator),
+ _logger(communicator->getLogger()),
+ _trace(0),
_dbenvObj(dbenvObj),
- _dbenv(dbenv),
_db(db),
- _logger(communicator->getLogger()),
- _trace(0)
+ _name(name)
{
+ _errorPrefix = "Freeze::DB(\"" + _name + "\"): ";
+
PropertiesPtr properties = _communicator->getProperties();
string value;
@@ -43,6 +335,24 @@ Freeze::DBI::DBI(const CommunicatorPtr& communicator, const string& name, const
{
_trace = atoi(value.c_str());
}
+
+ if (_trace >= 1)
+ {
+ ostringstream s;
+ s << "opening database \"" << _name << "\" in environment \"" << _dbenvObj->getName() << "\"";
+ _logger->trace("DB", s.str());
+ }
+
+ int ret = _db->open(_db, name.c_str(), 0, DB_BTREE, DB_CREATE, FREEZE_DB_MODE);
+
+ if (ret != 0)
+ {
+ ostringstream s;
+ s << _errorPrefix << "DB->open: " << db_strerror(ret);
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
}
Freeze::DBI::~DBI()
@@ -50,27 +360,27 @@ Freeze::DBI::~DBI()
if (_db)
{
ostringstream s;
- s << "Freeze::DB(\"" << _name << "\"): ";
- s << "\"" << _name << "\" has not been closed";
+ s << _errorPrefix << "\"" << _name << "\" has not been closed";
_communicator->getLogger()->warning(s.str());
}
}
+string
+Freeze::DBI::getName()
+{
+ // No mutex lock necessary, _name is immutable
+ return _name;
+}
+
void
-Freeze::DBI::put(const string& key, const ObjectPtr& servant)
+Freeze::DBI::put(const string& key, const ObjectPtr& servant, bool txn)
{
- //
- // TODO: Is synchronization necessary here? I really don't
- // understand what the Berekely DB documentation says with "free
- // threaded".
- //
JTCSyncT<JTCMutex> sync(*this);
if (!_db)
{
ostringstream s;
- s << "Freeze::DB(\"" << _name << "\"): ";
- s << "\"" << _name << "\" has been closed";
+ s << _errorPrefix << "\"" << _name << "\" has been closed";
DBException ex;
ex.message = s.str();
throw ex;
@@ -86,8 +396,6 @@ Freeze::DBI::put(const string& key, const ObjectPtr& servant)
stream.write(servant);
DBT dbKey, dbData;
- DB_TXN *tid;
- int ret;
memset(&dbKey, 0, sizeof(dbKey));
memset(&dbData, 0, sizeof(dbData));
@@ -98,23 +406,10 @@ Freeze::DBI::put(const string& key, const ObjectPtr& servant)
while (true)
{
- if (_trace >= 2)
- {
- ostringstream s;
- s << "starting transaction for database \"" << _name << "\"";
- _logger->trace("DB", s.str());
- }
-
- ret = txn_begin(_dbenv, 0, &tid, 0);
-
- if (ret != 0)
+ TXNPtr txnObj;
+ if (txn)
{
- ostringstream s;
- s << "Freeze::DB(\"" << _name << "\"): ";
- s << "txn_begin: " << db_strerror(ret);
- DBException ex;
- ex.message = s.str();
- throw ex;
+ txnObj = _dbenvObj->startTXN();
}
if (_trace >= 1)
@@ -124,70 +419,55 @@ Freeze::DBI::put(const string& key, const ObjectPtr& servant)
_logger->trace("DB", s.str());
}
- ret = _db->put(_db, tid, &dbKey, &dbData, 0);
+ int ret = _db->put(_db, 0, &dbKey, &dbData, 0);
switch (ret)
{
case 0:
{
- //
- // Everything ok, commit the transaction
- //
- if (_trace >= 2)
+ if (txnObj)
{
- ostringstream s;
- s << "committing transaction for database \"" << _name << "\"";
- _logger->trace("DB", s.str());
+ //
+ // Everything ok, commit the transaction
+ //
+ txnObj->commit();
}
- ret = txn_commit(tid, 0);
-
- if (ret != 0)
- {
- ostringstream s;
- s << "Freeze::DB(\"" << _name << "\"): ";
- s << "txn_commit: " << db_strerror(ret);
- DBException ex;
- ex.message = s.str();
- throw ex;
- }
return; // We're done
}
case DB_LOCK_DEADLOCK:
{
- //
- // Deadlock, abort the transaction and retry
- //
- if (_trace >= 2)
+ if (txnObj)
{
- ostringstream s;
- s << "aborting transaction for database \"" << _name << "\" due to deadlock";
- _logger->trace("DB", s.str());
+ //
+ // Deadlock, abort the transaction and retry
+ //
+ txnObj->abort();
+ break; // Repeat
}
-
- ret = txn_abort(tid);
-
- if (ret != 0)
+ else
{
ostringstream s;
- s << "Freeze::DB(\"" << _name << "\"): ";
- s << "txn_abort: " << db_strerror(ret);
+ s << _errorPrefix << "DB->put: " << db_strerror(ret);
DBException ex;
ex.message = s.str();
throw ex;
}
- break; // Repeat
}
default:
{
- //
- // Error, run recovery
- //
+ if (txnObj)
+ {
+ //
+ // Error, run recovery
+ //
+ txnObj->abort();
+ }
+
ostringstream s;
- s << "Freeze::DB(\"" << _name << "\"): ";
- s << "DB->put: " << db_strerror(ret);
+ s << _errorPrefix << "DB->put: " << db_strerror(ret);
DBException ex;
ex.message = s.str();
throw ex;
@@ -199,34 +479,22 @@ Freeze::DBI::put(const string& key, const ObjectPtr& servant)
ObjectPtr
Freeze::DBI::get(const string& key)
{
- //
- // TODO: Is synchronization necessary here? I really don't
- // understand what the Berekely DB documentation says with "free
- // threaded".
- //
JTCSyncT<JTCMutex> sync(*this);
if (!_db)
{
ostringstream s;
- s << "Freeze::DB(\"" << _name << "\"): ";
- s << "\"" << _name << "\" has been closed";
+ s << _errorPrefix << "\"" << _name << "\" has been closed";
DBException ex;
ex.message = s.str();
throw ex;
}
- //
- // TODO: Do I need transactions for get()?
- //
DBT dbKey, dbData;
- int ret;
-
memset(&dbKey, 0, sizeof(dbKey));
memset(&dbData, 0, sizeof(dbData));
dbKey.data = const_cast<void*>(static_cast<const void*>(key.c_str()));
dbKey.size = key.size();
- dbData.flags = DB_DBT_MALLOC;
if (_trace >= 1)
{
@@ -235,7 +503,7 @@ Freeze::DBI::get(const string& key)
_logger->trace("DB", s.str());
}
- ret = _db->get(_db, 0, &dbKey, &dbData, 0);
+ int ret = _db->get(_db, 0, &dbKey, &dbData, 0);
switch (ret)
{
@@ -244,22 +512,14 @@ Freeze::DBI::get(const string& key)
//
// Everything ok
//
+ IceInternal::InstancePtr instance = IceInternal::getInstance(_communicator);
+ IceInternal::Stream stream(instance);
+ stream.b.resize(dbData.size);
+ stream.i = stream.b.begin();
+ memcpy(stream.b.begin(), dbData.data, dbData.size);
+
ObjectPtr servant;
- try
- {
- IceInternal::InstancePtr instance = IceInternal::getInstance(_communicator);
- IceInternal::Stream stream(instance);
- stream.b.resize(dbData.size);
- stream.i = stream.b.begin();
- memcpy(stream.b.begin(), dbData.data, dbData.size);
- stream.read(servant, "::Ice::Object");
- }
- catch(...)
- {
- free(dbData.data);
- throw;
- }
- free(dbData.data);
+ stream.read(servant, "::Ice::Object");
if (!servant)
{
@@ -280,8 +540,7 @@ Freeze::DBI::get(const string& key)
default:
{
ostringstream s;
- s << "Freeze::DB(\"" << _name << "\"): ";
- s << "DB->get: " << db_strerror(ret);
+ s << _errorPrefix << "DB->get: " << db_strerror(ret);
DBException ex;
ex.message = s.str();
throw ex;
@@ -292,29 +551,18 @@ Freeze::DBI::get(const string& key)
void
Freeze::DBI::del(const string& key)
{
- //
- // TODO: Is synchronization necessary here? I really don't
- // understand what the Berekely DB documentation says with "free
- // threaded".
- //
JTCSyncT<JTCMutex> sync(*this);
if (!_db)
{
ostringstream s;
- s << "Freeze::DB(\"" << _name << "\"): ";
- s << "\"" << _name << "\" has been closed";
+ s << _errorPrefix << "\"" << _name << "\" has been closed";
DBException ex;
ex.message = s.str();
throw ex;
}
- //
- // TODO: Do I need transactions for del()?
- //
DBT dbKey;
- int ret;
-
memset(&dbKey, 0, sizeof(dbKey));
dbKey.data = const_cast<void*>(static_cast<const void*>(key.c_str()));
dbKey.size = key.size();
@@ -326,7 +574,7 @@ Freeze::DBI::del(const string& key)
_logger->trace("DB", s.str());
}
- ret = _db->del(_db, 0, &dbKey, 0);
+ int ret = _db->del(_db, 0, &dbKey, 0);
switch (ret)
{
@@ -341,8 +589,7 @@ Freeze::DBI::del(const string& key)
default:
{
ostringstream s;
- s << "Freeze::DB(\"" << _name << "\"): ";
- s << "DB->del: " << db_strerror(ret);
+ s << _errorPrefix << "DB->del: " << db_strerror(ret);
DBException ex;
ex.message = s.str();
throw ex;
@@ -372,8 +619,7 @@ Freeze::DBI::close()
if (ret != 0)
{
ostringstream s;
- s << "Freeze::DB(\"" << _name << "\"): ";
- s << "DB->close: " << db_strerror(ret);
+ s << _errorPrefix << "DB->close: " << db_strerror(ret);
DBException ex;
ex.message = s.str();
throw ex;
@@ -381,7 +627,6 @@ Freeze::DBI::close()
_dbenvObj->remove(_name);
_dbenvObj = 0;
- _dbenv = 0;
_db = 0;
}
@@ -393,8 +638,7 @@ Freeze::DBI::createEvictor()
if (!_db)
{
ostringstream s;
- s << "Freeze::DB(\"" << _name << "\"): ";
- s << "\"" << _name << "\" has been closed";
+ s << _errorPrefix << "\"" << _name << "\" has been closed";
DBException ex;
ex.message = s.str();
throw ex;
@@ -402,182 +646,3 @@ Freeze::DBI::createEvictor()
return new EvictorI(this, _communicator);
}
-
-Freeze::DBEnvI::DBEnvI(const CommunicatorPtr& communicator, const string& directory) :
- _communicator(communicator),
- _directory(directory),
- _dbenv(0),
- _logger(communicator->getLogger()),
- _trace(0)
-{
- PropertiesPtr properties = _communicator->getProperties();
- string value;
-
- value = properties->getProperty("Freeze.Trace.DB");
- if (!value.empty())
- {
- _trace = atoi(value.c_str());
- }
-
- int ret;
-
- ret = db_env_create(&_dbenv, 0);
-
- if (ret != 0)
- {
- ostringstream s;
- s << "Freeze::DBEnv(\"" << _directory << "\"): ";
- s << "db_env_create: " << db_strerror(ret);
- DBException ex;
- ex.message = s.str();
- throw ex;
- }
-
- if (_trace >= 1)
- {
- ostringstream s;
- s << "opening database environment \"" << _directory << "\"";
- _logger->trace("DB", s.str());
- }
-
- ret = _dbenv->open(_dbenv, _directory.c_str(),
- DB_CREATE |
- DB_INIT_LOCK |
- DB_INIT_LOG |
- DB_INIT_MPOOL |
- DB_INIT_TXN |
- DB_RECOVER |
- DB_THREAD,
- FREEZE_DB_MODE);
- if (ret != 0)
- {
- ostringstream s;
- s << "Freeze::DBEnv(\"" << _directory << "\"): ";
- s << "DB_ENV->open: " << db_strerror(ret);
- DBException ex;
- ex.message = s.str();
- throw ex;
- }
-}
-
-Freeze::DBEnvI::~DBEnvI()
-{
- if (_dbenv)
- {
- ostringstream s;
- s << "Freeze::DBEnv(\"" << _directory << "\"): ";
- s << "\"" << _directory << "\" has not been closed";
- _communicator->getLogger()->warning(s.str());
- }
-}
-
-DBPtr
-Freeze::DBEnvI::open(const string& name)
-{
- JTCSyncT<JTCRecursiveMutex> sync(*this);
-
- if (!_dbenv)
- {
- ostringstream s;
- s << "Freeze::DBEnv(\"" << _directory << "\"): ";
- s << "\"" << _directory << "\" has been closed";
- DBException ex;
- ex.message = s.str();
- throw ex;
- }
-
- map<string, DBPtr>::iterator p = _dbmap.find(name);
- if (p != _dbmap.end())
- {
- return p->second;
- }
-
- int ret;
-
- ::DB* db;
- ret = db_create(&db, _dbenv, 0);
-
- if (ret != 0)
- {
- ostringstream s;
- s << "Freeze::DBEnv(\"" << _directory << "\"): ";
- s << "db_create: " << db_strerror(ret);
- DBException ex;
- ex.message = s.str();
- throw ex;
- }
-
- if (_trace >= 1)
- {
- ostringstream s;
- s << "opening database \"" << name << "\" in environment \"" << _directory << "\"";
- _logger->trace("DB", s.str());
- }
-
- ret = db->open(db, name.c_str(), 0, DB_BTREE, DB_CREATE | DB_THREAD, FREEZE_DB_MODE);
-
- if (ret != 0)
- {
- ostringstream s;
- s << "Freeze::DBEnv(\"" << _directory << "\"): ";
- s << "DB->open: " << db_strerror(ret);
- DBException ex;
- ex.message = s.str();
- throw ex;
- }
-
- DBPtr dbp = new DBI(_communicator, name, this, _dbenv, db);
- _dbmap[name] = dbp;
- return dbp;
-}
-
-void
-Freeze::DBEnvI::close()
-{
- JTCSyncT<JTCRecursiveMutex> sync(*this);
-
- if (!_dbenv)
- {
- return;
- }
-
- while(!_dbmap.empty())
- {
- _dbmap.begin()->second->close();
- }
-
- if (_trace >= 1)
- {
- ostringstream s;
- s << "closing database environment \"" << _directory << "\"";
- _logger->trace("DB", s.str());
- }
-
- int ret = _dbenv->close(_dbenv, 0);
-
- if (ret != 0)
- {
- ostringstream s;
- s << "Freeze::DBEnv(\"" << _directory << "\"): ";
- s << "DB_ENV->close: " << db_strerror(ret);
- DBException ex;
- ex.message = s.str();
- throw ex;
- }
-
- _dbenv = 0;
-}
-
-void
-Freeze::DBEnvI::remove(const string& name)
-{
- JTCSyncT<JTCRecursiveMutex> sync(*this);
-
- _dbmap.erase(name);
-}
-
-DBEnvPtr
-Freeze::initialize(const CommunicatorPtr& communicator, const string& directory)
-{
- return new DBEnvI(communicator, directory);
-}
diff --git a/cpp/src/Freeze/DBI.h b/cpp/src/Freeze/DBI.h
index b98aed480e8..5e03136033f 100644
--- a/cpp/src/Freeze/DBI.h
+++ b/cpp/src/Freeze/DBI.h
@@ -26,10 +26,11 @@ class DBI : public DB, public JTCMutex
{
public:
- DBI(const ::Ice::CommunicatorPtr&, const std::string&, const DBEnvIPtr&, ::DB_ENV*, ::DB*);
+ DBI(const ::Ice::CommunicatorPtr&, const DBEnvIPtr&, ::DB*, const std::string&);
virtual ~DBI();
- virtual void put(const std::string&, const ::Ice::ObjectPtr&);
+ virtual std::string getName();
+ virtual void put(const std::string&, const ::Ice::ObjectPtr&, bool);
virtual ::Ice::ObjectPtr get(const std::string&);
virtual void del(const std::string&);
virtual void close();
@@ -38,12 +39,14 @@ public:
private:
::Ice::CommunicatorPtr _communicator;
- std::string _name;
- DBEnvIPtr _dbenvObj;
- ::DB_ENV* _dbenv;
- ::DB* _db;
::Ice::LoggerPtr _logger;
int _trace;
+
+ DBEnvIPtr _dbenvObj;
+ ::DB* _db;
+
+ std::string _name;
+ std::string _errorPrefix;
};
class DBEnvI : public DBEnv, public JTCRecursiveMutex
@@ -53,19 +56,50 @@ public:
DBEnvI(const ::Ice::CommunicatorPtr&, const std::string&);
virtual ~DBEnvI();
- virtual DBPtr open(const std::string&);
+ virtual std::string getName();
+ virtual DBPtr openDB(const std::string&);
+ virtual TXNPtr startTXN();
virtual void close();
- void remove(const std::string&);
-
private:
+ // DBI needs access to add and remove
+ friend class DBI;
+ void add(const std::string&, const DBPtr&);
+ void remove(const std::string&);
+
::Ice::CommunicatorPtr _communicator;
- std::string _directory;
+ ::Ice::LoggerPtr _logger;
+ int _trace;
+
::DB_ENV* _dbenv;
+
+ std::string _name;
+ std::string _errorPrefix;
+
std::map<std::string, DBPtr> _dbmap;
+};
+
+class TXNI : public TXN, public JTCMutex
+{
+public:
+
+ TXNI(const ::Ice::CommunicatorPtr&, ::DB_ENV*, const std::string&);
+ virtual ~TXNI();
+
+ virtual void commit();
+ virtual void abort();
+
+private:
+
+ ::Ice::CommunicatorPtr _communicator;
::Ice::LoggerPtr _logger;
int _trace;
+
+ ::DB_TXN* _tid;
+
+ std::string _name;
+ std::string _errorPrefix;
};
}