diff options
Diffstat (limited to 'cpp/src/Freeze')
-rw-r--r-- | cpp/src/Freeze/EvictorI.cpp | 656 | ||||
-rw-r--r-- | cpp/src/Freeze/EvictorI.h | 80 | ||||
-rw-r--r-- | cpp/src/Freeze/EvictorIteratorI.cpp | 231 | ||||
-rw-r--r-- | cpp/src/Freeze/EvictorIteratorI.h | 55 | ||||
-rw-r--r-- | cpp/src/Freeze/Index.cpp | 48 | ||||
-rw-r--r-- | cpp/src/Freeze/IndexI.cpp | 352 | ||||
-rw-r--r-- | cpp/src/Freeze/IndexI.h | 61 | ||||
-rw-r--r-- | cpp/src/Freeze/Makefile | 4 | ||||
-rw-r--r-- | cpp/src/Freeze/MapI.cpp | 29 | ||||
-rw-r--r-- | cpp/src/Freeze/SharedDb.cpp | 8 | ||||
-rw-r--r-- | cpp/src/Freeze/SharedDbEnv.cpp | 11 | ||||
-rw-r--r-- | cpp/src/Freeze/Util.cpp | 77 | ||||
-rw-r--r-- | cpp/src/Freeze/Util.h | 66 |
13 files changed, 1175 insertions, 503 deletions
diff --git a/cpp/src/Freeze/EvictorI.cpp b/cpp/src/Freeze/EvictorI.cpp index 0840ecab263..5024a24b5bc 100644 --- a/cpp/src/Freeze/EvictorI.cpp +++ b/cpp/src/Freeze/EvictorI.cpp @@ -5,7 +5,7 @@ // Billerica, MA, USA // // All Rights Reserved. -//d +// // Ice is free software; you can redistribute it and/or modify it under // the terms of the GNU General Public License version 2 as published by // the Free Software Foundation. @@ -15,208 +15,24 @@ #include <Freeze/EvictorI.h> #include <Freeze/Initialize.h> #include <IceUtil/AbstractMutex.h> -#include <sys/stat.h> +#include <Freeze/Util.h> +#include <Freeze/EvictorIteratorI.h> +#include <Freeze/IndexI.h> + #include <typeinfo> using namespace std; using namespace Freeze; using namespace Ice; -#ifdef _WIN32 -# define FREEZE_DB_MODE 0 -#else -# define FREEZE_DB_MODE (S_IRUSR | S_IWUSR) -#endif - -namespace Freeze -{ - -class EvictorIteratorI : public EvictorIterator -{ -public: - - EvictorIteratorI(EvictorI&, Int, bool); - - virtual bool hasNext(); - virtual Identity next(); - -private: - - vector<Identity>::const_iterator - nextBatch(); - - EvictorI& _evictor; - size_t _batchSize; - bool _loadServants; - vector<Identity>::const_iterator _batchIterator; - - Key _key; - Value _value; - vector<Identity> _batch; - bool _more; -}; - -} - -namespace -{ - -inline void -initializeInDbt(const vector<Byte>& v, Dbt& dbt) -{ - dbt.set_data(const_cast<Byte*>(&v[0])); - dbt.set_size(v.size()); - dbt.set_ulen(0); - dbt.set_dlen(0); - dbt.set_doff(0); - dbt.set_flags(DB_DBT_USERMEM); -} - -inline void -initializeOutDbt(vector<Byte>& v, Dbt& dbt) -{ - v.resize(v.capacity()); - dbt.set_data(&v[0]); - dbt.set_size(0); - dbt.set_ulen(v.size()); - dbt.set_dlen(0); - dbt.set_doff(0); - dbt.set_flags(DB_DBT_USERMEM); -} - -void -handleMemoryException(const DbMemoryException& dx, Key& key, Dbt& dbKey) -{ - if(dbKey.get_size() > dbKey.get_ulen()) - { - // - // Keep the old key size in case it's used as input - // - size_t oldKeySize = key.size(); - - key.resize(dbKey.get_size()); - initializeOutDbt(key, dbKey); - dbKey.set_size(oldKeySize); - } - else - { - // - // Real problem - // - DatabaseException ex(__FILE__, __LINE__); - ex.message = dx.what(); - throw ex; - } -} - -void -handleMemoryException(const DbMemoryException& dx, Key& key, Dbt& dbKey, Value& value, Dbt& dbValue) -{ - bool resized = false; - if(dbKey.get_size() > dbKey.get_ulen()) - { - size_t oldKeySize = key.size(); - key.resize(dbKey.get_size()); - initializeOutDbt(key, dbKey); - dbKey.set_size(oldKeySize); - resized = true; - } - - if(dbValue.get_size() > dbValue.get_ulen()) - { - value.resize(dbValue.get_size()); - initializeOutDbt(value, dbValue); - resized = true; - } - - if(!resized) - { - // - // Real problem - // - DatabaseException ex(__FILE__, __LINE__); - ex.message = dx.what(); - throw ex; - } -} - -inline bool startWith(Key key, Key root) -{ - if(root.size() > key.size()) - { - return false; - } - return memcmp(&root[0], &key[0], root.size()) == 0; -} - -// -// Marshaling/unmarshaling persistent (key, data) pairs. The marshalRoot function -// is used to create a key prefix containing only the key's identity. -// - -void -marshalRoot(const Identity& v, Key& bytes, const CommunicatorPtr& communicator) -{ - IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); - IceInternal::BasicStream stream(instance.get()); - v.__write(&stream); - bytes.swap(stream.b); -} - -void -marshal(const EvictorStorageKey& v, Key& bytes, const CommunicatorPtr& communicator) -{ - IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); - IceInternal::BasicStream stream(instance.get()); - v.__write(&stream); - bytes.swap(stream.b); -} - -void -unmarshal(EvictorStorageKey& v, const Key& bytes, const CommunicatorPtr& communicator) -{ - IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); - IceInternal::BasicStream stream(instance.get()); - stream.b = bytes; - stream.i = stream.b.begin(); - v.__read(&stream); -} - -void -marshal(const ObjectRecord& v, Value& bytes, const CommunicatorPtr& communicator) -{ - IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); - IceInternal::BasicStream stream(instance.get()); - stream.marshalFacets(false); - stream.startWriteEncaps(); - v.__write(&stream); - stream.writePendingObjects(); - stream.endWriteEncaps(); - bytes.swap(stream.b); -} - -void -unmarshal(ObjectRecord& v, const Value& bytes, const CommunicatorPtr& communicator) -{ - IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); - IceInternal::BasicStream stream(instance.get()); - stream.b = bytes; - stream.i = stream.b.begin(); - stream.startReadEncaps(); - v.__read(&stream); - stream.readPendingObjects(); - stream.endReadEncaps(); -} - -} - Freeze::EvictorPtr Freeze::createEvictor(const CommunicatorPtr& communicator, const string& envName, const string& dbName, + const vector<IndexPtr>& indices, bool createDb) { - return new EvictorI(communicator, envName, dbName, createDb); + return new EvictorI(communicator, envName, dbName, indices, createDb); } Freeze::EvictorPtr @@ -224,50 +40,57 @@ Freeze::createEvictor(const CommunicatorPtr& communicator, const string& envName, DbEnv& dbEnv, const string& dbName, + const vector<IndexPtr>& indices, bool createDb) { - return new EvictorI(communicator, envName, dbEnv, dbName, createDb); + return new EvictorI(communicator, envName, dbEnv, dbName, indices, createDb); } Freeze::EvictorI::EvictorI(const CommunicatorPtr communicator, const string& envName, - const string& dbName, + const string& dbName, + const std::vector<Freeze::IndexPtr>& indices, bool createDb) : _evictorSize(10), _deactivated(false), _communicator(communicator), _dbEnv(0), _dbEnvHolder(SharedDbEnv::get(communicator, envName)), + _dbName(dbName), + _indices(indices), _trace(0), _generation(0) { _dbEnv = _dbEnvHolder.get(); - init(envName, dbName, createDb); + init(envName, createDb); } Freeze::EvictorI::EvictorI(const CommunicatorPtr communicator, const string& envName, DbEnv& dbEnv, const string& dbName, + const std::vector<Freeze::IndexPtr>& indices, bool createDb) : _evictorSize(10), _deactivated(false), _communicator(communicator), _dbEnv(&dbEnv), + _dbName(dbName), + _indices(indices), _trace(0), _generation(0) { - init(envName, dbName, createDb); + init(envName, createDb); } void -Freeze::EvictorI::init(const string& envName, const string& dbName, bool createDb) +Freeze::EvictorI::init(const string& envName, bool createDb) { _trace = _communicator->getProperties()->getPropertyAsInt("Freeze.Trace.Evictor"); - string propertyPrefix = string("Freeze.Evictor.") + envName + '.' + dbName; + string propertyPrefix = string("Freeze.Evictor.") + envName + '.' + _dbName; // // By default, we save every minute or when the size of the modified queue @@ -293,18 +116,44 @@ Freeze::EvictorI::init(const string& envName, const string& dbName, bool createD _maxTxSize = 100; } + bool populateEmptyIndices = + (_communicator->getProperties()-> + getPropertyAsIntWithDefault(propertyPrefix + ".PopulateEmptyIndices", 0) != 0); + + DbTxn* txn = 0; try { _db.reset(new Db(_dbEnv, 0)); - u_int32_t flags = DB_AUTO_COMMIT | DB_THREAD; + + _dbEnv->txn_begin(0, &txn, 0); + + u_int32_t flags = DB_THREAD; if(createDb) { flags |= DB_CREATE; } - _db->open(0, dbName.c_str(), 0, DB_BTREE, flags, FREEZE_DB_MODE); + _db->open(txn, _dbName.c_str(), 0, DB_BTREE, flags, FREEZE_DB_MODE); + + for(size_t i = 0; i < _indices.size(); ++i) + { + _indices[i]->_impl->associate(this, txn, createDb, populateEmptyIndices); + } + DbTxn* toCommit = txn; + txn = 0; + toCommit->commit(0); } catch(const DbException& dx) { + if(txn != 0) + { + try + { + txn->abort(); + } + catch(...) + { + } + } DatabaseException ex(__FILE__, __LINE__); ex.message = dx.what(); throw ex; @@ -1145,6 +994,12 @@ Freeze::EvictorI::deactivate(const string&) try { _db->close(0); + + for(size_t i = 0; i < _indices.size(); ++i) + { + _indices[i]->_impl->close(); + } + _indices.clear(); } catch(const DbException& dx) { @@ -1483,46 +1338,61 @@ Freeze::EvictorI::load(Dbc* dbc, Key& key, Value& value, EvictorStorageKey esk; unmarshal(esk, key, _communicator); - if(_trace >= 3) + if(root.size() == 0) { - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "reading facet identity = \"" << esk.identity << "\" "; + if(esk.facet.size() == 0) { - out << "(main object)"; + // + // Good, we found the object + // + marshalRoot(esk.identity, root, _communicator); } else { - out << "facet = \""; - for(size_t i = 0; i < esk.facet.size(); i++) + // + // Otherwise, skip this orphan facet (could be a temporary + // inconsistency on disk) + // + + if(_trace >= 3) { - out << esk.facet[i]; - if(i != esk.facet.size() - 1) - { - out << "."; - } - else - { - out << "\""; - } + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "Iterator is skipping orphan facet \"" << esk.identity + << "\" " << esk.facet; } } } - - FacetPtr facet = new Facet(elt.get()); - facet->status = clean; - unmarshal(facet->rec, value, _communicator); - - pair<FacetMap::iterator, bool> pair; - pair = elt->facets.insert(FacetMap::value_type(esk.facet, facet)); - assert(pair.second); - if(root.size() == 0) + if(root.size() != 0) { - assert(esk.facet.size() == 0); - identities.push_back(esk.identity); - marshalRoot(esk.identity, root, _communicator); - elt->mainObject = facet; + if(_trace >= 3) + { + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "reading facet identity = \"" << esk.identity << "\" "; + if(esk.facet.size() == 0) + { + out << "(main object)"; + } + else + { + out << "facet = " << esk.facet; + } + } + + FacetPtr facet = new Facet(elt.get()); + facet->status = clean; + unmarshal(facet->rec, value, _communicator); + + pair<FacetMap::iterator, bool> pair; + pair = elt->facets.insert(FacetMap::value_type(esk.facet, facet)); + assert(pair.second); + + if(esk.facet.size() == 0) + { + identities.push_back(esk.identity); + elt->mainObject = facet; + } } initializeOutDbt(key, dbKey); @@ -1549,10 +1419,13 @@ Freeze::EvictorI::load(Dbc* dbc, Key& key, Value& value, } } } - while(rs == 0 && startWith(key, root)); + while(rs == 0 && (root.size() == 0 || startWith(key, root))); - buildFacetMap(elt->facets); - evictorElements.push_back(elt); + if(root.size() != 0) + { + buildFacetMap(elt->facets); + evictorElements.push_back(elt); + } return (rs == 0); } @@ -1573,6 +1446,40 @@ Freeze::EvictorI::load(Dbc* dbc, Key& key, vector<Identity>& identities) int rs = 0; do { + if(root.size() == 0) + { + EvictorStorageKey esk; + unmarshal(esk, key, _communicator); + + if(esk.facet.size() == 0) + { + // + // Good, we found the object + // + marshalRoot(esk.identity, root, _communicator); + + if(_trace >= 3) + { + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "Iterator read \"" << esk.identity << "\""; + } + identities.push_back(esk.identity); + } + else + { + // + // Otherwise, skip this orphan facet (could be a temporary + // inconsistency on disk) + // + if(_trace >= 3) + { + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "Iterator is skipping orphan facet \"" << esk.identity + << "\" " << esk.facet; + } + } + } + initializeOutDbt(key, dbKey); for(;;) @@ -1592,7 +1499,7 @@ Freeze::EvictorI::load(Dbc* dbc, Key& key, vector<Identity>& identities) } } } - while(rs == 0 && startWith(key, root)); + while(rs == 0 && (root.size() == 0 || startWith(key, root))); return (rs == 0); } @@ -1630,11 +1537,69 @@ Freeze::EvictorI::insert(const vector<Identity>& identities, // // Otherwise we don't insert anything // - } } +// +// Marshaling/unmarshaling persistent (key, data) pairs. The marshalRoot function +// is used to create a key prefix containing only the key's identity. +// + +void +Freeze::EvictorI::marshalRoot(const Identity& v, Key& bytes, const CommunicatorPtr& communicator) +{ + IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); + IceInternal::BasicStream stream(instance.get()); + v.__write(&stream); + bytes.swap(stream.b); +} + +void +Freeze::EvictorI::marshal(const EvictorStorageKey& v, Key& bytes, const CommunicatorPtr& communicator) +{ + IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); + IceInternal::BasicStream stream(instance.get()); + v.__write(&stream); + bytes.swap(stream.b); +} + +void +Freeze::EvictorI::unmarshal(EvictorStorageKey& v, const Key& bytes, const CommunicatorPtr& communicator) +{ + IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); + IceInternal::BasicStream stream(instance.get()); + stream.b = bytes; + stream.i = stream.b.begin(); + v.__read(&stream); +} + +void +Freeze::EvictorI::marshal(const ObjectRecord& v, Value& bytes, const CommunicatorPtr& communicator) +{ + IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); + IceInternal::BasicStream stream(instance.get()); + stream.marshalFacets(false); + stream.startWriteEncaps(); + v.__write(&stream); + stream.writePendingObjects(); + stream.endWriteEncaps(); + bytes.swap(stream.b); +} + +void +Freeze::EvictorI::unmarshal(ObjectRecord& v, const Value& bytes, const CommunicatorPtr& communicator) +{ + IceInternal::InstancePtr instance = IceInternal::getInstance(communicator); + IceInternal::BasicStream stream(instance.get()); + stream.b = bytes; + stream.i = stream.b.begin(); + stream.startReadEncaps(); + v.__read(&stream); + stream.readPendingObjects(); + stream.endReadEncaps(); +} + void Freeze::EvictorI::evict() @@ -1881,19 +1846,7 @@ Freeze::EvictorI::load(const Identity& ident) } else { - out << "facet = \""; - for(size_t i = 0; i < esk.facet.size(); i++) - { - out << esk.facet[i]; - if(i != esk.facet.size() - 1) - { - out << "."; - } - else - { - out << "\""; - } - } + out << "facet = " << esk.facet; } } @@ -2213,9 +2166,8 @@ Freeze::EvictorI::buildFacetMap(const FacetMap& facets) if(r == facets.end()) { // - // TODO: log warning for this orphan facet + // TODO: tracing for this orphna facet // - assert(0); } else { @@ -2223,8 +2175,6 @@ Freeze::EvictorI::buildFacetMap(const FacetMap& facets) } } } - - } @@ -2245,214 +2195,6 @@ Freeze::EvictorI::EvictorElement::~EvictorElement() { } -Freeze::EvictorIteratorI::EvictorIteratorI(EvictorI& evictor, Int batchSize, bool loadServants) : - _evictor(evictor), - _batchSize(static_cast<size_t>(batchSize)), - _loadServants(loadServants), - _key(1024), - _more(true) -{ - if(loadServants) - { - _value.resize(1024); - } - _batchIterator = _batch.end(); -} - - -bool -Freeze::EvictorIteratorI::hasNext() -{ - if(_batchIterator != _batch.end()) - { - return true; - } - else - { - _batchIterator = nextBatch(); - return (_batchIterator != _batch.end()); - } -} - -Identity -Freeze::EvictorIteratorI::next() -{ - if(hasNext()) - { - return *_batchIterator++; - } - else - { - throw Freeze::NoSuchElementException(__FILE__, __LINE__); - } -} - - -vector<Identity>::const_iterator -Freeze::EvictorIteratorI::nextBatch() -{ - _batch.clear(); - - if(!_more) - { - return _batch.end(); - } - - vector<EvictorI::EvictorElementPtr> evictorElements; - evictorElements.reserve(_batchSize); - - Key firstKey; - firstKey = _key; - - int loadedGeneration = 0; - - try - { - for(;;) - { - _batch.clear(); - evictorElements.clear(); - - Dbt dbKey; - initializeOutDbt(_key, dbKey); - - Dbt dbValue; - if(_loadServants) - { - initializeOutDbt(_value, dbValue); - } - else - { - dbValue.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL); - } - - Dbc* dbc = 0; - try - { - // - // Move to the first record - // - u_int32_t flags = DB_NEXT; - if(_key.size() > 0) - { - // - // _key represents the next element not yet returned - // if it has been deleted, we want the one after - // - flags = DB_SET_RANGE; - - // - // Will be used as input as well - // - dbKey.set_size(firstKey.size()); - } - - if(_loadServants) - { - loadedGeneration = _evictor.currentGeneration(); - } - - _evictor.db()->cursor(0, &dbc, 0); - - for(;;) - { - try - { - _more = (dbc->get(&dbKey, &dbValue, flags) == 0); - if(_more) - { - _key.resize(dbKey.get_size()); - // - // No need to resize data as we never use it as input - // - } - break; - } - catch(const DbMemoryException& dx) - { - handleMemoryException(dx, _key, dbKey, _value, dbValue); - } - } - - while(_batch.size() < _batchSize && _more) - { - // - // Even when count is 0, we read one more record (unless we reach the end) - // - if(_loadServants) - { - _more = _evictor.load(dbc, _key, _value, _batch, evictorElements); - } - else - { - _more = _evictor.load(dbc, _key, _batch); - } - } - - Dbc* toClose = dbc; - dbc = 0; - toClose->close(); - break; // for (;;) - } - catch(const DbDeadlockException&) - { - if(dbc != 0) - { - try - { - dbc->close(); - } - catch(const DbDeadlockException&) - { - // - // Ignored - // - } - } - _key = firstKey; - // - // Retry - // - } - catch(...) - { - if(dbc != 0) - { - try - { - dbc->close(); - } - catch(const DbDeadlockException&) - { - // - // Ignored - // - } - } - throw; - } - } - } - catch(const DbException& dx) - { - DatabaseException ex(__FILE__, __LINE__); - ex.message = dx.what(); - throw ex; - } - - if(_batch.size() == 0) - { - return _batch.end(); - } - else - { - if(_loadServants) - { - _evictor.insert(_batch, evictorElements, loadedGeneration); - } - return _batch.begin(); - } -} diff --git a/cpp/src/Freeze/EvictorI.h b/cpp/src/Freeze/EvictorI.h index d8107e0d910..d244405d106 100644 --- a/cpp/src/Freeze/EvictorI.h +++ b/cpp/src/Freeze/EvictorI.h @@ -20,6 +20,7 @@ #include <Freeze/Evictor.h> #include <Freeze/SharedDbEnv.h> #include <Freeze/EvictorStorage.h> +#include <Freeze/Index.h> #include <Freeze/DB.h> #include <list> #include <vector> @@ -33,8 +34,11 @@ class EvictorI : public Evictor, public IceUtil::Monitor<IceUtil::Mutex>, publi { public: - EvictorI(const Ice::CommunicatorPtr, const std::string&, const std::string&, bool); - EvictorI(const Ice::CommunicatorPtr, const std::string&, DbEnv&, const std::string&, bool); + EvictorI(const Ice::CommunicatorPtr, const std::string&, const std::string&, + const std::vector<Freeze::IndexPtr>&, bool); + + EvictorI(const Ice::CommunicatorPtr, const std::string&, DbEnv&, const std::string&, + const std::vector<Freeze::IndexPtr>&, bool); virtual ~EvictorI(); @@ -73,6 +77,12 @@ public: Db* db() const; + DbEnv* + dbEnv() const; + + const std::string& + dbName() const; + int currentGeneration() const; @@ -122,6 +132,24 @@ public: // + // marshaling/unmarshaling functions + // + static void + marshalRoot(const Ice::Identity&, Freeze::Key&, const Ice::CommunicatorPtr&); + + static void + marshal(const Freeze::EvictorStorageKey&, Freeze::Key& bytes, const Ice::CommunicatorPtr&); + + static void + unmarshal(Freeze::EvictorStorageKey&, const Freeze::Key&, const Ice::CommunicatorPtr&); + + static void + marshal(const Freeze::ObjectRecord&, Freeze::Value&, const Ice::CommunicatorPtr&); + + static void + unmarshal(Freeze::ObjectRecord&, const Freeze::Value&, const Ice::CommunicatorPtr&); + + // // Streamed objects // struct StreamedObject @@ -172,9 +200,11 @@ public: #endif + + private: - void init(const std::string& envName, const std::string& dbName, bool createDb); + void init(const std::string& envName, bool createDb); void evict(); bool dbHasObject(const Ice::Identity&); @@ -222,8 +252,14 @@ private: DbEnv* _dbEnv; SharedDbEnvPtr _dbEnvHolder; + + std::string _dbName; + std::auto_ptr<Db> _db; ServantInitializerPtr _initializer; + + std::vector<Freeze::IndexPtr> _indices; + Ice::Int _trace; // @@ -264,6 +300,18 @@ EvictorI::db() const return _db.get(); } +inline DbEnv* +EvictorI::dbEnv() const +{ + return _dbEnv; +} + +inline const std::string& +EvictorI::dbName() const +{ + return _dbName; +} + inline int EvictorI::currentGeneration() const { @@ -271,6 +319,32 @@ EvictorI::currentGeneration() const return _generation; } +inline bool +startWith(const Key& key, const Key& root) +{ + if(root.size() > key.size()) + { + return false; + } + return memcmp(&root[0], &key[0], root.size()) == 0; +} + +inline Ice::Trace& +operator<<(Ice::Trace& os, const std::vector<std::string>& facetPath) +{ + os << '"'; + for(size_t i = 0; i < facetPath.size(); i++) + { + os << facetPath[i]; + if(i != facetPath.size() - 1) + { + os << '.'; + } + } + os << '"'; + return os; +} + } #endif diff --git a/cpp/src/Freeze/EvictorIteratorI.cpp b/cpp/src/Freeze/EvictorIteratorI.cpp new file mode 100644 index 00000000000..2d348e63b86 --- /dev/null +++ b/cpp/src/Freeze/EvictorIteratorI.cpp @@ -0,0 +1,231 @@ +// ********************************************************************** +// +// Copyright (c) 2003 +// ZeroC, Inc. +// Billerica, MA, USA +// +// All Rights Reserved. +// +// Ice is free software; you can redistribute it and/or modify it under +// the terms of the GNU General Public License version 2 as published by +// the Free Software Foundation. +// +// ********************************************************************** + +#include <Freeze/EvictorIteratorI.h> +#include <Freeze/EvictorI.h> +#include <Freeze/Util.h> + +using namespace std; +using namespace Freeze; +using namespace Ice; + + +Freeze::EvictorIteratorI::EvictorIteratorI(EvictorI& evictor, Int batchSize, bool loadServants) : + _evictor(evictor), + _batchSize(static_cast<size_t>(batchSize)), + _loadServants(loadServants), + _key(1024), + _more(true) +{ + if(loadServants) + { + _value.resize(1024); + } + _batchIterator = _batch.end(); +} + + +bool +Freeze::EvictorIteratorI::hasNext() +{ + if(_batchIterator != _batch.end()) + { + return true; + } + else + { + _batchIterator = nextBatch(); + return (_batchIterator != _batch.end()); + } +} + +Identity +Freeze::EvictorIteratorI::next() +{ + if(hasNext()) + { + return *_batchIterator++; + } + else + { + throw Freeze::NoSuchElementException(__FILE__, __LINE__); + } +} + + +vector<Identity>::const_iterator +Freeze::EvictorIteratorI::nextBatch() +{ + _batch.clear(); + + if(!_more) + { + return _batch.end(); + } + + vector<EvictorI::EvictorElementPtr> evictorElements; + evictorElements.reserve(_batchSize); + + Key firstKey; + firstKey = _key; + + int loadedGeneration = 0; + + try + { + for(;;) + { + _batch.clear(); + evictorElements.clear(); + + Dbt dbKey; + initializeOutDbt(_key, dbKey); + + Dbt dbValue; + if(_loadServants) + { + initializeOutDbt(_value, dbValue); + } + else + { + dbValue.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL); + } + + Dbc* dbc = 0; + try + { + // + // Move to the first record + // + u_int32_t flags = DB_NEXT; + if(_key.size() > 0) + { + // + // _key represents the next element not yet returned + // if it has been deleted, we want the one after + // + flags = DB_SET_RANGE; + + // + // Will be used as input as well + // + dbKey.set_size(firstKey.size()); + } + + if(_loadServants) + { + loadedGeneration = _evictor.currentGeneration(); + } + + _evictor.db()->cursor(0, &dbc, 0); + + for(;;) + { + try + { + _more = (dbc->get(&dbKey, &dbValue, flags) == 0); + if(_more) + { + _key.resize(dbKey.get_size()); + // + // No need to resize data as we never use it as input + // + } + break; + } + catch(const DbMemoryException& dx) + { + handleMemoryException(dx, _key, dbKey, _value, dbValue); + } + } + + while(_batch.size() < _batchSize && _more) + { + // + // Even when count is 0, we read one more record (unless we reach the end) + // + if(_loadServants) + { + _more = _evictor.load(dbc, _key, _value, _batch, evictorElements); + } + else + { + _more = _evictor.load(dbc, _key, _batch); + } + } + + Dbc* toClose = dbc; + dbc = 0; + toClose->close(); + break; // for (;;) + } + catch(const DbDeadlockException&) + { + if(dbc != 0) + { + try + { + dbc->close(); + } + catch(const DbDeadlockException&) + { + // + // Ignored + // + } + } + _key = firstKey; + // + // Retry + // + } + catch(...) + { + if(dbc != 0) + { + try + { + dbc->close(); + } + catch(const DbDeadlockException&) + { + // + // Ignored + // + } + } + throw; + } + } + } + catch(const DbException& dx) + { + DatabaseException ex(__FILE__, __LINE__); + ex.message = dx.what(); + throw ex; + } + + if(_batch.size() == 0) + { + return _batch.end(); + } + else + { + if(_loadServants) + { + _evictor.insert(_batch, evictorElements, loadedGeneration); + } + return _batch.begin(); + } +} diff --git a/cpp/src/Freeze/EvictorIteratorI.h b/cpp/src/Freeze/EvictorIteratorI.h new file mode 100644 index 00000000000..817570c2c36 --- /dev/null +++ b/cpp/src/Freeze/EvictorIteratorI.h @@ -0,0 +1,55 @@ +// ********************************************************************** +// +// Copyright (c) 2003 +// ZeroC, Inc. +// Billerica, MA, USA +// +// All Rights Reserved. +// +// Ice is free software; you can redistribute it and/or modify it under +// the terms of the GNU General Public License version 2 as published by +// the Free Software Foundation. +// +// ********************************************************************** + +#ifndef FREEZE_EVICTOR_ITERATOR_I_H +#define FREEZE_EVICTOR_ITERATOR_I_H + +#include <Ice/Ice.h> +#include <Freeze/Evictor.h> +#include <Freeze/DB.h> +#include <vector> + +namespace Freeze +{ + +class EvictorI; + +class EvictorIteratorI : public EvictorIterator +{ +public: + + EvictorIteratorI(EvictorI&, Ice::Int, bool); + + virtual bool hasNext(); + virtual Ice::Identity next(); + +private: + + std::vector<Ice::Identity>::const_iterator + nextBatch(); + + EvictorI& _evictor; + size_t _batchSize; + bool _loadServants; + std::vector<Ice::Identity>::const_iterator _batchIterator; + + Key _key; + Value _value; + std::vector<Ice::Identity> _batch; + bool _more; +}; + +} + +#endif diff --git a/cpp/src/Freeze/Index.cpp b/cpp/src/Freeze/Index.cpp new file mode 100644 index 00000000000..db4034fce27 --- /dev/null +++ b/cpp/src/Freeze/Index.cpp @@ -0,0 +1,48 @@ +// ********************************************************************** +// +// Copyright (c) 2003 +// ZeroC, Inc. +// Billerica, MA, USA +// +// All Rights Reserved. +// +// Ice is free software; you can redistribute it and/or modify it under +// the terms of the GNU General Public License version 2 as published by +// the Free Software Foundation. +// +// ********************************************************************** + +#include <Freeze/Index.h> +#include <Freeze/IndexI.h> + +using namespace Freeze; +using namespace Ice; +using namespace std; + +Freeze::Index::~Index() +{ + delete _impl; +} + +Freeze::Index::Index(const string& name) : + _impl(new IndexI(*this, name)) +{ +} + +vector<Identity> +Freeze::Index::untypedFindFirst(const Key& bytes, Int firstN) const +{ + return _impl->untypedFindFirst(bytes, firstN); +} + +vector<Identity> +Freeze::Index::untypedFind(const Key& bytes) const +{ + return _impl->untypedFind(bytes); +} + +Int +Freeze::Index::untypedCount(const Key& bytes) const +{ + return _impl->untypedCount(bytes); +} diff --git a/cpp/src/Freeze/IndexI.cpp b/cpp/src/Freeze/IndexI.cpp new file mode 100644 index 00000000000..42d4e2115e3 --- /dev/null +++ b/cpp/src/Freeze/IndexI.cpp @@ -0,0 +1,352 @@ +// ********************************************************************** +// +// Copyright (c) 2003 +// ZeroC, Inc. +// Billerica, MA, USA +// +// All Rights Reserved. +// +// Ice is free software; you can redistribute it and/or modify it under +// the terms of the GNU General Public License version 2 as published by +// the Free Software Foundation. +// +// ********************************************************************** + +#include <Freeze/IndexI.h> +#include <Freeze/Util.h> + +using namespace Freeze; +using namespace Ice; +using namespace std; + + +static int +callback(Db* secondary, const Dbt* key, const Dbt* value, Dbt* result) +{ + void* indexObj = secondary->get_app_private(); + IndexI* index = static_cast<IndexI*>(indexObj); + assert(index != 0); + return index->secondaryKeyCreate(secondary, key, value, result); +} + + +Freeze::IndexI::IndexI(Index& index, const string& name) : + _index(index), + _name(name), + _evictor(0) +{ +} + +vector<Identity> +Freeze::IndexI::untypedFindFirst(const Key& bytes, Int firstN) const +{ + Dbt dbKey; + initializeInDbt(bytes, dbKey); + + // + // Berkeley DB 4.1.25 bug: it should not write into dbKey + // + dbKey.set_ulen(bytes.size()); + + Key pkey(1024); + Dbt pdbKey; + initializeOutDbt(pkey, pdbKey); + + Dbt dbValue; + dbValue.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL); + + + Ice::CommunicatorPtr communicator = _evictor->communicator(); + _evictor->saveNow(); + + vector<Identity> identities; + + try + { + for(;;) + { + Dbc* dbc = 0; + identities.clear(); + + try + { + // + // Move to the first record + // + _db->cursor(0, &dbc, 0); + bool more; + + for(;;) + { + try + { + more = (dbc->pget(&dbKey, &pdbKey, &dbValue, DB_SET) == 0); + if(more) + { + pkey.resize(pdbKey.get_size()); + } + break; // for(;;) + } + catch(const DbMemoryException& dx) + { + handleMemoryException(dx, pkey, pdbKey); + } + } + + while((firstN <= 0 || identities.size() < static_cast<size_t>(firstN)) && more) + { + EvictorStorageKey esk; + EvictorI::unmarshal(esk, pkey, communicator); + + if(esk.facet.size() == 0) + { + identities.push_back(esk.identity); + } + // + // Else skip "orphan" facet (could be just a temporary inconsistency + // on disk) + // + for(;;) + { + try + { + more = (dbc->pget(&dbKey, &pdbKey, &dbValue, DB_NEXT_DUP) == 0); + if(more) + { + pkey.resize(pdbKey.get_size()); + } + break; // for(;;) + } + catch(const DbMemoryException& dx) + { + handleMemoryException(dx, pkey, pdbKey); + } + } + } + Dbc* toClose = dbc; + dbc = 0; + toClose->close(); + break; // for (;;) + } + catch(const DbDeadlockException& dx) + { + if(dbc != 0) + { + try + { + dbc->close(); + } + catch(const DbDeadlockException&) + { + // + // Ignored + // + } + } + // + // Retry + // + } + catch(...) + { + if(dbc != 0) + { + try + { + dbc->close(); + } + catch(const DbDeadlockException&) + { + // + // Ignored + // + } + } + throw; + } + } + } + catch(const DbException& dx) + { + DatabaseException ex(__FILE__, __LINE__); + ex.message = dx.what(); + throw ex; + } + + return identities; +} + +vector<Identity> +Freeze::IndexI::untypedFind(const Key& bytes) const +{ + return untypedFindFirst(bytes, 0); +} + +Int +Freeze::IndexI::untypedCount(const Key& bytes) const +{ + + Dbt dbKey; + initializeInDbt(bytes, dbKey); + + Dbt dbValue; + dbValue.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL); + + _evictor->saveNow(); + Int result = 0; + + try + { + for(;;) + { + Dbc* dbc = 0; + + try + { + // + // Move to the first record + // + _db->cursor(0, &dbc, 0); + bool found = (dbc->get(&dbKey, &dbValue, DB_SET) == 0); + + if(found) + { + db_recno_t count = 0; + dbc->count(&count, 0); + result = static_cast<Int>(count); + } + + Dbc* toClose = dbc; + dbc = 0; + toClose->close(); + break; // for (;;) + } + catch(const DbDeadlockException& dx) + { + if(dbc != 0) + { + try + { + dbc->close(); + } + catch(const DbDeadlockException&) + { + // + // Ignored + // + } + } + // + // Retry + // + } + catch(...) + { + if(dbc != 0) + { + try + { + dbc->close(); + } + catch(const DbDeadlockException&) + { + // + // Ignored + // + } + } + throw; + } + } + } + catch(const DbException& dx) + { + DatabaseException ex(__FILE__, __LINE__); + ex.message = dx.what(); + throw ex; + } + + return result; +} + +void +Freeze::IndexI::associate(EvictorI* evictor, DbTxn* txn, + bool createDb, bool populateIndex) +{ + assert(txn != 0); + _evictor = evictor; + _index._communicator = evictor->communicator(); + + _db.reset(new Db(evictor->dbEnv(), 0)); + _db->set_flags(DB_DUP | DB_DUPSORT); + _db->set_app_private(this); + + u_int32_t flags = 0; + if(createDb) + { + flags = DB_CREATE; + } + _db->open(txn, (evictor->dbName() + "." + _name).c_str(), 0, DB_BTREE, flags, FREEZE_DB_MODE); + + flags = 0; + if(populateIndex) + { + flags = DB_CREATE; + } + evictor->db()->associate(txn, _db.get(), callback, flags); +} + +int +Freeze::IndexI::secondaryKeyCreate(Db* secondary, const Dbt* dbKey, + const Dbt* dbValue, Dbt* result) +{ + Ice::CommunicatorPtr communicator = _evictor->communicator(); + + EvictorStorageKey esk; + Byte* first = static_cast<Byte*>(dbKey->get_data()); + Key key(first, first + dbKey->get_size()); + EvictorI::unmarshal(esk, key, communicator); + + if(esk.facet.size() == 0) + { + ObjectRecord rec; + first = static_cast<Byte*>(dbValue->get_data()); + Value value(first, first + dbValue->get_size()); + EvictorI::unmarshal(rec, value, communicator); + + Key bytes; + if(_index.marshalKey(rec.servant, bytes)) + { + result->set_flags(DB_DBT_APPMALLOC); + void* data = malloc(bytes.size()); + memcpy(data, &bytes[0], bytes.size()); + result->set_data(data); + result->set_size(bytes.size()); + return 0; + } + } + + // + // Don't want to index this one + // + return DB_DONOTINDEX; +} + +void +Freeze::IndexI::close() +{ + if(_db.get() != 0) + { + try + { + _db->close(0); + } + catch(const DbException& dx) + { + DatabaseException ex(__FILE__, __LINE__); + ex.message = dx.what(); + throw ex; + } + _db.reset(0); + } +} diff --git a/cpp/src/Freeze/IndexI.h b/cpp/src/Freeze/IndexI.h new file mode 100644 index 00000000000..ca59018ac4b --- /dev/null +++ b/cpp/src/Freeze/IndexI.h @@ -0,0 +1,61 @@ +// ********************************************************************** +// +// Copyright (c) 2003 +// ZeroC, Inc. +// Billerica, MA, USA +// +// All Rights Reserved. +// +// Ice is free software; you can redistribute it and/or modify it under +// the terms of the GNU General Public License version 2 as published by +// the Free Software Foundation. +// +// ********************************************************************** + +#ifndef FREEZE_INDEX_I_H +#define FREEZE_INDEX_I_H + +#include <Ice/Ice.h> +#include <Freeze/Index.h> +#include <Freeze/EvictorI.h> + +namespace Freeze +{ + +class IndexI +{ +public: + + IndexI(Index&, const std::string&); + + std::vector<Ice::Identity> + untypedFindFirst(const Freeze::Key&, Ice::Int) const; + + std::vector<Ice::Identity> + untypedFind(const Freeze::Key&) const; + + Ice::Int + untypedCount(const Freeze::Key&) const; + + void + associate(EvictorI* evictor, DbTxn* txn, bool createDb, bool populateIndex); + + int + secondaryKeyCreate(Db*, const Dbt*, const Dbt*, Dbt*); + + void + close(); + + + +private: + + Index& _index; + std::string _name; + std::auto_ptr<Db> _db; + EvictorI* _evictor; +}; + +} +#endif + diff --git a/cpp/src/Freeze/Makefile b/cpp/src/Freeze/Makefile index 7d90e769150..ff9d5d34791 100644 --- a/cpp/src/Freeze/Makefile +++ b/cpp/src/Freeze/Makefile @@ -33,7 +33,11 @@ OBJS = DB.o \ TransactionI.o \ SharedDb.o \ MapI.o \ + Util.o \ EvictorI.o \ + EvictorIteratorI.o \ + Index.o \ + IndexI.o \ SharedDbEnv.o \ TransactionHolder.o diff --git a/cpp/src/Freeze/MapI.cpp b/cpp/src/Freeze/MapI.cpp index ceb12e1ee2c..b108b6c1b7c 100644 --- a/cpp/src/Freeze/MapI.cpp +++ b/cpp/src/Freeze/MapI.cpp @@ -15,40 +15,13 @@ #include <Freeze/MapI.h> #include <Freeze/Exception.h> #include <Freeze/SharedDb.h> +#include <Freeze/Util.h> #include <stdlib.h> using namespace std; using namespace Ice; using namespace Freeze; -namespace -{ - -inline void -initializeInDbt(const vector<Ice::Byte>& v, Dbt& dbt) -{ - dbt.set_data(const_cast<Ice::Byte*>(&v[0])); - dbt.set_size(v.size()); - dbt.set_ulen(0); - dbt.set_dlen(0); - dbt.set_doff(0); - dbt.set_flags(DB_DBT_USERMEM); -} - -inline void -initializeOutDbt(vector<Ice::Byte>& v, Dbt& dbt) -{ - v.resize(v.capacity()); - dbt.set_data(&v[0]); - dbt.set_size(0); - dbt.set_ulen(v.size()); - dbt.set_dlen(0); - dbt.set_doff(0); - dbt.set_flags(DB_DBT_USERMEM); -} - -} - // // MapHelper (from Map.h) diff --git a/cpp/src/Freeze/SharedDb.cpp b/cpp/src/Freeze/SharedDb.cpp index 143828d9fda..19e88243a29 100644 --- a/cpp/src/Freeze/SharedDb.cpp +++ b/cpp/src/Freeze/SharedDb.cpp @@ -15,18 +15,12 @@ #include <Freeze/SharedDb.h> #include <IceUtil/StaticMutex.h> #include <Freeze/Exception.h> -#include <sys/stat.h> +#include <Freeze/Util.h> using namespace std; using namespace IceUtil; using namespace Ice; -#ifdef _WIN32 -# define FREEZE_DB_MODE 0 -#else -# define FREEZE_DB_MODE (S_IRUSR | S_IWUSR) -#endif - namespace { diff --git a/cpp/src/Freeze/SharedDbEnv.cpp b/cpp/src/Freeze/SharedDbEnv.cpp index 6c72ab34549..f088c65511d 100644 --- a/cpp/src/Freeze/SharedDbEnv.cpp +++ b/cpp/src/Freeze/SharedDbEnv.cpp @@ -16,22 +16,17 @@ #include <IceUtil/StaticMutex.h> #include <IceUtil/Thread.h> #include <Freeze/Exception.h> +#include <Freeze/Util.h> + #include <cstdlib> #include <map> #include <memory> -#include <sys/stat.h> + using namespace std; using namespace IceUtil; using namespace Ice; -#ifdef _WIN32 -# define FREEZE_DB_MODE 0 -#else -# define FREEZE_DB_MODE (S_IRUSR | S_IWUSR) -#endif - - namespace Freeze { diff --git a/cpp/src/Freeze/Util.cpp b/cpp/src/Freeze/Util.cpp new file mode 100644 index 00000000000..ebaca432024 --- /dev/null +++ b/cpp/src/Freeze/Util.cpp @@ -0,0 +1,77 @@ +// ********************************************************************** +// +// Copyright (c) 2003 +// ZeroC, Inc. +// Billerica, MA, USA +// +// All Rights Reserved. +// +// Ice is free software; you can redistribute it and/or modify it under +// the terms of the GNU General Public License version 2 as published by +// the Free Software Foundation. +// +// ********************************************************************** + +#include <Freeze/Util.h> +#include <Freeze/Exception.h> + +using namespace Freeze; +using namespace Ice; +using namespace std; + +void +Freeze::handleMemoryException(const DbMemoryException& dx, Key& key, Dbt& dbKey) +{ + if(dbKey.get_size() > dbKey.get_ulen()) + { + // + // Keep the old key size in case it's used as input + // + size_t oldKeySize = key.size(); + + key.resize(dbKey.get_size()); + initializeOutDbt(key, dbKey); + dbKey.set_size(oldKeySize); + } + else + { + // + // Real problem + // + DatabaseException ex(__FILE__, __LINE__); + ex.message = dx.what(); + throw ex; + } +} + +void +Freeze::handleMemoryException(const DbMemoryException& dx, Key& key, Dbt& dbKey, + Value& value, Dbt& dbValue) +{ + bool resized = false; + if(dbKey.get_size() > dbKey.get_ulen()) + { + size_t oldKeySize = key.size(); + key.resize(dbKey.get_size()); + initializeOutDbt(key, dbKey); + dbKey.set_size(oldKeySize); + resized = true; + } + + if(dbValue.get_size() > dbValue.get_ulen()) + { + value.resize(dbValue.get_size()); + initializeOutDbt(value, dbValue); + resized = true; + } + + if(!resized) + { + // + // Real problem + // + DatabaseException ex(__FILE__, __LINE__); + ex.message = dx.what(); + throw ex; + } +} diff --git a/cpp/src/Freeze/Util.h b/cpp/src/Freeze/Util.h new file mode 100644 index 00000000000..e5ec22b758a --- /dev/null +++ b/cpp/src/Freeze/Util.h @@ -0,0 +1,66 @@ +// ********************************************************************** +// +// Copyright (c) 2003 +// ZeroC, Inc. +// Billerica, MA, USA +// +// All Rights Reserved. +// +// Ice is free software; you can redistribute it and/or modify it under +// the terms of the GNU General Public License version 2 as published by +// the Free Software Foundation. +// +// ********************************************************************** + +#ifndef FREEZE_UTIL_H +#define FREEZE_UTIL_H + +#include <Ice/Ice.h> +#include <Freeze/DB.h> +#include <db_cxx.h> +#include <sys/stat.h> + +#ifdef _WIN32 +# define FREEZE_DB_MODE 0 +#else +# define FREEZE_DB_MODE (S_IRUSR | S_IWUSR) +#endif + + +namespace Freeze +{ + +inline void +initializeInDbt(const std::vector<Ice::Byte>& v, Dbt& dbt) +{ + dbt.set_data(const_cast<Ice::Byte*>(&v[0])); + dbt.set_size(v.size()); + dbt.set_ulen(0); + dbt.set_dlen(0); + dbt.set_doff(0); + dbt.set_flags(DB_DBT_USERMEM); +} + +inline void +initializeOutDbt(std::vector<Ice::Byte>& v, Dbt& dbt) +{ + v.resize(v.capacity()); + dbt.set_data(&v[0]); + dbt.set_size(0); + dbt.set_ulen(v.size()); + dbt.set_dlen(0); + dbt.set_doff(0); + dbt.set_flags(DB_DBT_USERMEM); +} + +void +handleMemoryException(const DbMemoryException&, Key&, Dbt&); + +void +handleMemoryException(const DbMemoryException&, Key&, Dbt&, Value&, Dbt&); + +} + + +#endif + |