summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/Freeze/ConnectionI.cpp6
-rw-r--r--cpp/src/Freeze/ConnectionI.h10
-rw-r--r--cpp/src/Freeze/EvictorI.cpp104
-rw-r--r--cpp/src/Freeze/EvictorI.h32
-rw-r--r--cpp/src/Freeze/IndexI.cpp16
-rw-r--r--cpp/src/Freeze/MapI.cpp35
6 files changed, 159 insertions, 44 deletions
diff --git a/cpp/src/Freeze/ConnectionI.cpp b/cpp/src/Freeze/ConnectionI.cpp
index 7d6c9e1fc73..4934925bcc4 100644
--- a/cpp/src/Freeze/ConnectionI.cpp
+++ b/cpp/src/Freeze/ConnectionI.cpp
@@ -91,7 +91,8 @@ Freeze::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
_communicator(communicator),
_dbEnvHolder(SharedDbEnv::get(communicator, envName)),
_envName(envName),
- _trace(communicator->getProperties()->getPropertyAsInt("Freeze.Trace.Map"))
+ _trace(communicator->getProperties()->getPropertyAsInt("Freeze.Trace.Map")),
+ _deadlockWarning(communicator->getProperties()->getPropertyAsInt("Freeze.Warn.Deadlocks") != 0)
{
_dbEnv = _dbEnvHolder.get();
}
@@ -102,7 +103,8 @@ Freeze::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
_communicator(communicator),
_dbEnv(&dbEnv),
_envName(envName),
- _trace(communicator->getProperties()->getPropertyAsInt("Freeze.Trace.Map"))
+ _trace(communicator->getProperties()->getPropertyAsInt("Freeze.Trace.Map")),
+ _deadlockWarning(communicator->getProperties()->getPropertyAsInt("Freeze.Warn.Deadlocks") != 0)
{
}
diff --git a/cpp/src/Freeze/ConnectionI.h b/cpp/src/Freeze/ConnectionI.h
index edd3edd3501..ba0b9f5b556 100644
--- a/cpp/src/Freeze/ConnectionI.h
+++ b/cpp/src/Freeze/ConnectionI.h
@@ -81,6 +81,9 @@ public:
Ice::Int
trace() const;
+ bool
+ deadlockWarning() const;
+
private:
Ice::CommunicatorPtr _communicator;
@@ -90,6 +93,7 @@ private:
TransactionIPtr _transaction;
std::list<MapHelperI*> _mapList;
Ice::Int _trace;
+ bool _deadlockWarning;
};
inline void
@@ -135,6 +139,12 @@ ConnectionI::trace() const
return _trace;
}
+inline bool
+ConnectionI::deadlockWarning() const
+{
+ return _deadlockWarning;
+}
+
}
#endif
diff --git a/cpp/src/Freeze/EvictorI.cpp b/cpp/src/Freeze/EvictorI.cpp
index ca1a1e5fa1f..cffc2df5477 100644
--- a/cpp/src/Freeze/EvictorI.cpp
+++ b/cpp/src/Freeze/EvictorI.cpp
@@ -89,6 +89,7 @@ void
Freeze::EvictorI::init(const string& envName, bool createDb)
{
_trace = _communicator->getProperties()->getPropertyAsInt("Freeze.Trace.Evictor");
+ _deadlockWarning = (_communicator->getProperties()->getPropertyAsInt("Freeze.Warn.Deadlocks") != 0);
string propertyPrefix = string("Freeze.Evictor.") + envName + '.' + _dbName;
@@ -284,7 +285,7 @@ Freeze::EvictorI::createObject(const Identity& ident, const ObjectPtr& servant)
//
for(FacetMap::iterator q = element->facets.begin(); q != element->facets.end(); q++)
{
- destroyFacetImpl(q, q->second);
+ destroyFacetImpl(q->second);
}
}
else
@@ -485,7 +486,7 @@ Freeze::EvictorI::destroyObject(const Identity& ident)
//
for(FacetMap::iterator q = element->facets.begin(); q != element->facets.end(); q++)
{
- destroyFacetImpl(q, q->second);
+ destroyFacetImpl(q->second);
}
}
@@ -674,7 +675,7 @@ Freeze::EvictorI::removeAllFacets(const Identity& ident)
{
if(q->second != element->mainObject)
{
- destroyFacetImpl(q, q->second);
+ destroyFacetImpl(q->second);
}
}
}
@@ -949,10 +950,7 @@ Freeze::EvictorI::finished(const Current& current, const ObjectPtr& servant, con
if(enqueue)
{
- FacetMap::iterator q = facet->element->facets.find(current.facet);
- assert(q != facet->element->facets.end());
-
- addToModifiedQueue(q, facet);
+ addToModifiedQueue(facet);
}
else
{
@@ -1015,11 +1013,11 @@ Freeze::EvictorI::deactivate(const string&)
}
void
-Freeze::EvictorI::run()
+Freeze::EvictorI::run() throw()
{
for(;;)
{
- deque<FacetMap::iterator> allObjects;
+ deque<FacetPtr> allObjects;
size_t saveNowThreadsSize = 0;
{
@@ -1074,14 +1072,14 @@ Freeze::EvictorI::run()
deque<StreamedObject> streamedObjectQueue;
- Long saveStart = IceUtil::Time::now().toMilliSeconds();
+ Long streamStart = IceUtil::Time::now().toMilliSeconds();
//
// Stream each element
//
for(size_t i = 0; i < size; i++)
{
- FacetPtr& facet = allObjects[i]->second;
+ FacetPtr& facet = allObjects[i];
bool tryAgain;
do
@@ -1106,7 +1104,7 @@ Freeze::EvictorI::run()
size_t index = streamedObjectQueue.size();
streamedObjectQueue.resize(index + 1);
StreamedObject& obj = streamedObjectQueue[index];
- streamFacet(facet, allObjects[i]->first, status, saveStart, obj);
+ streamFacet(facet, facet->position->first, status, streamStart, obj);
break;
}
default:
@@ -1151,7 +1149,7 @@ Freeze::EvictorI::run()
size_t index = streamedObjectQueue.size();
streamedObjectQueue.resize(index + 1);
StreamedObject& obj = streamedObjectQueue[index];
- streamFacet(facet, allObjects[i]->first, status, saveStart, obj);
+ streamFacet(facet, facet->position->first, status, streamStart, obj);
}
else
{
@@ -1166,7 +1164,7 @@ Freeze::EvictorI::run()
size_t index = streamedObjectQueue.size();
streamedObjectQueue.resize(index + 1);
StreamedObject& obj = streamedObjectQueue[index];
- streamFacet(facet, allObjects[i]->first, status, saveStart, obj);
+ streamFacet(facet, facet->position->first, status, streamStart, obj);
break;
}
default:
@@ -1189,6 +1187,14 @@ Freeze::EvictorI::run()
} while(tryAgain);
}
+ if(_trace >= 1)
+ {
+ Long now = IceUtil::Time::now().toMilliSeconds();
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "streamed " << streamedObjectQueue.size() << " objects in "
+ << static_cast<Int>(now - streamStart) << " ms";
+ }
+
//
// Now let's save all these streamed objects to disk using a transaction
//
@@ -1215,6 +1221,7 @@ Freeze::EvictorI::run()
txSize = streamedObjectQueue.size();
}
+ Long saveStart = IceUtil::Time::now().toMilliSeconds();
try
{
DbTxn* tx = 0;
@@ -1276,7 +1283,6 @@ Freeze::EvictorI::run()
Trace out(_communicator->getLogger(), "Freeze.Evictor");
out << "saved " << txSize << " objects in "
<< static_cast<Int>(now - saveStart) << " ms";
- saveStart = now;
}
}
catch(const DbDeadlockException&)
@@ -1294,16 +1300,31 @@ Freeze::EvictorI::run()
}
while(tryAgain);
-
{
Lock sync(*this);
_generation++;
- for(deque<FacetMap::iterator>::iterator q = allObjects.begin();
+ for(deque<FacetPtr>::iterator q = allObjects.begin();
q != allObjects.end(); q++)
{
- (*q)->second->element->usageCount--;
+ FacetPtr& facet = *q;
+ facet->element->usageCount--;
+
+ if(facet != facet->element->mainObject)
+ {
+ //
+ // Remove if dead
+ //
+ IceUtil::Mutex::Lock lockFacet(facet->mutex);
+ {
+ if(facet->status == dead)
+ {
+ facet->element->facets.erase(facet->position);
+ facet->position = facet->element->facets.end();
+ }
+ }
+ }
}
allObjects.clear();
evict();
@@ -1387,6 +1408,7 @@ Freeze::EvictorI::load(Dbc* dbc, Key& key, Value& value,
pair<FacetMap::iterator, bool> pair;
pair = elt->facets.insert(FacetMap::value_type(esk.facet, facet));
assert(pair.second);
+ facet->position = pair.first;
if(esk.facet.size() == 0)
{
@@ -1627,7 +1649,7 @@ Freeze::EvictorI::evict()
break;
}
- if(_trace >= 2)
+ if(_trace >= 2 || (_trace >= 1 && _evictorList.size() % 50 == 0))
{
Trace out(_communicator->getLogger(), "Freeze.Evictor");
out << "evicting \"" << q->first << "\" from the queue\n"
@@ -1683,6 +1705,13 @@ Freeze::EvictorI::dbHasObject(const Identity& ident)
}
catch(const DbDeadlockException&)
{
+ if(_deadlockWarning)
+ {
+ Warning out(_communicator->getLogger());
+ out << "Deadlock in Freeze::EvictorI::dbHasObject while searching \""
+ << _dbName << "\"; retrying ...";
+ }
+
//
// Ignored, try again
//
@@ -1697,11 +1726,10 @@ Freeze::EvictorI::dbHasObject(const Identity& ident)
}
void
-Freeze::EvictorI::addToModifiedQueue(const Freeze::EvictorI::FacetMap::iterator& q,
- const Freeze::EvictorI::FacetPtr& facet)
+Freeze::EvictorI::addToModifiedQueue(const Freeze::EvictorI::FacetPtr& facet)
{
facet->element->usageCount++;
- _modifiedQueue.push_back(q);
+ _modifiedQueue.push_back(facet);
if(_saveSizeTrigger >= 0 && static_cast<Int>(_modifiedQueue.size()) >= _saveSizeTrigger)
{
@@ -1711,7 +1739,7 @@ Freeze::EvictorI::addToModifiedQueue(const Freeze::EvictorI::FacetMap::iterator&
void
Freeze::EvictorI::streamFacet(const FacetPtr& facet, const FacetPath& facetPath, Byte status,
- Long saveStart, StreamedObject& obj)
+ Long streamStart, StreamedObject& obj)
{
EvictorStorageKey esk;
esk.identity.name = facet->element->identity->name;
@@ -1721,7 +1749,7 @@ Freeze::EvictorI::streamFacet(const FacetPtr& facet, const FacetPath& facetPath,
obj.status = status;
if(status != destroyed)
{
- writeObjectRecordToValue(saveStart, facet->rec, obj.value);
+ writeObjectRecordToValue(streamStart, facet->rec, obj.value);
}
}
@@ -1740,13 +1768,13 @@ Freeze::EvictorI::saveNowNoSync()
}
void
-Freeze::EvictorI::writeObjectRecordToValue(Long saveStart, ObjectRecord& rec, Value& value)
+Freeze::EvictorI::writeObjectRecordToValue(Long streamStart, ObjectRecord& rec, Value& value)
{
//
// Update stats first
//
Statistics& stats = rec.stats;
- Long diff = saveStart - (stats.creationTime + stats.lastSaveTime);
+ Long diff = streamStart - (stats.creationTime + stats.lastSaveTime);
if(stats.lastSaveTime == 0)
{
stats.lastSaveTime = diff;
@@ -1754,7 +1782,7 @@ Freeze::EvictorI::writeObjectRecordToValue(Long saveStart, ObjectRecord& rec, Va
}
else
{
- stats.lastSaveTime = saveStart - stats.creationTime;
+ stats.lastSaveTime = streamStart - stats.creationTime;
stats.avgSaveTime = static_cast<Long>(stats.avgSaveTime * 0.95 + diff * 0.05);
}
@@ -1857,6 +1885,7 @@ Freeze::EvictorI::load(const Identity& ident)
pair<FacetMap::iterator, bool> pair;
pair = result->facets.insert(FacetMap::value_type(esk.facet, facet));
assert(pair.second);
+ facet->position = pair.first;
if(esk.facet.size() == 0)
{
@@ -1901,6 +1930,13 @@ Freeze::EvictorI::load(const Identity& ident)
}
}
+ if(_deadlockWarning)
+ {
+ Warning out(_communicator->getLogger());
+ out << "Deadlock in Freeze::EvictorI::load while searching \""
+ << _dbName << "\"; retrying ...";
+ }
+
//
// Try again
//
@@ -1992,7 +2028,7 @@ Freeze::EvictorI::addFacetImpl(EvictorElementPtr& element, const ObjectPtr& serv
case clean:
{
facet->status = modified;
- addToModifiedQueue(q, facet);
+ addToModifiedQueue(facet);
break;
}
case created:
@@ -2019,7 +2055,7 @@ Freeze::EvictorI::addFacetImpl(EvictorElementPtr& element, const ObjectPtr& serv
case dead:
{
facet->status = created;
- addToModifiedQueue(q, facet);
+ addToModifiedQueue(facet);
break;
}
default:
@@ -2047,11 +2083,13 @@ Freeze::EvictorI::addFacetImpl(EvictorElementPtr& element, const ObjectPtr& serv
pair<FacetMap::iterator, bool> insertResult = facets.insert(FacetMap::value_type(facetPath, facet));
assert(insertResult.second);
+ facet->position = insertResult.first;
+
if(facetPath.size() == 0)
{
element->mainObject = facet;
}
- addToModifiedQueue(insertResult.first, facet);
+ addToModifiedQueue(facet);
}
@@ -2079,7 +2117,7 @@ Freeze::EvictorI::removeFacetImpl(FacetMap& facets, const FacetPath& facetPath)
if(q != facets.end())
{
- servant = destroyFacetImpl(q, q->second);
+ servant = destroyFacetImpl(q->second);
}
//
// else should we raise an exception?
@@ -2102,7 +2140,7 @@ Freeze::EvictorI::removeFacetImpl(FacetMap& facets, const FacetPath& facetPath)
ObjectPtr
-Freeze::EvictorI::destroyFacetImpl(Freeze::EvictorI::FacetMap::iterator& q, const Freeze::EvictorI::FacetPtr& facet)
+Freeze::EvictorI::destroyFacetImpl(const Freeze::EvictorI::FacetPtr& facet)
{
IceUtil::Mutex::Lock lockFacet(facet->mutex);
switch(facet->status)
@@ -2110,7 +2148,7 @@ Freeze::EvictorI::destroyFacetImpl(Freeze::EvictorI::FacetMap::iterator& q, cons
case clean:
{
facet->status = destroyed;
- addToModifiedQueue(q, facet);
+ addToModifiedQueue(facet);
break;
}
case created:
diff --git a/cpp/src/Freeze/EvictorI.h b/cpp/src/Freeze/EvictorI.h
index d244405d106..4deb9156fde 100644
--- a/cpp/src/Freeze/EvictorI.h
+++ b/cpp/src/Freeze/EvictorI.h
@@ -65,7 +65,7 @@ public:
//
// Thread
//
- virtual void run();
+ virtual void run() throw ();
//
@@ -94,6 +94,10 @@ public:
typedef IceUtil::Handle<EvictorElement> EvictorElementPtr;
typedef std::map<Ice::Identity, EvictorElementPtr> EvictorMap;
+ struct Facet;
+ typedef IceUtil::Handle<Facet> FacetPtr;
+ typedef std::map<Ice::FacetPath, FacetPtr> FacetMap;
+
struct Facet : public Ice::LocalObject
{
Facet(EvictorElement*);
@@ -102,9 +106,12 @@ public:
Ice::Byte status;
ObjectRecord rec; // 64 bit alignment
EvictorElement* const element;
+
+ //
+ // Position in element->facets
+ //
+ FacetMap::iterator position;
};
- typedef IceUtil::Handle<Facet> FacetPtr;
- typedef std::map<Ice::FacetPath, FacetPtr> FacetMap;
struct EvictorElement : public IceUtil::Shared
{
@@ -200,7 +207,8 @@ public:
#endif
-
+ bool
+ deadlockWarning() const;
private:
@@ -209,7 +217,7 @@ private:
void evict();
bool dbHasObject(const Ice::Identity&);
bool getObject(const Ice::Identity&, ObjectRecord&);
- void addToModifiedQueue(const FacetMap::iterator&, const FacetPtr&);
+ void addToModifiedQueue(const FacetPtr&);
void streamFacet(const FacetPtr&, const Ice::FacetPath&, Ice::Byte, Ice::Long, StreamedObject&);
void saveNowNoSync();
@@ -219,7 +227,7 @@ private:
void addFacetImpl(EvictorElementPtr&, const Ice::ObjectPtr&, const Ice::FacetPath&, bool);
void removeFacetImpl(FacetMap&, const Ice::FacetPath&);
- Ice::ObjectPtr destroyFacetImpl(FacetMap::iterator&, const FacetPtr& facet);
+ Ice::ObjectPtr destroyFacetImpl(const FacetPtr& facet);
void buildFacetMap(const FacetMap&);
@@ -242,9 +250,7 @@ private:
// element containing the pointed element remains in the evictor
// map.
//
- // Note: relies on the stability of iterators in a std::map
- //
- std::deque<FacetMap::iterator> _modifiedQueue;
+ std::deque<FacetPtr> _modifiedQueue;
bool _deactivated;
@@ -286,6 +292,8 @@ private:
// this element, then the loaded value is current.
//
int _generation;
+
+ bool _deadlockWarning;
};
inline const Ice::CommunicatorPtr&
@@ -319,6 +327,12 @@ EvictorI::currentGeneration() const
return _generation;
}
+inline bool
+EvictorI::deadlockWarning() const
+{
+ return _deadlockWarning;
+}
+
inline bool
startWith(const Key& key, const Key& root)
{
diff --git a/cpp/src/Freeze/IndexI.cpp b/cpp/src/Freeze/IndexI.cpp
index 1e0dd451272..227f876b798 100644
--- a/cpp/src/Freeze/IndexI.cpp
+++ b/cpp/src/Freeze/IndexI.cpp
@@ -143,6 +143,14 @@ Freeze::IndexI::untypedFindFirst(const Key& bytes, Int firstN) const
//
}
}
+
+ if(_evictor->deadlockWarning())
+ {
+ Warning out(_evictor->communicator()->getLogger());
+ out << "Deadlock in Freeze::IndexI::untypedFindFirst while searching \""
+ << _evictor->dbName() << "\"; retrying ...";
+ }
+
//
// Retry
//
@@ -236,6 +244,14 @@ Freeze::IndexI::untypedCount(const Key& bytes) const
//
}
}
+
+ if(_evictor->deadlockWarning())
+ {
+ Warning out(_evictor->communicator()->getLogger());
+ out << "Deadlock in Freeze::IndexI::untypedCount while searching \""
+ << _evictor->dbName() << "\"; retrying ...";
+ }
+
//
// Retry
//
diff --git a/cpp/src/Freeze/MapI.cpp b/cpp/src/Freeze/MapI.cpp
index b108b6c1b7c..0ce27824a86 100644
--- a/cpp/src/Freeze/MapI.cpp
+++ b/cpp/src/Freeze/MapI.cpp
@@ -698,6 +698,13 @@ Freeze::MapHelperI::find(const Key& k, bool readOnly) const
}
else
{
+ if(_connection->deadlockWarning())
+ {
+ Warning out(_connection->communicator()->getLogger());
+ out << "Deadlock in Freeze::MapHelperI::find on Map \""
+ << _dbName << "\"; retrying ...";
+ }
+
//
// Ignored, try again
//
@@ -749,6 +756,13 @@ Freeze::MapHelperI::put(const Key& key, const Value& value)
}
else
{
+ if(_connection->deadlockWarning())
+ {
+ Warning out(_connection->communicator()->getLogger());
+ out << "Deadlock in Freeze::MapHelperI::put on Map \""
+ << _dbName << "\"; retrying ...";
+ }
+
//
// Ignored, try again
//
@@ -805,6 +819,13 @@ Freeze::MapHelperI::erase(const Key& key)
}
else
{
+ if(_connection->deadlockWarning())
+ {
+ Warning out(_connection->communicator()->getLogger());
+ out << "Deadlock in Freeze::MapHelperI::erase on Map \""
+ << _dbName << "\"; retrying ...";
+ }
+
//
// Ignored, try again
//
@@ -861,6 +882,13 @@ Freeze::MapHelperI::count(const Key& key) const
}
else
{
+ if(_connection->deadlockWarning())
+ {
+ Warning out(_connection->communicator()->getLogger());
+ out << "Deadlock in Freeze::MapHelperI::count on Map \""
+ << _dbName << "\"; retrying ...";
+ }
+
//
// Ignored, try again
//
@@ -904,6 +932,13 @@ Freeze::MapHelperI::clear()
}
else
{
+ if(_connection->deadlockWarning())
+ {
+ Warning out(_connection->communicator()->getLogger());
+ out << "Deadlock in Freeze::MapHelperI::clear on Map \""
+ << _dbName << "\"; retrying ...";
+ }
+
//
// Ignored, try again
//