summaryrefslogtreecommitdiff
path: root/cpp/src/Freeze/EvictorI.cpp
diff options
context:
space:
mode:
authorBernard Normier <bernard@zeroc.com>2007-02-01 17:09:49 +0000
committerBernard Normier <bernard@zeroc.com>2007-02-01 17:09:49 +0000
commitabada90e3f84dc703b8ddc9efcbed8a946fadead (patch)
tree2c6f9dccd510ea97cb927a7bd635422efaae547a /cpp/src/Freeze/EvictorI.cpp
parentremoving trace message (diff)
downloadice-abada90e3f84dc703b8ddc9efcbed8a946fadead.tar.bz2
ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.tar.xz
ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.zip
Expanded tabs into spaces
Diffstat (limited to 'cpp/src/Freeze/EvictorI.cpp')
-rw-r--r--cpp/src/Freeze/EvictorI.cpp2280
1 files changed, 1140 insertions, 1140 deletions
diff --git a/cpp/src/Freeze/EvictorI.cpp b/cpp/src/Freeze/EvictorI.cpp
index 75e1722c004..a3bffc1a2bc 100644
--- a/cpp/src/Freeze/EvictorI.cpp
+++ b/cpp/src/Freeze/EvictorI.cpp
@@ -36,23 +36,23 @@ string Freeze::EvictorI::indexPrefix = "$index:";
Freeze::EvictorPtr
Freeze::createEvictor(const ObjectAdapterPtr& adapter,
- const string& envName,
- const string& filename,
- const ServantInitializerPtr& initializer,
- const vector<IndexPtr>& indices,
- bool createDb)
+ const string& envName,
+ const string& filename,
+ const ServantInitializerPtr& initializer,
+ const vector<IndexPtr>& indices,
+ bool createDb)
{
return new EvictorI(adapter, envName, 0, filename, initializer, indices, createDb);
}
Freeze::EvictorPtr
Freeze::createEvictor(const ObjectAdapterPtr& adapter,
- const string& envName,
- DbEnv& dbEnv,
- const string& filename,
- const ServantInitializerPtr& initializer,
- const vector<IndexPtr>& indices,
- bool createDb)
+ const string& envName,
+ DbEnv& dbEnv,
+ const string& filename,
+ const ServantInitializerPtr& initializer,
+ const vector<IndexPtr>& indices,
+ bool createDb)
{
return new EvictorI(adapter, envName, &dbEnv, filename, initializer, indices, createDb);
}
@@ -78,11 +78,11 @@ handleFatalError(const Freeze::EvictorPtr& evictor, const Ice::CommunicatorPtr&
IceUtil::StaticMutex::Lock lock(fatalErrorCallbackMutex);
if(fatalErrorCallback != 0)
{
- fatalErrorCallback(evictor, communicator);
+ fatalErrorCallback(evictor, communicator);
}
else
{
- ::abort();
+ ::abort();
}
}
@@ -112,7 +112,7 @@ Freeze::DeactivateController::Guard::Guard(DeactivateController& controller) :
Lock sync(controller);
if(controller._deactivated || _controller._deactivating)
{
- throw EvictorDeactivatedException(__FILE__, __LINE__);
+ throw EvictorDeactivatedException(__FILE__, __LINE__);
}
controller._guardCount++;
}
@@ -123,11 +123,11 @@ Freeze::DeactivateController::Guard::~Guard()
_controller._guardCount--;
if(_controller._deactivating && _controller._guardCount == 0)
{
- //
- // Notify all the threads -- although we only want to
- // reach the thread doing the deactivation.
- //
- _controller.notifyAll();
+ //
+ // Notify all the threads -- although we only want to
+ // reach the thread doing the deactivation.
+ //
+ _controller.notifyAll();
}
}
@@ -153,40 +153,40 @@ Freeze::DeactivateController::deactivate()
if(_deactivated)
{
- return false;
+ return false;
}
if(_deactivating)
{
- //
- // Wait for deactivated
- //
- while(!_deactivated)
- {
- wait();
- }
- return false;
+ //
+ // Wait for deactivated
+ //
+ while(!_deactivated)
+ {
+ wait();
+ }
+ return false;
}
else
{
- _deactivating = true;
- while(_guardCount > 0)
- {
- if(_evictor->trace() >= 1)
- {
- Trace out(_evictor->communicator()->getLogger(), "Freeze.Evictor");
- out << "*** Waiting for " << _guardCount << " threads to complete before starting deactivation.";
- }
-
- wait();
- }
-
- if(_evictor->trace() >= 1)
- {
- Trace out(_evictor->communicator()->getLogger(), "Freeze.Evictor");
- out << "Starting deactivation.";
- }
- return true;
+ _deactivating = true;
+ while(_guardCount > 0)
+ {
+ if(_evictor->trace() >= 1)
+ {
+ Trace out(_evictor->communicator()->getLogger(), "Freeze.Evictor");
+ out << "*** Waiting for " << _guardCount << " threads to complete before starting deactivation.";
+ }
+
+ wait();
+ }
+
+ if(_evictor->trace() >= 1)
+ {
+ Trace out(_evictor->communicator()->getLogger(), "Freeze.Evictor");
+ out << "Starting deactivation.";
+ }
+ return true;
}
}
@@ -195,8 +195,8 @@ Freeze::DeactivateController::deactivationComplete()
{
if(_evictor->trace() >= 1)
{
- Trace out(_evictor->communicator()->getLogger(), "Freeze.Evictor");
- out << "Deactivation complete.";
+ Trace out(_evictor->communicator()->getLogger(), "Freeze.Evictor");
+ out << "Deactivation complete.";
}
Lock sync(*this);
@@ -225,20 +225,20 @@ Freeze::WatchDogThread::run()
while(!_done)
{
- if(_active)
- {
- if(timedWait(_timeout) == false && _active && !_done)
- {
- Error out(_evictor.communicator()->getLogger());
- out << "Fatal error: streaming watch dog thread timed out.";
- out.flush();
- handleFatalError(&_evictor, _evictor.communicator());
- }
- }
- else
- {
- wait();
- }
+ if(_active)
+ {
+ if(timedWait(_timeout) == false && _active && !_done)
+ {
+ Error out(_evictor.communicator()->getLogger());
+ out << "Fatal error: streaming watch dog thread timed out.";
+ out.flush();
+ handleFatalError(&_evictor, _evictor.communicator());
+ }
+ }
+ else
+ {
+ wait();
+ }
}
}
@@ -270,12 +270,12 @@ Freeze::WatchDogThread::terminate()
//
Freeze::EvictorI::EvictorI(const ObjectAdapterPtr& adapter,
- const string& envName,
- DbEnv* dbEnv,
- const string& filename,
- const ServantInitializerPtr& initializer,
- const vector<IndexPtr>& indices,
- bool createDb) :
+ const string& envName,
+ DbEnv* dbEnv,
+ const string& filename,
+ const ServantInitializerPtr& initializer,
+ const vector<IndexPtr>& indices,
+ bool createDb) :
_evictorSize(10),
_currentEvictorSize(0),
@@ -304,10 +304,10 @@ Freeze::EvictorI::EvictorI(const ObjectAdapterPtr& adapter,
//
_saveSizeTrigger = _communicator->getProperties()->
- getPropertyAsIntWithDefault(propertyPrefix + ".SaveSizeTrigger", 10);
+ getPropertyAsIntWithDefault(propertyPrefix + ".SaveSizeTrigger", 10);
Int savePeriod = _communicator->getProperties()->
- getPropertyAsIntWithDefault(propertyPrefix + ".SavePeriod", 60 * 1000);
+ getPropertyAsIntWithDefault(propertyPrefix + ".SavePeriod", 60 * 1000);
_savePeriod = IceUtil::Time::milliSeconds(savePeriod);
@@ -315,17 +315,17 @@ Freeze::EvictorI::EvictorI(const ObjectAdapterPtr& adapter,
// By default, we save at most 10 * SaveSizeTrigger objects per transaction
//
_maxTxSize = _communicator->getProperties()->
- getPropertyAsIntWithDefault(propertyPrefix + ".MaxTxSize", 10 * _saveSizeTrigger);
+ getPropertyAsIntWithDefault(propertyPrefix + ".MaxTxSize", 10 * _saveSizeTrigger);
if(_maxTxSize <= 0)
{
- _maxTxSize = 100;
- }
+ _maxTxSize = 100;
+ }
bool populateEmptyIndices =
- (_communicator->getProperties()->
- getPropertyAsIntWithDefault(propertyPrefix + ".PopulateEmptyIndices", 0) != 0);
-
+ (_communicator->getProperties()->
+ getPropertyAsIntWithDefault(propertyPrefix + ".PopulateEmptyIndices", 0) != 0);
+
//
// Instantiate all Dbs in 2 steps:
// (1) iterate over the indices and create ObjectStore with indices
@@ -342,57 +342,57 @@ Freeze::EvictorI::EvictorI(const ObjectAdapterPtr& adapter,
for(vector<IndexPtr>::const_iterator i = indices.begin(); i != indices.end(); ++i)
{
- string facet = (*i)->facet();
+ string facet = (*i)->facet();
- StoreMap::iterator q = _storeMap.find(facet);
- if(q == _storeMap.end())
- {
- //
- // New db
- //
+ StoreMap::iterator q = _storeMap.find(facet);
+ if(q == _storeMap.end())
+ {
+ //
+ // New db
+ //
- vector<IndexPtr> storeIndices;
+ vector<IndexPtr> storeIndices;
- for(vector<IndexPtr>::const_iterator r = i; r != indices.end(); ++r)
- {
- if((*r)->facet() == facet)
- {
- storeIndices.push_back(*r);
- }
- }
- ObjectStore* store = new ObjectStore(facet, _createDb, this, storeIndices, populateEmptyIndices);
- _storeMap.insert(StoreMap::value_type(facet, store));
- }
+ for(vector<IndexPtr>::const_iterator r = i; r != indices.end(); ++r)
+ {
+ if((*r)->facet() == facet)
+ {
+ storeIndices.push_back(*r);
+ }
+ }
+ ObjectStore* store = new ObjectStore(facet, _createDb, this, storeIndices, populateEmptyIndices);
+ _storeMap.insert(StoreMap::value_type(facet, store));
+ }
}
for(vector<string>::iterator p = dbs.begin(); p != dbs.end(); ++p)
{
- string facet = *p;
- if(facet == defaultDb)
- {
- facet = "";
- }
-
- pair<StoreMap::iterator, bool> ir =
- _storeMap.insert(StoreMap::value_type(facet, 0));
-
- if(ir.second)
- {
- ir.first->second = new ObjectStore(facet, _createDb, this);
- }
+ string facet = *p;
+ if(facet == defaultDb)
+ {
+ facet = "";
+ }
+
+ pair<StoreMap::iterator, bool> ir =
+ _storeMap.insert(StoreMap::value_type(facet, 0));
+
+ if(ir.second)
+ {
+ ir.first->second = new ObjectStore(facet, _createDb, this);
+ }
}
//
// By default, no stream timeout
//
long streamTimeout = _communicator->getProperties()->
- getPropertyAsIntWithDefault(propertyPrefix+ ".StreamTimeout", 0) * 1000;
+ getPropertyAsIntWithDefault(propertyPrefix+ ".StreamTimeout", 0) * 1000;
if(streamTimeout > 0)
{
- _watchDogThread = new WatchDogThread(streamTimeout, *this);
- _watchDogThread->start();
+ _watchDogThread = new WatchDogThread(streamTimeout, *this);
+ _watchDogThread->start();
}
//
@@ -405,13 +405,13 @@ Freeze::EvictorI::~EvictorI()
{
if(!_deactivateController.deactivated())
{
- Warning out(_communicator->getLogger());
- out << "evictor has not been deactivated";
+ Warning out(_communicator->getLogger());
+ out << "evictor has not been deactivated";
- //
- // Need to deactivate to save objects and join saving thread
- //
- deactivate("");
+ //
+ // Need to deactivate to save objects and join saving thread
+ //
+ deactivate("");
}
//
@@ -419,7 +419,7 @@ Freeze::EvictorI::~EvictorI()
//
for(StoreMap::iterator p = _storeMap.begin(); p != _storeMap.end(); ++p)
{
- delete (*p).second;
+ delete (*p).second;
}
}
@@ -435,7 +435,7 @@ Freeze::EvictorI::setSize(Int evictorSize)
//
if(evictorSize < 0)
{
- return;
+ return;
}
//
@@ -473,35 +473,35 @@ Freeze::EvictorI::addFacet(const ObjectPtr& servant, const Identity& ident, cons
for(;;)
{
- {
- Lock sync(*this);
-
- StoreMap::iterator p = _storeMap.find(facet);
- if(p == _storeMap.end())
- {
- if(store != 0)
- {
- _storeMap.insert(StoreMap::value_type(facet, store));
- }
- }
- else
- {
- delete store;
- store = (*p).second;
- assert(store != 0);
- }
- }
-
- if(store == 0)
- {
- assert(facet != "");
- store = new ObjectStore(facet, _createDb, this);
- // loop
- }
- else
- {
- break; // for(;;)
- }
+ {
+ Lock sync(*this);
+
+ StoreMap::iterator p = _storeMap.find(facet);
+ if(p == _storeMap.end())
+ {
+ if(store != 0)
+ {
+ _storeMap.insert(StoreMap::value_type(facet, store));
+ }
+ }
+ else
+ {
+ delete store;
+ store = (*p).second;
+ assert(store != 0);
+ }
+ }
+
+ if(store == 0)
+ {
+ assert(facet != "");
+ store = new ObjectStore(facet, _createDb, this);
+ // loop
+ }
+ else
+ {
+ break; // for(;;)
+ }
}
assert(store != 0);
@@ -509,103 +509,103 @@ Freeze::EvictorI::addFacet(const ObjectPtr& servant, const Identity& ident, cons
for(;;)
{
- //
- // Create a new entry
- //
-
- EvictorElementPtr element = new EvictorElement(*store);
- element->status = EvictorElement::dead;
- EvictorElementPtr oldElt = store->putIfAbsent(ident, element);
+ //
+ // Create a new entry
+ //
+
+ EvictorElementPtr element = new EvictorElement(*store);
+ element->status = EvictorElement::dead;
+ EvictorElementPtr oldElt = store->putIfAbsent(ident, element);
- if(oldElt != 0)
- {
- element = oldElt;
- }
-
- {
- Lock sync(*this);
-
- if(element->stale)
- {
- //
- // Try again
- //
- continue;
- }
- fixEvictPosition(element);
-
- IceUtil::Mutex::Lock lock(element->mutex);
-
- switch(element->status)
- {
- case EvictorElement::clean:
- case EvictorElement::created:
- case EvictorElement::modified:
- {
- alreadyThere = true;
- break;
- }
- case EvictorElement::destroyed:
- {
- element->status = EvictorElement::modified;
- element->rec.servant = servant;
-
- //
- // No need to push it on the modified queue, as a destroyed object
- // is either already on the queue or about to be saved. When saved,
- // it becomes dead.
- //
- break;
- }
- case EvictorElement::dead:
- {
- element->status = EvictorElement::created;
- ObjectRecord& rec = element->rec;
-
- rec.servant = servant;
- rec.stats.creationTime = IceUtil::Time::now().toMilliSeconds();
- rec.stats.lastSaveTime = 0;
- rec.stats.avgSaveTime = 0;
-
- addToModifiedQueue(element);
- break;
- }
- default:
- {
- assert(0);
- break;
- }
- }
- }
- break; // for(;;)
+ if(oldElt != 0)
+ {
+ element = oldElt;
+ }
+
+ {
+ Lock sync(*this);
+
+ if(element->stale)
+ {
+ //
+ // Try again
+ //
+ continue;
+ }
+ fixEvictPosition(element);
+
+ IceUtil::Mutex::Lock lock(element->mutex);
+
+ switch(element->status)
+ {
+ case EvictorElement::clean:
+ case EvictorElement::created:
+ case EvictorElement::modified:
+ {
+ alreadyThere = true;
+ break;
+ }
+ case EvictorElement::destroyed:
+ {
+ element->status = EvictorElement::modified;
+ element->rec.servant = servant;
+
+ //
+ // No need to push it on the modified queue, as a destroyed object
+ // is either already on the queue or about to be saved. When saved,
+ // it becomes dead.
+ //
+ break;
+ }
+ case EvictorElement::dead:
+ {
+ element->status = EvictorElement::created;
+ ObjectRecord& rec = element->rec;
+
+ rec.servant = servant;
+ rec.stats.creationTime = IceUtil::Time::now().toMilliSeconds();
+ rec.stats.lastSaveTime = 0;
+ rec.stats.avgSaveTime = 0;
+
+ addToModifiedQueue(element);
+ break;
+ }
+ default:
+ {
+ assert(0);
+ break;
+ }
+ }
+ }
+ break; // for(;;)
}
if(alreadyThere)
{
- AlreadyRegisteredException ex(__FILE__, __LINE__);
- ex.kindOfObject = "servant";
- ex.id = _communicator->identityToString(ident);
- if(!facet.empty())
- {
- ex.id += " -f " + IceUtil::escapeString(facet, "");
- }
- throw ex;
+ AlreadyRegisteredException ex(__FILE__, __LINE__);
+ ex.kindOfObject = "servant";
+ ex.id = _communicator->identityToString(ident);
+ if(!facet.empty())
+ {
+ ex.id += " -f " + IceUtil::escapeString(facet, "");
+ }
+ throw ex;
}
if(_trace >= 1)
{
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "added object \"" << _communicator->identityToString(ident) << "\"";
- if(!facet.empty())
- {
- out << " with facet \"" << facet << "\"";
- }
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "added object \"" << _communicator->identityToString(ident) << "\"";
+ if(!facet.empty())
+ {
+ out << " with facet \"" << facet << "\"";
+ }
}
ObjectPrx obj = _adapter->createProxy(ident);
if(!facet.empty())
{
- obj = obj->ice_facet(facet);
+ obj = obj->ice_facet(facet);
}
return obj;
}
@@ -627,108 +627,108 @@ Freeze::EvictorI::removeFacet(const Identity& ident, const string& facet)
if(store != 0)
{
- for(;;)
- {
- //
- // Retrieve object
- //
-
- EvictorElementPtr element = store->pin(ident);
- if(element != 0)
- {
- Lock sync(*this);
- if(element->stale)
- {
- //
- // Try again
- //
- continue;
- }
-
- fixEvictPosition(element);
- {
- IceUtil::Mutex::Lock lock(element->mutex);
-
- switch(element->status)
- {
- case EvictorElement::clean:
- {
- servant = element->rec.servant;
- element->status = EvictorElement::destroyed;
- element->rec.servant = 0;
- addToModifiedQueue(element);
- break;
- }
- case EvictorElement::created:
- {
- servant = element->rec.servant;
- element->status = EvictorElement::dead;
- element->rec.servant = 0;
- break;
- }
- case EvictorElement::modified:
- {
- servant = element->rec.servant;
- element->status = EvictorElement::destroyed;
- element->rec.servant = 0;
- //
- // Not necessary to push it on the modified queue, as a modified
- // element is either on the queue already or about to be saved
- // (at which point it becomes clean)
- //
- break;
- }
- case EvictorElement::destroyed:
- case EvictorElement::dead:
- {
- break;
- }
- default:
- {
- assert(0);
- break;
- }
- }
- }
- if(element->keepCount > 0)
- {
- assert(servant != 0);
-
- element->keepCount = 0;
- //
- // Add to front of evictor queue
- //
- // Note that save evicts dead objects
- //
- _evictorList.push_front(element);
- _currentEvictorSize++;
- element->evictPosition = _evictorList.begin();
- }
- }
- break; // for(;;)
- }
+ for(;;)
+ {
+ //
+ // Retrieve object
+ //
+
+ EvictorElementPtr element = store->pin(ident);
+ if(element != 0)
+ {
+ Lock sync(*this);
+ if(element->stale)
+ {
+ //
+ // Try again
+ //
+ continue;
+ }
+
+ fixEvictPosition(element);
+ {
+ IceUtil::Mutex::Lock lock(element->mutex);
+
+ switch(element->status)
+ {
+ case EvictorElement::clean:
+ {
+ servant = element->rec.servant;
+ element->status = EvictorElement::destroyed;
+ element->rec.servant = 0;
+ addToModifiedQueue(element);
+ break;
+ }
+ case EvictorElement::created:
+ {
+ servant = element->rec.servant;
+ element->status = EvictorElement::dead;
+ element->rec.servant = 0;
+ break;
+ }
+ case EvictorElement::modified:
+ {
+ servant = element->rec.servant;
+ element->status = EvictorElement::destroyed;
+ element->rec.servant = 0;
+ //
+ // Not necessary to push it on the modified queue, as a modified
+ // element is either on the queue already or about to be saved
+ // (at which point it becomes clean)
+ //
+ break;
+ }
+ case EvictorElement::destroyed:
+ case EvictorElement::dead:
+ {
+ break;
+ }
+ default:
+ {
+ assert(0);
+ break;
+ }
+ }
+ }
+ if(element->keepCount > 0)
+ {
+ assert(servant != 0);
+
+ element->keepCount = 0;
+ //
+ // Add to front of evictor queue
+ //
+ // Note that save evicts dead objects
+ //
+ _evictorList.push_front(element);
+ _currentEvictorSize++;
+ element->evictPosition = _evictorList.begin();
+ }
+ }
+ break; // for(;;)
+ }
}
if(servant == 0)
{
- NotRegisteredException ex(__FILE__, __LINE__);
- ex.kindOfObject = "servant";
- ex.id = _communicator->identityToString(ident);
- if(!facet.empty())
- {
- ex.id += " -f " + IceUtil::escapeString(facet, "");
- }
- throw ex;
+ NotRegisteredException ex(__FILE__, __LINE__);
+ ex.kindOfObject = "servant";
+ ex.id = _communicator->identityToString(ident);
+ if(!facet.empty())
+ {
+ ex.id += " -f " + IceUtil::escapeString(facet, "");
+ }
+ throw ex;
}
if(_trace >= 1)
{
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "removed object \"" << _communicator->identityToString(ident) << "\"";
- if(!facet.empty())
- {
- out << " with facet \"" << facet << "\"";
- }
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "removed object \"" << _communicator->identityToString(ident) << "\"";
+ if(!facet.empty())
+ {
+ out << " with facet \"" << facet << "\"";
+ }
}
return servant;
}
@@ -750,77 +750,77 @@ Freeze::EvictorI::keepFacet(const Identity& ident, const string& facet)
ObjectStore* store = findStore(facet);
if(store == 0)
{
- notThere = true;
+ notThere = true;
}
else
{
- for(;;)
- {
- EvictorElementPtr element = store->pin(ident);
- if(element == 0)
- {
- notThere = true;
- break;
- }
-
- Lock sync(*this);
-
- if(element->stale)
- {
- //
- // try again
- //
- continue;
- }
-
-
- {
- IceUtil::Mutex::Lock lockElement(element->mutex);
- if(element->status == EvictorElement::destroyed || element->status == EvictorElement::dead)
- {
- notThere = true;
- break;
- }
- }
-
- //
- // Found!
- //
-
- if(element->keepCount == 0)
- {
- if(element->usageCount < 0)
- {
- //
- // New object
- //
- element->usageCount = 0;
- }
- else
- {
- _evictorList.erase(element->evictPosition);
- _currentEvictorSize--;
- }
- element->keepCount = 1;
- }
- else
- {
- element->keepCount++;
- }
- break;
- }
+ for(;;)
+ {
+ EvictorElementPtr element = store->pin(ident);
+ if(element == 0)
+ {
+ notThere = true;
+ break;
+ }
+
+ Lock sync(*this);
+
+ if(element->stale)
+ {
+ //
+ // try again
+ //
+ continue;
+ }
+
+
+ {
+ IceUtil::Mutex::Lock lockElement(element->mutex);
+ if(element->status == EvictorElement::destroyed || element->status == EvictorElement::dead)
+ {
+ notThere = true;
+ break;
+ }
+ }
+
+ //
+ // Found!
+ //
+
+ if(element->keepCount == 0)
+ {
+ if(element->usageCount < 0)
+ {
+ //
+ // New object
+ //
+ element->usageCount = 0;
+ }
+ else
+ {
+ _evictorList.erase(element->evictPosition);
+ _currentEvictorSize--;
+ }
+ element->keepCount = 1;
+ }
+ else
+ {
+ element->keepCount++;
+ }
+ break;
+ }
}
if(notThere)
{
- NotRegisteredException ex(__FILE__, __LINE__);
- ex.kindOfObject = "servant";
- ex.id = _communicator->identityToString(ident);
- if(!facet.empty())
- {
- ex.id += " -f " + IceUtil::escapeString(facet, "");
- }
- throw ex;
+ NotRegisteredException ex(__FILE__, __LINE__);
+ ex.kindOfObject = "servant";
+ ex.id = _communicator->identityToString(ident);
+ if(!facet.empty())
+ {
+ ex.id += " -f " + IceUtil::escapeString(facet, "");
+ }
+ throw ex;
}
}
@@ -837,38 +837,38 @@ Freeze::EvictorI::releaseFacet(const Identity& ident, const string& facet)
DeactivateController::Guard deactivateGuard(_deactivateController);
{
- Lock sync(*this);
-
- StoreMap::iterator p = _storeMap.find(facet);
- if(p != _storeMap.end())
- {
- ObjectStore* store = (*p).second;
-
- EvictorElementPtr element = store->getIfPinned(ident);
- if(element != 0)
- {
- assert(!element->stale);
- if(element->keepCount > 0)
- {
- if(--element->keepCount == 0)
- {
- //
- // Add to front of evictor queue
- //
- // Note that the element cannot be destroyed or dead since
- // its keepCount was > 0.
- //
- _evictorList.push_front(element);
- _currentEvictorSize++;
- element->evictPosition = _evictorList.begin();
- }
- //
- // Success
- //
- return;
- }
- }
- }
+ Lock sync(*this);
+
+ StoreMap::iterator p = _storeMap.find(facet);
+ if(p != _storeMap.end())
+ {
+ ObjectStore* store = (*p).second;
+
+ EvictorElementPtr element = store->getIfPinned(ident);
+ if(element != 0)
+ {
+ assert(!element->stale);
+ if(element->keepCount > 0)
+ {
+ if(--element->keepCount == 0)
+ {
+ //
+ // Add to front of evictor queue
+ //
+ // Note that the element cannot be destroyed or dead since
+ // its keepCount was > 0.
+ //
+ _evictorList.push_front(element);
+ _currentEvictorSize++;
+ element->evictPosition = _evictorList.begin();
+ }
+ //
+ // Success
+ //
+ return;
+ }
+ }
+ }
}
NotRegisteredException ex(__FILE__, __LINE__);
@@ -876,7 +876,7 @@ Freeze::EvictorI::releaseFacet(const Identity& ident, const string& facet)
ex.id = _communicator->identityToString(ident);
if(!facet.empty())
{
- ex.id += " -f " + IceUtil::escapeString(facet, "");
+ ex.id += " -f " + IceUtil::escapeString(facet, "");
}
throw ex;
}
@@ -888,14 +888,14 @@ Freeze::EvictorI::getIterator(const string& facet, Int batchSize)
ObjectStore* store = 0;
{
- Lock sync(*this);
-
- StoreMap::iterator p = _storeMap.find(facet);
- if(p != _storeMap.end())
- {
- store = (*p).second;
- saveNowNoSync();
- }
+ Lock sync(*this);
+
+ StoreMap::iterator p = _storeMap.find(facet);
+ if(p != _storeMap.end())
+ {
+ store = (*p).second;
+ saveNowNoSync();
+ }
}
return new EvictorIteratorI(store, batchSize);
}
@@ -924,25 +924,25 @@ Freeze::EvictorI::hasFacetImpl(const Identity& ident, const string& facet)
ObjectStore* store = 0;
{
- Lock sync(*this);
+ Lock sync(*this);
- StoreMap::iterator p = _storeMap.find(facet);
- if(p == _storeMap.end())
- {
- return false;
- }
-
- store = (*p).second;
-
- EvictorElementPtr element = store->getIfPinned(ident);
- if(element != 0)
- {
- assert(!element->stale);
-
- IceUtil::Mutex::Lock lock(element->mutex);
- return element->status != EvictorElement::dead &&
- element->status != EvictorElement::destroyed;
- }
+ StoreMap::iterator p = _storeMap.find(facet);
+ if(p == _storeMap.end())
+ {
+ return false;
+ }
+
+ store = (*p).second;
+
+ EvictorElementPtr element = store->getIfPinned(ident);
+ if(element != 0)
+ {
+ assert(!element->stale);
+
+ IceUtil::Mutex::Lock lock(element->mutex);
+ return element->status != EvictorElement::dead &&
+ element->status != EvictorElement::destroyed;
+ }
}
return store->dbHasObject(ident);
}
@@ -960,45 +960,45 @@ Freeze::EvictorI::hasAnotherFacet(const Identity& ident, const string& facet)
//
StoreMap storeMapCopy;
{
- Lock sync(*this);
- storeMapCopy = _storeMap;
- }
-
+ Lock sync(*this);
+ storeMapCopy = _storeMap;
+ }
+
for(StoreMap::iterator p = storeMapCopy.begin(); p != storeMapCopy.end(); ++p)
{
- //
- // Do not check again the given facet
- //
- if((*p).first != facet)
- {
- ObjectStore* store = (*p).second;
-
- bool inCache = false;
- {
- Lock sync(*this);
-
- EvictorElementPtr element = store->getIfPinned(ident);
- if(element != 0)
- {
- inCache = true;
- assert(!element->stale);
-
- IceUtil::Mutex::Lock lock(element->mutex);
- if(element->status != EvictorElement::dead &&
- element->status != EvictorElement::destroyed)
- {
- return true;
- }
- }
- }
- if(!inCache)
- {
- if(store->dbHasObject(ident))
- {
- return true;
- }
- }
- }
+ //
+ // Do not check again the given facet
+ //
+ if((*p).first != facet)
+ {
+ ObjectStore* store = (*p).second;
+
+ bool inCache = false;
+ {
+ Lock sync(*this);
+
+ EvictorElementPtr element = store->getIfPinned(ident);
+ if(element != 0)
+ {
+ inCache = true;
+ assert(!element->stale);
+
+ IceUtil::Mutex::Lock lock(element->mutex);
+ if(element->status != EvictorElement::dead &&
+ element->status != EvictorElement::destroyed)
+ {
+ return true;
+ }
+ }
+ }
+ if(!inCache)
+ {
+ if(store->dbHasObject(ident))
+ {
+ return true;
+ }
+ }
+ }
}
return false;
}
@@ -1018,48 +1018,48 @@ Freeze::EvictorI::locate(const Current& current, LocalObjectPtr& cookie)
//
if(current.operation == "ice_ping")
{
- if(hasFacetImpl(current.id, current.facet))
- {
- if(_trace >= 3)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "ice_ping found \"" << _communicator->identityToString(current.id)
- << "\" with facet \"" << current.facet + "\"";
- }
-
- cookie = 0;
- return _pingObject;
- }
- else if(hasAnotherFacet(current.id, current.facet))
- {
- if(_trace >= 3)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "ice_ping raises FacetNotExistException for \"" << _communicator->identityToString(current.id)
- << "\" with facet \"" << current.facet + "\"";
- }
- throw FacetNotExistException(__FILE__, __LINE__);
- }
- else
- {
- if(_trace >= 3)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "ice_ping will raise ObjectNotExistException for \""
- << _communicator->identityToString(current.id) << "\" with facet \"" << current.facet + "\"";
- }
- return 0;
- }
+ if(hasFacetImpl(current.id, current.facet))
+ {
+ if(_trace >= 3)
+ {
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "ice_ping found \"" << _communicator->identityToString(current.id)
+ << "\" with facet \"" << current.facet + "\"";
+ }
+
+ cookie = 0;
+ return _pingObject;
+ }
+ else if(hasAnotherFacet(current.id, current.facet))
+ {
+ if(_trace >= 3)
+ {
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "ice_ping raises FacetNotExistException for \"" << _communicator->identityToString(current.id)
+ << "\" with facet \"" << current.facet + "\"";
+ }
+ throw FacetNotExistException(__FILE__, __LINE__);
+ }
+ else
+ {
+ if(_trace >= 3)
+ {
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "ice_ping will raise ObjectNotExistException for \""
+ << _communicator->identityToString(current.id) << "\" with facet \"" << current.facet + "\"";
+ }
+ return 0;
+ }
}
ObjectPtr result = locateImpl(current, cookie);
if(result == 0)
{
- if(hasAnotherFacet(current.id, current.facet))
- {
- throw FacetNotExistException(__FILE__, __LINE__);
- }
+ if(hasAnotherFacet(current.id, current.facet))
+ {
+ throw FacetNotExistException(__FILE__, __LINE__);
+ }
}
return result;
}
@@ -1074,65 +1074,65 @@ Freeze::EvictorI::locateImpl(const Current& current, LocalObjectPtr& cookie)
if(store == 0)
{
if(_trace >= 2)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "locate could not find a database for facet \"" << current.facet << "\"";
- }
- return 0;
+ {
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "locate could not find a database for facet \"" << current.facet << "\"";
+ }
+ return 0;
}
for(;;)
{
- EvictorElementPtr element = store->pin(current.id);
- if(element == 0)
- {
+ EvictorElementPtr element = store->pin(current.id);
+ if(element == 0)
+ {
if(_trace >= 2)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "locate could not find \"" << _communicator->identityToString(current.id) << "\" in database \""
- << current.facet << "\"";
- }
- return 0;
- }
-
- Lock sync(*this);
-
- if(element->stale)
- {
- //
- // try again
- //
- continue;
- }
-
-
- IceUtil::Mutex::Lock lockElement(element->mutex);
- if(element->status == EvictorElement::destroyed || element->status == EvictorElement::dead)
- {
+ {
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "locate could not find \"" << _communicator->identityToString(current.id) << "\" in database \""
+ << current.facet << "\"";
+ }
+ return 0;
+ }
+
+ Lock sync(*this);
+
+ if(element->stale)
+ {
+ //
+ // try again
+ //
+ continue;
+ }
+
+
+ IceUtil::Mutex::Lock lockElement(element->mutex);
+ if(element->status == EvictorElement::destroyed || element->status == EvictorElement::dead)
+ {
if(_trace >= 2)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "locate found \"" << _communicator->identityToString(current.id)
- << "\" in the cache for database \"" << current.facet << "\" but it was dead or destroyed";
- }
- return 0;
- }
-
- //
- // It's a good one!
- //
+ {
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "locate found \"" << _communicator->identityToString(current.id)
+ << "\" in the cache for database \"" << current.facet << "\" but it was dead or destroyed";
+ }
+ return 0;
+ }
+
+ //
+ // It's a good one!
+ //
if(_trace >= 2)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "locate found \"" << _communicator->identityToString(current.id) << "\" in database \""
- << current.facet << "\"";
- }
-
- fixEvictPosition(element);
- element->usageCount++;
- cookie = element;
- assert(element->rec.servant != 0);
- return element->rec.servant;
+ {
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "locate found \"" << _communicator->identityToString(current.id) << "\" in database \""
+ << current.facet << "\"";
+ }
+
+ fixEvictPosition(element);
+ element->usageCount++;
+ cookie = element;
+ assert(element->rec.servant != 0);
+ return element->rec.servant;
}
}
@@ -1148,51 +1148,51 @@ Freeze::EvictorI::finished(const Current& current, const ObjectPtr& servant, con
if(cookie != 0)
{
- EvictorElementPtr element = EvictorElementPtr::dynamicCast(cookie);
- assert(element);
+ EvictorElementPtr element = EvictorElementPtr::dynamicCast(cookie);
+ assert(element);
- bool enqueue = false;
-
- if((_useNonmutating && current.mode != Nonmutating) ||
- (servant->ice_operationAttributes(current.operation) & 0x1) != 0)
- {
- IceUtil::Mutex::Lock lock(element->mutex);
-
- if(element->status == EvictorElement::clean)
- {
- //
- // Assume this operation updated the object
- //
- element->status = EvictorElement::modified;
- enqueue = true;
- }
- }
-
- Lock sync(*this);
-
- //
- // Only elements with a usageCount == 0 can become stale and we own
- // one count!
- //
- assert(!element->stale);
- assert(element->usageCount >= 1);
-
- //
- // Decrease the usage count of the evictor queue element.
- //
- element->usageCount--;
-
- if(enqueue)
- {
- addToModifiedQueue(element);
- }
- else if(element->usageCount == 0 && element->keepCount == 0)
- {
- //
- // Evict as many elements as necessary.
- //
- evict();
- }
+ bool enqueue = false;
+
+ if((_useNonmutating && current.mode != Nonmutating) ||
+ (servant->ice_operationAttributes(current.operation) & 0x1) != 0)
+ {
+ IceUtil::Mutex::Lock lock(element->mutex);
+
+ if(element->status == EvictorElement::clean)
+ {
+ //
+ // Assume this operation updated the object
+ //
+ element->status = EvictorElement::modified;
+ enqueue = true;
+ }
+ }
+
+ Lock sync(*this);
+
+ //
+ // Only elements with a usageCount == 0 can become stale and we own
+ // one count!
+ //
+ assert(!element->stale);
+ assert(element->usageCount >= 1);
+
+ //
+ // Decrease the usage count of the evictor queue element.
+ //
+ element->usageCount--;
+
+ if(enqueue)
+ {
+ addToModifiedQueue(element);
+ }
+ else if(element->usageCount == 0 && element->keepCount == 0)
+ {
+ //
+ // Evict as many elements as necessary.
+ //
+ evict();
+ }
}
}
@@ -1201,44 +1201,44 @@ Freeze::EvictorI::deactivate(const string&)
{
if(_deactivateController.deactivate())
{
- try
- {
- Lock sync(*this);
-
- saveNowNoSync();
-
- //
- // Set the evictor size to zero, meaning that we will evict
- // everything possible.
- //
- _evictorSize = 0;
- evict();
-
- _savingThreadDone = true;
- notifyAll();
- sync.release();
- getThreadControl().join();
-
- if(_watchDogThread != 0)
- {
- _watchDogThread->terminate();
- _watchDogThread->getThreadControl().join();
- }
-
- for(StoreMap::iterator p = _storeMap.begin(); p != _storeMap.end(); ++p)
- {
- (*p).second->close();
- }
-
- _dbEnv = 0;
- _initializer = 0;
- }
- catch(...)
- {
- _deactivateController.deactivationComplete();
- throw;
- }
- _deactivateController.deactivationComplete();
+ try
+ {
+ Lock sync(*this);
+
+ saveNowNoSync();
+
+ //
+ // Set the evictor size to zero, meaning that we will evict
+ // everything possible.
+ //
+ _evictorSize = 0;
+ evict();
+
+ _savingThreadDone = true;
+ notifyAll();
+ sync.release();
+ getThreadControl().join();
+
+ if(_watchDogThread != 0)
+ {
+ _watchDogThread->terminate();
+ _watchDogThread->getThreadControl().join();
+ }
+
+ for(StoreMap::iterator p = _storeMap.begin(); p != _storeMap.end(); ++p)
+ {
+ (*p).second->close();
+ }
+
+ _dbEnv = 0;
+ _initializer = 0;
+ }
+ catch(...)
+ {
+ _deactivateController.deactivationComplete();
+ throw;
+ }
+ _deactivateController.deactivationComplete();
}
}
@@ -1248,7 +1248,7 @@ Freeze::EvictorI::initialize(const Identity& ident, const string& facet, const O
{
if(_initializer != 0)
{
- _initializer->initialize(_adapter, ident, facet, servant);
+ _initializer->initialize(_adapter, ident, facet, servant);
}
}
@@ -1258,387 +1258,387 @@ Freeze::EvictorI::run()
{
try
{
- for(;;)
- {
- deque<EvictorElementPtr> allObjects;
- deque<EvictorElementPtr> deadObjects;
-
- size_t saveNowThreadsSize = 0;
-
- {
- Lock sync(*this);
-
- while(!_savingThreadDone &&
- (_saveNowThreads.size() == 0) &&
- (_saveSizeTrigger < 0 || static_cast<Int>(_modifiedQueue.size()) < _saveSizeTrigger))
- {
- if(_savePeriod == IceUtil::Time::milliSeconds(0))
- {
- wait();
- }
- else if(timedWait(_savePeriod) == false)
- {
- //
- // Timeout, so let's save
- //
- break; // while
- }
- }
-
- saveNowThreadsSize = _saveNowThreads.size();
-
- if(_savingThreadDone)
- {
- assert(_modifiedQueue.size() == 0);
- assert(saveNowThreadsSize == 0);
- break; // for(;;)
- }
-
- //
- // Check first if there is something to do!
- //
- if(_modifiedQueue.size() == 0)
- {
- if(saveNowThreadsSize > 0)
- {
- _saveNowThreads.clear();
- notifyAll();
- }
- continue; // for(;;)
- }
-
- _modifiedQueue.swap(allObjects);
- }
-
- const size_t size = allObjects.size();
-
- deque<StreamedObject> streamedObjectQueue;
-
- Long streamStart = IceUtil::Time::now().toMilliSeconds();
-
- //
- // Stream each element
- //
- for(size_t i = 0; i < size; i++)
- {
- EvictorElementPtr& element = allObjects[i];
-
- bool tryAgain;
- do
- {
- tryAgain = false;
- ObjectPtr servant = 0;
-
- //
- // These elements can't be stale as only elements with
- // usageCount == 0 can become stale, and the modifiedQueue
- // (us now) owns one count.
- //
-
- IceUtil::Mutex::Lock lockElement(element->mutex);
- Byte status = element->status;
-
- switch(status)
- {
- case EvictorElement::created:
- case EvictorElement::modified:
- {
- servant = element->rec.servant;
- break;
- }
- case EvictorElement::destroyed:
- {
- size_t index = streamedObjectQueue.size();
- streamedObjectQueue.resize(index + 1);
- StreamedObject& obj = streamedObjectQueue[index];
- stream(element, streamStart, obj);
-
- element->status = EvictorElement::dead;
- deadObjects.push_back(element);
-
- break;
- }
- case EvictorElement::dead:
- {
- deadObjects.push_back(element);
- break;
- }
- default:
- {
- //
- // Nothing to do (could be a duplicate)
- //
- break;
- }
- }
- if(servant == 0)
- {
- lockElement.release();
- }
- else
- {
- IceUtil::AbstractMutex* mutex = dynamic_cast<IceUtil::AbstractMutex*>(servant.get());
- if(mutex != 0)
- {
- //
- // Lock servant and then element so that user can safely lock
- // servant and call various Evictor operations
- //
-
- IceUtil::AbstractMutex::TryLock lockServant(*mutex);
- if(!lockServant.acquired())
- {
- lockElement.release();
-
- if(_watchDogThread != 0)
- {
- _watchDogThread->activate();
- }
- lockServant.acquire();
- if(_watchDogThread != 0)
- {
- _watchDogThread->deactivate();
- }
-
- lockElement.acquire();
- status = element->status;
- }
+ for(;;)
+ {
+ deque<EvictorElementPtr> allObjects;
+ deque<EvictorElementPtr> deadObjects;
+
+ size_t saveNowThreadsSize = 0;
+
+ {
+ Lock sync(*this);
+
+ while(!_savingThreadDone &&
+ (_saveNowThreads.size() == 0) &&
+ (_saveSizeTrigger < 0 || static_cast<Int>(_modifiedQueue.size()) < _saveSizeTrigger))
+ {
+ if(_savePeriod == IceUtil::Time::milliSeconds(0))
+ {
+ wait();
+ }
+ else if(timedWait(_savePeriod) == false)
+ {
+ //
+ // Timeout, so let's save
+ //
+ break; // while
+ }
+ }
+
+ saveNowThreadsSize = _saveNowThreads.size();
+
+ if(_savingThreadDone)
+ {
+ assert(_modifiedQueue.size() == 0);
+ assert(saveNowThreadsSize == 0);
+ break; // for(;;)
+ }
+
+ //
+ // Check first if there is something to do!
+ //
+ if(_modifiedQueue.size() == 0)
+ {
+ if(saveNowThreadsSize > 0)
+ {
+ _saveNowThreads.clear();
+ notifyAll();
+ }
+ continue; // for(;;)
+ }
+
+ _modifiedQueue.swap(allObjects);
+ }
+
+ const size_t size = allObjects.size();
+
+ deque<StreamedObject> streamedObjectQueue;
+
+ Long streamStart = IceUtil::Time::now().toMilliSeconds();
+
+ //
+ // Stream each element
+ //
+ for(size_t i = 0; i < size; i++)
+ {
+ EvictorElementPtr& element = allObjects[i];
+
+ bool tryAgain;
+ do
+ {
+ tryAgain = false;
+ ObjectPtr servant = 0;
+
+ //
+ // These elements can't be stale as only elements with
+ // usageCount == 0 can become stale, and the modifiedQueue
+ // (us now) owns one count.
+ //
+
+ IceUtil::Mutex::Lock lockElement(element->mutex);
+ Byte status = element->status;
+
+ switch(status)
+ {
+ case EvictorElement::created:
+ case EvictorElement::modified:
+ {
+ servant = element->rec.servant;
+ break;
+ }
+ case EvictorElement::destroyed:
+ {
+ size_t index = streamedObjectQueue.size();
+ streamedObjectQueue.resize(index + 1);
+ StreamedObject& obj = streamedObjectQueue[index];
+ stream(element, streamStart, obj);
+
+ element->status = EvictorElement::dead;
+ deadObjects.push_back(element);
+
+ break;
+ }
+ case EvictorElement::dead:
+ {
+ deadObjects.push_back(element);
+ break;
+ }
+ default:
+ {
+ //
+ // Nothing to do (could be a duplicate)
+ //
+ break;
+ }
+ }
+ if(servant == 0)
+ {
+ lockElement.release();
+ }
+ else
+ {
+ IceUtil::AbstractMutex* mutex = dynamic_cast<IceUtil::AbstractMutex*>(servant.get());
+ if(mutex != 0)
+ {
+ //
+ // Lock servant and then element so that user can safely lock
+ // servant and call various Evictor operations
+ //
+
+ IceUtil::AbstractMutex::TryLock lockServant(*mutex);
+ if(!lockServant.acquired())
+ {
+ lockElement.release();
+
+ if(_watchDogThread != 0)
+ {
+ _watchDogThread->activate();
+ }
+ lockServant.acquire();
+ if(_watchDogThread != 0)
+ {
+ _watchDogThread->deactivate();
+ }
+
+ lockElement.acquire();
+ status = element->status;
+ }
- switch(status)
- {
- case EvictorElement::created:
- case EvictorElement::modified:
- {
- if(servant == element->rec.servant)
- {
- size_t index = streamedObjectQueue.size();
- streamedObjectQueue.resize(index + 1);
- StreamedObject& obj = streamedObjectQueue[index];
- stream(element, streamStart, obj);
-
- element->status = EvictorElement::clean;
- }
- else
- {
- tryAgain = true;
- }
- break;
- }
- case EvictorElement::destroyed:
- {
- lockServant.release();
-
- size_t index = streamedObjectQueue.size();
- streamedObjectQueue.resize(index + 1);
- StreamedObject& obj = streamedObjectQueue[index];
- stream(element, streamStart, obj);
-
- element->status = EvictorElement::dead;
- deadObjects.push_back(element);
- break;
- }
- case EvictorElement::dead:
- {
- deadObjects.push_back(element);
- break;
- }
- default:
- {
- //
- // Nothing to do (could be a duplicate)
- //
- break;
- }
- }
- }
- else
- {
- DatabaseException ex(__FILE__, __LINE__);
- ex.message = string(typeid(*element->rec.servant).name())
- + " does not implement IceUtil::AbstractMutex";
- throw ex;
- }
- }
- } while(tryAgain);
- }
-
- if(_trace >= 1)
- {
- Long now = IceUtil::Time::now().toMilliSeconds();
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "streamed " << streamedObjectQueue.size() << " objects in "
- << static_cast<Int>(now - streamStart) << " ms";
- }
-
- //
- // Now let's save all these streamed objects to disk using a transaction
- //
-
- //
- // Each time we get a deadlock, we reduce the number of objects to save
- // per transaction
- //
- size_t txSize = streamedObjectQueue.size();
- if(txSize > static_cast<size_t>(_maxTxSize))
- {
- txSize = static_cast<size_t>(_maxTxSize);
- }
- bool tryAgain;
-
- do
- {
- tryAgain = false;
-
- while(streamedObjectQueue.size() > 0)
- {
- if(txSize > streamedObjectQueue.size())
- {
- txSize = streamedObjectQueue.size();
- }
-
- Long saveStart = IceUtil::Time::now().toMilliSeconds();
- try
- {
- DbTxn* tx = 0;
- _dbEnv->getEnv()->txn_begin(0, &tx, 0);
-
- long txnId = 0;
- if(_txTrace >= 1)
- {
- txnId = (tx->id() & 0x7FFFFFFF) + 0x80000000L;
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "started transaction " << hex << txnId << dec << " in saving thread";
- }
-
- try
- {
- for(size_t i = 0; i < txSize; i++)
- {
- StreamedObject& obj = streamedObjectQueue[i];
- obj.store->save(obj.key, obj.value, obj.status, tx);
- }
- }
- catch(...)
- {
- tx->abort();
- if(_txTrace >= 1)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "rolled back transaction " << hex << txnId << dec;
- }
- throw;
- }
- tx->commit(0);
-
- if(_txTrace >= 1)
- {
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "committed transaction " << hex << txnId << dec;
- }
-
- streamedObjectQueue.erase
- (streamedObjectQueue.begin(),
- streamedObjectQueue.begin() + txSize);
-
- if(_trace >= 1)
- {
- Long now = IceUtil::Time::now().toMilliSeconds();
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "saved " << txSize << " objects in "
- << static_cast<Int>(now - saveStart) << " ms";
- }
- }
- catch(const DbDeadlockException&)
- {
- if(_deadlockWarning)
- {
- Warning out(_communicator->getLogger());
- out << "Deadlock in Freeze::EvictorI::run while writing into Db \"" + _filename
- + "\"; retrying ...";
- }
-
- tryAgain = true;
- txSize = (txSize + 1)/2;
- }
- catch(const DbException& dx)
- {
- DatabaseException ex(__FILE__, __LINE__);
- ex.message = dx.what();
- throw ex;
- }
- }
- }
- while(tryAgain);
-
- {
- Lock sync(*this);
-
- //
- // Release usage count
- //
- for(deque<EvictorElementPtr>::iterator p = allObjects.begin();
- p != allObjects.end(); p++)
- {
- EvictorElementPtr& element = *p;
- element->usageCount--;
- }
- allObjects.clear();
-
- for(deque<EvictorElementPtr>::iterator q = deadObjects.begin();
- q != deadObjects.end(); q++)
- {
- EvictorElementPtr& element = *q;
- if(!element->stale)
- {
- //
- // Can be stale when there are duplicate elements on the
- // deadObjecst queue
- //
-
- if(!element->stale && element->usageCount == 0 && element->keepCount == 0)
- {
- //
- // Get rid of unused dead elements
- //
- IceUtil::Mutex::Lock lockElement(element->mutex);
- if(element->status == EvictorElement::dead)
- {
- evict(element);
- }
- }
- }
- }
- deadObjects.clear();
- evict();
-
- if(saveNowThreadsSize > 0)
- {
- _saveNowThreads.erase(_saveNowThreads.begin(), _saveNowThreads.begin() + saveNowThreadsSize);
- notifyAll();
- }
- }
- }
+ switch(status)
+ {
+ case EvictorElement::created:
+ case EvictorElement::modified:
+ {
+ if(servant == element->rec.servant)
+ {
+ size_t index = streamedObjectQueue.size();
+ streamedObjectQueue.resize(index + 1);
+ StreamedObject& obj = streamedObjectQueue[index];
+ stream(element, streamStart, obj);
+
+ element->status = EvictorElement::clean;
+ }
+ else
+ {
+ tryAgain = true;
+ }
+ break;
+ }
+ case EvictorElement::destroyed:
+ {
+ lockServant.release();
+
+ size_t index = streamedObjectQueue.size();
+ streamedObjectQueue.resize(index + 1);
+ StreamedObject& obj = streamedObjectQueue[index];
+ stream(element, streamStart, obj);
+
+ element->status = EvictorElement::dead;
+ deadObjects.push_back(element);
+ break;
+ }
+ case EvictorElement::dead:
+ {
+ deadObjects.push_back(element);
+ break;
+ }
+ default:
+ {
+ //
+ // Nothing to do (could be a duplicate)
+ //
+ break;
+ }
+ }
+ }
+ else
+ {
+ DatabaseException ex(__FILE__, __LINE__);
+ ex.message = string(typeid(*element->rec.servant).name())
+ + " does not implement IceUtil::AbstractMutex";
+ throw ex;
+ }
+ }
+ } while(tryAgain);
+ }
+
+ if(_trace >= 1)
+ {
+ Long now = IceUtil::Time::now().toMilliSeconds();
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "streamed " << streamedObjectQueue.size() << " objects in "
+ << static_cast<Int>(now - streamStart) << " ms";
+ }
+
+ //
+ // Now let's save all these streamed objects to disk using a transaction
+ //
+
+ //
+ // Each time we get a deadlock, we reduce the number of objects to save
+ // per transaction
+ //
+ size_t txSize = streamedObjectQueue.size();
+ if(txSize > static_cast<size_t>(_maxTxSize))
+ {
+ txSize = static_cast<size_t>(_maxTxSize);
+ }
+ bool tryAgain;
+
+ do
+ {
+ tryAgain = false;
+
+ while(streamedObjectQueue.size() > 0)
+ {
+ if(txSize > streamedObjectQueue.size())
+ {
+ txSize = streamedObjectQueue.size();
+ }
+
+ Long saveStart = IceUtil::Time::now().toMilliSeconds();
+ try
+ {
+ DbTxn* tx = 0;
+ _dbEnv->getEnv()->txn_begin(0, &tx, 0);
+
+ long txnId = 0;
+ if(_txTrace >= 1)
+ {
+ txnId = (tx->id() & 0x7FFFFFFF) + 0x80000000L;
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "started transaction " << hex << txnId << dec << " in saving thread";
+ }
+
+ try
+ {
+ for(size_t i = 0; i < txSize; i++)
+ {
+ StreamedObject& obj = streamedObjectQueue[i];
+ obj.store->save(obj.key, obj.value, obj.status, tx);
+ }
+ }
+ catch(...)
+ {
+ tx->abort();
+ if(_txTrace >= 1)
+ {
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "rolled back transaction " << hex << txnId << dec;
+ }
+ throw;
+ }
+ tx->commit(0);
+
+ if(_txTrace >= 1)
+ {
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "committed transaction " << hex << txnId << dec;
+ }
+
+ streamedObjectQueue.erase
+ (streamedObjectQueue.begin(),
+ streamedObjectQueue.begin() + txSize);
+
+ if(_trace >= 1)
+ {
+ Long now = IceUtil::Time::now().toMilliSeconds();
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "saved " << txSize << " objects in "
+ << static_cast<Int>(now - saveStart) << " ms";
+ }
+ }
+ catch(const DbDeadlockException&)
+ {
+ if(_deadlockWarning)
+ {
+ Warning out(_communicator->getLogger());
+ out << "Deadlock in Freeze::EvictorI::run while writing into Db \"" + _filename
+ + "\"; retrying ...";
+ }
+
+ tryAgain = true;
+ txSize = (txSize + 1)/2;
+ }
+ catch(const DbException& dx)
+ {
+ DatabaseException ex(__FILE__, __LINE__);
+ ex.message = dx.what();
+ throw ex;
+ }
+ }
+ }
+ while(tryAgain);
+
+ {
+ Lock sync(*this);
+
+ //
+ // Release usage count
+ //
+ for(deque<EvictorElementPtr>::iterator p = allObjects.begin();
+ p != allObjects.end(); p++)
+ {
+ EvictorElementPtr& element = *p;
+ element->usageCount--;
+ }
+ allObjects.clear();
+
+ for(deque<EvictorElementPtr>::iterator q = deadObjects.begin();
+ q != deadObjects.end(); q++)
+ {
+ EvictorElementPtr& element = *q;
+ if(!element->stale)
+ {
+ //
+ // Can be stale when there are duplicate elements on the
+ // deadObjecst queue
+ //
+
+ if(!element->stale && element->usageCount == 0 && element->keepCount == 0)
+ {
+ //
+ // Get rid of unused dead elements
+ //
+ IceUtil::Mutex::Lock lockElement(element->mutex);
+ if(element->status == EvictorElement::dead)
+ {
+ evict(element);
+ }
+ }
+ }
+ }
+ deadObjects.clear();
+ evict();
+
+ if(saveNowThreadsSize > 0)
+ {
+ _saveNowThreads.erase(_saveNowThreads.begin(), _saveNowThreads.begin() + saveNowThreadsSize);
+ notifyAll();
+ }
+ }
+ }
}
catch(const IceUtil::Exception& ex)
{
- Error out(_communicator->getLogger());
- out << "Saving thread killed by exception: " << ex;
- out.flush();
- handleFatalError(this, _communicator);
+ Error out(_communicator->getLogger());
+ out << "Saving thread killed by exception: " << ex;
+ out.flush();
+ handleFatalError(this, _communicator);
}
catch(const std::exception& ex)
{
- Error out(_communicator->getLogger());
- out << "Saving thread killed by std::exception: " << ex.what();
- out.flush();
- handleFatalError(this, _communicator);
+ Error out(_communicator->getLogger());
+ out << "Saving thread killed by std::exception: " << ex.what();
+ out.flush();
+ handleFatalError(this, _communicator);
}
catch(...)
{
- Error out(_communicator->getLogger());
- out << "Saving thread killed by unknown exception";
- out.flush();
- handleFatalError(this, _communicator);
+ Error out(_communicator->getLogger());
+ out << "Saving thread killed by unknown exception";
+ out.flush();
+ handleFatalError(this, _communicator);
}
}
@@ -1665,7 +1665,7 @@ Freeze::EvictorI::saveNowNoSync()
notifyAll();
do
{
- wait();
+ wait();
}
while(find(_saveNowThreads.begin(), _saveNowThreads.end(), myself) != _saveNowThreads.end());
}
@@ -1683,50 +1683,50 @@ Freeze::EvictorI::evict()
while(_currentEvictorSize > _evictorSize)
{
- //
- // Get the last unused element from the evictor queue.
- //
- while(p != _evictorList.rend())
- {
- if((*p)->usageCount == 0)
- {
- break; // Fine, servant is not in use (and not in the modifiedQueue)
- }
- ++p;
- }
- if(p == _evictorList.rend())
- {
- //
- // All servants are active, can't evict any further.
- //
- break;
- }
-
- EvictorElementPtr& element = *p;
- assert(!element->stale);
- assert(element->keepCount == 0);
-
- if(_trace >= 2 || (_trace >= 1 && _evictorList.size() % 50 == 0))
- {
- string facet = element->store.facet();
-
- Trace out(_communicator->getLogger(), "Freeze.Evictor");
- out << "evicting \"" << _communicator->identityToString(element->cachePosition->first) << "\" ";
- if(facet != "")
- {
- out << "-f \"" << facet << "\" ";
- }
- out << "from the queue\n"
- << "number of elements in the queue: " << _currentEvictorSize;
- }
-
- //
- // Remove last unused element from the evictor queue.
- //
- element->stale = true;
- element->store.unpin(element->cachePosition);
- p = list<EvictorElementPtr>::reverse_iterator(_evictorList.erase(element->evictPosition));
- _currentEvictorSize--;
+ //
+ // Get the last unused element from the evictor queue.
+ //
+ while(p != _evictorList.rend())
+ {
+ if((*p)->usageCount == 0)
+ {
+ break; // Fine, servant is not in use (and not in the modifiedQueue)
+ }
+ ++p;
+ }
+ if(p == _evictorList.rend())
+ {
+ //
+ // All servants are active, can't evict any further.
+ //
+ break;
+ }
+
+ EvictorElementPtr& element = *p;
+ assert(!element->stale);
+ assert(element->keepCount == 0);
+
+ if(_trace >= 2 || (_trace >= 1 && _evictorList.size() % 50 == 0))
+ {
+ string facet = element->store.facet();
+
+ Trace out(_communicator->getLogger(), "Freeze.Evictor");
+ out << "evicting \"" << _communicator->identityToString(element->cachePosition->first) << "\" ";
+ if(facet != "")
+ {
+ out << "-f \"" << facet << "\" ";
+ }
+ out << "from the queue\n"
+ << "number of elements in the queue: " << _currentEvictorSize;
+ }
+
+ //
+ // Remove last unused element from the evictor queue.
+ //
+ element->stale = true;
+ element->store.unpin(element->cachePosition);
+ p = list<EvictorElementPtr>::reverse_iterator(_evictorList.erase(element->evictPosition));
+ _currentEvictorSize--;
}
}
@@ -1737,20 +1737,20 @@ Freeze::EvictorI::fixEvictPosition(const EvictorElementPtr& element)
if(element->keepCount == 0)
{
- if(element->usageCount < 0)
- {
- //
- // New object
- //
- element->usageCount = 0;
- _currentEvictorSize++;
- }
- else
- {
- _evictorList.erase(element->evictPosition);
- }
- _evictorList.push_front(element);
- element->evictPosition = _evictorList.begin();
+ if(element->usageCount < 0)
+ {
+ //
+ // New object
+ //
+ element->usageCount = 0;
+ _currentEvictorSize++;
+ }
+ else
+ {
+ _evictorList.erase(element->evictPosition);
+ }
+ _evictorList.push_front(element);
+ element->evictPosition = _evictorList.begin();
}
}
@@ -1775,7 +1775,7 @@ Freeze::EvictorI::addToModifiedQueue(const EvictorElementPtr& element)
if(_saveSizeTrigger >= 0 && static_cast<Int>(_modifiedQueue.size()) >= _saveSizeTrigger)
{
- notifyAll();
+ notifyAll();
}
}
@@ -1793,22 +1793,22 @@ Freeze::EvictorI::stream(const EvictorElementPtr& element, Long streamStart, Str
if(element->status != EvictorElement::destroyed)
{
- //
- // Update stats first
- //
- Statistics& stats = element->rec.stats;
- Long diff = streamStart - (stats.creationTime + stats.lastSaveTime);
- if(stats.lastSaveTime == 0)
- {
- stats.lastSaveTime = diff;
- stats.avgSaveTime = diff;
- }
- else
- {
- stats.lastSaveTime = streamStart - stats.creationTime;
- stats.avgSaveTime = static_cast<Long>(stats.avgSaveTime * 0.95 + diff * 0.05);
- }
- ObjectStore::marshal(element->rec, obj.value, _communicator);
+ //
+ // Update stats first
+ //
+ Statistics& stats = element->rec.stats;
+ Long diff = streamStart - (stats.creationTime + stats.lastSaveTime);
+ if(stats.lastSaveTime == 0)
+ {
+ stats.lastSaveTime = diff;
+ stats.avgSaveTime = diff;
+ }
+ else
+ {
+ stats.lastSaveTime = streamStart - stats.creationTime;
+ stats.avgSaveTime = static_cast<Long>(stats.avgSaveTime * 0.95 + diff * 0.05);
+ }
+ ObjectStore::marshal(element->rec, obj.value, _communicator);
}
}
@@ -1820,11 +1820,11 @@ Freeze::EvictorI::findStore(const string& facet) const
StoreMap::const_iterator p = _storeMap.find(facet);
if(p == _storeMap.end())
{
- return 0;
+ return 0;
}
else
{
- return (*p).second;
+ return (*p).second;
}
}
@@ -1836,45 +1836,45 @@ Freeze::EvictorI::allDbs() const
try
{
- Db db(_dbEnv->getEnv(), 0);
- db.open(0, _filename.c_str(), 0, DB_UNKNOWN, DB_RDONLY, 0);
-
- Dbc* dbc = 0;
- db.cursor(0, &dbc, 0);
-
- Dbt dbKey;
- dbKey.set_flags(DB_DBT_MALLOC);
-
- Dbt dbValue;
- dbValue.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
-
- bool more = true;
- while(more)
- {
- more = (dbc->get(&dbKey, &dbValue, DB_NEXT) == 0);
- if(more)
- {
- string dbName(static_cast<char*>(dbKey.get_data()), dbKey.get_size());
-
- if(dbName.find(indexPrefix) != 0)
- {
- result.push_back(dbName);
- }
- free(dbKey.get_data());
- }
- }
-
- dbc->close();
- db.close(0);
+ Db db(_dbEnv->getEnv(), 0);
+ db.open(0, _filename.c_str(), 0, DB_UNKNOWN, DB_RDONLY, 0);
+
+ Dbc* dbc = 0;
+ db.cursor(0, &dbc, 0);
+
+ Dbt dbKey;
+ dbKey.set_flags(DB_DBT_MALLOC);
+
+ Dbt dbValue;
+ dbValue.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
+
+ bool more = true;
+ while(more)
+ {
+ more = (dbc->get(&dbKey, &dbValue, DB_NEXT) == 0);
+ if(more)
+ {
+ string dbName(static_cast<char*>(dbKey.get_data()), dbKey.get_size());
+
+ if(dbName.find(indexPrefix) != 0)
+ {
+ result.push_back(dbName);
+ }
+ free(dbKey.get_data());
+ }
+ }
+
+ dbc->close();
+ db.close(0);
}
catch(const DbException& dx)
{
- if(dx.get_errno() != ENOENT)
- {
- DatabaseException ex(__FILE__, __LINE__);
- ex.message = dx.what();
- throw ex;
- }
+ if(dx.get_errno() != ENOENT)
+ {
+ DatabaseException ex(__FILE__, __LINE__);
+ ex.message = dx.what();
+ throw ex;
+ }
}
return result;