diff options
author | Bernard Normier <bernard@zeroc.com> | 2007-12-12 12:03:04 -0500 |
---|---|---|
committer | Bernard Normier <bernard@zeroc.com> | 2007-12-12 12:03:04 -0500 |
commit | 28c22de9b812daeffa630656818da6ec5411a7da (patch) | |
tree | a3b5142c2144ead6aafb62dd71baf0657296e014 /cpp/src/Freeze/SharedDbEnv.cpp | |
parent | Fixed bug #2546 (diff) | |
download | ice-28c22de9b812daeffa630656818da6ec5411a7da.tar.bz2 ice-28c22de9b812daeffa630656818da6ec5411a7da.tar.xz ice-28c22de9b812daeffa630656818da6ec5411a7da.zip |
Fixed bug #2557 (closing database within transaction)
Diffstat (limited to 'cpp/src/Freeze/SharedDbEnv.cpp')
-rw-r--r-- | cpp/src/Freeze/SharedDbEnv.cpp | 239 |
1 files changed, 170 insertions, 69 deletions
diff --git a/cpp/src/Freeze/SharedDbEnv.cpp b/cpp/src/Freeze/SharedDbEnv.cpp index 4ab0e365a6e..956dedc94c4 100644 --- a/cpp/src/Freeze/SharedDbEnv.cpp +++ b/cpp/src/Freeze/SharedDbEnv.cpp @@ -10,19 +10,21 @@ #include <Freeze/SharedDbEnv.h> #include <Freeze/Exception.h> #include <Freeze/Util.h> -#include <Freeze/SharedDb.h> +#include <Freeze/MapDb.h> #include <Freeze/TransactionalEvictorContext.h> +#include <Freeze/Catalog.h> +#include <Freeze/CatalogIndexList.h> #include <IceUtil/IceUtil.h> #include <cstdlib> -#include <map> #include <memory> using namespace std; using namespace IceUtil; using namespace Ice; +using namespace Freeze; namespace Freeze { @@ -64,15 +66,15 @@ operator<(const MapKey& lhs, const MapKey& rhs) } #if DB_VERSION_MAJOR != 4 - #error Freeze requires DB 4.x +#error Freeze requires DB 4.x #endif #if DB_VERSION_MINOR < 3 void dbErrCallback(const char* prefix, char* msg) #else -void -dbErrCallback(const ::DbEnv* ignored, const char* prefix, const char* msg) + void + dbErrCallback(const ::DbEnv* ignored, const char* prefix, const char* msg) #endif { const Freeze::SharedDbEnv* env = reinterpret_cast<const Freeze::SharedDbEnv*>(prefix); @@ -129,42 +131,86 @@ Freeze::SharedDbEnv::get(const CommunicatorPtr& communicator, const string& envN Freeze::SharedDbEnv::~SharedDbEnv() { - if(_trace >= 1) + cleanup(); +} + + +Freeze::MapDb* +Freeze::SharedDbEnv::getSharedMapDb(const string& dbName, + const string& key, + const string& value, + const KeyCompareBasePtr& keyCompare, + const vector<MapIndexBasePtr>& indices, + bool createDb) +{ + // + // We don't want to lock to retrieve the catalog or catalog index + // + + if(dbName == _catalog->dbName()) { - Trace out(_communicator->getLogger(), "Freeze.DbEnv"); - out << "closing database environment \"" << _envName << "\""; + _catalog->checkTypes(key, value); + return _catalog; + } + else if(dbName == _catalogIndexList->dbName()) + { + _catalogIndexList->checkTypes(key, value); + return _catalogIndexList; } + + IceUtil::Mutex::Lock lock(_mutex); + + SharedDbMap::iterator p = _sharedDbMap.find(dbName); + if(p != _sharedDbMap.end()) + { + MapDb* db = p->second; + db->checkTypes(key, value); + db->connectIndices(indices); + return db; + } + // - // Release catalogs (to close it) + // key not found, let's create and open a new Db // - _catalog = 0; - _catalogIndexList = 0; // - // First terminate checkpointing thread + // Since we're going to put this SharedDb in the map no matter + // what, we use our own transaction and connection to do so // - if(_thread != 0) - { - _thread->terminate(); - _thread = 0; - } - if(_envHolder.get() != 0) + ConnectionIPtr insertConnection = new ConnectionI(this); + + auto_ptr<MapDb> result(new MapDb(insertConnection, dbName, key, value, + keyCompare, indices, createDb)); + + // + // Insert it into the map + // + pair<SharedDbMap::iterator, bool> insertResult; + insertResult = _sharedDbMap.insert(SharedDbMap::value_type(dbName, result.get())); + assert(insertResult.second); + + return result.release(); +} + + + +void +Freeze::SharedDbEnv::removeSharedMapDb(const string& dbName) +{ + IceUtil::Mutex::Lock lock(_mutex); + + SharedDbMap::iterator p = _sharedDbMap.find(dbName); + if(p != _sharedDbMap.end()) { - try - { - _envHolder->close(0); - } - catch(const ::DbException& dx) - { - DatabaseException ex(__FILE__, __LINE__); - ex.message = dx.what(); - throw ex; - } + MapDb* db = p->second; + _sharedDbMap.erase(p); + delete db; } } + void Freeze::SharedDbEnv::__incRef() { IceUtil::StaticMutex::Lock lock(_refCountMutex); @@ -305,7 +351,7 @@ Freeze::SharedDbEnv::setCurrentTransaction(const Freeze::TransactionPtr& tx) if(conn->dbEnv().get() != this) { throw DatabaseException(__FILE__, __LINE__, "the given transaction is bound to environment '" + - conn->dbEnv()->_envName + "'"); + conn->dbEnv()->_envName + "'"); } } @@ -363,6 +409,8 @@ Freeze::SharedDbEnv::SharedDbEnv(const std::string& envName, _env(env), _envName(envName), _communicator(communicator), + _catalog(0), + _catalogIndexList(0), _refCount(0) { Ice::PropertiesPtr properties = _communicator->getProperties(); @@ -383,39 +431,40 @@ Freeze::SharedDbEnv::SharedDbEnv(const std::string& envName, _trace = properties->getPropertyAsInt("Freeze.Trace.DbEnv"); - if(_env == 0) + try { - _envHolder.reset(new DbEnv(0)); - _env = _envHolder.get(); - - if(_trace >= 1) - { - Trace out(_communicator->getLogger(), "Freeze.DbEnv"); - out << "opening database environment \"" << envName << "\""; - } - - string propertyPrefix = string("Freeze.DbEnv.") + envName; - - try + if(_env == 0) { + _envHolder.reset(new DbEnv(0)); + _env = _envHolder.get(); + + if(_trace >= 1) + { + Trace out(_communicator->getLogger(), "Freeze.DbEnv"); + out << "opening database environment \"" << envName << "\""; + } + + string propertyPrefix = string("Freeze.DbEnv.") + envName; + + _env->set_errpfx(reinterpret_cast<char*>(this)); - + _env->set_errcall(dbErrCallback); - + #ifdef _WIN32 // // Berkeley DB may use a different C++ runtime // _env->set_alloc(::malloc, ::realloc, ::free); #endif - + // // Deadlock detection // _env->set_lk_detect(DB_LOCK_YOUNGEST); - + u_int32_t flags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN; - + if(properties->getPropertyAsInt(propertyPrefix + ".DbRecoverFatal") != 0) { flags |= DB_RECOVER_FATAL | DB_CREATE; @@ -424,19 +473,19 @@ Freeze::SharedDbEnv::SharedDbEnv(const std::string& envName, { flags |= DB_RECOVER | DB_CREATE; } - + if(properties->getPropertyAsIntWithDefault(propertyPrefix + ".DbPrivate", 1) != 0) { flags |= DB_PRIVATE; } - - + + // // Auto delete // bool autoDelete = (properties->getPropertyAsIntWithDefault( propertyPrefix + ".OldLogsAutoDelete", 1) != 0); - + if(autoDelete) { _env->set_flags(DB_LOG_AUTOREMOVE, 1); @@ -451,33 +500,85 @@ Freeze::SharedDbEnv::SharedDbEnv(const std::string& envName, propertyPrefix + ".DbHome", envName); _env->open(dbHome.c_str(), flags, FREEZE_DB_MODE); - } - catch(const ::DbException& dx) - { - DatabaseException ex(__FILE__, __LINE__); - ex.message = dx.what(); - throw ex; - } + + // + // Default checkpoint period is every 120 seconds + // + Int checkpointPeriod = properties->getPropertyAsIntWithDefault( + propertyPrefix + ".CheckpointPeriod", 120); + Int kbyte = properties->getPropertyAsIntWithDefault(propertyPrefix + ".PeriodicCheckpointMinSize", 0); + if(checkpointPeriod > 0) + { + _thread = new CheckpointThread(*this, Time::seconds(checkpointPeriod), kbyte, _trace); + } + } + // - // Default checkpoint period is every 120 seconds + // Get catalogs // - Int checkpointPeriod = properties->getPropertyAsIntWithDefault( - propertyPrefix + ".CheckpointPeriod", 120); - Int kbyte = properties->getPropertyAsIntWithDefault(propertyPrefix + ".PeriodicCheckpointMinSize", 0); - - if(checkpointPeriod > 0) - { - _thread = new CheckpointThread(*this, Time::seconds(checkpointPeriod), kbyte, _trace); - } + _catalog = new MapDb(_communicator, catalogName(), CatalogKeyCodec::typeId(), CatalogValueCodec::typeId(), _env); + _catalogIndexList = new MapDb(_communicator, catalogIndexListName(), + CatalogIndexListKeyCodec::typeId(), CatalogIndexListValueCodec::typeId(), _env); + + } + catch(const ::DbException& dx) + { + cleanup(); + throw DatabaseException(__FILE__, __LINE__, dx.what()); + } +} + +void +Freeze::SharedDbEnv::cleanup() +{ + if(_trace >= 1) + { + Trace out(_communicator->getLogger(), "Freeze.DbEnv"); + out << "closing database environment \"" << _envName << "\""; + } + + // + // Close & destroy all MapDbs + // + for(SharedDbMap::iterator p = _sharedDbMap.begin(); p != _sharedDbMap.end(); ++p) + { + delete p->second; + } + + // + // Same for catalogs + // + delete _catalog; + delete _catalogIndexList; + + // + // Then terminate checkpointing thread + // + if(_thread != 0) + { + _thread->terminate(); + _thread = 0; } // - // Get catalogs + // And finally close env // - SharedDb::openCatalogs(*this, _catalog, _catalogIndexList); + + if(_envHolder.get() != 0) + { + try + { + _envHolder->close(0); + } + catch(const ::DbException& dx) + { + throw DatabaseException(__FILE__, __LINE__, dx.what()); + } + } } + Freeze::CheckpointThread::CheckpointThread(SharedDbEnv& dbEnv, const Time& checkpointPeriod, Int kbyte, Int trace) : _dbEnv(dbEnv), _done(false), |