diff options
Diffstat (limited to 'cpp/src/Freeze/EvictorI.cpp')
-rw-r--r-- | cpp/src/Freeze/EvictorI.cpp | 104 |
1 files changed, 71 insertions, 33 deletions
diff --git a/cpp/src/Freeze/EvictorI.cpp b/cpp/src/Freeze/EvictorI.cpp index ca1a1e5fa1f..cffc2df5477 100644 --- a/cpp/src/Freeze/EvictorI.cpp +++ b/cpp/src/Freeze/EvictorI.cpp @@ -89,6 +89,7 @@ void Freeze::EvictorI::init(const string& envName, bool createDb) { _trace = _communicator->getProperties()->getPropertyAsInt("Freeze.Trace.Evictor"); + _deadlockWarning = (_communicator->getProperties()->getPropertyAsInt("Freeze.Warn.Deadlocks") != 0); string propertyPrefix = string("Freeze.Evictor.") + envName + '.' + _dbName; @@ -284,7 +285,7 @@ Freeze::EvictorI::createObject(const Identity& ident, const ObjectPtr& servant) // for(FacetMap::iterator q = element->facets.begin(); q != element->facets.end(); q++) { - destroyFacetImpl(q, q->second); + destroyFacetImpl(q->second); } } else @@ -485,7 +486,7 @@ Freeze::EvictorI::destroyObject(const Identity& ident) // for(FacetMap::iterator q = element->facets.begin(); q != element->facets.end(); q++) { - destroyFacetImpl(q, q->second); + destroyFacetImpl(q->second); } } @@ -674,7 +675,7 @@ Freeze::EvictorI::removeAllFacets(const Identity& ident) { if(q->second != element->mainObject) { - destroyFacetImpl(q, q->second); + destroyFacetImpl(q->second); } } } @@ -949,10 +950,7 @@ Freeze::EvictorI::finished(const Current& current, const ObjectPtr& servant, con if(enqueue) { - FacetMap::iterator q = facet->element->facets.find(current.facet); - assert(q != facet->element->facets.end()); - - addToModifiedQueue(q, facet); + addToModifiedQueue(facet); } else { @@ -1015,11 +1013,11 @@ Freeze::EvictorI::deactivate(const string&) } void -Freeze::EvictorI::run() +Freeze::EvictorI::run() throw() { for(;;) { - deque<FacetMap::iterator> allObjects; + deque<FacetPtr> allObjects; size_t saveNowThreadsSize = 0; { @@ -1074,14 +1072,14 @@ Freeze::EvictorI::run() deque<StreamedObject> streamedObjectQueue; - Long saveStart = IceUtil::Time::now().toMilliSeconds(); + Long streamStart = IceUtil::Time::now().toMilliSeconds(); // // Stream each element // for(size_t i = 0; i < size; i++) { - FacetPtr& facet = allObjects[i]->second; + FacetPtr& facet = allObjects[i]; bool tryAgain; do @@ -1106,7 +1104,7 @@ Freeze::EvictorI::run() size_t index = streamedObjectQueue.size(); streamedObjectQueue.resize(index + 1); StreamedObject& obj = streamedObjectQueue[index]; - streamFacet(facet, allObjects[i]->first, status, saveStart, obj); + streamFacet(facet, facet->position->first, status, streamStart, obj); break; } default: @@ -1151,7 +1149,7 @@ Freeze::EvictorI::run() size_t index = streamedObjectQueue.size(); streamedObjectQueue.resize(index + 1); StreamedObject& obj = streamedObjectQueue[index]; - streamFacet(facet, allObjects[i]->first, status, saveStart, obj); + streamFacet(facet, facet->position->first, status, streamStart, obj); } else { @@ -1166,7 +1164,7 @@ Freeze::EvictorI::run() size_t index = streamedObjectQueue.size(); streamedObjectQueue.resize(index + 1); StreamedObject& obj = streamedObjectQueue[index]; - streamFacet(facet, allObjects[i]->first, status, saveStart, obj); + streamFacet(facet, facet->position->first, status, streamStart, obj); break; } default: @@ -1189,6 +1187,14 @@ Freeze::EvictorI::run() } while(tryAgain); } + if(_trace >= 1) + { + Long now = IceUtil::Time::now().toMilliSeconds(); + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "streamed " << streamedObjectQueue.size() << " objects in " + << static_cast<Int>(now - streamStart) << " ms"; + } + // // Now let's save all these streamed objects to disk using a transaction // @@ -1215,6 +1221,7 @@ Freeze::EvictorI::run() txSize = streamedObjectQueue.size(); } + Long saveStart = IceUtil::Time::now().toMilliSeconds(); try { DbTxn* tx = 0; @@ -1276,7 +1283,6 @@ Freeze::EvictorI::run() Trace out(_communicator->getLogger(), "Freeze.Evictor"); out << "saved " << txSize << " objects in " << static_cast<Int>(now - saveStart) << " ms"; - saveStart = now; } } catch(const DbDeadlockException&) @@ -1294,16 +1300,31 @@ Freeze::EvictorI::run() } while(tryAgain); - { Lock sync(*this); _generation++; - for(deque<FacetMap::iterator>::iterator q = allObjects.begin(); + for(deque<FacetPtr>::iterator q = allObjects.begin(); q != allObjects.end(); q++) { - (*q)->second->element->usageCount--; + FacetPtr& facet = *q; + facet->element->usageCount--; + + if(facet != facet->element->mainObject) + { + // + // Remove if dead + // + IceUtil::Mutex::Lock lockFacet(facet->mutex); + { + if(facet->status == dead) + { + facet->element->facets.erase(facet->position); + facet->position = facet->element->facets.end(); + } + } + } } allObjects.clear(); evict(); @@ -1387,6 +1408,7 @@ Freeze::EvictorI::load(Dbc* dbc, Key& key, Value& value, pair<FacetMap::iterator, bool> pair; pair = elt->facets.insert(FacetMap::value_type(esk.facet, facet)); assert(pair.second); + facet->position = pair.first; if(esk.facet.size() == 0) { @@ -1627,7 +1649,7 @@ Freeze::EvictorI::evict() break; } - if(_trace >= 2) + if(_trace >= 2 || (_trace >= 1 && _evictorList.size() % 50 == 0)) { Trace out(_communicator->getLogger(), "Freeze.Evictor"); out << "evicting \"" << q->first << "\" from the queue\n" @@ -1683,6 +1705,13 @@ Freeze::EvictorI::dbHasObject(const Identity& ident) } catch(const DbDeadlockException&) { + if(_deadlockWarning) + { + Warning out(_communicator->getLogger()); + out << "Deadlock in Freeze::EvictorI::dbHasObject while searching \"" + << _dbName << "\"; retrying ..."; + } + // // Ignored, try again // @@ -1697,11 +1726,10 @@ Freeze::EvictorI::dbHasObject(const Identity& ident) } void -Freeze::EvictorI::addToModifiedQueue(const Freeze::EvictorI::FacetMap::iterator& q, - const Freeze::EvictorI::FacetPtr& facet) +Freeze::EvictorI::addToModifiedQueue(const Freeze::EvictorI::FacetPtr& facet) { facet->element->usageCount++; - _modifiedQueue.push_back(q); + _modifiedQueue.push_back(facet); if(_saveSizeTrigger >= 0 && static_cast<Int>(_modifiedQueue.size()) >= _saveSizeTrigger) { @@ -1711,7 +1739,7 @@ Freeze::EvictorI::addToModifiedQueue(const Freeze::EvictorI::FacetMap::iterator& void Freeze::EvictorI::streamFacet(const FacetPtr& facet, const FacetPath& facetPath, Byte status, - Long saveStart, StreamedObject& obj) + Long streamStart, StreamedObject& obj) { EvictorStorageKey esk; esk.identity.name = facet->element->identity->name; @@ -1721,7 +1749,7 @@ Freeze::EvictorI::streamFacet(const FacetPtr& facet, const FacetPath& facetPath, obj.status = status; if(status != destroyed) { - writeObjectRecordToValue(saveStart, facet->rec, obj.value); + writeObjectRecordToValue(streamStart, facet->rec, obj.value); } } @@ -1740,13 +1768,13 @@ Freeze::EvictorI::saveNowNoSync() } void -Freeze::EvictorI::writeObjectRecordToValue(Long saveStart, ObjectRecord& rec, Value& value) +Freeze::EvictorI::writeObjectRecordToValue(Long streamStart, ObjectRecord& rec, Value& value) { // // Update stats first // Statistics& stats = rec.stats; - Long diff = saveStart - (stats.creationTime + stats.lastSaveTime); + Long diff = streamStart - (stats.creationTime + stats.lastSaveTime); if(stats.lastSaveTime == 0) { stats.lastSaveTime = diff; @@ -1754,7 +1782,7 @@ Freeze::EvictorI::writeObjectRecordToValue(Long saveStart, ObjectRecord& rec, Va } else { - stats.lastSaveTime = saveStart - stats.creationTime; + stats.lastSaveTime = streamStart - stats.creationTime; stats.avgSaveTime = static_cast<Long>(stats.avgSaveTime * 0.95 + diff * 0.05); } @@ -1857,6 +1885,7 @@ Freeze::EvictorI::load(const Identity& ident) pair<FacetMap::iterator, bool> pair; pair = result->facets.insert(FacetMap::value_type(esk.facet, facet)); assert(pair.second); + facet->position = pair.first; if(esk.facet.size() == 0) { @@ -1901,6 +1930,13 @@ Freeze::EvictorI::load(const Identity& ident) } } + if(_deadlockWarning) + { + Warning out(_communicator->getLogger()); + out << "Deadlock in Freeze::EvictorI::load while searching \"" + << _dbName << "\"; retrying ..."; + } + // // Try again // @@ -1992,7 +2028,7 @@ Freeze::EvictorI::addFacetImpl(EvictorElementPtr& element, const ObjectPtr& serv case clean: { facet->status = modified; - addToModifiedQueue(q, facet); + addToModifiedQueue(facet); break; } case created: @@ -2019,7 +2055,7 @@ Freeze::EvictorI::addFacetImpl(EvictorElementPtr& element, const ObjectPtr& serv case dead: { facet->status = created; - addToModifiedQueue(q, facet); + addToModifiedQueue(facet); break; } default: @@ -2047,11 +2083,13 @@ Freeze::EvictorI::addFacetImpl(EvictorElementPtr& element, const ObjectPtr& serv pair<FacetMap::iterator, bool> insertResult = facets.insert(FacetMap::value_type(facetPath, facet)); assert(insertResult.second); + facet->position = insertResult.first; + if(facetPath.size() == 0) { element->mainObject = facet; } - addToModifiedQueue(insertResult.first, facet); + addToModifiedQueue(facet); } @@ -2079,7 +2117,7 @@ Freeze::EvictorI::removeFacetImpl(FacetMap& facets, const FacetPath& facetPath) if(q != facets.end()) { - servant = destroyFacetImpl(q, q->second); + servant = destroyFacetImpl(q->second); } // // else should we raise an exception? @@ -2102,7 +2140,7 @@ Freeze::EvictorI::removeFacetImpl(FacetMap& facets, const FacetPath& facetPath) ObjectPtr -Freeze::EvictorI::destroyFacetImpl(Freeze::EvictorI::FacetMap::iterator& q, const Freeze::EvictorI::FacetPtr& facet) +Freeze::EvictorI::destroyFacetImpl(const Freeze::EvictorI::FacetPtr& facet) { IceUtil::Mutex::Lock lockFacet(facet->mutex); switch(facet->status) @@ -2110,7 +2148,7 @@ Freeze::EvictorI::destroyFacetImpl(Freeze::EvictorI::FacetMap::iterator& q, cons case clean: { facet->status = destroyed; - addToModifiedQueue(q, facet); + addToModifiedQueue(facet); break; } case created: |