diff options
Diffstat (limited to 'cpp/src/Freeze/ObjectStore.cpp')
-rw-r--r-- | cpp/src/Freeze/ObjectStore.cpp | 646 |
1 files changed, 646 insertions, 0 deletions
diff --git a/cpp/src/Freeze/ObjectStore.cpp b/cpp/src/Freeze/ObjectStore.cpp new file mode 100644 index 00000000000..018b8fd24e3 --- /dev/null +++ b/cpp/src/Freeze/ObjectStore.cpp @@ -0,0 +1,646 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <Freeze/ObjectStore.h> +#include <Freeze/EvictorI.h> +#include <Freeze/BackgroundSaveEvictorI.h> +#include <Freeze/Util.h> +#include <Freeze/Catalog.h> +#include <Freeze/TransactionI.h> +#include <Freeze/IndexI.h> + +#include <Ice/StringConverter.h> + +using namespace std; +using namespace Ice; +using namespace Freeze; + +Freeze::ObjectStoreBase::ObjectStoreBase(const string& facet, const string& facetType, + bool createDb, EvictorIBase* evictor, + const vector<IndexPtr>& indices, + bool populateEmptyIndices) : + _facet(facet), + _evictor(evictor), + _indices(indices), + _communicator(evictor->communicator()) +{ + if(facet == "") + { + _dbName = EvictorIBase::defaultDb; + } + else + { + _dbName = facet; + } + + if(!facetType.empty()) + { + // + // Create a sample servant with this type + // + ObjectFactoryPtr factory = _communicator->findObjectFactory(facetType); + if(factory == 0) + { + throw DatabaseException(__FILE__, __LINE__, "No object factory registered for type-id '" + facetType + "'"); + } + + _sampleServant = factory->create(facetType); + } + + ConnectionPtr catalogConnection = createConnection(_communicator, evictor->dbEnv()->getEnvName()); + Catalog catalog(catalogConnection, catalogName()); + + Catalog::iterator p = catalog.find(evictor->filename()); + if(p != catalog.end()) + { + if(p->second.evictor == false) + { + DatabaseException ex(__FILE__, __LINE__); + ex.message = evictor->filename() + " is an evictor database"; + throw ex; + } + } + + DbEnv* dbEnv = evictor->dbEnv()->getEnv(); + + try + { + _db.reset(new Db(dbEnv, 0)); + + Ice::PropertiesPtr properties = evictor->communicator()->getProperties(); + string propPrefix = "Freeze.Evictor." + evictor->filename() + "."; + + int btreeMinKey = properties->getPropertyAsInt(propPrefix + _dbName + ".BtreeMinKey"); + if(btreeMinKey > 2) + { + if(evictor->trace() >= 1) + { + Trace out(evictor->communicator()->getLogger(), "Freeze.Evictor"); + out << "Setting \"" << evictor->filename() + "." + _dbName << "\"'s btree minkey to " << btreeMinKey; + } + + _db->set_bt_minkey(btreeMinKey); + } + + bool checksum = properties->getPropertyAsInt(propPrefix + "Checksum") > 0; + if(checksum) + { + if(evictor->trace() >= 1) + { + Trace out(evictor->communicator()->getLogger(), "Freeze.Evictor"); + out << "Turning checksum on for \"" << evictor->filename() << "\""; + } + + _db->set_flags(DB_CHKSUM); + } + + int pageSize = properties->getPropertyAsInt(propPrefix + "PageSize"); + if(pageSize > 0) + { + if(evictor->trace() >= 1) + { + Trace out(evictor->communicator()->getLogger(), "Freeze.Evictor"); + out << "Setting \"" << evictor->filename() << "\"'s pagesize to " << pageSize; + } + + _db->set_pagesize(pageSize); + } + + + TransactionPtr tx = catalogConnection->beginTransaction(); + DbTxn* txn = getTxn(tx); + + u_int32_t flags = DB_THREAD; + if(createDb) + { + flags |= DB_CREATE; + } + + // + // We keep _dbName as a native string here, while it might have + // been better to convert it to UTF-8, changing this isn't + // possible without potentially breaking backward compatibility + // with deployed databases. + // + _db->open(txn, Ice::nativeToUTF8(evictor->communicator(), evictor->filename()).c_str(), _dbName.c_str(), + DB_BTREE, flags, FREEZE_DB_MODE); + + for(size_t i = 0; i < _indices.size(); ++i) + { + _indices[i]->_impl->associate(this, txn, createDb, populateEmptyIndices); + } + + if(p == catalog.end()) + { + CatalogData catalogData; + catalogData.evictor = true; + catalog.put(Catalog::value_type(evictor->filename(), catalogData)); + } + + tx->commit(); + } + catch(const DbException& dx) + { + TransactionPtr tx = catalogConnection->currentTransaction(); + if(tx != 0) + { + try + { + tx->rollback(); + } + catch(...) + { + } + } + + if(dx.get_errno() == ENOENT) + { + NotFoundException ex(__FILE__, __LINE__); + ex.message = dx.what(); + throw ex; + } + else + { + DatabaseException ex(__FILE__, __LINE__); + ex.message = dx.what(); + throw ex; + } + } + catch(...) + { + TransactionPtr tx = catalogConnection->currentTransaction(); + if(tx != 0) + { + try + { + tx->rollback(); + } + catch(...) + { + } + } + throw; + } +} + +Freeze::ObjectStoreBase::~ObjectStoreBase() +{ + try + { + _db->close(0); + + for(size_t i = 0; i < _indices.size(); ++i) + { + _indices[i]->_impl->close(); + } + _indices.clear(); + } + catch(const DbException& dx) + { + DatabaseException ex(__FILE__, __LINE__); + ex.message = dx.what(); + throw ex; + } +} + +bool +Freeze::ObjectStoreBase::dbHasObject(const Identity& ident, const TransactionIPtr& transaction) const +{ + DbTxn* tx = 0; + if(transaction != 0) + { + tx = transaction->dbTxn(); + if(tx == 0) + { + throw DatabaseException(__FILE__, __LINE__, "inactive transaction"); + } + } + + Key key; + marshal(ident, key, _communicator); + Dbt dbKey; + initializeInDbt(key, dbKey); + + // + // Keep 0 length since we're not interested in the data + // + Dbt dbValue; + dbValue.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL); + + for(;;) + { + try + { + int err = _db->get(tx, &dbKey, &dbValue, 0); + + if(err == 0) + { + return true; + } + else if(err == DB_NOTFOUND) + { + return false; + } + else + { + assert(0); + throw DatabaseException(__FILE__, __LINE__); + } + } + catch(const DbDeadlockException& dx) + { + if(_evictor->deadlockWarning()) + { + Warning out(_communicator->getLogger()); + out << "Deadlock in Freeze::ObjectStoreBase::dbHasObject while searching \"" + << _evictor->filename() + "/" + _dbName << "\"; retrying ..."; + } + + if(tx != 0) + { + throw DeadlockException(__FILE__, __LINE__, dx.what(), transaction); + } + // Else, try again + } + catch(const DbException& dx) + { + DatabaseException ex(__FILE__, __LINE__); + ex.message = dx.what(); + throw ex; + } + } + +} + +void +Freeze::ObjectStoreBase::save(Key& key, Value& value, Byte status, DbTxn* tx) +{ + switch(status) + { + case created: + case modified: + { + Dbt dbKey; + Dbt dbValue; + initializeInDbt(key, dbKey); + initializeInDbt(value, dbValue); + u_int32_t flags = (status == created) ? DB_NOOVERWRITE : 0; + int err = _db->put(tx, &dbKey, &dbValue, flags); + if(err != 0) + { + throw DatabaseException(__FILE__, __LINE__); + } + break; + } + + case destroyed: + { + Dbt dbKey; + initializeInDbt(key, dbKey); + int err = _db->del(tx, &dbKey, 0); + if(err != 0) + { + throw DatabaseException(__FILE__, __LINE__); + } + break; + } + default: + { + assert(0); + } + } +} + +void +Freeze::ObjectStoreBase::marshal(const Identity& ident, Key& bytes, const CommunicatorPtr& communicator) +{ + IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); + IceInternal::BasicStream stream(instance.get()); + ident.__write(&stream); + vector<Byte>(stream.b.begin(), stream.b.end()).swap(bytes); +} + +void +Freeze::ObjectStoreBase::unmarshal(Identity& ident, const Key& bytes, const CommunicatorPtr& communicator) +{ + IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); + IceInternal::BasicStream stream(instance.get()); + stream.b.resize(bytes.size()); + memcpy(&stream.b[0], &bytes[0], bytes.size()); + stream.i = stream.b.begin(); + ident.__read(&stream); +} + +void +Freeze::ObjectStoreBase::marshal(const ObjectRecord& v, Value& bytes, const CommunicatorPtr& communicator) +{ + IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); + IceInternal::BasicStream stream(instance.get()); + stream.startWriteEncaps(); + v.__write(&stream); + stream.writePendingObjects(); + stream.endWriteEncaps(); + vector<Byte>(stream.b.begin(), stream.b.end()).swap(bytes); +} + +void +Freeze::ObjectStoreBase::unmarshal(ObjectRecord& v, const Value& bytes, const CommunicatorPtr& communicator) +{ + IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); + IceInternal::BasicStream stream(instance.get()); + stream.sliceObjects(false); + stream.b.resize(bytes.size()); + memcpy(&stream.b[0], &bytes[0], bytes.size()); + stream.i = stream.b.begin(); + stream.startReadEncaps(); + v.__read(&stream); + stream.readPendingObjects(); + stream.endReadEncaps(); +} + +bool +Freeze::ObjectStoreBase::load(const Identity& ident, const TransactionIPtr& transaction, ObjectRecord& rec) +{ + if(transaction == 0) + { + throw DatabaseException(__FILE__, __LINE__, "no active transaction"); + } + + DbTxn* txn = transaction->dbTxn(); + + if(txn == 0) + { + throw DatabaseException(__FILE__, __LINE__, "inactive transaction"); + } + + Key key; + marshal(ident, key, _communicator); + + Dbt dbKey; + initializeInDbt(key, dbKey); + + const size_t defaultValueSize = 4096; + Value value(defaultValueSize); + + Dbt dbValue; + initializeOutDbt(value, dbValue); + + for(;;) + { + try + { + int rs =_db->get(txn, &dbKey, &dbValue, 0); + if(rs == DB_NOTFOUND) + { + return false; + } + else if(rs != 0) + { + assert(0); + throw DatabaseException(__FILE__, __LINE__); + } + break; // for(;;) + } + catch(const DbDeadlockException& dx) + { + if(_evictor->deadlockWarning()) + { + Warning out(_communicator->getLogger()); + out << "Deadlock in Freeze::ObjectStoreBase::load while searching \"" + << _evictor->filename() + "/" + _dbName << "\""; + } + throw DeadlockException(__FILE__, __LINE__, dx.what(), transaction); + } + catch(const DbException& dx) + { + handleDbException(dx, value, dbValue, __FILE__, __LINE__); + } + } + + unmarshal(rec, value, _communicator); + _evictor->initialize(ident, _facet, rec.servant); + return true; +} + +void +Freeze::ObjectStoreBase::update(const Identity& ident, const ObjectRecord& rec, const TransactionIPtr& transaction) +{ + if(transaction == 0) + { + throw DatabaseException(__FILE__, __LINE__, "no active transaction"); + } + + DbTxn* txn = transaction->dbTxn(); + + if(txn == 0) + { + throw DatabaseException(__FILE__, __LINE__, "inactive transaction"); + } + + Key key; + marshal(ident, key, _communicator); + + Value value; + marshal(rec, value, _communicator); + + Dbt dbKey; + Dbt dbValue; + initializeInDbt(key, dbKey); + initializeInDbt(value, dbValue); + u_int32_t flags = 0; + + try + { + _db->put(txn, &dbKey, &dbValue, flags); + } + catch(const DbDeadlockException& dx) + { + if(_evictor->deadlockWarning()) + { + Warning out(_communicator->getLogger()); + out << "Deadlock in Freeze::ObjectStoreBase::update while updating \"" + << _evictor->filename() + "/" + _dbName << "\""; + } + throw DeadlockException(__FILE__, __LINE__, dx.what(), transaction); + } + catch(const DbException& dx) + { + handleDbException(dx, __FILE__, __LINE__); + } +} + +bool +Freeze::ObjectStoreBase::insert(const Identity& ident, const ObjectRecord& rec, const TransactionIPtr& transaction) +{ + DbTxn* tx = 0; + if(transaction != 0) + { + tx = transaction->dbTxn(); + if(tx == 0) + { + throw DatabaseException(__FILE__, __LINE__, "inactive transaction"); + } + } + + Key key; + marshal(ident, key, _communicator); + + Value value; + marshal(rec, value, _communicator); + + Dbt dbKey; + Dbt dbValue; + initializeInDbt(key, dbKey); + initializeInDbt(value, dbValue); + u_int32_t flags = DB_NOOVERWRITE; + if(tx == 0) + { + flags |= DB_AUTO_COMMIT; + } + + for(;;) + { + try + { + return _db->put(tx, &dbKey, &dbValue, flags) == 0; + } + catch(const DbDeadlockException& dx) + { + if(_evictor->deadlockWarning()) + { + Warning out(_communicator->getLogger()); + out << "Deadlock in Freeze::ObjectStoreBase::insert while updating \"" + << _evictor->filename() + "/" + _dbName << "\""; + } + if(tx != 0) + { + throw DeadlockException(__FILE__, __LINE__, dx.what(), transaction); + } + // + // Otherwise, try again + // + } + catch(const DbException& dx) + { + handleDbException(dx, __FILE__, __LINE__); + } + } +} + +bool +Freeze::ObjectStoreBase::remove(const Identity& ident, const TransactionIPtr& transaction) +{ + DbTxn* tx = 0; + if(transaction != 0) + { + tx = transaction->dbTxn(); + if(tx == 0) + { + throw DatabaseException(__FILE__, __LINE__, "inactive transaction"); + } + } + + Key key; + marshal(ident, key, _communicator); + + Dbt dbKey; + initializeInDbt(key, dbKey); + + for(;;) + { + try + { + return _db->del(tx, &dbKey, tx != 0 ? 0 : DB_AUTO_COMMIT) == 0; + } + catch(const DbDeadlockException& dx) + { + if(_evictor->deadlockWarning()) + { + Warning out(_communicator->getLogger()); + out << "Deadlock in Freeze::ObjectStoreBase::remove while updating \"" + << _evictor->filename() + "/" + _dbName << "\""; + } + if(tx != 0) + { + throw DeadlockException(__FILE__, __LINE__, dx.what(), transaction); + } + // + // Otherwise, try again + // + } + catch(const DbException& dx) + { + handleDbException(dx, __FILE__, __LINE__); + } + } +} + + +const string& +Freeze::ObjectStoreBase::dbName() const +{ + return _dbName; +} + +// +// Non transactional load +// +bool +Freeze::ObjectStoreBase::loadImpl(const Identity& ident, ObjectRecord& rec) +{ + Key key; + marshal(ident, key, _communicator); + + Dbt dbKey; + initializeInDbt(key, dbKey); + + const size_t defaultValueSize = 4096; + Value value(defaultValueSize); + + Dbt dbValue; + initializeOutDbt(value, dbValue); + + for(;;) + { + try + { + int rs = _db->get(0, &dbKey, &dbValue, 0); + if(rs == DB_NOTFOUND) + { + return false; + } + else if(rs != 0) + { + assert(0); + throw DatabaseException(__FILE__, __LINE__); + } + break; // for(;;) + } + catch(const DbDeadlockException&) + { + if(_evictor->deadlockWarning()) + { + Warning out(_communicator->getLogger()); + out << "Deadlock in Freeze::ObjectStoreBase::load while searching \"" + << _evictor->filename() + "/" + _dbName << "\"; retrying ..."; + } + // + // Ignored, try again + // + } + catch(const DbException& dx) + { + handleDbException(dx, value, dbValue, __FILE__, __LINE__); + } + } + + unmarshal(rec, value, _communicator); + _evictor->initialize(ident, _facet, rec.servant); + return true; +} |