summaryrefslogtreecommitdiff
path: root/cpp/src/Freeze/EvictorI.cpp
diff options
context:
space:
mode:
authorBernard Normier <bernard@zeroc.com>2003-10-21 20:40:01 +0000
committerBernard Normier <bernard@zeroc.com>2003-10-21 20:40:01 +0000
commit2cb10f33a1cc8d3d94450fa45cc8f63ba32c137f (patch)
treeee83ca7fa800f497d1e7f340c5d1737545235870 /cpp/src/Freeze/EvictorI.cpp
parentFixed code generation bug. (diff)
downloadice-2cb10f33a1cc8d3d94450fa45cc8f63ba32c137f.tar.bz2
ice-2cb10f33a1cc8d3d94450fa45cc8f63ba32c137f.tar.xz
ice-2cb10f33a1cc8d3d94450fa45cc8f63ba32c137f.zip
Added Freeze evictor indices
Diffstat (limited to 'cpp/src/Freeze/EvictorI.cpp')
-rw-r--r--cpp/src/Freeze/EvictorI.cpp656
1 files changed, 199 insertions, 457 deletions
diff --git a/cpp/src/Freeze/EvictorI.cpp b/cpp/src/Freeze/EvictorI.cpp
index 0840ecab263..5024a24b5bc 100644
--- a/cpp/src/Freeze/EvictorI.cpp
+++ b/cpp/src/Freeze/EvictorI.cpp
@@ -5,7 +5,7 @@
// Billerica, MA, USA
//
// All Rights Reserved.
-//d
+//
// Ice is free software; you can redistribute it and/or modify it under
// the terms of the GNU General Public License version 2 as published by
// the Free Software Foundation.
@@ -15,208 +15,24 @@
#include <Freeze/EvictorI.h>
#include <Freeze/Initialize.h>
#include <IceUtil/AbstractMutex.h>
-#include <sys/stat.h>
+#include <Freeze/Util.h>
+#include <Freeze/EvictorIteratorI.h>
+#include <Freeze/IndexI.h>
+
#include <typeinfo>
using namespace std;
using namespace Freeze;
using namespace Ice;
-#ifdef _WIN32
-# define FREEZE_DB_MODE 0
-#else
-# define FREEZE_DB_MODE (S_IRUSR | S_IWUSR)
-#endif
-
-namespace Freeze
-{
-
-class EvictorIteratorI : public EvictorIterator
-{
-public:
-
- EvictorIteratorI(EvictorI&, Int, bool);
-
- virtual bool hasNext();
- virtual Identity next();
-
-private:
-
- vector<Identity>::const_iterator
- nextBatch();
-
- EvictorI& _evictor;
- size_t _batchSize;
- bool _loadServants;
- vector<Identity>::const_iterator _batchIterator;
-
- Key _key;
- Value _value;
- vector<Identity> _batch;
- bool _more;
-};
-
-}
-
-namespace
-{
-
-inline void
-initializeInDbt(const vector<Byte>& v, Dbt& dbt)
-{
- dbt.set_data(const_cast<Byte*>(&v[0]));
- dbt.set_size(v.size());
- dbt.set_ulen(0);
- dbt.set_dlen(0);
- dbt.set_doff(0);
- dbt.set_flags(DB_DBT_USERMEM);
-}
-
-inline void
-initializeOutDbt(vector<Byte>& v, Dbt& dbt)
-{
- v.resize(v.capacity());
- dbt.set_data(&v[0]);
- dbt.set_size(0);
- dbt.set_ulen(v.size());
- dbt.set_dlen(0);
- dbt.set_doff(0);
- dbt.set_flags(DB_DBT_USERMEM);
-}
-
-void
-handleMemoryException(const DbMemoryException& dx, Key& key, Dbt& dbKey)
-{
- if(dbKey.get_size() > dbKey.get_ulen())
- {
- //
- // Keep the old key size in case it's used as input
- //
- size_t oldKeySize = key.size();
-
- key.resize(dbKey.get_size());
- initializeOutDbt(key, dbKey);
- dbKey.set_size(oldKeySize);
- }
- else
- {
- //
- // Real problem
- //
- DatabaseException ex(__FILE__, __LINE__);
- ex.message = dx.what();
- throw ex;
- }
-}
-
-void
-handleMemoryException(const DbMemoryException& dx, Key& key, Dbt& dbKey, Value& value, Dbt& dbValue)
-{
- bool resized = false;
- if(dbKey.get_size() > dbKey.get_ulen())
- {
- size_t oldKeySize = key.size();
- key.resize(dbKey.get_size());
- initializeOutDbt(key, dbKey);
- dbKey.set_size(oldKeySize);
- resized = true;
- }
-
- if(dbValue.get_size() > dbValue.get_ulen())
- {
- value.resize(dbValue.get_size());
- initializeOutDbt(value, dbValue);
- resized = true;
- }
-
- if(!resized)
- {
- //
- // Real problem
- //
- DatabaseException ex(__FILE__, __LINE__);
- ex.message = dx.what();
- throw ex;
- }
-}
-
-inline bool startWith(Key key, Key root)
-{
- if(root.size() > key.size())
- {
- return false;
- }
- return memcmp(&root[0], &key[0], root.size()) == 0;
-}
-
-//
-// Marshaling/unmarshaling persistent (key, data) pairs. The marshalRoot function
-// is used to create a key prefix containing only the key's identity.
-//
-
-void
-marshalRoot(const Identity& v, Key& bytes, const CommunicatorPtr& communicator)
-{
- IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
- IceInternal::BasicStream stream(instance.get());
- v.__write(&stream);
- bytes.swap(stream.b);
-}
-
-void
-marshal(const EvictorStorageKey& v, Key& bytes, const CommunicatorPtr& communicator)
-{
- IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
- IceInternal::BasicStream stream(instance.get());
- v.__write(&stream);
- bytes.swap(stream.b);
-}
-
-void
-unmarshal(EvictorStorageKey& v, const Key& bytes, const CommunicatorPtr& communicator)
-{
- IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
- IceInternal::BasicStream stream(instance.get());
- stream.b = bytes;
- stream.i = stream.b.begin();
- v.__read(&stream);
-}
-
-void
-marshal(const ObjectRecord& v, Value& bytes, const CommunicatorPtr& communicator)
-{
- IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
- IceInternal::BasicStream stream(instance.get());
- stream.marshalFacets(false);
- stream.startWriteEncaps();
- v.__write(&stream);
- stream.writePendingObjects();
- stream.endWriteEncaps();
- bytes.swap(stream.b);
-}
-
-void
-unmarshal(ObjectRecord& v, const Value& bytes, const CommunicatorPtr& communicator)
-{
- IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
- IceInternal::BasicStream stream(instance.get());
- stream.b = bytes;
- stream.i = stream.b.begin();
- stream.startReadEncaps();
- v.__read(&stream);
- stream.readPendingObjects();
- stream.endReadEncaps();
-}
-
-}
-
Freeze::EvictorPtr
Freeze::createEvictor(const CommunicatorPtr& communicator,
const string& envName,
const string& dbName,
+ const vector<IndexPtr>& indices,
bool createDb)
{
- return new EvictorI(communicator, envName, dbName, createDb);
+ return new EvictorI(communicator, envName, dbName, indices, createDb);
}
Freeze::EvictorPtr
@@ -224,50 +40,57 @@ Freeze::createEvictor(const CommunicatorPtr& communicator,
const string& envName,
DbEnv& dbEnv,
const string& dbName,
+ const vector<IndexPtr>& indices,
bool createDb)
{
- return new EvictorI(communicator, envName, dbEnv, dbName, createDb);
+ return new EvictorI(communicator, envName, dbEnv, dbName, indices, createDb);
}
Freeze::EvictorI::EvictorI(const CommunicatorPtr communicator,
const string& envName,
- const string& dbName,
+ const string& dbName,
+ const std::vector<Freeze::IndexPtr>& indices,
bool createDb) :
_evictorSize(10),
_deactivated(false),
_communicator(communicator),
_dbEnv(0),
_dbEnvHolder(SharedDbEnv::get(communicator, envName)),
+ _dbName(dbName),
+ _indices(indices),
_trace(0),
_generation(0)
{
_dbEnv = _dbEnvHolder.get();
- init(envName, dbName, createDb);
+ init(envName, createDb);
}
Freeze::EvictorI::EvictorI(const CommunicatorPtr communicator,
const string& envName,
DbEnv& dbEnv,
const string& dbName,
+ const std::vector<Freeze::IndexPtr>& indices,
bool createDb) :
_evictorSize(10),
_deactivated(false),
_communicator(communicator),
_dbEnv(&dbEnv),
+ _dbName(dbName),
+ _indices(indices),
_trace(0),
_generation(0)
{
- init(envName, dbName, createDb);
+ init(envName, createDb);
}
void
-Freeze::EvictorI::init(const string& envName, const string& dbName, bool createDb)
+Freeze::EvictorI::init(const string& envName, bool createDb)
{
_trace = _communicator->getProperties()->getPropertyAsInt("Freeze.Trace.Evictor");
- string propertyPrefix = string("Freeze.Evictor.") + envName + '.' + dbName;
+ string propertyPrefix = string("Freeze.Evictor.") + envName + '.' + _dbName;
//
// By default, we save every minute or when the size of the modified queue
@@ -293,18 +116,44 @@ Freeze::EvictorI::init(const string& envName, const string& dbName, bool createD
_maxTxSize = 100;
}
+ bool populateEmptyIndices =
+ (_communicator->getProperties()->
+ getPropertyAsIntWithDefault(propertyPrefix + ".PopulateEmptyIndices", 0) != 0);
+
+ DbTxn* txn = 0;
try
{
_db.reset(new Db(_dbEnv, 0));
- u_int32_t flags = DB_AUTO_COMMIT | DB_THREAD;
+
+ _dbEnv->txn_begin(0, &txn, 0);
+
+ u_int32_t flags = DB_THREAD;
if(createDb)
{
flags |= DB_CREATE;
}
- _db->open(0, dbName.c_str(), 0, DB_BTREE, flags, FREEZE_DB_MODE);
+ _db->open(txn, _dbName.c_str(), 0, DB_BTREE, flags, FREEZE_DB_MODE);
+
+ for(size_t i = 0; i < _indices.size(); ++i)
+ {
+ _indices[i]->_impl->associate(this, txn, createDb, populateEmptyIndices);
+ }
+ DbTxn* toCommit = txn;
+ txn = 0;
+ toCommit->commit(0);
}
catch(const DbException& dx)
{
+ if(txn != 0)
+ {
+ try
+ {
+ txn->abort();
+ }
+ catch(...)
+ {
+ }
+ }
DatabaseException ex(__FILE__, __LINE__);
ex.message = dx.what();
throw ex;
@@ -1145,6 +994,12 @@ Freeze::EvictorI::deactivate(const string&)
try
{
_db->close(0);
+
+ for(size_t i = 0; i < _indices.size(); ++i)
+ {
+ _indices[i]->_impl->close();
+ }
+ _indices.clear();
}
catch(const DbException& dx)
{
@@ -1483,46 +1338,61 @@ Freeze::EvictorI::load(Dbc* dbc, Key& key, Value& value,
EvictorStorageKey esk;
unmarshal(esk, key, _communicator);
- if(_trace >= 3)
+ if(root.size() == 0)
{
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "reading facet identity = \"" << esk.identity << "\" ";
+
if(esk.facet.size() == 0)
{
- out << "(main object)";
+ //
+ // Good, we found the object
+ //
+ marshalRoot(esk.identity, root, _communicator);
}
else
{
- out << "facet = \"";
- for(size_t i = 0; i < esk.facet.size(); i++)
+ //
+ // Otherwise, skip this orphan facet (could be a temporary
+ // inconsistency on disk)
+ //
+
+ if(_trace >= 3)
{
- out << esk.facet[i];
- if(i != esk.facet.size() - 1)
- {
- out << ".";
- }
- else
- {
- out << "\"";
- }
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "Iterator is skipping orphan facet \"" << esk.identity
+ << "\" " << esk.facet;
}
}
}
-
- FacetPtr facet = new Facet(elt.get());
- facet->status = clean;
- unmarshal(facet->rec, value, _communicator);
-
- pair<FacetMap::iterator, bool> pair;
- pair = elt->facets.insert(FacetMap::value_type(esk.facet, facet));
- assert(pair.second);
- if(root.size() == 0)
+ if(root.size() != 0)
{
- assert(esk.facet.size() == 0);
- identities.push_back(esk.identity);
- marshalRoot(esk.identity, root, _communicator);
- elt->mainObject = facet;
+ if(_trace >= 3)
+ {
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "reading facet identity = \"" << esk.identity << "\" ";
+ if(esk.facet.size() == 0)
+ {
+ out << "(main object)";
+ }
+ else
+ {
+ out << "facet = " << esk.facet;
+ }
+ }
+
+ FacetPtr facet = new Facet(elt.get());
+ facet->status = clean;
+ unmarshal(facet->rec, value, _communicator);
+
+ pair<FacetMap::iterator, bool> pair;
+ pair = elt->facets.insert(FacetMap::value_type(esk.facet, facet));
+ assert(pair.second);
+
+ if(esk.facet.size() == 0)
+ {
+ identities.push_back(esk.identity);
+ elt->mainObject = facet;
+ }
}
initializeOutDbt(key, dbKey);
@@ -1549,10 +1419,13 @@ Freeze::EvictorI::load(Dbc* dbc, Key& key, Value& value,
}
}
}
- while(rs == 0 && startWith(key, root));
+ while(rs == 0 && (root.size() == 0 || startWith(key, root)));
- buildFacetMap(elt->facets);
- evictorElements.push_back(elt);
+ if(root.size() != 0)
+ {
+ buildFacetMap(elt->facets);
+ evictorElements.push_back(elt);
+ }
return (rs == 0);
}
@@ -1573,6 +1446,40 @@ Freeze::EvictorI::load(Dbc* dbc, Key& key, vector<Identity>& identities)
int rs = 0;
do
{
+ if(root.size() == 0)
+ {
+ EvictorStorageKey esk;
+ unmarshal(esk, key, _communicator);
+
+ if(esk.facet.size() == 0)
+ {
+ //
+ // Good, we found the object
+ //
+ marshalRoot(esk.identity, root, _communicator);
+
+ if(_trace >= 3)
+ {
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "Iterator read \"" << esk.identity << "\"";
+ }
+ identities.push_back(esk.identity);
+ }
+ else
+ {
+ //
+ // Otherwise, skip this orphan facet (could be a temporary
+ // inconsistency on disk)
+ //
+ if(_trace >= 3)
+ {
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "Iterator is skipping orphan facet \"" << esk.identity
+ << "\" " << esk.facet;
+ }
+ }
+ }
+
initializeOutDbt(key, dbKey);
for(;;)
@@ -1592,7 +1499,7 @@ Freeze::EvictorI::load(Dbc* dbc, Key& key, vector<Identity>& identities)
}
}
}
- while(rs == 0 && startWith(key, root));
+ while(rs == 0 && (root.size() == 0 || startWith(key, root)));
return (rs == 0);
}
@@ -1630,11 +1537,69 @@ Freeze::EvictorI::insert(const vector<Identity>& identities,
//
// Otherwise we don't insert anything
//
-
}
}
+//
+// Marshaling/unmarshaling persistent (key, data) pairs. The marshalRoot function
+// is used to create a key prefix containing only the key's identity.
+//
+
+void
+Freeze::EvictorI::marshalRoot(const Identity& v, Key& bytes, const CommunicatorPtr& communicator)
+{
+ IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
+ IceInternal::BasicStream stream(instance.get());
+ v.__write(&stream);
+ bytes.swap(stream.b);
+}
+
+void
+Freeze::EvictorI::marshal(const EvictorStorageKey& v, Key& bytes, const CommunicatorPtr& communicator)
+{
+ IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
+ IceInternal::BasicStream stream(instance.get());
+ v.__write(&stream);
+ bytes.swap(stream.b);
+}
+
+void
+Freeze::EvictorI::unmarshal(EvictorStorageKey& v, const Key& bytes, const CommunicatorPtr& communicator)
+{
+ IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
+ IceInternal::BasicStream stream(instance.get());
+ stream.b = bytes;
+ stream.i = stream.b.begin();
+ v.__read(&stream);
+}
+
+void
+Freeze::EvictorI::marshal(const ObjectRecord& v, Value& bytes, const CommunicatorPtr& communicator)
+{
+ IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
+ IceInternal::BasicStream stream(instance.get());
+ stream.marshalFacets(false);
+ stream.startWriteEncaps();
+ v.__write(&stream);
+ stream.writePendingObjects();
+ stream.endWriteEncaps();
+ bytes.swap(stream.b);
+}
+
+void
+Freeze::EvictorI::unmarshal(ObjectRecord& v, const Value& bytes, const CommunicatorPtr& communicator)
+{
+ IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
+ IceInternal::BasicStream stream(instance.get());
+ stream.b = bytes;
+ stream.i = stream.b.begin();
+ stream.startReadEncaps();
+ v.__read(&stream);
+ stream.readPendingObjects();
+ stream.endReadEncaps();
+}
+
void
Freeze::EvictorI::evict()
@@ -1881,19 +1846,7 @@ Freeze::EvictorI::load(const Identity& ident)
}
else
{
- out << "facet = \"";
- for(size_t i = 0; i < esk.facet.size(); i++)
- {
- out << esk.facet[i];
- if(i != esk.facet.size() - 1)
- {
- out << ".";
- }
- else
- {
- out << "\"";
- }
- }
+ out << "facet = " << esk.facet;
}
}
@@ -2213,9 +2166,8 @@ Freeze::EvictorI::buildFacetMap(const FacetMap& facets)
if(r == facets.end())
{
//
- // TODO: log warning for this orphan facet
+ // TODO: tracing for this orphna facet
//
- assert(0);
}
else
{
@@ -2223,8 +2175,6 @@ Freeze::EvictorI::buildFacetMap(const FacetMap& facets)
}
}
}
-
-
}
@@ -2245,214 +2195,6 @@ Freeze::EvictorI::EvictorElement::~EvictorElement()
{
}
-Freeze::EvictorIteratorI::EvictorIteratorI(EvictorI& evictor, Int batchSize, bool loadServants) :
- _evictor(evictor),
- _batchSize(static_cast<size_t>(batchSize)),
- _loadServants(loadServants),
- _key(1024),
- _more(true)
-{
- if(loadServants)
- {
- _value.resize(1024);
- }
- _batchIterator = _batch.end();
-}
-
-
-bool
-Freeze::EvictorIteratorI::hasNext()
-{
- if(_batchIterator != _batch.end())
- {
- return true;
- }
- else
- {
- _batchIterator = nextBatch();
- return (_batchIterator != _batch.end());
- }
-}
-
-Identity
-Freeze::EvictorIteratorI::next()
-{
- if(hasNext())
- {
- return *_batchIterator++;
- }
- else
- {
- throw Freeze::NoSuchElementException(__FILE__, __LINE__);
- }
-}
-
-
-vector<Identity>::const_iterator
-Freeze::EvictorIteratorI::nextBatch()
-{
- _batch.clear();
-
- if(!_more)
- {
- return _batch.end();
- }
-
- vector<EvictorI::EvictorElementPtr> evictorElements;
- evictorElements.reserve(_batchSize);
-
- Key firstKey;
- firstKey = _key;
-
- int loadedGeneration = 0;
-
- try
- {
- for(;;)
- {
- _batch.clear();
- evictorElements.clear();
-
- Dbt dbKey;
- initializeOutDbt(_key, dbKey);
-
- Dbt dbValue;
- if(_loadServants)
- {
- initializeOutDbt(_value, dbValue);
- }
- else
- {
- dbValue.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
- }
-
- Dbc* dbc = 0;
- try
- {
- //
- // Move to the first record
- //
- u_int32_t flags = DB_NEXT;
- if(_key.size() > 0)
- {
- //
- // _key represents the next element not yet returned
- // if it has been deleted, we want the one after
- //
- flags = DB_SET_RANGE;
-
- //
- // Will be used as input as well
- //
- dbKey.set_size(firstKey.size());
- }
-
- if(_loadServants)
- {
- loadedGeneration = _evictor.currentGeneration();
- }
-
- _evictor.db()->cursor(0, &dbc, 0);
-
- for(;;)
- {
- try
- {
- _more = (dbc->get(&dbKey, &dbValue, flags) == 0);
- if(_more)
- {
- _key.resize(dbKey.get_size());
- //
- // No need to resize data as we never use it as input
- //
- }
- break;
- }
- catch(const DbMemoryException& dx)
- {
- handleMemoryException(dx, _key, dbKey, _value, dbValue);
- }
- }
-
- while(_batch.size() < _batchSize && _more)
- {
- //
- // Even when count is 0, we read one more record (unless we reach the end)
- //
- if(_loadServants)
- {
- _more = _evictor.load(dbc, _key, _value, _batch, evictorElements);
- }
- else
- {
- _more = _evictor.load(dbc, _key, _batch);
- }
- }
-
- Dbc* toClose = dbc;
- dbc = 0;
- toClose->close();
- break; // for (;;)
- }
- catch(const DbDeadlockException&)
- {
- if(dbc != 0)
- {
- try
- {
- dbc->close();
- }
- catch(const DbDeadlockException&)
- {
- //
- // Ignored
- //
- }
- }
- _key = firstKey;
- //
- // Retry
- //
- }
- catch(...)
- {
- if(dbc != 0)
- {
- try
- {
- dbc->close();
- }
- catch(const DbDeadlockException&)
- {
- //
- // Ignored
- //
- }
- }
- throw;
- }
- }
- }
- catch(const DbException& dx)
- {
- DatabaseException ex(__FILE__, __LINE__);
- ex.message = dx.what();
- throw ex;
- }
-
- if(_batch.size() == 0)
- {
- return _batch.end();
- }
- else
- {
- if(_loadServants)
- {
- _evictor.insert(_batch, evictorElements, loadedGeneration);
- }
- return _batch.begin();
- }
-}