summaryrefslogtreecommitdiff
path: root/cpp/src/Freeze/EvictorI.cpp
diff options
context:
space:
mode:
authorBernard Normier <bernard@zeroc.com>2004-04-09 23:21:15 +0000
committerBernard Normier <bernard@zeroc.com>2004-04-09 23:21:15 +0000
commit0dcad3e212de5e8560e57c1a3d2f04909ebe7513 (patch)
tree412366d59303c0c4a90d281e50f78c39775db31d /cpp/src/Freeze/EvictorI.cpp
parentEach request now has its own set of object factories. (diff)
downloadice-0dcad3e212de5e8560e57c1a3d2f04909ebe7513.tar.bz2
ice-0dcad3e212de5e8560e57c1a3d2f04909ebe7513.tar.xz
ice-0dcad3e212de5e8560e57c1a3d2f04909ebe7513.zip
Updated Freeze Evictor with new facets
Diffstat (limited to 'cpp/src/Freeze/EvictorI.cpp')
-rw-r--r--cpp/src/Freeze/EvictorI.cpp2110
1 files changed, 602 insertions, 1508 deletions
diff --git a/cpp/src/Freeze/EvictorI.cpp b/cpp/src/Freeze/EvictorI.cpp
index c8b81d44add..a8cda63750b 100644
--- a/cpp/src/Freeze/EvictorI.cpp
+++ b/cpp/src/Freeze/EvictorI.cpp
@@ -1,6 +1,6 @@
// **********************************************************************
//
-// Copyright (c) 2003
+// Copyright (c) 2003-2004
// ZeroC, Inc.
// Billerica, MA, USA
//
@@ -17,7 +17,6 @@
#include <IceUtil/AbstractMutex.h>
#include <Freeze/Util.h>
#include <Freeze/EvictorIteratorI.h>
-#include <Freeze/IndexI.h>
#include <typeinfo>
@@ -26,67 +25,81 @@ using namespace Freeze;
using namespace Ice;
Freeze::EvictorPtr
-Freeze::createEvictor(const CommunicatorPtr& communicator,
+Freeze::createEvictor(const ObjectAdapterPtr& adapter,
const string& envName,
const string& dbName,
+ const ServantInitializerPtr& initializer,
const vector<IndexPtr>& indices,
bool createDb)
{
- return new EvictorI(communicator, envName, dbName, indices, createDb);
+ return new EvictorI(adapter, envName, dbName, initializer, indices, createDb);
}
Freeze::EvictorPtr
-Freeze::createEvictor(const CommunicatorPtr& communicator,
+Freeze::createEvictor(const ObjectAdapterPtr& adapter,
const string& envName,
DbEnv& dbEnv,
const string& dbName,
+ const ServantInitializerPtr& initializer,
const vector<IndexPtr>& indices,
bool createDb)
{
- return new EvictorI(communicator, envName, dbEnv, dbName, indices, createDb);
+ return new EvictorI(adapter, envName, dbEnv, dbName, initializer, indices, createDb);
}
-
-Freeze::EvictorI::EvictorI(const CommunicatorPtr communicator,
+Freeze::EvictorI::EvictorI(const ObjectAdapterPtr& adapter,
const string& envName,
const string& dbName,
- const std::vector<Freeze::IndexPtr>& indices,
+ const ServantInitializerPtr& initializer,
+ const vector<IndexPtr>& indices,
bool createDb) :
_evictorSize(10),
+ _currentEvictorSize(0),
_deactivated(false),
- _communicator(communicator),
+ _adapter(adapter),
+ _communicator(adapter->getCommunicator()),
+ _connection(createConnection(_communicator, envName)),
+ _facetRegistry(_connection, dbName + "--facetRegistry", createDb),
+ _initializer(initializer),
+
_dbEnv(0),
- _dbEnvHolder(SharedDbEnv::get(communicator, envName)),
+ _dbEnvHolder(SharedDbEnv::get(_communicator, envName)),
_dbName(dbName),
- _indices(indices),
- _trace(0),
- _generation(0)
+ _createDb(createDb),
+ _trace(0)
{
_dbEnv = _dbEnvHolder.get();
- init(envName, createDb);
+ init(envName, indices);
}
-Freeze::EvictorI::EvictorI(const CommunicatorPtr communicator,
+Freeze::EvictorI::EvictorI(const ObjectAdapterPtr& adapter,
const string& envName,
DbEnv& dbEnv,
const string& dbName,
- const std::vector<Freeze::IndexPtr>& indices,
+ const ServantInitializerPtr& initializer,
+ const vector<IndexPtr>& indices,
bool createDb) :
+
_evictorSize(10),
+ _currentEvictorSize(0),
_deactivated(false),
- _communicator(communicator),
+ _adapter(adapter),
+ _communicator(adapter->getCommunicator()),
+ _connection(createConnection(_communicator, envName)),
+ _facetRegistry(_connection, dbName + "--facetRegistry", createDb),
+ _initializer(initializer),
+
_dbEnv(&dbEnv),
_dbName(dbName),
- _indices(indices),
- _trace(0),
- _generation(0)
+ _createDb(createDb),
+ _trace(0)
{
- init(envName, createDb);
+ init(envName, indices);
}
void
-Freeze::EvictorI::init(const string& envName, bool createDb)
+Freeze::EvictorI::init(const string& envName, const vector<IndexPtr>& indices)
{
_trace = _communicator->getProperties()->getPropertyAsInt("Freeze.Trace.Evictor");
_deadlockWarning = (_communicator->getProperties()->getPropertyAsInt("Freeze.Warn.Deadlocks") != 0);
@@ -121,47 +134,73 @@ Freeze::EvictorI::init(const string& envName, bool createDb)
(_communicator->getProperties()->
getPropertyAsIntWithDefault(propertyPrefix + ".PopulateEmptyIndices", 0) != 0);
- DbTxn* txn = 0;
- try
+ //
+ // Instantiate all Dbs in 3 steps:
+ // (1) create _storeMap entries for all facets in the facet registry
+ // (2) iterate over the indices and create ObjectMap with indices
+ // (3) open object map without indices
+ //
+
+ //
+ // If there is no default ("") store, add it.
+ //
+ if(_facetRegistry.find("") == _facetRegistry.end())
{
- _db.reset(new Db(_dbEnv, 0));
+ _facetRegistry.put(FacetRegistry::value_type("", true));
+ }
+
+ for(FacetRegistry::iterator p = _facetRegistry.begin(); p != _facetRegistry.end(); ++p)
+ {
+ const string& facet = (*p).first;
+ _storeMap.insert(StoreMap::value_type(facet, 0));
+ }
- _dbEnv->txn_begin(0, &txn, 0);
+ for(vector<IndexPtr>::const_iterator p = indices.begin(); p != indices.end(); ++p)
+ {
+ string facet = (*p)->facet();
- u_int32_t flags = DB_THREAD;
- if(createDb)
+ StoreMap::iterator q = _storeMap.find(facet);
+ if(q == _storeMap.end())
{
- flags |= DB_CREATE;
+ //
+ // Not yet known to the facet registry
+ //
+ pair<StoreMap::iterator, bool> ir =
+ _storeMap.insert(StoreMap::value_type(facet, 0));
+ assert(ir.second == true);
+ q = ir.first;
+
+ _facetRegistry.put(FacetRegistry::value_type(facet, true));
}
- _db->open(txn, _dbName.c_str(), 0, DB_BTREE, flags, FREEZE_DB_MODE);
- for(size_t i = 0; i < _indices.size(); ++i)
+ if((*q).second == 0)
{
- _indices[i]->_impl->associate(this, txn, createDb, populateEmptyIndices);
+ vector<IndexPtr> storeIndices;
+
+ for(vector<IndexPtr>::const_iterator r = p; r != indices.end(); ++r)
+ {
+ if((*r)->facet() == facet)
+ {
+ storeIndices.push_back(*r);
+ }
+ }
+ (*q).second = new ObjectStore(_dbName, facet, _createDb, this, storeIndices, populateEmptyIndices);
}
- DbTxn* toCommit = txn;
- txn = 0;
- toCommit->commit(0);
}
- catch(const DbException& dx)
+
+
+ //
+ // Finally, open all the stores without index
+ //
+ for(StoreMap::iterator p = _storeMap.begin(); p != _storeMap.end(); ++p)
{
- if(txn != 0)
+ if((*p).second == 0)
{
- try
- {
- txn->abort();
- }
- catch(...)
- {
- }
+ const string& facet = (*p).first;
+ (*p).second = new ObjectStore(_dbName, facet, _createDb, this);
}
- DatabaseException ex(__FILE__, __LINE__);
- ex.message = dx.what();
- throw ex;
}
- _lastSave = IceUtil::Time::now();
-
//
// Start saving thread
//
@@ -180,6 +219,14 @@ Freeze::EvictorI::~EvictorI()
//
deactivate("");
}
+
+ //
+ // Delete all the ObjectStore*
+ //
+ for(StoreMap::iterator p = _storeMap.begin(); p != _storeMap.end(); ++p)
+ {
+ delete (*p).second;
+ }
}
void
@@ -203,7 +250,7 @@ Freeze::EvictorI::setSize(Int evictorSize)
//
// Update the evictor size.
//
- _evictorSize = static_cast<EvictorMap::size_type>(evictorSize);
+ _evictorSize = static_cast<size_t>(evictorSize);
//
// Evict as many elements as necessary.
@@ -224,26 +271,18 @@ Freeze::EvictorI::getSize()
return static_cast<Int>(_evictorSize);
}
-void
-Freeze::EvictorI::saveNow()
-{
- Lock sync(*this);
-
- if(_deactivated)
- {
- throw EvictorDeactivatedException(__FILE__, __LINE__);
- }
- saveNowNoSync();
+Ice::ObjectPrx
+Freeze::EvictorI::add(const ObjectPtr& servant, const Identity& ident)
+{
+ return addFacet(servant, ident, "");
}
-void
-Freeze::EvictorI::createObject(const Identity& ident, const ObjectPtr& servant)
+Ice::ObjectPrx
+Freeze::EvictorI::addFacet(const ObjectPtr& servant, const Identity& ident, const string& facet)
{
- EvictorElementPtr loadedElement = 0;
- int loadedElementGeneration = 0;
- bool triedToLoadElement = false;
-
+ ObjectStore* store = 0;
+
for(;;)
{
{
@@ -254,661 +293,416 @@ Freeze::EvictorI::createObject(const Identity& ident, const ObjectPtr& servant)
throw EvictorDeactivatedException(__FILE__, __LINE__);
}
- EvictorMap::iterator p = _evictorMap.find(ident);
-
- if(p == _evictorMap.end() && triedToLoadElement)
+ StoreMap::iterator p = _storeMap.find(facet);
+ if(p == _storeMap.end())
{
- if(loadedElementGeneration == _generation)
+ if(store != 0)
{
- if(loadedElement != 0)
- {
- p = insertElement(0, ident, loadedElement);
- }
- }
- else
- {
- loadedElement = 0;
- triedToLoadElement = false;
+ _storeMap.insert(StoreMap::value_type(facet, store));
}
}
-
- bool replacing = (p != _evictorMap.end());
-
- if(replacing || triedToLoadElement)
- {
- if(replacing)
- {
- EvictorElementPtr& element = p->second;
-
- //
- // Destroy all existing facets
- //
- for(FacetMap::iterator q = element->facets.begin(); q != element->facets.end(); q++)
- {
- destroyFacetImpl(q->second);
- }
- }
- else
- {
- //
- // Let's insert an empty EvitorElement
- //
- EvictorElementPtr element = new EvictorElement;
-
- pair<EvictorMap::iterator, bool> pair = _evictorMap.insert(EvictorMap::value_type(ident, element));
- assert(pair.second);
-
- p = pair.first;
- element->identity = &p->first;
-
- _evictorList.push_front(p);
- element->position = _evictorList.begin();
- }
-
- //
- // Add all the new facets (recursively)
- //
- EvictorElementPtr& element = p->second;
-
- addFacetImpl(element, servant, FacetPath(), replacing);
-
- //
- // Evict as many elements as necessary.
- //
- evict();
- break; // for(;;)
- }
else
{
- loadedElementGeneration = _generation;
+ delete store;
+ store = (*p).second;
+ assert(store != 0);
}
}
-
- //
- // Try to load element and try again
- //
- assert(loadedElement == 0);
- assert(triedToLoadElement == false);
- loadedElement = load(ident);
- triedToLoadElement = true;
- }
-
- if(_trace >= 1)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "created \"" << ident << "\"";
- }
-}
-
-void
-Freeze::EvictorI::addFacet(const Identity& ident, const FacetPath& facetPath, const ObjectPtr& servant)
-{
- if(facetPath.size() == 0)
- {
- throw EmptyFacetPathException(__FILE__, __LINE__);
+
+ if(store == 0)
+ {
+ assert(facet != "");
+ store = new ObjectStore(_dbName, facet, _createDb, this);
+ _facetRegistry.put(FacetRegistry::value_type(facet, true));
+ // loop
+ }
+ else
+ {
+ break; // for(;;)
+ }
}
- EvictorElementPtr loadedElement = 0;
- int loadedElementGeneration = 0;
+ assert(store != 0);
+ bool alreadyThere = false;
for(;;)
{
- {
- Lock sync(*this);
-
- if(_deactivated)
- {
- throw EvictorDeactivatedException(__FILE__, __LINE__);
- }
-
- EvictorMap::iterator p = _evictorMap.find(ident);
-
- if(p == _evictorMap.end() && loadedElement != 0)
- {
- //
- // If generation matches, load element into map
- //
- if(loadedElementGeneration == _generation)
- {
- p = insertElement(0, ident, loadedElement);
- }
- else
- {
- //
- // Discard loaded element
- //
- loadedElement = 0;
- }
- }
-
- if(p != _evictorMap.end())
- {
- EvictorElementPtr& element = p->second;
- FacetPath parentPath(facetPath);
- parentPath.pop_back();
- FacetMap::iterator q = element->facets.find(parentPath);
- if(q == element->facets.end())
- {
- throw FacetNotExistException(__FILE__, __LINE__);
- }
-
- {
- FacetPtr& facet = q->second;
- IceUtil::Mutex::Lock lockFacet(facet->mutex);
-
- if(facet->status == dead || facet->status == destroyed)
- {
- throw FacetNotExistException(__FILE__, __LINE__);
- }
-
- //
- // Throws AlreadyRegisteredException if the facet is already registered
- //
- facet->rec.servant->ice_addFacet(servant, facetPath[facetPath.size() - 1]);
- }
- //
- // We may need to replace (nested) dead or destroyed facets
- //
- addFacetImpl(element, servant, facetPath, true);
-
- evict();
-
- break; // for(;;)
- }
-
- loadedElementGeneration = _generation;
- }
-
- assert(loadedElement == 0);
-
//
- // Load object and loop
+ // Create a new entry
//
- loadedElement = load(ident);
- if(loadedElement == 0)
+
+ EvictorElementPtr element = new EvictorElement(*store);
+ element->status = EvictorElement::dead;
+ pair<EvictorElementPtr, bool> ir = store->insert(ident, element);
+
+ if(ir.second == false)
{
- throw ObjectNotExistException(__FILE__, __LINE__);
+ element = ir.first;
}
- }
-
- if(_trace >= 1)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "added facet to \"" << ident << "\"";
- }
-}
-
-void
-Freeze::EvictorI::destroyObject(const Identity& ident)
-{
- EvictorElementPtr loadedElement = 0;
- int loadedElementGeneration = 0;
- bool triedToLoadElement = false;
- for(;;)
- {
{
Lock sync(*this);
-
+
if(_deactivated)
{
throw EvictorDeactivatedException(__FILE__, __LINE__);
}
-
- EvictorMap::iterator p = _evictorMap.find(ident);
-
- if(p == _evictorMap.end() && triedToLoadElement)
- {
- if(loadedElementGeneration == _generation)
- {
- if(loadedElement != 0)
- {
- p = insertElement(0, ident, loadedElement);
- }
- }
- else
- {
- loadedElement = 0;
- triedToLoadElement = false;
- }
- }
-
- bool destroying = (p != _evictorMap.end());
- if(destroying || triedToLoadElement)
+ if(element->stale)
{
- if(destroying)
- {
- EvictorElementPtr& element = p->second;
-
- //
- // Destroy all existing facets
- //
- for(FacetMap::iterator q = element->facets.begin(); q != element->facets.end(); q++)
- {
- destroyFacetImpl(q->second);
- }
- }
-
- //
- // Evict as many elements as necessary.
//
- evict();
- break; // for(;;)
- }
- else
- {
- loadedElementGeneration = _generation;
+ // Try again
+ //
+ continue;
}
- }
-
- //
- // Try to load element and try again
- //
- assert(loadedElement == 0);
- assert(triedToLoadElement == false);
- loadedElement = load(ident);
- triedToLoadElement = true;
- }
-
- if(_trace >= 1)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "destroyed \"" << ident << "\"";
- }
-}
+ fixEvictPosition(element);
-ObjectPtr
-Freeze::EvictorI::removeFacet(const Identity& ident, const FacetPath& facetPath)
-{
- if(facetPath.size() == 0)
- {
- throw EmptyFacetPathException(__FILE__, __LINE__);
- }
-
- ObjectPtr result = 0;
- EvictorElementPtr loadedElement = 0;
- int loadedElementGeneration = 0;
-
- for(;;)
- {
- {
- Lock sync(*this);
+ IceUtil::Mutex::Lock lock(element->mutex);
- if(_deactivated)
+ switch(element->status)
{
- throw EvictorDeactivatedException(__FILE__, __LINE__);
- }
-
- EvictorMap::iterator p = _evictorMap.find(ident);
-
- if(p == _evictorMap.end() && loadedElement != 0)
- {
- //
- // If generation matches, load element into map
- //
- if(loadedElementGeneration == _generation)
+ case EvictorElement::clean:
+ case EvictorElement::created:
+ case EvictorElement::modified:
{
- p = insertElement(0, ident, loadedElement);
- }
- else
+ alreadyThere = true;
+ break;
+ }
+ case EvictorElement::destroyed:
{
+ element->status = EvictorElement::modified;
+ element->rec.servant = servant;
+
//
- // Discard loaded element
+ // No need to push it on the modified queue, as a destroyed object
+ // is either already on the queue or about to be saved. When saved,
+ // it becomes dead.
//
- loadedElement = 0;
+ break;
}
- }
-
- if(p != _evictorMap.end())
- {
- EvictorElementPtr& element = p->second;
- FacetPath parentPath(facetPath);
- parentPath.pop_back();
- FacetMap::iterator q = element->facets.find(parentPath);
- if(q == element->facets.end())
+ case EvictorElement::dead:
{
- throw FacetNotExistException(__FILE__, __LINE__);
+ element->status = EvictorElement::created;
+ ObjectRecord& rec = element->rec;
+
+ rec.servant = servant;
+ rec.stats.creationTime = IceUtil::Time::now().toMilliSeconds();
+ rec.stats.lastSaveTime = 0;
+ rec.stats.avgSaveTime = 0;
+
+ addToModifiedQueue(element);
+ break;
}
-
+ default:
{
- FacetPtr& facet = q->second;
- IceUtil::Mutex::Lock lockFacet(facet->mutex);
-
- if(facet->status == dead || facet->status == destroyed)
- {
- throw FacetNotExistException(__FILE__, __LINE__);
- }
-
- //
- // Throws NotRegisteredException if the facet is not registered
- //
- result = facet->rec.servant->ice_removeFacet(facetPath[facetPath.size() - 1]);
+ assert(0);
+ break;
}
- removeFacetImpl(element->facets, facetPath);
-
- evict();
-
- break; // for(;;)
}
-
- loadedElementGeneration = _generation;
}
-
- assert(loadedElement == 0);
-
- //
- // Load object and loop
- //
- loadedElement = load(ident);
- if(loadedElement == 0)
+ break; // for(;;)
+ }
+
+ if(alreadyThere)
+ {
+ AlreadyRegisteredException ex(__FILE__, __LINE__);
+ ex.kindOfObject = "servant";
+ ex.id = identityToString(ident);
+ if(!facet.empty())
{
- throw ObjectNotExistException(__FILE__, __LINE__);
+ ex.id += " -f " + facet;
}
+ throw ex;
}
if(_trace >= 1)
{
Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "removed facet from \"" << ident << "\"";
+ out << "added object \"" << ident << "\"";
+ if(!facet.empty())
+ {
+ out << " with facet \"" << facet << "\"";
+ }
}
- return result;
+
+
+ //
+ // TODO: there is currently no way to create an ObjectPrx
+ // with a facet!
+ //
+ return _adapter->createProxy(ident);
}
+void
+Freeze::EvictorI::remove(const Identity& ident)
+{
+ removeFacet(ident, "");
+}
void
-Freeze::EvictorI::removeAllFacets(const Identity& ident)
+Freeze::EvictorI::removeFacet(const Identity& ident, const string& facet)
{
- EvictorElementPtr loadedElement = 0;
- int loadedElementGeneration = 0;
+ ObjectStore* store = findStore(facet);
+ bool notThere = (store == 0);
- for(;;)
+ if(store != 0)
{
+ for(;;)
{
- Lock sync(*this);
-
- if(_deactivated)
+ //
+ // Retrieve object
+ //
+
+ EvictorElementPtr element = store->pin(ident);
+ if(element == 0)
{
- throw EvictorDeactivatedException(__FILE__, __LINE__);
+ notThere = true;
}
-
- EvictorMap::iterator p = _evictorMap.find(ident);
-
- if(p == _evictorMap.end() && loadedElement != 0)
+ else
{
- //
- // If generation matches, load element into map
- //
- if(loadedElementGeneration == _generation)
- {
- p = insertElement(0, ident, loadedElement);
- }
- else
+ Lock sync(*this);
+ if(element->stale)
{
//
- // Discard loaded element
- //
- loadedElement = 0;
+ // Try again
+ //
+ continue;
}
- }
-
- if(p != _evictorMap.end())
- {
- EvictorElementPtr& element = p->second;
-
+
+ fixEvictPosition(element);
+ IceUtil::Mutex::Lock lock(element->mutex);
+
+ switch(element->status)
{
- FacetPtr& facet = element->mainObject;
- IceUtil::Mutex::Lock lockFacet(facet->mutex);
-
- if(facet->status == dead || facet->status == destroyed)
+ case EvictorElement::clean:
{
- throw ObjectNotExistException(__FILE__, __LINE__);
+ element->status = EvictorElement::destroyed;
+ element->rec.servant = 0;
+ addToModifiedQueue(element);
+ break;
}
- facet->rec.servant->ice_removeAllFacets();
- }
-
- {
- //
- // Destroy all facets except main object
- //
- for(FacetMap::iterator q = element->facets.begin(); q != element->facets.end(); q++)
+ case EvictorElement::created:
{
- if(q->second != element->mainObject)
- {
- destroyFacetImpl(q->second);
- }
+ element->status = EvictorElement::dead;
+ element->rec.servant = 0;
+ break;
+ }
+ case EvictorElement::modified:
+ {
+ element->status = EvictorElement::destroyed;
+ element->rec.servant = 0;
+ //
+ // Not necessary to push it on the modified queue, as a modified
+ // element is either on the queue already or about to be saved
+ // (at which point it becomes clean)
+ //
+ break;
+ }
+ case EvictorElement::destroyed:
+ case EvictorElement::dead:
+ {
+ notThere = true;
+ break;
+ }
+ default:
+ {
+ assert(0);
+ break;
}
}
-
- evict();
-
- break; // for(;;)
}
-
- loadedElementGeneration = _generation;
+ break; // for(;;)
}
-
- assert(loadedElement == 0);
-
- //
- // Load object and loop
- //
- loadedElement = load(ident);
- if(loadedElement == 0)
+ }
+
+ if(notThere)
+ {
+ NotRegisteredException ex(__FILE__, __LINE__);
+ ex.kindOfObject = "servant";
+ ex.id = identityToString(ident);
+ if(!facet.empty())
{
- throw ObjectNotExistException(__FILE__, __LINE__);
+ ex.id += " -f " + facet;
}
+ throw ex;
}
if(_trace >= 1)
{
- Trace out(_communicator->getLogger(), "Ereeze.Evictor");
- out << "removed all facets from \"" << ident << "\"";
- }
-}
-
-
-void
-Freeze::EvictorI::installServantInitializer(const ServantInitializerPtr& initializer)
-{
- Lock sync(*this);
-
- if(_deactivated)
- {
- throw EvictorDeactivatedException(__FILE__, __LINE__);
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "removed object \"" << ident << "\"";
+ if(!facet.empty())
+ {
+ out << " with facet \"" << facet << "\"";
+ }
}
-
- _initializer = initializer;
}
EvictorIteratorPtr
-Freeze::EvictorI::getIterator(Int batchSize, bool loadServants)
+Freeze::EvictorI::getIterator(const string& facet, Int batchSize)
{
+ ObjectStore* store = 0;
{
Lock sync(*this);
if(_deactivated)
{
throw EvictorDeactivatedException(__FILE__, __LINE__);
}
- saveNowNoSync();
+
+ StoreMap::iterator p = _storeMap.find(facet);
+ if(p != _storeMap.end())
+ {
+ store = (*p).second;
+ saveNowNoSync();
+ }
}
-
- return new EvictorIteratorI(*this, batchSize, loadServants);
+ return new EvictorIteratorI(store, batchSize);
}
bool
Freeze::EvictorI::hasObject(const Identity& ident)
{
- Lock sync(*this);
+ return hasFacet(ident, "");
+}
- if(_deactivated)
- {
- throw EvictorDeactivatedException(__FILE__, __LINE__);
- }
+bool
+Freeze::EvictorI::hasFacet(const Identity& ident, const string& facet)
+{
+ ObjectStore* store = 0;
- EvictorMap::iterator p = _evictorMap.find(ident);
-
- if(p != _evictorMap.end())
{
- EvictorElementPtr& element = p->second;
- IceUtil::Mutex::Lock lockFacet(element->mainObject->mutex);
- return (element->mainObject->status != destroyed && element->mainObject->status != dead);
- }
- else
- {
- //
- // Release sync to increase concurrency
- //
- sync.release();
-
- return dbHasObject(ident);
+ Lock sync(*this);
+
+ if(_deactivated)
+ {
+ throw EvictorDeactivatedException(__FILE__, __LINE__);
+ }
+
+ StoreMap::iterator p = _storeMap.find(facet);
+ if(p == _storeMap.end())
+ {
+ return false;
+ }
+
+ ObjectStore* store = (*p).second;
+
+ EvictorElementPtr element = store->getIfPinned(ident);
+ if(element != 0)
+ {
+ assert(!element->stale);
+
+ IceUtil::Mutex::Lock lock(element->mutex);
+ return element->status != EvictorElement::dead &&
+ element->status != EvictorElement::destroyed;
+ }
}
+ return store->dbHasObject(ident);
}
ObjectPtr
Freeze::EvictorI::locate(const Current& current, LocalObjectPtr& cookie)
{
- EvictorElementPtr loadedElement = 0;
- int loadedElementGeneration = 0;
- cookie = 0;
-
- for(;;)
+ ObjectPtr result = locateImpl(current, cookie);
+
+ if(result == 0)
{
- EvictorMap::iterator p;
- bool objectFound = false;
-
+ //
+ // If the object exists in another store, throw FacetNotExistException
+ // instead of returning 0 (== ObjectNotExistException)
+ //
+ StoreMap storeMapCopy;
{
Lock sync(*this);
-
- //
- // If this operation is called on a deactivated servant locator,
- // it's a bug in Ice.
- //
- assert(!_deactivated);
-
- p = _evictorMap.find(current.id);
-
- if(p == _evictorMap.end() && loadedElement != 0)
- {
- if(loadedElementGeneration == _generation)
- {
- p = insertElement(current.adapter, current.id, loadedElement);
- }
- else
- {
- loadedElement = 0;
- }
- }
-
- objectFound = (p != _evictorMap.end());
-
- if(objectFound)
- {
- //
- // Ice object found in evictor map. Push it to the front of
- // the evictor list, so that it will be evicted last.
- //
- EvictorElementPtr& element = p->second;
- if(element->position != _evictorList.begin())
- {
- _evictorList.erase(element->position);
- _evictorList.push_front(p);
- element->position = _evictorList.begin();
- }
-
- element->usageCount++;
-
- FacetMap::iterator q = element->facets.find(current.facet);
- if(q != element->facets.end())
- {
- cookie = q->second;
- }
-
- evict();
- //
- // Later (after releasing the mutex), check that this
- // object is not dead or destroyed
- //
- }
- else
- {
- loadedElementGeneration = _generation;
- }
- }
-
- if(objectFound)
+ storeMapCopy = _storeMap;
+ }
+ for(StoreMap::iterator p = storeMapCopy.begin(); p != storeMapCopy.end(); ++p)
{
- EvictorElementPtr& element = p->second;
-
- if(_trace >= 2)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "found \"" << current.id << "\" in the queue";
- }
-
//
- // Return servant if object not dead or destroyed
+ // Do not check again the current facet
//
- if(cookie == 0)
- {
- ObjectPtr result = 0;
+ if((*p).first != current.facet)
+ {
+ ObjectStore* store = (*p).second;
+
+ bool inCache = false;
{
- IceUtil::Mutex::Lock lockFacet(element->mainObject->mutex);
- if(element->mainObject->status != destroyed && element->mainObject->status != dead)
+ Lock sync(*this);
+
+ EvictorElementPtr element = store->getIfPinned(current.id);
+ if(element != 0)
{
- result = element->mainObject->rec.servant;
+ inCache = true;
+ assert(!element->stale);
+
+ IceUtil::Mutex::Lock lock(element->mutex);
+ if(element->status != EvictorElement::dead &&
+ element->status != EvictorElement::destroyed)
+ {
+ throw FacetNotExistException(__FILE__, __LINE__);
+ }
}
}
- if(_trace >= 2)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "\"" << current.id << "\" does not have the desired facet";
- }
- Lock sync(*this);
- element->usageCount--;
- return result;
- }
- else
- {
- IceUtil::Mutex::Lock lockFacet(element->mainObject->mutex);
- if(element->mainObject->status != destroyed && element->mainObject->status != dead)
+ if(!inCache)
{
- return element->mainObject->rec.servant;
+ if(store->dbHasObject(current.id))
+ {
+ throw FacetNotExistException(__FILE__, __LINE__);
+ }
}
}
+ }
+ }
+
+ return result;
+}
+
+
+ObjectPtr
+Freeze::EvictorI::locateImpl(const Current& current, LocalObjectPtr& cookie)
+{
+ cookie = 0;
+
+ ObjectStore* store = findStore(current.facet);
+ if(store == 0)
+ {
+ return 0;
+ }
+
+ for(;;)
+ {
+ EvictorElementPtr element = store->pin(current.id);
+ if(element == 0)
+ {
+ return 0;
+ }
+ Lock sync(*this);
+ assert(!_deactivated);
+
+ if(element->stale)
+ {
//
- // Object is destroyed or dead: clean-up
+ // try again
//
- if(_trace >= 2)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "\"" << current.id << "\" was dead or destroyed";
- }
- Lock sync(*this);
- element->usageCount--;
- return 0;
+ continue;
}
- else
+
+
+ IceUtil::Mutex::Lock lockElement(element->mutex);
+ if(element->status == EvictorElement::destroyed || element->status == EvictorElement::dead)
{
- //
- // Load object now and loop
- //
-
- if(_trace >= 2)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "could not find \"" << current.id << "\" in the queue\n"
- << "loading \"" << current.id << "\" from the database";
- }
-
- loadedElement = load(current.id);
- if(loadedElement == 0)
- {
- return 0;
- }
+ return 0;
}
+
+ //
+ // It's a good one!
+ //
+ fixEvictPosition(element);
+ element->usageCount++;
+ cookie = element;
+ assert(element->rec.servant != 0);
+ return element->rec.servant;
}
}
@@ -919,38 +713,42 @@ Freeze::EvictorI::finished(const Current& current, const ObjectPtr& servant, con
if(cookie != 0)
{
- FacetPtr facet = FacetPtr::dynamicCast(cookie);
- assert(facet);
+ EvictorElementPtr element = EvictorElementPtr::dynamicCast(cookie);
+ assert(element);
bool enqueue = false;
if(current.mode != Nonmutating)
{
- IceUtil::Mutex::Lock lockRec(facet->mutex);
+ IceUtil::Mutex::Lock lock(element->mutex);
- if(facet->status == clean)
+ if(element->status == EvictorElement::clean)
{
//
// Assume this operation updated the object
//
- facet->status = modified;
+ element->status = EvictorElement::modified;
enqueue = true;
}
}
Lock sync(*this);
- assert(!_deactivated);
+ //
+ // Only elements with a usageCount == 0 can become stale and we own
+ // one count!
+ //
+ assert(!element->stale);
+ assert(element->usageCount >= 1);
//
// Decrease the usage count of the evictor queue element.
//
- assert(facet->element->usageCount >= 1);
- --facet->element->usageCount;
+ element->usageCount--;
if(enqueue)
{
- addToModifiedQueue(facet);
+ addToModifiedQueue(element);
}
else
{
@@ -966,7 +764,11 @@ void
Freeze::EvictorI::deactivate(const string&)
{
Lock sync(*this);
-
+
+ //
+ // TODO: wait until all outstanding requests have completed
+ //
+
if(!_deactivated)
{
if(_trace >= 1)
@@ -989,29 +791,28 @@ Freeze::EvictorI::deactivate(const string&)
sync.release();
getThreadControl().join();
- try
- {
- _db->close(0);
-
- for(size_t i = 0; i < _indices.size(); ++i)
- {
- _indices[i]->_impl->close();
- }
- _indices.clear();
- }
- catch(const DbException& dx)
+ for(StoreMap::iterator p = _storeMap.begin(); p != _storeMap.end(); ++p)
{
- DatabaseException ex(__FILE__, __LINE__);
- ex.message = dx.what();
- throw ex;
+ (*p).second->close();
}
- _db.reset();
+
_dbEnv = 0;
_dbEnvHolder = 0;
_initializer = 0;
}
}
+
+void
+Freeze::EvictorI::initialize(const Identity& ident, const string& facet, const ObjectPtr& servant)
+{
+ if(_initializer != 0)
+ {
+ _initializer->initialize(_adapter, ident, facet, servant);
+ }
+}
+
+
void
Freeze::EvictorI::run()
{
@@ -1019,7 +820,9 @@ Freeze::EvictorI::run()
{
for(;;)
{
- deque<FacetPtr> allObjects;
+ deque<EvictorElementPtr> allObjects;
+ deque<EvictorElementPtr> deadObjects;
+
size_t saveNowThreadsSize = 0;
{
@@ -1081,7 +884,7 @@ Freeze::EvictorI::run()
//
for(size_t i = 0; i < size; i++)
{
- FacetPtr& facet = allObjects[i];
+ EvictorElementPtr& element = allObjects[i];
bool tryAgain;
do
@@ -1089,26 +892,40 @@ Freeze::EvictorI::run()
tryAgain = false;
ObjectPtr servant = 0;
- IceUtil::Mutex::Lock lockFacet(facet->mutex);
- Byte status = facet->status;
+ //
+ // These elements can't be stale as only elements with
+ // usageCount == 0 can become stale, and the modifiedQueue
+ // (us now) owns one count.
+ //
+
+ IceUtil::Mutex::Lock lockElement(element->mutex);
+ Byte status = element->status;
switch(status)
{
- case created:
- case modified:
+ case EvictorElement::created:
+ case EvictorElement::modified:
{
- servant = facet->rec.servant;
+ servant = element->rec.servant;
break;
}
- case destroyed:
+ case EvictorElement::destroyed:
{
- facet->status = dead;
size_t index = streamedObjectQueue.size();
streamedObjectQueue.resize(index + 1);
StreamedObject& obj = streamedObjectQueue[index];
- streamFacet(facet, facet->position->first, status, streamStart, obj);
+ stream(element, streamStart, obj);
+
+ element->status = EvictorElement::dead;
+ deadObjects.push_back(element);
+
break;
}
+ case EvictorElement::dead:
+ {
+ deadObjects.push_back(element);
+ break;
+ }
default:
{
//
@@ -1119,7 +936,7 @@ Freeze::EvictorI::run()
}
if(servant == 0)
{
- lockFacet.release();
+ lockElement.release();
}
else
{
@@ -1127,31 +944,32 @@ Freeze::EvictorI::run()
if(mutex != 0)
{
//
- // Lock servant and then facet so that user can safely lock
+ // Lock servant and then element so that user can safely lock
// servant and call various Evictor operations
//
IceUtil::AbstractMutex::TryLock lockServant(*mutex);
if(!lockServant.acquired())
{
- lockFacet.release();
+ lockElement.release();
lockServant.acquire();
- lockFacet.acquire();
- status = facet->status;
+ lockElement.acquire();
+ status = element->status;
}
switch(status)
{
- case created:
- case modified:
+ case EvictorElement::created:
+ case EvictorElement::modified:
{
- if(servant == facet->rec.servant)
+ if(servant == element->rec.servant)
{
- facet->status = clean;
size_t index = streamedObjectQueue.size();
streamedObjectQueue.resize(index + 1);
StreamedObject& obj = streamedObjectQueue[index];
- streamFacet(facet, facet->position->first, status, streamStart, obj);
+ stream(element, streamStart, obj);
+
+ element->status = EvictorElement::clean;
}
else
{
@@ -1159,16 +977,24 @@ Freeze::EvictorI::run()
}
break;
}
- case destroyed:
+ case EvictorElement::destroyed:
{
lockServant.release();
- facet->status = dead;
+
size_t index = streamedObjectQueue.size();
streamedObjectQueue.resize(index + 1);
StreamedObject& obj = streamedObjectQueue[index];
- streamFacet(facet, facet->position->first, status, streamStart, obj);
+ stream(element, streamStart, obj);
+
+ element->status = EvictorElement::dead;
+ deadObjects.push_back(element);
break;
}
+ case EvictorElement::dead:
+ {
+ deadObjects.push_back(element);
+ break;
+ }
default:
{
//
@@ -1181,7 +1007,7 @@ Freeze::EvictorI::run()
else
{
DatabaseException ex(__FILE__, __LINE__);
- ex.message = string(typeid(*facet->rec.servant).name())
+ ex.message = string(typeid(*element->rec.servant).name())
+ " does not implement IceUtil::AbstractMutex";
throw ex;
}
@@ -1233,40 +1059,7 @@ Freeze::EvictorI::run()
for(size_t i = 0; i < txSize; i++)
{
StreamedObject& obj = streamedObjectQueue[i];
-
- switch(obj.status)
- {
- case created:
- case modified:
- {
- Dbt dbKey;
- Dbt dbValue;
- initializeInDbt(obj.key, dbKey);
- initializeInDbt(obj.value, dbValue);
- u_int32_t flags = (obj.status == created) ? DB_NOOVERWRITE : 0;
- int err = _db->put(tx, &dbKey, &dbValue, flags);
- if(err != 0)
- {
- throw DatabaseException(__FILE__, __LINE__);
- }
- break;
- }
- case destroyed:
- {
- Dbt dbKey;
- initializeInDbt(obj.key, dbKey);
- int err = _db->del(tx, &dbKey, 0);
- if(err != 0)
- {
- throw DatabaseException(__FILE__, __LINE__);
- }
- break;
- }
- default:
- {
- assert(0);
- }
- }
+ obj.store->save(obj.key, obj.value, obj.status, tx);
}
}
catch(...)
@@ -1304,31 +1097,38 @@ Freeze::EvictorI::run()
{
Lock sync(*this);
-
- _generation++;
-
- for(deque<FacetPtr>::iterator q = allObjects.begin();
+
+ //
+ // Release usage count
+ //
+ for(deque<EvictorElementPtr>::iterator q = allObjects.begin();
q != allObjects.end(); q++)
{
- FacetPtr& facet = *q;
- facet->element->usageCount--;
-
- if(facet != facet->element->mainObject)
+ EvictorElementPtr& element = *q;
+ element->usageCount--;
+ }
+ allObjects.clear();
+
+ for(deque<EvictorElementPtr>::iterator q = deadObjects.begin();
+ q != deadObjects.end(); q++)
+ {
+ EvictorElementPtr& element = *q;
+ if(!element->stale)
{
- //
- // Remove if dead
- //
- IceUtil::Mutex::Lock lockFacet(facet->mutex);
+ if(element->usageCount == 0)
{
- if(facet->status == dead)
+ //
+ // Get rid of unused dead elements
+ //
+ IceUtil::Mutex::Lock lockElement(element->mutex);
+ if(element->status == EvictorElement::dead)
{
- facet->element->facets.erase(facet->position);
- facet->position = facet->element->facets.end();
- }
+ evict(element);
+ }
}
}
}
- allObjects.clear();
+ deadObjects.clear();
evict();
if(saveNowThreadsSize > 0)
@@ -1337,7 +1137,6 @@ Freeze::EvictorI::run()
notifyAll();
}
}
- _lastSave = IceUtil::Time::now();
}
}
catch(const IceUtil::Exception& ex)
@@ -1363,301 +1162,26 @@ Freeze::EvictorI::run()
}
}
-bool
-Freeze::EvictorI::load(Dbc* dbc, Key& key, Value& value,
- vector<Identity>& identities,
- vector<EvictorElementPtr>& evictorElements)
-{
- Dbt dbKey;
- Dbt dbValue;
- Key root;
-
- EvictorElementPtr elt = new EvictorElement;
- int rs = 0;
- do
- {
- //
- // Unmarshal key and data and insert it into elt's facet map
- //
- EvictorStorageKey esk;
- unmarshal(esk, key, _communicator);
-
- if(root.size() == 0)
- {
-
- if(esk.facet.size() == 0)
- {
- //
- // Good, we found the object
- //
- marshalRoot(esk.identity, root, _communicator);
- }
- else
- {
- //
- // Otherwise, skip this orphan facet (could be a temporary
- // inconsistency on disk)
- //
-
- if(_trace >= 3)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "Iterator is skipping orphan facet \"" << esk.identity
- << "\" " << esk.facet;
- }
- }
- }
-
- if(root.size() != 0)
- {
- if(_trace >= 3)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "reading facet identity = \"" << esk.identity << "\" ";
- if(esk.facet.size() == 0)
- {
- out << "(main object)";
- }
- else
- {
- out << "facet = " << esk.facet;
- }
- }
-
- FacetPtr facet = new Facet(elt.get());
- facet->status = clean;
- unmarshal(facet->rec, value, _communicator);
-
- pair<FacetMap::iterator, bool> pair;
- pair = elt->facets.insert(FacetMap::value_type(esk.facet, facet));
- assert(pair.second);
- facet->position = pair.first;
-
- if(esk.facet.size() == 0)
- {
- identities.push_back(esk.identity);
- elt->mainObject = facet;
- }
- }
-
- initializeOutDbt(key, dbKey);
- initializeOutDbt(value, dbValue);
-
- for(;;)
- {
- try
- {
- rs = dbc->get(&dbKey, &dbValue, DB_NEXT);
- if(rs == 0)
- {
- //
- // Key may be used as input of a DB_SET_RANGE call, so we
- // need to write its exact size
- //
- key.resize(dbKey.get_size());
- }
- break; // for(;;)
- }
- catch(const DbMemoryException& dx)
- {
- handleMemoryException(dx, key, dbKey, value, dbValue);
- }
- }
- }
- while(rs == 0 && (root.size() == 0 || startWith(key, root)));
-
- if(root.size() != 0)
- {
- buildFacetMap(elt->facets);
- evictorElements.push_back(elt);
- }
- return (rs == 0);
-}
-
-bool
-Freeze::EvictorI::load(Dbc* dbc, Key& key, vector<Identity>& identities)
-{
- Key root;
- Dbt dbKey;
- Dbt dbValue;
- dbValue.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
-
- int rs = 0;
- do
- {
- if(root.size() == 0)
- {
- EvictorStorageKey esk;
- unmarshal(esk, key, _communicator);
-
- if(esk.facet.size() == 0)
- {
- //
- // Good, we found the object
- //
- marshalRoot(esk.identity, root, _communicator);
-
- if(_trace >= 3)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "Iterator read \"" << esk.identity << "\"";
- }
- identities.push_back(esk.identity);
- }
- else
- {
- //
- // Otherwise, skip this orphan facet (could be a temporary
- // inconsistency on disk)
- //
- if(_trace >= 3)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "Iterator is skipping orphan facet \"" << esk.identity
- << "\" " << esk.facet;
- }
- }
- }
-
- initializeOutDbt(key, dbKey);
-
- for(;;)
- {
- try
- {
- rs = dbc->get(&dbKey, &dbValue, DB_NEXT);
- if(rs == 0)
- {
- key.resize(dbKey.get_size());
- }
- break; // for (;;)
- }
- catch(const DbMemoryException& dx)
- {
- handleMemoryException(dx, key, dbKey);
- }
- }
- }
- while(rs == 0 && (root.size() == 0 || startWith(key, root)));
- return (rs == 0);
-}
-
-void
-Freeze::EvictorI::insert(const vector<Identity>& identities,
- const vector<EvictorElementPtr>& evictorElements,
- int loadedGeneration)
-{
- assert(identities.size() == evictorElements.size());
-
- size_t size = identities.size();
-
- if(size > 0)
- {
- Lock sync(*this);
-
- if(_deactivated)
- {
- throw EvictorDeactivatedException(__FILE__, __LINE__);
- }
-
- if(_generation == loadedGeneration)
- {
- for(size_t i = 0; i < size; ++i)
- {
- const Identity& ident = identities[i];
-
- EvictorMap::iterator p = _evictorMap.find(ident);
- if(p == _evictorMap.end())
- {
- p = insertElement(0, ident, evictorElements[i]);
- }
- }
- }
- //
- // Otherwise we don't insert anything
- //
- }
-}
-
-//
-// Marshaling/unmarshaling persistent (key, data) pairs. The marshalRoot function
-// is used to create a key prefix containing only the key's identity.
-//
-
-void
-Freeze::EvictorI::marshalRoot(const Identity& v, Key& bytes, const CommunicatorPtr& communicator)
-{
- IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
- IceInternal::BasicStream stream(instance.get());
- v.__write(&stream);
- bytes.swap(stream.b);
-}
-
-void
-Freeze::EvictorI::marshal(const EvictorStorageKey& v, Key& bytes, const CommunicatorPtr& communicator)
-{
- IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
- IceInternal::BasicStream stream(instance.get());
- v.__write(&stream);
- bytes.swap(stream.b);
-}
-
-void
-Freeze::EvictorI::unmarshal(EvictorStorageKey& v, const Key& bytes, const CommunicatorPtr& communicator)
-{
- IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
- IceInternal::BasicStream stream(instance.get());
- stream.b = bytes;
- stream.i = stream.b.begin();
- v.__read(&stream);
-}
-
-void
-Freeze::EvictorI::marshal(const ObjectRecord& v, Value& bytes, const CommunicatorPtr& communicator)
-{
- IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
- IceInternal::BasicStream stream(instance.get());
- stream.marshalFacets(false);
- stream.startWriteEncaps();
- v.__write(&stream);
- stream.writePendingObjects();
- stream.endWriteEncaps();
- bytes.swap(stream.b);
-}
-
-void
-Freeze::EvictorI::unmarshal(ObjectRecord& v, const Value& bytes, const CommunicatorPtr& communicator)
-{
- IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
- IceInternal::BasicStream stream(instance.get());
- stream.sliceObjects(false);
- stream.b = bytes;
- stream.i = stream.b.begin();
- stream.startReadEncaps();
- v.__read(&stream);
- stream.readPendingObjects();
- stream.endReadEncaps();
-}
void
Freeze::EvictorI::evict()
{
- list<EvictorMap::iterator>::reverse_iterator p = _evictorList.rbegin();
-
//
- // With most STL implementations, _evictorMap.size() is faster
- // than _evictorList.size().
+ // Must be called with *this locked
//
- while(_evictorMap.size() > _evictorSize)
+
+ assert(_currentEvictorSize == _evictorList.size());
+
+ list<EvictorElementPtr>::reverse_iterator p = _evictorList.rbegin();
+
+ while(_currentEvictorSize > _evictorSize)
{
//
// Get the last unused element from the evictor queue.
//
- EvictorMap::iterator q;
while(p != _evictorList.rend())
{
- q = *p;
- if(q->second->usageCount == 0)
+ if((*p)->usageCount == 0)
{
break; // Fine, servant is not in use (and not in the modifiedQueue)
}
@@ -1671,588 +1195,164 @@ Freeze::EvictorI::evict()
break;
}
+ EvictorElementPtr& element = *p;
+ assert(!element->stale);
+
+ // temporary
+ assert(element->status == EvictorElement::dead || element->status == EvictorElement::clean);
+
if(_trace >= 2 || (_trace >= 1 && _evictorList.size() % 50 == 0))
{
+ string facet = element->store.facet();
+
Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "evicting \"" << q->first << "\" from the queue\n"
- << "number of elements in the queue: " << _evictorMap.size();
+ out << "evicting \"" << element->cachePosition->first << "\" ";
+ if(facet != "")
+ {
+ out << "-f \"" << facet << "\" ";
+ }
+ out << "from the queue\n"
+ << "number of elements in the queue: " << _currentEvictorSize;
}
-
- EvictorElementPtr& element = q->second;
+
//
// Remove last unused element from the evictor queue.
//
- assert(--(p.base()) == element->position);
- p = list<EvictorMap::iterator>::reverse_iterator(_evictorList.erase(element->position));
- _evictorMap.erase(q);
+ element->stale = true;
+ element->store.unpin(element->cachePosition);
+ p = list<EvictorElementPtr>::reverse_iterator(_evictorList.erase(element->evictPosition));
+ _currentEvictorSize--;
}
}
-bool
-Freeze::EvictorI::dbHasObject(const Identity& ident)
+void
+Freeze::EvictorI::fixEvictPosition(const EvictorElementPtr& element)
{
- EvictorStorageKey esk;
- esk.identity = ident;
-
- Key key;
- marshal(esk, key, _communicator);
- Dbt dbKey;
- initializeInDbt(key, dbKey);
-
- //
- // Keep 0 length since we're not interested in the data
- //
- Dbt dbValue;
- dbValue.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
-
- for(;;)
+ assert(!element->stale);
+ if(element->usageCount < 0)
{
- try
- {
- int err = _db->get(0, &dbKey, &dbValue, 0);
-
- if(err == 0)
- {
- return true;
- }
- else if(err == DB_NOTFOUND)
- {
- return false;
- }
- else
- {
- assert(0);
- throw DatabaseException(__FILE__, __LINE__);
- }
- }
- catch(const DbDeadlockException&)
- {
- if(_deadlockWarning)
- {
- Warning out(_communicator->getLogger());
- out << "Deadlock in Freeze::EvictorI::dbHasObject while searching \""
- << _dbName << "\"; retrying ...";
- }
-
- //
- // Ignored, try again
- //
- }
- catch(const DbException& dx)
- {
- DatabaseException ex(__FILE__, __LINE__);
- ex.message = dx.what();
- throw ex;
- }
+ //
+ // New object
+ //
+ element->usageCount = 0;
+ _currentEvictorSize++;
}
-}
-
-void
-Freeze::EvictorI::addToModifiedQueue(const Freeze::EvictorI::FacetPtr& facet)
-{
- facet->element->usageCount++;
- _modifiedQueue.push_back(facet);
-
- if(_saveSizeTrigger >= 0 && static_cast<Int>(_modifiedQueue.size()) >= _saveSizeTrigger)
+ else
{
- notifyAll();
+ _evictorList.erase(element->evictPosition);
}
+ _evictorList.push_front(element);
+ element->evictPosition = _evictorList.begin();
}
-void
-Freeze::EvictorI::streamFacet(const FacetPtr& facet, const FacetPath& facetPath, Byte status,
- Long streamStart, StreamedObject& obj)
+void
+Freeze::EvictorI::evict(const EvictorElementPtr& element)
{
- EvictorStorageKey esk;
- esk.identity.name = facet->element->identity->name;
- esk.identity.category = facet->element->identity->category;
- esk.facet = facetPath;
- marshal(esk, obj.key, _communicator);
- obj.status = status;
- if(status != destroyed)
- {
- writeObjectRecordToValue(streamStart, facet->rec, obj.value);
- }
+ assert(!element->stale);
+
+ // temporary
+ assert(element->status == EvictorElement::dead || element->status == EvictorElement::clean);
+
+ _evictorList.erase(element->evictPosition);
+ _currentEvictorSize--;
+ element->stale = true;
+ element->store.unpin(element->cachePosition);
}
+
void
-Freeze::EvictorI::saveNowNoSync()
+Freeze::EvictorI::addToModifiedQueue(const EvictorElementPtr& element)
{
- IceUtil::ThreadControl myself;
-
- _saveNowThreads.push_back(myself);
- notifyAll();
- do
+ element->usageCount++;
+ _modifiedQueue.push_back(element);
+
+ if(_saveSizeTrigger >= 0 && static_cast<Int>(_modifiedQueue.size()) >= _saveSizeTrigger)
{
- wait();
+ notifyAll();
}
- while(find(_saveNowThreads.begin(), _saveNowThreads.end(), myself) != _saveNowThreads.end());
}
+
void
-Freeze::EvictorI::writeObjectRecordToValue(Long streamStart, ObjectRecord& rec, Value& value)
+Freeze::EvictorI::stream(const EvictorElementPtr& element, Long streamStart, StreamedObject& obj)
{
- //
- // Update stats first
- //
- Statistics& stats = rec.stats;
- Long diff = streamStart - (stats.creationTime + stats.lastSaveTime);
- if(stats.lastSaveTime == 0)
- {
- stats.lastSaveTime = diff;
- stats.avgSaveTime = diff;
- }
- else
- {
- stats.lastSaveTime = streamStart - stats.creationTime;
- stats.avgSaveTime = static_cast<Long>(stats.avgSaveTime * 0.95 + diff * 0.05);
- }
+ assert(element->status != EvictorElement::dead);
- marshal(rec, value, _communicator);
-}
-
-Freeze::EvictorI::EvictorElementPtr
-Freeze::EvictorI::load(const Identity& ident)
-{
- //
- // This method attempts to restore an object and all of its facets from the database. It works by
- // iterating over the database keys that match the "root" key. The root key is the encoded portion
- // of the EvictorStorageKey struct that the object and its facets all have in common, namely the
- // identity.
- //
- Key root;
- marshalRoot(ident, root, _communicator);
-
- Key key(root);
- const size_t defaultKeySize = 1024;
- key.resize(defaultKeySize);
-
- const size_t defaultValueSize = 1024;
- Value value(defaultValueSize);
-
- EvictorElementPtr result;
+ obj.status = element->status;
+ obj.store = &element->store;
+
+ const Identity& ident = element->cachePosition->first;
+ ObjectStore::marshal(ident, obj.key, _communicator);
- for(;;)
+ if(element->status != EvictorElement::destroyed)
{
- result = new EvictorElement;
-
- Dbc* dbc = 0;
- int rs = 0;
-
- try
- {
- //
- // Open cursor
- //
- _db->cursor(0, &dbc, 0);
-
- //
- // We position the cursor at the key for the main object.
- //
- Dbt dbKey;
- initializeOutDbt(key, dbKey);
- dbKey.set_size(static_cast<u_int32_t>(root.size()));
-
- Dbt dbValue;
- initializeOutDbt(value, dbValue);
-
- //
- // Get first pair
- //
- for(;;)
- {
- try
- {
- rs = dbc->get(&dbKey, &dbValue, DB_SET_RANGE);
- break;
- }
- catch(const DbMemoryException& dx)
- {
- handleMemoryException(dx, key, dbKey, value, dbValue);
- }
- }
-
- while(rs == 0 && startWith(key, root))
- {
- //
- // Unmarshal key and data and insert it into result's facet map
- //
- EvictorStorageKey esk;
- unmarshal(esk, key, _communicator);
-
- if(_trace >= 3)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "reading facet identity = \"" << esk.identity << "\" ";
- if(esk.facet.size() == 0)
- {
- out << "(main object)";
- }
- else
- {
- out << "facet = " << esk.facet;
- }
- }
-
- //
- // The Ice encoding of Identity is such that startWith(key, root)
- // implies esk.identity == ident
- //
- assert(esk.identity == ident);
-
- FacetPtr facet = new Facet(result.get());
- facet->status = clean;
- unmarshal(facet->rec, value, _communicator);
-
- pair<FacetMap::iterator, bool> pair;
- pair = result->facets.insert(FacetMap::value_type(esk.facet, facet));
- assert(pair.second);
- facet->position = pair.first;
-
- if(esk.facet.size() == 0)
- {
- result->mainObject = facet;
- }
-
- //
- // Next facet
- //
- initializeOutDbt(key, dbKey);
- initializeOutDbt(value, dbValue);
-
- for(;;)
- {
- try
- {
- rs = dbc->get(&dbKey, &dbValue, DB_NEXT);
- break; // for(;;)
- }
- catch(const DbMemoryException& dx)
- {
- handleMemoryException(dx, key, dbKey, value, dbValue);
- }
- }
- }
-
- Dbc* toClose = dbc;
- dbc = 0;
- toClose->close();
- break; // for (;;)
- }
- catch(const DbDeadlockException&)
- {
- if(dbc != 0)
- {
- try
- {
- dbc->close();
- }
- catch(...)
- {
- }
- }
-
- if(_deadlockWarning)
- {
- Warning out(_communicator->getLogger());
- out << "Deadlock in Freeze::EvictorI::load while searching \""
- << _dbName << "\"; retrying ...";
- }
-
- //
- // Try again
- //
- }
- catch(const DbException& dx)
+ //
+ // Update stats first
+ //
+ Statistics& stats = element->rec.stats;
+ Long diff = streamStart - (stats.creationTime + stats.lastSaveTime);
+ if(stats.lastSaveTime == 0)
{
- if(dbc != 0)
- {
- try
- {
- dbc->close();
- }
- catch(...)
- {
- }
- }
- DatabaseException ex(__FILE__, __LINE__);
- ex.message = dx.what();
- throw ex;
+ stats.lastSaveTime = diff;
+ stats.avgSaveTime = diff;
}
- catch(...)
- {
- try
- {
- dbc->close();
- }
- catch(...)
- {
- }
- throw;
- }
- }
-
- if(result->facets.size() == 0)
- {
- if(_trace >= 2)
+ else
{
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "could not find \"" << ident << "\" in the database";
+ stats.lastSaveTime = streamStart - stats.creationTime;
+ stats.avgSaveTime = static_cast<Long>(stats.avgSaveTime * 0.95 + diff * 0.05);
}
- return 0;
+ ObjectStore::marshal(element->rec, obj.value, _communicator);
}
-
- buildFacetMap(result->facets);
- return result;
}
-Freeze::EvictorI::EvictorMap::iterator
-Freeze::EvictorI::insertElement(const ObjectAdapterPtr& adapter, const Identity& ident, const EvictorElementPtr& element)
-{
- if(_initializer)
- {
- _initializer->initialize(adapter, ident, element->mainObject->rec.servant);
- }
-
- pair<EvictorMap::iterator, bool> pair = _evictorMap.insert(EvictorMap::value_type(ident, element));
- assert(pair.second);
- EvictorMap::iterator p = pair.first;
- element->identity = &p->first;
-
- _evictorList.push_front(p);
- element->position = _evictorList.begin();
-
- return p;
-}
-
-
void
-Freeze::EvictorI::addFacetImpl(EvictorElementPtr& element, const ObjectPtr& servant,
- const FacetPath& facetPath, bool replacing)
+Freeze::EvictorI::saveNow()
{
- FacetMap& facets = element->facets;
-
- bool insertIt = true;
-
- if(replacing)
- {
- FacetMap::iterator q = facets.find(facetPath);
-
- if(q != facets.end())
- {
- FacetPtr& facet = q->second;
-
- {
- IceUtil::Mutex::Lock lockFacet(facet->mutex);
-
- switch(facet->status)
- {
- case clean:
- {
- facet->status = modified;
- addToModifiedQueue(facet);
- break;
- }
- case created:
- case modified:
- {
- //
- // Nothing to do.
- // No need to push it on the modified queue as a created resp
- // modified facet is either already on the queue or about
- // to be saved. When saved, it becomes clean.
- //
- break;
- }
- case destroyed:
- {
- facet->status = modified;
- //
- // No need to push it on the modified queue, as a destroyed facet
- // is either already on the queue or about to be saved. When saved,
- // it becomes dead.
- //
- break;
- }
- case dead:
- {
- facet->status = created;
- addToModifiedQueue(facet);
- break;
- }
- default:
- {
- assert(0);
- break;
- }
- }
- facet->rec.servant = servant;
- insertIt = false;
- }
- }
- }
+ Lock sync(*this);
- if(insertIt)
+ if(_deactivated)
{
- FacetPtr facet = new Facet(element.get());
- facet->status = created;
-
- ObjectRecord& rec = facet->rec;
- rec.servant = servant;
- rec.stats.creationTime = IceUtil::Time::now().toMilliSeconds();
- rec.stats.lastSaveTime = 0;
- rec.stats.avgSaveTime = 0;
-
- pair<FacetMap::iterator, bool> insertResult = facets.insert(FacetMap::value_type(facetPath, facet));
- assert(insertResult.second);
- facet->position = insertResult.first;
-
- if(facetPath.size() == 0)
- {
- element->mainObject = facet;
- }
- addToModifiedQueue(facet);
+ throw EvictorDeactivatedException(__FILE__, __LINE__);
}
-
- if(servant != 0)
- {
- //
- // Add servant's facets
- //
- vector<string> facetList = servant->ice_facets();
- for(vector<string>::iterator r = facetList.begin(); r != facetList.end(); r++)
- {
- FacetPath newFacetPath(facetPath);
- newFacetPath.push_back(*r);
- addFacetImpl(element, servant->ice_findFacet(*r), newFacetPath, replacing);
- }
- }
+ saveNowNoSync();
}
-
void
-Freeze::EvictorI::removeFacetImpl(FacetMap& facets, const FacetPath& facetPath)
+Freeze::EvictorI::saveNowNoSync()
{
- FacetMap::iterator q = facets.find(facetPath);
- ObjectPtr servant = 0;
-
- if(q != facets.end())
- {
- servant = destroyFacetImpl(q->second);
- }
- //
- // else should we raise an exception?
- //
+ IceUtil::ThreadControl myself;
- if(servant != 0)
+ _saveNowThreads.push_back(myself);
+ notifyAll();
+ do
{
- //
- // Remove servant's facets
- //
- vector<string> facetList = servant->ice_facets();
- for(vector<string>::iterator r = facetList.begin(); r != facetList.end(); r++)
- {
- FacetPath newFacetPath(facetPath);
- newFacetPath.push_back(*r);
- removeFacetImpl(facets, newFacetPath);
- }
+ wait();
}
+ while(find(_saveNowThreads.begin(), _saveNowThreads.end(), myself) != _saveNowThreads.end());
}
-
-ObjectPtr
-Freeze::EvictorI::destroyFacetImpl(const Freeze::EvictorI::FacetPtr& facet)
+Freeze::ObjectStore*
+Freeze::EvictorI::findStore(const string& facet) const
{
- IceUtil::Mutex::Lock lockFacet(facet->mutex);
- switch(facet->status)
+ Lock sync(*this);
+ if(_deactivated)
{
- case clean:
- {
- facet->status = destroyed;
- addToModifiedQueue(facet);
- break;
- }
- case created:
- {
- facet->status = dead;
- break;
- }
- case modified:
- {
- facet->status = destroyed;
- //
- // Not necessary to push it on the modified queue, as a modified
- // element is either on the queue already or about to be saved
- // (at which point it becomes clean)
- //
- break;
- }
- case destroyed:
- case dead:
- {
- //
- // Nothing to do!
- //
- break;
- }
- default:
- {
- assert(0);
- break;
- }
+ throw EvictorDeactivatedException(__FILE__, __LINE__);
}
- return facet->rec.servant;
-}
-void
-Freeze::EvictorI::buildFacetMap(const FacetMap& facets)
-{
- for(FacetMap::const_iterator q = facets.begin(); q != facets.end(); q++)
+ StoreMap::const_iterator p = _storeMap.find(facet);
+ if(p == _storeMap.end())
{
- const FacetPath& facetPath = q->first;
-
- if(facetPath.size() > 0)
- {
- FacetPath parent(facetPath);
- parent.pop_back();
- FacetMap::const_iterator r = facets.find(parent);
- if(r == facets.end())
- {
- //
- // TODO: tracing for this orphna facet
- //
- }
- else
- {
- r->second->rec.servant->ice_addFacet(q->second->rec.servant, facetPath[facetPath.size() - 1]);
- }
- }
+ return 0;
+ }
+ else
+ {
+ return (*p).second;
}
}
-Freeze::EvictorI::Facet::Facet(EvictorElement* elt) :
- status(dead),
- element(elt)
-{
-}
-
-Freeze::EvictorI::EvictorElement::EvictorElement() :
- usageCount(0),
- identity(0),
- mainObject(0)
-{
-}
-
-Freeze::EvictorI::EvictorElement::~EvictorElement()
-{
-}
-
-
-
-
-
//
// Print for the various exception types.
//
@@ -2271,9 +1371,3 @@ Freeze::NoSuchElementException::ice_print(ostream& out) const
out << ":\nno such element";
}
-void
-Freeze::EmptyFacetPathException::ice_print(ostream& out) const
-{
- Exception::ice_print(out);
- out << ":\nempty facet path";
-}