summaryrefslogtreecommitdiff
path: root/cpp/src/Freeze/EvictorI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Freeze/EvictorI.cpp')
-rw-r--r--cpp/src/Freeze/EvictorI.cpp104
1 files changed, 71 insertions, 33 deletions
diff --git a/cpp/src/Freeze/EvictorI.cpp b/cpp/src/Freeze/EvictorI.cpp
index ca1a1e5fa1f..cffc2df5477 100644
--- a/cpp/src/Freeze/EvictorI.cpp
+++ b/cpp/src/Freeze/EvictorI.cpp
@@ -89,6 +89,7 @@ void
Freeze::EvictorI::init(const string& envName, bool createDb)
{
_trace = _communicator->getProperties()->getPropertyAsInt("Freeze.Trace.Evictor");
+ _deadlockWarning = (_communicator->getProperties()->getPropertyAsInt("Freeze.Warn.Deadlocks") != 0);
string propertyPrefix = string("Freeze.Evictor.") + envName + '.' + _dbName;
@@ -284,7 +285,7 @@ Freeze::EvictorI::createObject(const Identity& ident, const ObjectPtr& servant)
//
for(FacetMap::iterator q = element->facets.begin(); q != element->facets.end(); q++)
{
- destroyFacetImpl(q, q->second);
+ destroyFacetImpl(q->second);
}
}
else
@@ -485,7 +486,7 @@ Freeze::EvictorI::destroyObject(const Identity& ident)
//
for(FacetMap::iterator q = element->facets.begin(); q != element->facets.end(); q++)
{
- destroyFacetImpl(q, q->second);
+ destroyFacetImpl(q->second);
}
}
@@ -674,7 +675,7 @@ Freeze::EvictorI::removeAllFacets(const Identity& ident)
{
if(q->second != element->mainObject)
{
- destroyFacetImpl(q, q->second);
+ destroyFacetImpl(q->second);
}
}
}
@@ -949,10 +950,7 @@ Freeze::EvictorI::finished(const Current& current, const ObjectPtr& servant, con
if(enqueue)
{
- FacetMap::iterator q = facet->element->facets.find(current.facet);
- assert(q != facet->element->facets.end());
-
- addToModifiedQueue(q, facet);
+ addToModifiedQueue(facet);
}
else
{
@@ -1015,11 +1013,11 @@ Freeze::EvictorI::deactivate(const string&)
}
void
-Freeze::EvictorI::run()
+Freeze::EvictorI::run() throw()
{
for(;;)
{
- deque<FacetMap::iterator> allObjects;
+ deque<FacetPtr> allObjects;
size_t saveNowThreadsSize = 0;
{
@@ -1074,14 +1072,14 @@ Freeze::EvictorI::run()
deque<StreamedObject> streamedObjectQueue;
- Long saveStart = IceUtil::Time::now().toMilliSeconds();
+ Long streamStart = IceUtil::Time::now().toMilliSeconds();
//
// Stream each element
//
for(size_t i = 0; i < size; i++)
{
- FacetPtr& facet = allObjects[i]->second;
+ FacetPtr& facet = allObjects[i];
bool tryAgain;
do
@@ -1106,7 +1104,7 @@ Freeze::EvictorI::run()
size_t index = streamedObjectQueue.size();
streamedObjectQueue.resize(index + 1);
StreamedObject& obj = streamedObjectQueue[index];
- streamFacet(facet, allObjects[i]->first, status, saveStart, obj);
+ streamFacet(facet, facet->position->first, status, streamStart, obj);
break;
}
default:
@@ -1151,7 +1149,7 @@ Freeze::EvictorI::run()
size_t index = streamedObjectQueue.size();
streamedObjectQueue.resize(index + 1);
StreamedObject& obj = streamedObjectQueue[index];
- streamFacet(facet, allObjects[i]->first, status, saveStart, obj);
+ streamFacet(facet, facet->position->first, status, streamStart, obj);
}
else
{
@@ -1166,7 +1164,7 @@ Freeze::EvictorI::run()
size_t index = streamedObjectQueue.size();
streamedObjectQueue.resize(index + 1);
StreamedObject& obj = streamedObjectQueue[index];
- streamFacet(facet, allObjects[i]->first, status, saveStart, obj);
+ streamFacet(facet, facet->position->first, status, streamStart, obj);
break;
}
default:
@@ -1189,6 +1187,14 @@ Freeze::EvictorI::run()
} while(tryAgain);
}
+ if(_trace >= 1)
+ {
+ Long now = IceUtil::Time::now().toMilliSeconds();
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "streamed " << streamedObjectQueue.size() << " objects in "
+ << static_cast<Int>(now - streamStart) << " ms";
+ }
+
//
// Now let's save all these streamed objects to disk using a transaction
//
@@ -1215,6 +1221,7 @@ Freeze::EvictorI::run()
txSize = streamedObjectQueue.size();
}
+ Long saveStart = IceUtil::Time::now().toMilliSeconds();
try
{
DbTxn* tx = 0;
@@ -1276,7 +1283,6 @@ Freeze::EvictorI::run()
Trace out(_communicator->getLogger(), "Freeze.Evictor");
out << "saved " << txSize << " objects in "
<< static_cast<Int>(now - saveStart) << " ms";
- saveStart = now;
}
}
catch(const DbDeadlockException&)
@@ -1294,16 +1300,31 @@ Freeze::EvictorI::run()
}
while(tryAgain);
-
{
Lock sync(*this);
_generation++;
- for(deque<FacetMap::iterator>::iterator q = allObjects.begin();
+ for(deque<FacetPtr>::iterator q = allObjects.begin();
q != allObjects.end(); q++)
{
- (*q)->second->element->usageCount--;
+ FacetPtr& facet = *q;
+ facet->element->usageCount--;
+
+ if(facet != facet->element->mainObject)
+ {
+ //
+ // Remove if dead
+ //
+ IceUtil::Mutex::Lock lockFacet(facet->mutex);
+ {
+ if(facet->status == dead)
+ {
+ facet->element->facets.erase(facet->position);
+ facet->position = facet->element->facets.end();
+ }
+ }
+ }
}
allObjects.clear();
evict();
@@ -1387,6 +1408,7 @@ Freeze::EvictorI::load(Dbc* dbc, Key& key, Value& value,
pair<FacetMap::iterator, bool> pair;
pair = elt->facets.insert(FacetMap::value_type(esk.facet, facet));
assert(pair.second);
+ facet->position = pair.first;
if(esk.facet.size() == 0)
{
@@ -1627,7 +1649,7 @@ Freeze::EvictorI::evict()
break;
}
- if(_trace >= 2)
+ if(_trace >= 2 || (_trace >= 1 && _evictorList.size() % 50 == 0))
{
Trace out(_communicator->getLogger(), "Freeze.Evictor");
out << "evicting \"" << q->first << "\" from the queue\n"
@@ -1683,6 +1705,13 @@ Freeze::EvictorI::dbHasObject(const Identity& ident)
}
catch(const DbDeadlockException&)
{
+ if(_deadlockWarning)
+ {
+ Warning out(_communicator->getLogger());
+ out << "Deadlock in Freeze::EvictorI::dbHasObject while searching \""
+ << _dbName << "\"; retrying ...";
+ }
+
//
// Ignored, try again
//
@@ -1697,11 +1726,10 @@ Freeze::EvictorI::dbHasObject(const Identity& ident)
}
void
-Freeze::EvictorI::addToModifiedQueue(const Freeze::EvictorI::FacetMap::iterator& q,
- const Freeze::EvictorI::FacetPtr& facet)
+Freeze::EvictorI::addToModifiedQueue(const Freeze::EvictorI::FacetPtr& facet)
{
facet->element->usageCount++;
- _modifiedQueue.push_back(q);
+ _modifiedQueue.push_back(facet);
if(_saveSizeTrigger >= 0 && static_cast<Int>(_modifiedQueue.size()) >= _saveSizeTrigger)
{
@@ -1711,7 +1739,7 @@ Freeze::EvictorI::addToModifiedQueue(const Freeze::EvictorI::FacetMap::iterator&
void
Freeze::EvictorI::streamFacet(const FacetPtr& facet, const FacetPath& facetPath, Byte status,
- Long saveStart, StreamedObject& obj)
+ Long streamStart, StreamedObject& obj)
{
EvictorStorageKey esk;
esk.identity.name = facet->element->identity->name;
@@ -1721,7 +1749,7 @@ Freeze::EvictorI::streamFacet(const FacetPtr& facet, const FacetPath& facetPath,
obj.status = status;
if(status != destroyed)
{
- writeObjectRecordToValue(saveStart, facet->rec, obj.value);
+ writeObjectRecordToValue(streamStart, facet->rec, obj.value);
}
}
@@ -1740,13 +1768,13 @@ Freeze::EvictorI::saveNowNoSync()
}
void
-Freeze::EvictorI::writeObjectRecordToValue(Long saveStart, ObjectRecord& rec, Value& value)
+Freeze::EvictorI::writeObjectRecordToValue(Long streamStart, ObjectRecord& rec, Value& value)
{
//
// Update stats first
//
Statistics& stats = rec.stats;
- Long diff = saveStart - (stats.creationTime + stats.lastSaveTime);
+ Long diff = streamStart - (stats.creationTime + stats.lastSaveTime);
if(stats.lastSaveTime == 0)
{
stats.lastSaveTime = diff;
@@ -1754,7 +1782,7 @@ Freeze::EvictorI::writeObjectRecordToValue(Long saveStart, ObjectRecord& rec, Va
}
else
{
- stats.lastSaveTime = saveStart - stats.creationTime;
+ stats.lastSaveTime = streamStart - stats.creationTime;
stats.avgSaveTime = static_cast<Long>(stats.avgSaveTime * 0.95 + diff * 0.05);
}
@@ -1857,6 +1885,7 @@ Freeze::EvictorI::load(const Identity& ident)
pair<FacetMap::iterator, bool> pair;
pair = result->facets.insert(FacetMap::value_type(esk.facet, facet));
assert(pair.second);
+ facet->position = pair.first;
if(esk.facet.size() == 0)
{
@@ -1901,6 +1930,13 @@ Freeze::EvictorI::load(const Identity& ident)
}
}
+ if(_deadlockWarning)
+ {
+ Warning out(_communicator->getLogger());
+ out << "Deadlock in Freeze::EvictorI::load while searching \""
+ << _dbName << "\"; retrying ...";
+ }
+
//
// Try again
//
@@ -1992,7 +2028,7 @@ Freeze::EvictorI::addFacetImpl(EvictorElementPtr& element, const ObjectPtr& serv
case clean:
{
facet->status = modified;
- addToModifiedQueue(q, facet);
+ addToModifiedQueue(facet);
break;
}
case created:
@@ -2019,7 +2055,7 @@ Freeze::EvictorI::addFacetImpl(EvictorElementPtr& element, const ObjectPtr& serv
case dead:
{
facet->status = created;
- addToModifiedQueue(q, facet);
+ addToModifiedQueue(facet);
break;
}
default:
@@ -2047,11 +2083,13 @@ Freeze::EvictorI::addFacetImpl(EvictorElementPtr& element, const ObjectPtr& serv
pair<FacetMap::iterator, bool> insertResult = facets.insert(FacetMap::value_type(facetPath, facet));
assert(insertResult.second);
+ facet->position = insertResult.first;
+
if(facetPath.size() == 0)
{
element->mainObject = facet;
}
- addToModifiedQueue(insertResult.first, facet);
+ addToModifiedQueue(facet);
}
@@ -2079,7 +2117,7 @@ Freeze::EvictorI::removeFacetImpl(FacetMap& facets, const FacetPath& facetPath)
if(q != facets.end())
{
- servant = destroyFacetImpl(q, q->second);
+ servant = destroyFacetImpl(q->second);
}
//
// else should we raise an exception?
@@ -2102,7 +2140,7 @@ Freeze::EvictorI::removeFacetImpl(FacetMap& facets, const FacetPath& facetPath)
ObjectPtr
-Freeze::EvictorI::destroyFacetImpl(Freeze::EvictorI::FacetMap::iterator& q, const Freeze::EvictorI::FacetPtr& facet)
+Freeze::EvictorI::destroyFacetImpl(const Freeze::EvictorI::FacetPtr& facet)
{
IceUtil::Mutex::Lock lockFacet(facet->mutex);
switch(facet->status)
@@ -2110,7 +2148,7 @@ Freeze::EvictorI::destroyFacetImpl(Freeze::EvictorI::FacetMap::iterator& q, cons
case clean:
{
facet->status = destroyed;
- addToModifiedQueue(q, facet);
+ addToModifiedQueue(facet);
break;
}
case created: