diff options
Diffstat (limited to 'cpp/src/Freeze/EvictorI.cpp')
-rw-r--r-- | cpp/src/Freeze/EvictorI.cpp | 656 |
1 files changed, 199 insertions, 457 deletions
diff --git a/cpp/src/Freeze/EvictorI.cpp b/cpp/src/Freeze/EvictorI.cpp index 0840ecab263..5024a24b5bc 100644 --- a/cpp/src/Freeze/EvictorI.cpp +++ b/cpp/src/Freeze/EvictorI.cpp @@ -5,7 +5,7 @@ // Billerica, MA, USA // // All Rights Reserved. -//d +// // Ice is free software; you can redistribute it and/or modify it under // the terms of the GNU General Public License version 2 as published by // the Free Software Foundation. @@ -15,208 +15,24 @@ #include <Freeze/EvictorI.h> #include <Freeze/Initialize.h> #include <IceUtil/AbstractMutex.h> -#include <sys/stat.h> +#include <Freeze/Util.h> +#include <Freeze/EvictorIteratorI.h> +#include <Freeze/IndexI.h> + #include <typeinfo> using namespace std; using namespace Freeze; using namespace Ice; -#ifdef _WIN32 -# define FREEZE_DB_MODE 0 -#else -# define FREEZE_DB_MODE (S_IRUSR | S_IWUSR) -#endif - -namespace Freeze -{ - -class EvictorIteratorI : public EvictorIterator -{ -public: - - EvictorIteratorI(EvictorI&, Int, bool); - - virtual bool hasNext(); - virtual Identity next(); - -private: - - vector<Identity>::const_iterator - nextBatch(); - - EvictorI& _evictor; - size_t _batchSize; - bool _loadServants; - vector<Identity>::const_iterator _batchIterator; - - Key _key; - Value _value; - vector<Identity> _batch; - bool _more; -}; - -} - -namespace -{ - -inline void -initializeInDbt(const vector<Byte>& v, Dbt& dbt) -{ - dbt.set_data(const_cast<Byte*>(&v[0])); - dbt.set_size(v.size()); - dbt.set_ulen(0); - dbt.set_dlen(0); - dbt.set_doff(0); - dbt.set_flags(DB_DBT_USERMEM); -} - -inline void -initializeOutDbt(vector<Byte>& v, Dbt& dbt) -{ - v.resize(v.capacity()); - dbt.set_data(&v[0]); - dbt.set_size(0); - dbt.set_ulen(v.size()); - dbt.set_dlen(0); - dbt.set_doff(0); - dbt.set_flags(DB_DBT_USERMEM); -} - -void -handleMemoryException(const DbMemoryException& dx, Key& key, Dbt& dbKey) -{ - if(dbKey.get_size() > dbKey.get_ulen()) - { - // - // Keep the old key size in case it's used as input - // - size_t oldKeySize = key.size(); - - key.resize(dbKey.get_size()); - initializeOutDbt(key, dbKey); - dbKey.set_size(oldKeySize); - } - else - { - // - // Real problem - // - DatabaseException ex(__FILE__, __LINE__); - ex.message = dx.what(); - throw ex; - } -} - -void -handleMemoryException(const DbMemoryException& dx, Key& key, Dbt& dbKey, Value& value, Dbt& dbValue) -{ - bool resized = false; - if(dbKey.get_size() > dbKey.get_ulen()) - { - size_t oldKeySize = key.size(); - key.resize(dbKey.get_size()); - initializeOutDbt(key, dbKey); - dbKey.set_size(oldKeySize); - resized = true; - } - - if(dbValue.get_size() > dbValue.get_ulen()) - { - value.resize(dbValue.get_size()); - initializeOutDbt(value, dbValue); - resized = true; - } - - if(!resized) - { - // - // Real problem - // - DatabaseException ex(__FILE__, __LINE__); - ex.message = dx.what(); - throw ex; - } -} - -inline bool startWith(Key key, Key root) -{ - if(root.size() > key.size()) - { - return false; - } - return memcmp(&root[0], &key[0], root.size()) == 0; -} - -// -// Marshaling/unmarshaling persistent (key, data) pairs. The marshalRoot function -// is used to create a key prefix containing only the key's identity. -// - -void -marshalRoot(const Identity& v, Key& bytes, const CommunicatorPtr& communicator) -{ - IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); - IceInternal::BasicStream stream(instance.get()); - v.__write(&stream); - bytes.swap(stream.b); -} - -void -marshal(const EvictorStorageKey& v, Key& bytes, const CommunicatorPtr& communicator) -{ - IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); - IceInternal::BasicStream stream(instance.get()); - v.__write(&stream); - bytes.swap(stream.b); -} - -void -unmarshal(EvictorStorageKey& v, const Key& bytes, const CommunicatorPtr& communicator) -{ - IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); - IceInternal::BasicStream stream(instance.get()); - stream.b = bytes; - stream.i = stream.b.begin(); - v.__read(&stream); -} - -void -marshal(const ObjectRecord& v, Value& bytes, const CommunicatorPtr& communicator) -{ - IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); - IceInternal::BasicStream stream(instance.get()); - stream.marshalFacets(false); - stream.startWriteEncaps(); - v.__write(&stream); - stream.writePendingObjects(); - stream.endWriteEncaps(); - bytes.swap(stream.b); -} - -void -unmarshal(ObjectRecord& v, const Value& bytes, const CommunicatorPtr& communicator) -{ - IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); - IceInternal::BasicStream stream(instance.get()); - stream.b = bytes; - stream.i = stream.b.begin(); - stream.startReadEncaps(); - v.__read(&stream); - stream.readPendingObjects(); - stream.endReadEncaps(); -} - -} - Freeze::EvictorPtr Freeze::createEvictor(const CommunicatorPtr& communicator, const string& envName, const string& dbName, + const vector<IndexPtr>& indices, bool createDb) { - return new EvictorI(communicator, envName, dbName, createDb); + return new EvictorI(communicator, envName, dbName, indices, createDb); } Freeze::EvictorPtr @@ -224,50 +40,57 @@ Freeze::createEvictor(const CommunicatorPtr& communicator, const string& envName, DbEnv& dbEnv, const string& dbName, + const vector<IndexPtr>& indices, bool createDb) { - return new EvictorI(communicator, envName, dbEnv, dbName, createDb); + return new EvictorI(communicator, envName, dbEnv, dbName, indices, createDb); } Freeze::EvictorI::EvictorI(const CommunicatorPtr communicator, const string& envName, - const string& dbName, + const string& dbName, + const std::vector<Freeze::IndexPtr>& indices, bool createDb) : _evictorSize(10), _deactivated(false), _communicator(communicator), _dbEnv(0), _dbEnvHolder(SharedDbEnv::get(communicator, envName)), + _dbName(dbName), + _indices(indices), _trace(0), _generation(0) { _dbEnv = _dbEnvHolder.get(); - init(envName, dbName, createDb); + init(envName, createDb); } Freeze::EvictorI::EvictorI(const CommunicatorPtr communicator, const string& envName, DbEnv& dbEnv, const string& dbName, + const std::vector<Freeze::IndexPtr>& indices, bool createDb) : _evictorSize(10), _deactivated(false), _communicator(communicator), _dbEnv(&dbEnv), + _dbName(dbName), + _indices(indices), _trace(0), _generation(0) { - init(envName, dbName, createDb); + init(envName, createDb); } void -Freeze::EvictorI::init(const string& envName, const string& dbName, bool createDb) +Freeze::EvictorI::init(const string& envName, bool createDb) { _trace = _communicator->getProperties()->getPropertyAsInt("Freeze.Trace.Evictor"); - string propertyPrefix = string("Freeze.Evictor.") + envName + '.' + dbName; + string propertyPrefix = string("Freeze.Evictor.") + envName + '.' + _dbName; // // By default, we save every minute or when the size of the modified queue @@ -293,18 +116,44 @@ Freeze::EvictorI::init(const string& envName, const string& dbName, bool createD _maxTxSize = 100; } + bool populateEmptyIndices = + (_communicator->getProperties()-> + getPropertyAsIntWithDefault(propertyPrefix + ".PopulateEmptyIndices", 0) != 0); + + DbTxn* txn = 0; try { _db.reset(new Db(_dbEnv, 0)); - u_int32_t flags = DB_AUTO_COMMIT | DB_THREAD; + + _dbEnv->txn_begin(0, &txn, 0); + + u_int32_t flags = DB_THREAD; if(createDb) { flags |= DB_CREATE; } - _db->open(0, dbName.c_str(), 0, DB_BTREE, flags, FREEZE_DB_MODE); + _db->open(txn, _dbName.c_str(), 0, DB_BTREE, flags, FREEZE_DB_MODE); + + for(size_t i = 0; i < _indices.size(); ++i) + { + _indices[i]->_impl->associate(this, txn, createDb, populateEmptyIndices); + } + DbTxn* toCommit = txn; + txn = 0; + toCommit->commit(0); } catch(const DbException& dx) { + if(txn != 0) + { + try + { + txn->abort(); + } + catch(...) + { + } + } DatabaseException ex(__FILE__, __LINE__); ex.message = dx.what(); throw ex; @@ -1145,6 +994,12 @@ Freeze::EvictorI::deactivate(const string&) try { _db->close(0); + + for(size_t i = 0; i < _indices.size(); ++i) + { + _indices[i]->_impl->close(); + } + _indices.clear(); } catch(const DbException& dx) { @@ -1483,46 +1338,61 @@ Freeze::EvictorI::load(Dbc* dbc, Key& key, Value& value, EvictorStorageKey esk; unmarshal(esk, key, _communicator); - if(_trace >= 3) + if(root.size() == 0) { - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "reading facet identity = \"" << esk.identity << "\" "; + if(esk.facet.size() == 0) { - out << "(main object)"; + // + // Good, we found the object + // + marshalRoot(esk.identity, root, _communicator); } else { - out << "facet = \""; - for(size_t i = 0; i < esk.facet.size(); i++) + // + // Otherwise, skip this orphan facet (could be a temporary + // inconsistency on disk) + // + + if(_trace >= 3) { - out << esk.facet[i]; - if(i != esk.facet.size() - 1) - { - out << "."; - } - else - { - out << "\""; - } + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "Iterator is skipping orphan facet \"" << esk.identity + << "\" " << esk.facet; } } } - - FacetPtr facet = new Facet(elt.get()); - facet->status = clean; - unmarshal(facet->rec, value, _communicator); - - pair<FacetMap::iterator, bool> pair; - pair = elt->facets.insert(FacetMap::value_type(esk.facet, facet)); - assert(pair.second); - if(root.size() == 0) + if(root.size() != 0) { - assert(esk.facet.size() == 0); - identities.push_back(esk.identity); - marshalRoot(esk.identity, root, _communicator); - elt->mainObject = facet; + if(_trace >= 3) + { + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "reading facet identity = \"" << esk.identity << "\" "; + if(esk.facet.size() == 0) + { + out << "(main object)"; + } + else + { + out << "facet = " << esk.facet; + } + } + + FacetPtr facet = new Facet(elt.get()); + facet->status = clean; + unmarshal(facet->rec, value, _communicator); + + pair<FacetMap::iterator, bool> pair; + pair = elt->facets.insert(FacetMap::value_type(esk.facet, facet)); + assert(pair.second); + + if(esk.facet.size() == 0) + { + identities.push_back(esk.identity); + elt->mainObject = facet; + } } initializeOutDbt(key, dbKey); @@ -1549,10 +1419,13 @@ Freeze::EvictorI::load(Dbc* dbc, Key& key, Value& value, } } } - while(rs == 0 && startWith(key, root)); + while(rs == 0 && (root.size() == 0 || startWith(key, root))); - buildFacetMap(elt->facets); - evictorElements.push_back(elt); + if(root.size() != 0) + { + buildFacetMap(elt->facets); + evictorElements.push_back(elt); + } return (rs == 0); } @@ -1573,6 +1446,40 @@ Freeze::EvictorI::load(Dbc* dbc, Key& key, vector<Identity>& identities) int rs = 0; do { + if(root.size() == 0) + { + EvictorStorageKey esk; + unmarshal(esk, key, _communicator); + + if(esk.facet.size() == 0) + { + // + // Good, we found the object + // + marshalRoot(esk.identity, root, _communicator); + + if(_trace >= 3) + { + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "Iterator read \"" << esk.identity << "\""; + } + identities.push_back(esk.identity); + } + else + { + // + // Otherwise, skip this orphan facet (could be a temporary + // inconsistency on disk) + // + if(_trace >= 3) + { + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "Iterator is skipping orphan facet \"" << esk.identity + << "\" " << esk.facet; + } + } + } + initializeOutDbt(key, dbKey); for(;;) @@ -1592,7 +1499,7 @@ Freeze::EvictorI::load(Dbc* dbc, Key& key, vector<Identity>& identities) } } } - while(rs == 0 && startWith(key, root)); + while(rs == 0 && (root.size() == 0 || startWith(key, root))); return (rs == 0); } @@ -1630,11 +1537,69 @@ Freeze::EvictorI::insert(const vector<Identity>& identities, // // Otherwise we don't insert anything // - } } +// +// Marshaling/unmarshaling persistent (key, data) pairs. The marshalRoot function +// is used to create a key prefix containing only the key's identity. +// + +void +Freeze::EvictorI::marshalRoot(const Identity& v, Key& bytes, const CommunicatorPtr& communicator) +{ + IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); + IceInternal::BasicStream stream(instance.get()); + v.__write(&stream); + bytes.swap(stream.b); +} + +void +Freeze::EvictorI::marshal(const EvictorStorageKey& v, Key& bytes, const CommunicatorPtr& communicator) +{ + IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); + IceInternal::BasicStream stream(instance.get()); + v.__write(&stream); + bytes.swap(stream.b); +} + +void +Freeze::EvictorI::unmarshal(EvictorStorageKey& v, const Key& bytes, const CommunicatorPtr& communicator) +{ + IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); + IceInternal::BasicStream stream(instance.get()); + stream.b = bytes; + stream.i = stream.b.begin(); + v.__read(&stream); +} + +void +Freeze::EvictorI::marshal(const ObjectRecord& v, Value& bytes, const CommunicatorPtr& communicator) +{ + IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); + IceInternal::BasicStream stream(instance.get()); + stream.marshalFacets(false); + stream.startWriteEncaps(); + v.__write(&stream); + stream.writePendingObjects(); + stream.endWriteEncaps(); + bytes.swap(stream.b); +} + +void +Freeze::EvictorI::unmarshal(ObjectRecord& v, const Value& bytes, const CommunicatorPtr& communicator) +{ + IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); + IceInternal::BasicStream stream(instance.get()); + stream.b = bytes; + stream.i = stream.b.begin(); + stream.startReadEncaps(); + v.__read(&stream); + stream.readPendingObjects(); + stream.endReadEncaps(); +} + void Freeze::EvictorI::evict() @@ -1881,19 +1846,7 @@ Freeze::EvictorI::load(const Identity& ident) } else { - out << "facet = \""; - for(size_t i = 0; i < esk.facet.size(); i++) - { - out << esk.facet[i]; - if(i != esk.facet.size() - 1) - { - out << "."; - } - else - { - out << "\""; - } - } + out << "facet = " << esk.facet; } } @@ -2213,9 +2166,8 @@ Freeze::EvictorI::buildFacetMap(const FacetMap& facets) if(r == facets.end()) { // - // TODO: log warning for this orphan facet + // TODO: tracing for this orphna facet // - assert(0); } else { @@ -2223,8 +2175,6 @@ Freeze::EvictorI::buildFacetMap(const FacetMap& facets) } } } - - } @@ -2245,214 +2195,6 @@ Freeze::EvictorI::EvictorElement::~EvictorElement() { } -Freeze::EvictorIteratorI::EvictorIteratorI(EvictorI& evictor, Int batchSize, bool loadServants) : - _evictor(evictor), - _batchSize(static_cast<size_t>(batchSize)), - _loadServants(loadServants), - _key(1024), - _more(true) -{ - if(loadServants) - { - _value.resize(1024); - } - _batchIterator = _batch.end(); -} - - -bool -Freeze::EvictorIteratorI::hasNext() -{ - if(_batchIterator != _batch.end()) - { - return true; - } - else - { - _batchIterator = nextBatch(); - return (_batchIterator != _batch.end()); - } -} - -Identity -Freeze::EvictorIteratorI::next() -{ - if(hasNext()) - { - return *_batchIterator++; - } - else - { - throw Freeze::NoSuchElementException(__FILE__, __LINE__); - } -} - - -vector<Identity>::const_iterator -Freeze::EvictorIteratorI::nextBatch() -{ - _batch.clear(); - - if(!_more) - { - return _batch.end(); - } - - vector<EvictorI::EvictorElementPtr> evictorElements; - evictorElements.reserve(_batchSize); - - Key firstKey; - firstKey = _key; - - int loadedGeneration = 0; - - try - { - for(;;) - { - _batch.clear(); - evictorElements.clear(); - - Dbt dbKey; - initializeOutDbt(_key, dbKey); - - Dbt dbValue; - if(_loadServants) - { - initializeOutDbt(_value, dbValue); - } - else - { - dbValue.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL); - } - - Dbc* dbc = 0; - try - { - // - // Move to the first record - // - u_int32_t flags = DB_NEXT; - if(_key.size() > 0) - { - // - // _key represents the next element not yet returned - // if it has been deleted, we want the one after - // - flags = DB_SET_RANGE; - - // - // Will be used as input as well - // - dbKey.set_size(firstKey.size()); - } - - if(_loadServants) - { - loadedGeneration = _evictor.currentGeneration(); - } - - _evictor.db()->cursor(0, &dbc, 0); - - for(;;) - { - try - { - _more = (dbc->get(&dbKey, &dbValue, flags) == 0); - if(_more) - { - _key.resize(dbKey.get_size()); - // - // No need to resize data as we never use it as input - // - } - break; - } - catch(const DbMemoryException& dx) - { - handleMemoryException(dx, _key, dbKey, _value, dbValue); - } - } - - while(_batch.size() < _batchSize && _more) - { - // - // Even when count is 0, we read one more record (unless we reach the end) - // - if(_loadServants) - { - _more = _evictor.load(dbc, _key, _value, _batch, evictorElements); - } - else - { - _more = _evictor.load(dbc, _key, _batch); - } - } - - Dbc* toClose = dbc; - dbc = 0; - toClose->close(); - break; // for (;;) - } - catch(const DbDeadlockException&) - { - if(dbc != 0) - { - try - { - dbc->close(); - } - catch(const DbDeadlockException&) - { - // - // Ignored - // - } - } - _key = firstKey; - // - // Retry - // - } - catch(...) - { - if(dbc != 0) - { - try - { - dbc->close(); - } - catch(const DbDeadlockException&) - { - // - // Ignored - // - } - } - throw; - } - } - } - catch(const DbException& dx) - { - DatabaseException ex(__FILE__, __LINE__); - ex.message = dx.what(); - throw ex; - } - - if(_batch.size() == 0) - { - return _batch.end(); - } - else - { - if(_loadServants) - { - _evictor.insert(_batch, evictorElements, loadedGeneration); - } - return _batch.begin(); - } -} |