diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/slice/Freeze/DB.ice | 276 | ||||
-rw-r--r-- | cpp/slice/Freeze/DBF.ice | 4 | ||||
-rw-r--r-- | cpp/slice/Freeze/Evictor.ice | 20 | ||||
-rw-r--r-- | cpp/src/Freeze/DBI.cpp | 669 | ||||
-rw-r--r-- | cpp/src/Freeze/DBI.h | 54 |
5 files changed, 647 insertions, 376 deletions
diff --git a/cpp/slice/Freeze/DB.ice b/cpp/slice/Freeze/DB.ice index 65ffcab9225..2b37b9fc6d4 100644 --- a/cpp/slice/Freeze/DB.ice +++ b/cpp/slice/Freeze/DB.ice @@ -11,6 +11,7 @@ #ifndef FREEZE_DB_ICE #define FREEZE_DB_ICE +#include <Freeze/DBF.ice> #include <Freeze/EvictorF.ice> /** @@ -21,12 +22,9 @@ module Freeze { -local interface DB; -local interface DBEnv; - /** * - * A database exception. + * A Freeze database exception. * **/ struct DBException @@ -41,119 +39,291 @@ struct DBException /** * - * An object representing a database. + * A database environment. Multiple databases can be created from a + * single database environment. The database environment also offers + * operations to create transactions. * * @see DBException - * @see DBEnv + * @see DBTransaction + * @see DB + * @see DBForServants + * + **/ +local interface DBEnvironment +{ + /** + * + * Get the name of the database environment. + * + * @return The name of this database environment. + * + **/ + string getName(); + + /** + * + * Open and return a basic key/value database object. If the + * database has been opened before, the previously returned + * database object is returned again. + * + * @param name The database name. + * + * @return The database object. + * + * @see DB + * @see DB::close + * + **/ + DB openDB(string name) throws DBException; + + /** + * + * Open and return a databae for identity/Servant pairs. If the + * database has been opened before, the previously returned + * database object is returned again. + * + * @param name The database name. + * + * @return The database object. + * + * @see DBForServant + * @see DBForServant::close + * + **/ + DB openDBForServant(string name) throws DBException; + + /** + * + * Start a new transaction in this database environment, and + * return the transaction object for such new transaction. + * + * @return The transaction object. + * + * @see DBTransaction + * + **/ + DBTransaction startDBTransaction() throws DBException; + + /** + * + * Close the database environment and destroy this database + * environment object. This operation also calls + * <literal>close</literal> on all databases that have been opened + * with this database environment object. Subsequent calls to + * <literal>close</literal> have no effect. + * + * @see DB::close + * + **/ + void close() throws DBException; +}; + +/** + * + * A transaction object. + * + **/ +local interface DBTransaction +{ + /** + * + * Commit a transaction. + * + **/ + void commit() throws DBException; + + /** + * + * Abort a transaction. + * + **/ + void abort() throws DBException; +}; + +/** + * + * A database key, represented as a sequence of bytes. + * + **/ +sequence<byte> Key; + +/** + * + * A database value, represented as a sequence of bytes + * + **/ +sequence<byte> Value; + +/** + * + * A basic database, which can store key/value pairs. + * + * @see DBException + * @see DBEnvironment * **/ local interface DB { /** * - * Put a Servant into the database, using a string as - * key. Typically, applications will use the identity of the Ice - * Object implemented by the Servant as key. + * Get the name of the database. + * + * @return The name of this database. * - * @param key The key under which the servant will be stored in + **/ + string getName(); + + /** + * + * Save a value in the database under a given key. + * + * @param key The key under which the value will be stored in * the database. * - * @param servant The servant to store. If the servant is null, - * this operation does nothing. + * @param servant The value to store. + * + * @param txn If true, do a transaction-protected write to the + * database. * * @see get * @see del + * @see DBTransaction * **/ - void put(string key, Object servant); + void put(Key key, Value value, bool txn) throws DBException; /** * - * Get a Servant from the database by it's key. + * Get a value from a database by it's key. * - * @param key The key under which the servant is stored in the database. + * @param key The key under which the value is stored in the database. * - * @return The Servant from the database, or null if the key does - * not exist. + * @return The value from the database, or an empty value if the + * key does not exist. * * @see put * @see del * **/ - Object get(string key); + Value get(Key key) throws DBException; /** * - * Remove a key and the corresponding Servant from the - * database. If the key does not exist, this operation will do - * nothing. + * Remove a key and the corresponding value from the database. If + * the key does not exist, this operation will do nothing. * - * @param key The key to remove. + * @param key The key to remove together with the corresponding + * value. * * @see put * @see get * **/ - void del(string key); + void del(Key key) throws DBException; /** * * Close the database and destroy this database object. Subsequent * calls to <literal>close</literal> have no effect. * - * @see DBEnv::open - * @see DBEnv::close + * @see DBEnvironment::openDB + * @see DBEnvironment::close * **/ - void close(); - - /** - * - * Create a new Evictor that uses this database. - * - * @return The new Evictor. - * - * @see Evictor - * - **/ - Evictor createEvictor(); + void close() throws DBException; }; /** * - * An object representing a database environment. + * A Servant database, which can store identity/Servant pairs. * * @see DBException - * @see DB + * @see DBEnvironment + * @see Evictor * **/ -local interface DBEnv +local interface DBForServants { /** * - * Open a database and return a database object for this - * database. If the database has been opened before, the - * previously returned database object is returned again. + * Get the name of the database. * - * @param name The database name. + * @return The name of this database. * - * @return The database object. + **/ + string getName(); + + /** * - * @see DB - * @see DB::close + * Put a Servant into the database, using the Ice Object's + * identity implemented by the Servant as a key. + * + * @param identity The identity under which the servant will be + * stored in the database. + * + * @param servant The servant to store. If the servant is null, + * this operation does nothing. + * + * @param txn If true, do a transaction-protected write to the + * database. + * + * @see getServant + * @see delServant + * @see DBTransaction * **/ - DB open(string name) throws DBException; + void putServant(string identity, Object servant, bool txn) throws DBException; /** * - * Close the database environment and destroy this database - * environment object. This operation also calls - * <literal>close</literal> on all databases that have been opened - * with this database environment object. Subsequent + * Get a Servant from the database by the Ice Object's identity + * which the Servant must implement. + * + * @param identity The identity under which the servant is stored + * in the database. + * + * @return The Servant from the database, or null if the identity + * does not exist. + * + * @see putServant + * @see delServant + * + **/ + Object getServant(string identity) throws DBException; + + /** + * + * Remove an identity and the corresponding Servant from the + * database. If the identity does not exist, this operation will + * do nothing. + * + * @param identity The identity to remove together with the + * corresponding Servant. + * + * @see putServant + * @see getServant + * + **/ + void delServant(string identity) throws DBException; + + /** + * + * Create a new Evictor that uses this database. + * + * @return The new Evictor. + * + * @see Evictor + * + **/ + Evictor createEvictor(); + + /** + * + * Close the database and destroy this database object. Subsequent * calls to <literal>close</literal> have no effect. * - * @see DB::close + * @see DBEnvironment::openDBForServants + * @see DBEnvironment::close * **/ void close() throws DBException; diff --git a/cpp/slice/Freeze/DBF.ice b/cpp/slice/Freeze/DBF.ice index 53544995457..0ede4ae95d9 100644 --- a/cpp/slice/Freeze/DBF.ice +++ b/cpp/slice/Freeze/DBF.ice @@ -14,8 +14,10 @@ module Freeze { +local interface DBEnvironment; +local interface DBTransaction; local interface DB; -local interface DBEnv; +local interface DBForServants; }; diff --git a/cpp/slice/Freeze/Evictor.ice b/cpp/slice/Freeze/Evictor.ice index b7a8ebf0f6c..ca99af4d0a1 100644 --- a/cpp/slice/Freeze/Evictor.ice +++ b/cpp/slice/Freeze/Evictor.ice @@ -12,7 +12,7 @@ #define FREEZE_EVICTOR_ICE #include <Ice/ObjectAdapter.ice> -#include <Freeze/DBF.ice> +#include <Freeze/DB.ice> module Freeze { @@ -85,10 +85,10 @@ enum EvictorPersistenceMode * evictor pattern. The Evictor is an extended Servant Locator, with * an implementation in the Freeze module. Instances of this * implementation can be created with the operation - * <literal>DB::createEvictor</literal>, and must be registered with + * <literal>DBForServants::createEvictor</literal>, and must be registered with * an Object Adapter like other Servant Locators. * - * @see DB::createEvictor + * @see DBForServants::createEvictor * @see Ice::ServantLocator * **/ @@ -96,15 +96,15 @@ local interface Evictor extends Ice::ServantLocator { /** * - * Get the database that is being used by this Evictor. This is + * Get the Servant database that is being used by this Evictor. This is * the database from which this Evictor was created. * * @return The database used by this Evictor. * - * @see DB::createEvictor + * @see DBForServants::createEvictor * **/ - DB getDB(); + DBForServants getDBForServants(); /** * @@ -120,7 +120,7 @@ local interface Evictor extends Ice::ServantLocator * @see getSize * **/ - void setSize(int sz); + void setSize(int sz) throws DBException; /** * @@ -147,7 +147,7 @@ local interface Evictor extends Ice::ServantLocator * @see getPersistenceMode * **/ - void setPersistenceMode(EvictorPersistenceMode mode); + void setPersistenceMode(EvictorPersistenceMode mode) throws DBException; /** * @@ -177,7 +177,7 @@ local interface Evictor extends Ice::ServantLocator * @see destroyObject * **/ - void createObject(string identity, Object servant); + void createObject(string identity, Object servant) throws DBException; /** * @@ -191,7 +191,7 @@ local interface Evictor extends Ice::ServantLocator * @see createObject * **/ - void destroyObject(string identity); + void destroyObject(string identity) throws DBException; /** * diff --git a/cpp/src/Freeze/DBI.cpp b/cpp/src/Freeze/DBI.cpp index c6364c0ba97..32f341628e6 100644 --- a/cpp/src/Freeze/DBI.cpp +++ b/cpp/src/Freeze/DBI.cpp @@ -25,16 +25,308 @@ using namespace Freeze; # define FREEZE_DB_MODE (S_IRUSR | S_IWUSR) #endif -Freeze::DBI::DBI(const CommunicatorPtr& communicator, const string& name, const DBEnvIPtr& dbenvObj, ::DB_ENV* dbenv, - ::DB* db) : +Freeze::DBEnvI::DBEnvI(const CommunicatorPtr& communicator, const string& name) : _communicator(communicator), - _name(name), + _logger(communicator->getLogger()), + _trace(0), + _dbenv(0), + _name(name) +{ + _errorPrefix = "Freeze::DBEnv(\"" + _name + "\"): "; + + PropertiesPtr properties = _communicator->getProperties(); + string value; + + value = properties->getProperty("Freeze.Trace.DB"); + if (!value.empty()) + { + _trace = atoi(value.c_str()); + } + + int ret; + + ret = db_env_create(&_dbenv, 0); + + if (ret != 0) + { + ostringstream s; + s << _errorPrefix << "db_env_create: " << db_strerror(ret); + DBException ex; + ex.message = s.str(); + throw ex; + } + + if (_trace >= 1) + { + ostringstream s; + s << "opening database environment \"" << _name << "\""; + _logger->trace("DB", s.str()); + } + + ret = _dbenv->open(_dbenv, _name.c_str(), + DB_CREATE | + DB_INIT_LOCK | + DB_INIT_LOG | + DB_INIT_MPOOL | + //DB_INIT_TXN | + DB_RECOVER | + DB_THREAD, + FREEZE_DB_MODE); + if (ret != 0) + { + ostringstream s; + s << _errorPrefix << "DB_ENV->open: " << db_strerror(ret); + DBException ex; + ex.message = s.str(); + throw ex; + } +} + +Freeze::DBEnvI::~DBEnvI() +{ + if (_dbenv) + { + ostringstream s; + s << _errorPrefix << "\"" << _name << "\" has not been closed"; + _communicator->getLogger()->warning(s.str()); + } +} + +string +Freeze::DBEnvI::getName() +{ + // No mutex lock necessary, _name is immutable + return _name; +} + +DBPtr +Freeze::DBEnvI::openDB(const string& name) +{ + JTCSyncT<JTCRecursiveMutex> sync(*this); + + if (!_dbenv) + { + ostringstream s; + s << _errorPrefix << "\"" << _name << "\" has been closed"; + DBException ex; + ex.message = s.str(); + throw ex; + } + + map<string, DBPtr>::iterator p = _dbmap.find(name); + if (p != _dbmap.end()) + { + return p->second; + } + + ::DB* db; + int ret = db_create(&db, _dbenv, 0); + + if (ret != 0) + { + ostringstream s; + s << _errorPrefix << "db_create: " << db_strerror(ret); + DBException ex; + ex.message = s.str(); + throw ex; + } + + return new DBI(_communicator, this, db, name); +} + +TXNPtr +Freeze::DBEnvI::startTXN() +{ + JTCSyncT<JTCRecursiveMutex> sync(*this); + + return new TXNI(_communicator, _dbenv, _name); +} + +void +Freeze::DBEnvI::close() +{ + JTCSyncT<JTCRecursiveMutex> sync(*this); + + if (!_dbenv) + { + return; + } + + while(!_dbmap.empty()) + { + _dbmap.begin()->second->close(); + } + + if (_trace >= 1) + { + ostringstream s; + s << "closing database environment \"" << _name << "\""; + _logger->trace("DB", s.str()); + } + + int ret = _dbenv->close(_dbenv, 0); + + if (ret != 0) + { + ostringstream s; + s << _errorPrefix << "DB_ENV->close: " << db_strerror(ret); + DBException ex; + ex.message = s.str(); + throw ex; + } + + _dbenv = 0; +} + +void +Freeze::DBEnvI::add(const string& name, const DBPtr& db) +{ + JTCSyncT<JTCRecursiveMutex> sync(*this); + + _dbmap[name] = db; +} + +void +Freeze::DBEnvI::remove(const string& name) +{ + JTCSyncT<JTCRecursiveMutex> sync(*this); + + _dbmap.erase(name); +} + +DBEnvPtr +Freeze::initialize(const CommunicatorPtr& communicator, const string& name) +{ + return new DBEnvI(communicator, name); +} + +Freeze::TXNI::TXNI(const CommunicatorPtr& communicator, ::DB_ENV* dbenv, const string& name) : + _communicator(communicator), + _logger(communicator->getLogger()), + _trace(0), + _tid(0), + _name(name) +{ + _errorPrefix = "Freeze::TXN(\"" + _name + "\"): "; + + PropertiesPtr properties = _communicator->getProperties(); + string value; + + value = properties->getProperty("Freeze.Trace.TXN"); + if (!value.empty()) + { + _trace = atoi(value.c_str()); + } + + if (_trace >= 1) + { + ostringstream s; + s << "starting transaction for environment \"" << _name << "\""; + _logger->trace("DB", s.str()); + } + + int ret = txn_begin(dbenv, 0, &_tid, 0); + + if (ret != 0) + { + ostringstream s; + s << _errorPrefix << "txn_begin: " << db_strerror(ret); + DBException ex; + ex.message = s.str(); + throw ex; + } +} + +Freeze::TXNI::~TXNI() +{ + if (_tid) + { + ostringstream s; + s << _errorPrefix << "transaction has not been committed or aborted"; + _communicator->getLogger()->warning(s.str()); + } +} + +void +Freeze::TXNI::commit() +{ + JTCSyncT<JTCMutex> sync(*this); + + if (!_tid) + { + ostringstream s; + s << _errorPrefix << "transaction has already been committed or aborted"; + DBException ex; + ex.message = s.str(); + throw ex; + } + + if (_trace >= 1) + { + ostringstream s; + s << "committing transaction for environment \"" << _name << "\""; + _logger->trace("DB", s.str()); + } + + int ret = txn_commit(_tid, 0); + + if (ret != 0) + { + ostringstream s; + s << _errorPrefix << "txn_commit: " << db_strerror(ret); + DBException ex; + ex.message = s.str(); + throw ex; + } + + _tid = 0; +} + +void +Freeze::TXNI::abort() +{ + JTCSyncT<JTCMutex> sync(*this); + + if (!_tid) + { + ostringstream s; + s << _errorPrefix << "transaction has already been committed or aborted"; + DBException ex; + ex.message = s.str(); + throw ex; + } + + if (_trace >= 1) + { + ostringstream s; + s << "aborting transaction for environment \"" << _name << "\" due to deadlock"; + _logger->trace("DB", s.str()); + } + + int ret = txn_abort(_tid); + + if (ret != 0) + { + ostringstream s; + s << _errorPrefix << "txn_abort: " << db_strerror(ret); + DBException ex; + ex.message = s.str(); + throw ex; + } + + _tid = 0; +} + +Freeze::DBI::DBI(const CommunicatorPtr& communicator, const DBEnvIPtr& dbenvObj, ::DB* db, const string& name) : + _communicator(communicator), + _logger(communicator->getLogger()), + _trace(0), _dbenvObj(dbenvObj), - _dbenv(dbenv), _db(db), - _logger(communicator->getLogger()), - _trace(0) + _name(name) { + _errorPrefix = "Freeze::DB(\"" + _name + "\"): "; + PropertiesPtr properties = _communicator->getProperties(); string value; @@ -43,6 +335,24 @@ Freeze::DBI::DBI(const CommunicatorPtr& communicator, const string& name, const { _trace = atoi(value.c_str()); } + + if (_trace >= 1) + { + ostringstream s; + s << "opening database \"" << _name << "\" in environment \"" << _dbenvObj->getName() << "\""; + _logger->trace("DB", s.str()); + } + + int ret = _db->open(_db, name.c_str(), 0, DB_BTREE, DB_CREATE, FREEZE_DB_MODE); + + if (ret != 0) + { + ostringstream s; + s << _errorPrefix << "DB->open: " << db_strerror(ret); + DBException ex; + ex.message = s.str(); + throw ex; + } } Freeze::DBI::~DBI() @@ -50,27 +360,27 @@ Freeze::DBI::~DBI() if (_db) { ostringstream s; - s << "Freeze::DB(\"" << _name << "\"): "; - s << "\"" << _name << "\" has not been closed"; + s << _errorPrefix << "\"" << _name << "\" has not been closed"; _communicator->getLogger()->warning(s.str()); } } +string +Freeze::DBI::getName() +{ + // No mutex lock necessary, _name is immutable + return _name; +} + void -Freeze::DBI::put(const string& key, const ObjectPtr& servant) +Freeze::DBI::put(const string& key, const ObjectPtr& servant, bool txn) { - // - // TODO: Is synchronization necessary here? I really don't - // understand what the Berekely DB documentation says with "free - // threaded". - // JTCSyncT<JTCMutex> sync(*this); if (!_db) { ostringstream s; - s << "Freeze::DB(\"" << _name << "\"): "; - s << "\"" << _name << "\" has been closed"; + s << _errorPrefix << "\"" << _name << "\" has been closed"; DBException ex; ex.message = s.str(); throw ex; @@ -86,8 +396,6 @@ Freeze::DBI::put(const string& key, const ObjectPtr& servant) stream.write(servant); DBT dbKey, dbData; - DB_TXN *tid; - int ret; memset(&dbKey, 0, sizeof(dbKey)); memset(&dbData, 0, sizeof(dbData)); @@ -98,23 +406,10 @@ Freeze::DBI::put(const string& key, const ObjectPtr& servant) while (true) { - if (_trace >= 2) - { - ostringstream s; - s << "starting transaction for database \"" << _name << "\""; - _logger->trace("DB", s.str()); - } - - ret = txn_begin(_dbenv, 0, &tid, 0); - - if (ret != 0) + TXNPtr txnObj; + if (txn) { - ostringstream s; - s << "Freeze::DB(\"" << _name << "\"): "; - s << "txn_begin: " << db_strerror(ret); - DBException ex; - ex.message = s.str(); - throw ex; + txnObj = _dbenvObj->startTXN(); } if (_trace >= 1) @@ -124,70 +419,55 @@ Freeze::DBI::put(const string& key, const ObjectPtr& servant) _logger->trace("DB", s.str()); } - ret = _db->put(_db, tid, &dbKey, &dbData, 0); + int ret = _db->put(_db, 0, &dbKey, &dbData, 0); switch (ret) { case 0: { - // - // Everything ok, commit the transaction - // - if (_trace >= 2) + if (txnObj) { - ostringstream s; - s << "committing transaction for database \"" << _name << "\""; - _logger->trace("DB", s.str()); + // + // Everything ok, commit the transaction + // + txnObj->commit(); } - ret = txn_commit(tid, 0); - - if (ret != 0) - { - ostringstream s; - s << "Freeze::DB(\"" << _name << "\"): "; - s << "txn_commit: " << db_strerror(ret); - DBException ex; - ex.message = s.str(); - throw ex; - } return; // We're done } case DB_LOCK_DEADLOCK: { - // - // Deadlock, abort the transaction and retry - // - if (_trace >= 2) + if (txnObj) { - ostringstream s; - s << "aborting transaction for database \"" << _name << "\" due to deadlock"; - _logger->trace("DB", s.str()); + // + // Deadlock, abort the transaction and retry + // + txnObj->abort(); + break; // Repeat } - - ret = txn_abort(tid); - - if (ret != 0) + else { ostringstream s; - s << "Freeze::DB(\"" << _name << "\"): "; - s << "txn_abort: " << db_strerror(ret); + s << _errorPrefix << "DB->put: " << db_strerror(ret); DBException ex; ex.message = s.str(); throw ex; } - break; // Repeat } default: { - // - // Error, run recovery - // + if (txnObj) + { + // + // Error, run recovery + // + txnObj->abort(); + } + ostringstream s; - s << "Freeze::DB(\"" << _name << "\"): "; - s << "DB->put: " << db_strerror(ret); + s << _errorPrefix << "DB->put: " << db_strerror(ret); DBException ex; ex.message = s.str(); throw ex; @@ -199,34 +479,22 @@ Freeze::DBI::put(const string& key, const ObjectPtr& servant) ObjectPtr Freeze::DBI::get(const string& key) { - // - // TODO: Is synchronization necessary here? I really don't - // understand what the Berekely DB documentation says with "free - // threaded". - // JTCSyncT<JTCMutex> sync(*this); if (!_db) { ostringstream s; - s << "Freeze::DB(\"" << _name << "\"): "; - s << "\"" << _name << "\" has been closed"; + s << _errorPrefix << "\"" << _name << "\" has been closed"; DBException ex; ex.message = s.str(); throw ex; } - // - // TODO: Do I need transactions for get()? - // DBT dbKey, dbData; - int ret; - memset(&dbKey, 0, sizeof(dbKey)); memset(&dbData, 0, sizeof(dbData)); dbKey.data = const_cast<void*>(static_cast<const void*>(key.c_str())); dbKey.size = key.size(); - dbData.flags = DB_DBT_MALLOC; if (_trace >= 1) { @@ -235,7 +503,7 @@ Freeze::DBI::get(const string& key) _logger->trace("DB", s.str()); } - ret = _db->get(_db, 0, &dbKey, &dbData, 0); + int ret = _db->get(_db, 0, &dbKey, &dbData, 0); switch (ret) { @@ -244,22 +512,14 @@ Freeze::DBI::get(const string& key) // // Everything ok // + IceInternal::InstancePtr instance = IceInternal::getInstance(_communicator); + IceInternal::Stream stream(instance); + stream.b.resize(dbData.size); + stream.i = stream.b.begin(); + memcpy(stream.b.begin(), dbData.data, dbData.size); + ObjectPtr servant; - try - { - IceInternal::InstancePtr instance = IceInternal::getInstance(_communicator); - IceInternal::Stream stream(instance); - stream.b.resize(dbData.size); - stream.i = stream.b.begin(); - memcpy(stream.b.begin(), dbData.data, dbData.size); - stream.read(servant, "::Ice::Object"); - } - catch(...) - { - free(dbData.data); - throw; - } - free(dbData.data); + stream.read(servant, "::Ice::Object"); if (!servant) { @@ -280,8 +540,7 @@ Freeze::DBI::get(const string& key) default: { ostringstream s; - s << "Freeze::DB(\"" << _name << "\"): "; - s << "DB->get: " << db_strerror(ret); + s << _errorPrefix << "DB->get: " << db_strerror(ret); DBException ex; ex.message = s.str(); throw ex; @@ -292,29 +551,18 @@ Freeze::DBI::get(const string& key) void Freeze::DBI::del(const string& key) { - // - // TODO: Is synchronization necessary here? I really don't - // understand what the Berekely DB documentation says with "free - // threaded". - // JTCSyncT<JTCMutex> sync(*this); if (!_db) { ostringstream s; - s << "Freeze::DB(\"" << _name << "\"): "; - s << "\"" << _name << "\" has been closed"; + s << _errorPrefix << "\"" << _name << "\" has been closed"; DBException ex; ex.message = s.str(); throw ex; } - // - // TODO: Do I need transactions for del()? - // DBT dbKey; - int ret; - memset(&dbKey, 0, sizeof(dbKey)); dbKey.data = const_cast<void*>(static_cast<const void*>(key.c_str())); dbKey.size = key.size(); @@ -326,7 +574,7 @@ Freeze::DBI::del(const string& key) _logger->trace("DB", s.str()); } - ret = _db->del(_db, 0, &dbKey, 0); + int ret = _db->del(_db, 0, &dbKey, 0); switch (ret) { @@ -341,8 +589,7 @@ Freeze::DBI::del(const string& key) default: { ostringstream s; - s << "Freeze::DB(\"" << _name << "\"): "; - s << "DB->del: " << db_strerror(ret); + s << _errorPrefix << "DB->del: " << db_strerror(ret); DBException ex; ex.message = s.str(); throw ex; @@ -372,8 +619,7 @@ Freeze::DBI::close() if (ret != 0) { ostringstream s; - s << "Freeze::DB(\"" << _name << "\"): "; - s << "DB->close: " << db_strerror(ret); + s << _errorPrefix << "DB->close: " << db_strerror(ret); DBException ex; ex.message = s.str(); throw ex; @@ -381,7 +627,6 @@ Freeze::DBI::close() _dbenvObj->remove(_name); _dbenvObj = 0; - _dbenv = 0; _db = 0; } @@ -393,8 +638,7 @@ Freeze::DBI::createEvictor() if (!_db) { ostringstream s; - s << "Freeze::DB(\"" << _name << "\"): "; - s << "\"" << _name << "\" has been closed"; + s << _errorPrefix << "\"" << _name << "\" has been closed"; DBException ex; ex.message = s.str(); throw ex; @@ -402,182 +646,3 @@ Freeze::DBI::createEvictor() return new EvictorI(this, _communicator); } - -Freeze::DBEnvI::DBEnvI(const CommunicatorPtr& communicator, const string& directory) : - _communicator(communicator), - _directory(directory), - _dbenv(0), - _logger(communicator->getLogger()), - _trace(0) -{ - PropertiesPtr properties = _communicator->getProperties(); - string value; - - value = properties->getProperty("Freeze.Trace.DB"); - if (!value.empty()) - { - _trace = atoi(value.c_str()); - } - - int ret; - - ret = db_env_create(&_dbenv, 0); - - if (ret != 0) - { - ostringstream s; - s << "Freeze::DBEnv(\"" << _directory << "\"): "; - s << "db_env_create: " << db_strerror(ret); - DBException ex; - ex.message = s.str(); - throw ex; - } - - if (_trace >= 1) - { - ostringstream s; - s << "opening database environment \"" << _directory << "\""; - _logger->trace("DB", s.str()); - } - - ret = _dbenv->open(_dbenv, _directory.c_str(), - DB_CREATE | - DB_INIT_LOCK | - DB_INIT_LOG | - DB_INIT_MPOOL | - DB_INIT_TXN | - DB_RECOVER | - DB_THREAD, - FREEZE_DB_MODE); - if (ret != 0) - { - ostringstream s; - s << "Freeze::DBEnv(\"" << _directory << "\"): "; - s << "DB_ENV->open: " << db_strerror(ret); - DBException ex; - ex.message = s.str(); - throw ex; - } -} - -Freeze::DBEnvI::~DBEnvI() -{ - if (_dbenv) - { - ostringstream s; - s << "Freeze::DBEnv(\"" << _directory << "\"): "; - s << "\"" << _directory << "\" has not been closed"; - _communicator->getLogger()->warning(s.str()); - } -} - -DBPtr -Freeze::DBEnvI::open(const string& name) -{ - JTCSyncT<JTCRecursiveMutex> sync(*this); - - if (!_dbenv) - { - ostringstream s; - s << "Freeze::DBEnv(\"" << _directory << "\"): "; - s << "\"" << _directory << "\" has been closed"; - DBException ex; - ex.message = s.str(); - throw ex; - } - - map<string, DBPtr>::iterator p = _dbmap.find(name); - if (p != _dbmap.end()) - { - return p->second; - } - - int ret; - - ::DB* db; - ret = db_create(&db, _dbenv, 0); - - if (ret != 0) - { - ostringstream s; - s << "Freeze::DBEnv(\"" << _directory << "\"): "; - s << "db_create: " << db_strerror(ret); - DBException ex; - ex.message = s.str(); - throw ex; - } - - if (_trace >= 1) - { - ostringstream s; - s << "opening database \"" << name << "\" in environment \"" << _directory << "\""; - _logger->trace("DB", s.str()); - } - - ret = db->open(db, name.c_str(), 0, DB_BTREE, DB_CREATE | DB_THREAD, FREEZE_DB_MODE); - - if (ret != 0) - { - ostringstream s; - s << "Freeze::DBEnv(\"" << _directory << "\"): "; - s << "DB->open: " << db_strerror(ret); - DBException ex; - ex.message = s.str(); - throw ex; - } - - DBPtr dbp = new DBI(_communicator, name, this, _dbenv, db); - _dbmap[name] = dbp; - return dbp; -} - -void -Freeze::DBEnvI::close() -{ - JTCSyncT<JTCRecursiveMutex> sync(*this); - - if (!_dbenv) - { - return; - } - - while(!_dbmap.empty()) - { - _dbmap.begin()->second->close(); - } - - if (_trace >= 1) - { - ostringstream s; - s << "closing database environment \"" << _directory << "\""; - _logger->trace("DB", s.str()); - } - - int ret = _dbenv->close(_dbenv, 0); - - if (ret != 0) - { - ostringstream s; - s << "Freeze::DBEnv(\"" << _directory << "\"): "; - s << "DB_ENV->close: " << db_strerror(ret); - DBException ex; - ex.message = s.str(); - throw ex; - } - - _dbenv = 0; -} - -void -Freeze::DBEnvI::remove(const string& name) -{ - JTCSyncT<JTCRecursiveMutex> sync(*this); - - _dbmap.erase(name); -} - -DBEnvPtr -Freeze::initialize(const CommunicatorPtr& communicator, const string& directory) -{ - return new DBEnvI(communicator, directory); -} diff --git a/cpp/src/Freeze/DBI.h b/cpp/src/Freeze/DBI.h index b98aed480e8..5e03136033f 100644 --- a/cpp/src/Freeze/DBI.h +++ b/cpp/src/Freeze/DBI.h @@ -26,10 +26,11 @@ class DBI : public DB, public JTCMutex { public: - DBI(const ::Ice::CommunicatorPtr&, const std::string&, const DBEnvIPtr&, ::DB_ENV*, ::DB*); + DBI(const ::Ice::CommunicatorPtr&, const DBEnvIPtr&, ::DB*, const std::string&); virtual ~DBI(); - virtual void put(const std::string&, const ::Ice::ObjectPtr&); + virtual std::string getName(); + virtual void put(const std::string&, const ::Ice::ObjectPtr&, bool); virtual ::Ice::ObjectPtr get(const std::string&); virtual void del(const std::string&); virtual void close(); @@ -38,12 +39,14 @@ public: private: ::Ice::CommunicatorPtr _communicator; - std::string _name; - DBEnvIPtr _dbenvObj; - ::DB_ENV* _dbenv; - ::DB* _db; ::Ice::LoggerPtr _logger; int _trace; + + DBEnvIPtr _dbenvObj; + ::DB* _db; + + std::string _name; + std::string _errorPrefix; }; class DBEnvI : public DBEnv, public JTCRecursiveMutex @@ -53,19 +56,50 @@ public: DBEnvI(const ::Ice::CommunicatorPtr&, const std::string&); virtual ~DBEnvI(); - virtual DBPtr open(const std::string&); + virtual std::string getName(); + virtual DBPtr openDB(const std::string&); + virtual TXNPtr startTXN(); virtual void close(); - void remove(const std::string&); - private: + // DBI needs access to add and remove + friend class DBI; + void add(const std::string&, const DBPtr&); + void remove(const std::string&); + ::Ice::CommunicatorPtr _communicator; - std::string _directory; + ::Ice::LoggerPtr _logger; + int _trace; + ::DB_ENV* _dbenv; + + std::string _name; + std::string _errorPrefix; + std::map<std::string, DBPtr> _dbmap; +}; + +class TXNI : public TXN, public JTCMutex +{ +public: + + TXNI(const ::Ice::CommunicatorPtr&, ::DB_ENV*, const std::string&); + virtual ~TXNI(); + + virtual void commit(); + virtual void abort(); + +private: + + ::Ice::CommunicatorPtr _communicator; ::Ice::LoggerPtr _logger; int _trace; + + ::DB_TXN* _tid; + + std::string _name; + std::string _errorPrefix; }; } |