diff options
-rw-r--r-- | cpp/src/Freeze/ConnectionI.cpp | 6 | ||||
-rw-r--r-- | cpp/src/Freeze/ConnectionI.h | 10 | ||||
-rw-r--r-- | cpp/src/Freeze/EvictorI.cpp | 104 | ||||
-rw-r--r-- | cpp/src/Freeze/EvictorI.h | 32 | ||||
-rw-r--r-- | cpp/src/Freeze/IndexI.cpp | 16 | ||||
-rw-r--r-- | cpp/src/Freeze/MapI.cpp | 35 |
6 files changed, 159 insertions, 44 deletions
diff --git a/cpp/src/Freeze/ConnectionI.cpp b/cpp/src/Freeze/ConnectionI.cpp index 7d6c9e1fc73..4934925bcc4 100644 --- a/cpp/src/Freeze/ConnectionI.cpp +++ b/cpp/src/Freeze/ConnectionI.cpp @@ -91,7 +91,8 @@ Freeze::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, _communicator(communicator), _dbEnvHolder(SharedDbEnv::get(communicator, envName)), _envName(envName), - _trace(communicator->getProperties()->getPropertyAsInt("Freeze.Trace.Map")) + _trace(communicator->getProperties()->getPropertyAsInt("Freeze.Trace.Map")), + _deadlockWarning(communicator->getProperties()->getPropertyAsInt("Freeze.Warn.Deadlocks") != 0) { _dbEnv = _dbEnvHolder.get(); } @@ -102,7 +103,8 @@ Freeze::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, _communicator(communicator), _dbEnv(&dbEnv), _envName(envName), - _trace(communicator->getProperties()->getPropertyAsInt("Freeze.Trace.Map")) + _trace(communicator->getProperties()->getPropertyAsInt("Freeze.Trace.Map")), + _deadlockWarning(communicator->getProperties()->getPropertyAsInt("Freeze.Warn.Deadlocks") != 0) { } diff --git a/cpp/src/Freeze/ConnectionI.h b/cpp/src/Freeze/ConnectionI.h index edd3edd3501..ba0b9f5b556 100644 --- a/cpp/src/Freeze/ConnectionI.h +++ b/cpp/src/Freeze/ConnectionI.h @@ -81,6 +81,9 @@ public: Ice::Int trace() const; + bool + deadlockWarning() const; + private: Ice::CommunicatorPtr _communicator; @@ -90,6 +93,7 @@ private: TransactionIPtr _transaction; std::list<MapHelperI*> _mapList; Ice::Int _trace; + bool _deadlockWarning; }; inline void @@ -135,6 +139,12 @@ ConnectionI::trace() const return _trace; } +inline bool +ConnectionI::deadlockWarning() const +{ + return _deadlockWarning; +} + } #endif diff --git a/cpp/src/Freeze/EvictorI.cpp b/cpp/src/Freeze/EvictorI.cpp index ca1a1e5fa1f..cffc2df5477 100644 --- a/cpp/src/Freeze/EvictorI.cpp +++ b/cpp/src/Freeze/EvictorI.cpp @@ -89,6 +89,7 @@ void Freeze::EvictorI::init(const string& envName, bool createDb) { _trace = _communicator->getProperties()->getPropertyAsInt("Freeze.Trace.Evictor"); + _deadlockWarning = (_communicator->getProperties()->getPropertyAsInt("Freeze.Warn.Deadlocks") != 0); string propertyPrefix = string("Freeze.Evictor.") + envName + '.' + _dbName; @@ -284,7 +285,7 @@ Freeze::EvictorI::createObject(const Identity& ident, const ObjectPtr& servant) // for(FacetMap::iterator q = element->facets.begin(); q != element->facets.end(); q++) { - destroyFacetImpl(q, q->second); + destroyFacetImpl(q->second); } } else @@ -485,7 +486,7 @@ Freeze::EvictorI::destroyObject(const Identity& ident) // for(FacetMap::iterator q = element->facets.begin(); q != element->facets.end(); q++) { - destroyFacetImpl(q, q->second); + destroyFacetImpl(q->second); } } @@ -674,7 +675,7 @@ Freeze::EvictorI::removeAllFacets(const Identity& ident) { if(q->second != element->mainObject) { - destroyFacetImpl(q, q->second); + destroyFacetImpl(q->second); } } } @@ -949,10 +950,7 @@ Freeze::EvictorI::finished(const Current& current, const ObjectPtr& servant, con if(enqueue) { - FacetMap::iterator q = facet->element->facets.find(current.facet); - assert(q != facet->element->facets.end()); - - addToModifiedQueue(q, facet); + addToModifiedQueue(facet); } else { @@ -1015,11 +1013,11 @@ Freeze::EvictorI::deactivate(const string&) } void -Freeze::EvictorI::run() +Freeze::EvictorI::run() throw() { for(;;) { - deque<FacetMap::iterator> allObjects; + deque<FacetPtr> allObjects; size_t saveNowThreadsSize = 0; { @@ -1074,14 +1072,14 @@ Freeze::EvictorI::run() deque<StreamedObject> streamedObjectQueue; - Long saveStart = IceUtil::Time::now().toMilliSeconds(); + Long streamStart = IceUtil::Time::now().toMilliSeconds(); // // Stream each element // for(size_t i = 0; i < size; i++) { - FacetPtr& facet = allObjects[i]->second; + FacetPtr& facet = allObjects[i]; bool tryAgain; do @@ -1106,7 +1104,7 @@ Freeze::EvictorI::run() size_t index = streamedObjectQueue.size(); streamedObjectQueue.resize(index + 1); StreamedObject& obj = streamedObjectQueue[index]; - streamFacet(facet, allObjects[i]->first, status, saveStart, obj); + streamFacet(facet, facet->position->first, status, streamStart, obj); break; } default: @@ -1151,7 +1149,7 @@ Freeze::EvictorI::run() size_t index = streamedObjectQueue.size(); streamedObjectQueue.resize(index + 1); StreamedObject& obj = streamedObjectQueue[index]; - streamFacet(facet, allObjects[i]->first, status, saveStart, obj); + streamFacet(facet, facet->position->first, status, streamStart, obj); } else { @@ -1166,7 +1164,7 @@ Freeze::EvictorI::run() size_t index = streamedObjectQueue.size(); streamedObjectQueue.resize(index + 1); StreamedObject& obj = streamedObjectQueue[index]; - streamFacet(facet, allObjects[i]->first, status, saveStart, obj); + streamFacet(facet, facet->position->first, status, streamStart, obj); break; } default: @@ -1189,6 +1187,14 @@ Freeze::EvictorI::run() } while(tryAgain); } + if(_trace >= 1) + { + Long now = IceUtil::Time::now().toMilliSeconds(); + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "streamed " << streamedObjectQueue.size() << " objects in " + << static_cast<Int>(now - streamStart) << " ms"; + } + // // Now let's save all these streamed objects to disk using a transaction // @@ -1215,6 +1221,7 @@ Freeze::EvictorI::run() txSize = streamedObjectQueue.size(); } + Long saveStart = IceUtil::Time::now().toMilliSeconds(); try { DbTxn* tx = 0; @@ -1276,7 +1283,6 @@ Freeze::EvictorI::run() Trace out(_communicator->getLogger(), "Freeze.Evictor"); out << "saved " << txSize << " objects in " << static_cast<Int>(now - saveStart) << " ms"; - saveStart = now; } } catch(const DbDeadlockException&) @@ -1294,16 +1300,31 @@ Freeze::EvictorI::run() } while(tryAgain); - { Lock sync(*this); _generation++; - for(deque<FacetMap::iterator>::iterator q = allObjects.begin(); + for(deque<FacetPtr>::iterator q = allObjects.begin(); q != allObjects.end(); q++) { - (*q)->second->element->usageCount--; + FacetPtr& facet = *q; + facet->element->usageCount--; + + if(facet != facet->element->mainObject) + { + // + // Remove if dead + // + IceUtil::Mutex::Lock lockFacet(facet->mutex); + { + if(facet->status == dead) + { + facet->element->facets.erase(facet->position); + facet->position = facet->element->facets.end(); + } + } + } } allObjects.clear(); evict(); @@ -1387,6 +1408,7 @@ Freeze::EvictorI::load(Dbc* dbc, Key& key, Value& value, pair<FacetMap::iterator, bool> pair; pair = elt->facets.insert(FacetMap::value_type(esk.facet, facet)); assert(pair.second); + facet->position = pair.first; if(esk.facet.size() == 0) { @@ -1627,7 +1649,7 @@ Freeze::EvictorI::evict() break; } - if(_trace >= 2) + if(_trace >= 2 || (_trace >= 1 && _evictorList.size() % 50 == 0)) { Trace out(_communicator->getLogger(), "Freeze.Evictor"); out << "evicting \"" << q->first << "\" from the queue\n" @@ -1683,6 +1705,13 @@ Freeze::EvictorI::dbHasObject(const Identity& ident) } catch(const DbDeadlockException&) { + if(_deadlockWarning) + { + Warning out(_communicator->getLogger()); + out << "Deadlock in Freeze::EvictorI::dbHasObject while searching \"" + << _dbName << "\"; retrying ..."; + } + // // Ignored, try again // @@ -1697,11 +1726,10 @@ Freeze::EvictorI::dbHasObject(const Identity& ident) } void -Freeze::EvictorI::addToModifiedQueue(const Freeze::EvictorI::FacetMap::iterator& q, - const Freeze::EvictorI::FacetPtr& facet) +Freeze::EvictorI::addToModifiedQueue(const Freeze::EvictorI::FacetPtr& facet) { facet->element->usageCount++; - _modifiedQueue.push_back(q); + _modifiedQueue.push_back(facet); if(_saveSizeTrigger >= 0 && static_cast<Int>(_modifiedQueue.size()) >= _saveSizeTrigger) { @@ -1711,7 +1739,7 @@ Freeze::EvictorI::addToModifiedQueue(const Freeze::EvictorI::FacetMap::iterator& void Freeze::EvictorI::streamFacet(const FacetPtr& facet, const FacetPath& facetPath, Byte status, - Long saveStart, StreamedObject& obj) + Long streamStart, StreamedObject& obj) { EvictorStorageKey esk; esk.identity.name = facet->element->identity->name; @@ -1721,7 +1749,7 @@ Freeze::EvictorI::streamFacet(const FacetPtr& facet, const FacetPath& facetPath, obj.status = status; if(status != destroyed) { - writeObjectRecordToValue(saveStart, facet->rec, obj.value); + writeObjectRecordToValue(streamStart, facet->rec, obj.value); } } @@ -1740,13 +1768,13 @@ Freeze::EvictorI::saveNowNoSync() } void -Freeze::EvictorI::writeObjectRecordToValue(Long saveStart, ObjectRecord& rec, Value& value) +Freeze::EvictorI::writeObjectRecordToValue(Long streamStart, ObjectRecord& rec, Value& value) { // // Update stats first // Statistics& stats = rec.stats; - Long diff = saveStart - (stats.creationTime + stats.lastSaveTime); + Long diff = streamStart - (stats.creationTime + stats.lastSaveTime); if(stats.lastSaveTime == 0) { stats.lastSaveTime = diff; @@ -1754,7 +1782,7 @@ Freeze::EvictorI::writeObjectRecordToValue(Long saveStart, ObjectRecord& rec, Va } else { - stats.lastSaveTime = saveStart - stats.creationTime; + stats.lastSaveTime = streamStart - stats.creationTime; stats.avgSaveTime = static_cast<Long>(stats.avgSaveTime * 0.95 + diff * 0.05); } @@ -1857,6 +1885,7 @@ Freeze::EvictorI::load(const Identity& ident) pair<FacetMap::iterator, bool> pair; pair = result->facets.insert(FacetMap::value_type(esk.facet, facet)); assert(pair.second); + facet->position = pair.first; if(esk.facet.size() == 0) { @@ -1901,6 +1930,13 @@ Freeze::EvictorI::load(const Identity& ident) } } + if(_deadlockWarning) + { + Warning out(_communicator->getLogger()); + out << "Deadlock in Freeze::EvictorI::load while searching \"" + << _dbName << "\"; retrying ..."; + } + // // Try again // @@ -1992,7 +2028,7 @@ Freeze::EvictorI::addFacetImpl(EvictorElementPtr& element, const ObjectPtr& serv case clean: { facet->status = modified; - addToModifiedQueue(q, facet); + addToModifiedQueue(facet); break; } case created: @@ -2019,7 +2055,7 @@ Freeze::EvictorI::addFacetImpl(EvictorElementPtr& element, const ObjectPtr& serv case dead: { facet->status = created; - addToModifiedQueue(q, facet); + addToModifiedQueue(facet); break; } default: @@ -2047,11 +2083,13 @@ Freeze::EvictorI::addFacetImpl(EvictorElementPtr& element, const ObjectPtr& serv pair<FacetMap::iterator, bool> insertResult = facets.insert(FacetMap::value_type(facetPath, facet)); assert(insertResult.second); + facet->position = insertResult.first; + if(facetPath.size() == 0) { element->mainObject = facet; } - addToModifiedQueue(insertResult.first, facet); + addToModifiedQueue(facet); } @@ -2079,7 +2117,7 @@ Freeze::EvictorI::removeFacetImpl(FacetMap& facets, const FacetPath& facetPath) if(q != facets.end()) { - servant = destroyFacetImpl(q, q->second); + servant = destroyFacetImpl(q->second); } // // else should we raise an exception? @@ -2102,7 +2140,7 @@ Freeze::EvictorI::removeFacetImpl(FacetMap& facets, const FacetPath& facetPath) ObjectPtr -Freeze::EvictorI::destroyFacetImpl(Freeze::EvictorI::FacetMap::iterator& q, const Freeze::EvictorI::FacetPtr& facet) +Freeze::EvictorI::destroyFacetImpl(const Freeze::EvictorI::FacetPtr& facet) { IceUtil::Mutex::Lock lockFacet(facet->mutex); switch(facet->status) @@ -2110,7 +2148,7 @@ Freeze::EvictorI::destroyFacetImpl(Freeze::EvictorI::FacetMap::iterator& q, cons case clean: { facet->status = destroyed; - addToModifiedQueue(q, facet); + addToModifiedQueue(facet); break; } case created: diff --git a/cpp/src/Freeze/EvictorI.h b/cpp/src/Freeze/EvictorI.h index d244405d106..4deb9156fde 100644 --- a/cpp/src/Freeze/EvictorI.h +++ b/cpp/src/Freeze/EvictorI.h @@ -65,7 +65,7 @@ public: // // Thread // - virtual void run(); + virtual void run() throw (); // @@ -94,6 +94,10 @@ public: typedef IceUtil::Handle<EvictorElement> EvictorElementPtr; typedef std::map<Ice::Identity, EvictorElementPtr> EvictorMap; + struct Facet; + typedef IceUtil::Handle<Facet> FacetPtr; + typedef std::map<Ice::FacetPath, FacetPtr> FacetMap; + struct Facet : public Ice::LocalObject { Facet(EvictorElement*); @@ -102,9 +106,12 @@ public: Ice::Byte status; ObjectRecord rec; // 64 bit alignment EvictorElement* const element; + + // + // Position in element->facets + // + FacetMap::iterator position; }; - typedef IceUtil::Handle<Facet> FacetPtr; - typedef std::map<Ice::FacetPath, FacetPtr> FacetMap; struct EvictorElement : public IceUtil::Shared { @@ -200,7 +207,8 @@ public: #endif - + bool + deadlockWarning() const; private: @@ -209,7 +217,7 @@ private: void evict(); bool dbHasObject(const Ice::Identity&); bool getObject(const Ice::Identity&, ObjectRecord&); - void addToModifiedQueue(const FacetMap::iterator&, const FacetPtr&); + void addToModifiedQueue(const FacetPtr&); void streamFacet(const FacetPtr&, const Ice::FacetPath&, Ice::Byte, Ice::Long, StreamedObject&); void saveNowNoSync(); @@ -219,7 +227,7 @@ private: void addFacetImpl(EvictorElementPtr&, const Ice::ObjectPtr&, const Ice::FacetPath&, bool); void removeFacetImpl(FacetMap&, const Ice::FacetPath&); - Ice::ObjectPtr destroyFacetImpl(FacetMap::iterator&, const FacetPtr& facet); + Ice::ObjectPtr destroyFacetImpl(const FacetPtr& facet); void buildFacetMap(const FacetMap&); @@ -242,9 +250,7 @@ private: // element containing the pointed element remains in the evictor // map. // - // Note: relies on the stability of iterators in a std::map - // - std::deque<FacetMap::iterator> _modifiedQueue; + std::deque<FacetPtr> _modifiedQueue; bool _deactivated; @@ -286,6 +292,8 @@ private: // this element, then the loaded value is current. // int _generation; + + bool _deadlockWarning; }; inline const Ice::CommunicatorPtr& @@ -319,6 +327,12 @@ EvictorI::currentGeneration() const return _generation; } +inline bool +EvictorI::deadlockWarning() const +{ + return _deadlockWarning; +} + inline bool startWith(const Key& key, const Key& root) { diff --git a/cpp/src/Freeze/IndexI.cpp b/cpp/src/Freeze/IndexI.cpp index 1e0dd451272..227f876b798 100644 --- a/cpp/src/Freeze/IndexI.cpp +++ b/cpp/src/Freeze/IndexI.cpp @@ -143,6 +143,14 @@ Freeze::IndexI::untypedFindFirst(const Key& bytes, Int firstN) const // } } + + if(_evictor->deadlockWarning()) + { + Warning out(_evictor->communicator()->getLogger()); + out << "Deadlock in Freeze::IndexI::untypedFindFirst while searching \"" + << _evictor->dbName() << "\"; retrying ..."; + } + // // Retry // @@ -236,6 +244,14 @@ Freeze::IndexI::untypedCount(const Key& bytes) const // } } + + if(_evictor->deadlockWarning()) + { + Warning out(_evictor->communicator()->getLogger()); + out << "Deadlock in Freeze::IndexI::untypedCount while searching \"" + << _evictor->dbName() << "\"; retrying ..."; + } + // // Retry // diff --git a/cpp/src/Freeze/MapI.cpp b/cpp/src/Freeze/MapI.cpp index b108b6c1b7c..0ce27824a86 100644 --- a/cpp/src/Freeze/MapI.cpp +++ b/cpp/src/Freeze/MapI.cpp @@ -698,6 +698,13 @@ Freeze::MapHelperI::find(const Key& k, bool readOnly) const } else { + if(_connection->deadlockWarning()) + { + Warning out(_connection->communicator()->getLogger()); + out << "Deadlock in Freeze::MapHelperI::find on Map \"" + << _dbName << "\"; retrying ..."; + } + // // Ignored, try again // @@ -749,6 +756,13 @@ Freeze::MapHelperI::put(const Key& key, const Value& value) } else { + if(_connection->deadlockWarning()) + { + Warning out(_connection->communicator()->getLogger()); + out << "Deadlock in Freeze::MapHelperI::put on Map \"" + << _dbName << "\"; retrying ..."; + } + // // Ignored, try again // @@ -805,6 +819,13 @@ Freeze::MapHelperI::erase(const Key& key) } else { + if(_connection->deadlockWarning()) + { + Warning out(_connection->communicator()->getLogger()); + out << "Deadlock in Freeze::MapHelperI::erase on Map \"" + << _dbName << "\"; retrying ..."; + } + // // Ignored, try again // @@ -861,6 +882,13 @@ Freeze::MapHelperI::count(const Key& key) const } else { + if(_connection->deadlockWarning()) + { + Warning out(_connection->communicator()->getLogger()); + out << "Deadlock in Freeze::MapHelperI::count on Map \"" + << _dbName << "\"; retrying ..."; + } + // // Ignored, try again // @@ -904,6 +932,13 @@ Freeze::MapHelperI::clear() } else { + if(_connection->deadlockWarning()) + { + Warning out(_connection->communicator()->getLogger()); + out << "Deadlock in Freeze::MapHelperI::clear on Map \"" + << _dbName << "\"; retrying ..."; + } + // // Ignored, try again // |