diff options
author | Bernard Normier <bernard@zeroc.com> | 2007-02-01 17:09:49 +0000 |
---|---|---|
committer | Bernard Normier <bernard@zeroc.com> | 2007-02-01 17:09:49 +0000 |
commit | abada90e3f84dc703b8ddc9efcbed8a946fadead (patch) | |
tree | 2c6f9dccd510ea97cb927a7bd635422efaae547a /cpp/src/Freeze/EvictorI.cpp | |
parent | removing trace message (diff) | |
download | ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.tar.bz2 ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.tar.xz ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.zip |
Expanded tabs into spaces
Diffstat (limited to 'cpp/src/Freeze/EvictorI.cpp')
-rw-r--r-- | cpp/src/Freeze/EvictorI.cpp | 2280 |
1 files changed, 1140 insertions, 1140 deletions
diff --git a/cpp/src/Freeze/EvictorI.cpp b/cpp/src/Freeze/EvictorI.cpp index 75e1722c004..a3bffc1a2bc 100644 --- a/cpp/src/Freeze/EvictorI.cpp +++ b/cpp/src/Freeze/EvictorI.cpp @@ -36,23 +36,23 @@ string Freeze::EvictorI::indexPrefix = "$index:"; Freeze::EvictorPtr Freeze::createEvictor(const ObjectAdapterPtr& adapter, - const string& envName, - const string& filename, - const ServantInitializerPtr& initializer, - const vector<IndexPtr>& indices, - bool createDb) + const string& envName, + const string& filename, + const ServantInitializerPtr& initializer, + const vector<IndexPtr>& indices, + bool createDb) { return new EvictorI(adapter, envName, 0, filename, initializer, indices, createDb); } Freeze::EvictorPtr Freeze::createEvictor(const ObjectAdapterPtr& adapter, - const string& envName, - DbEnv& dbEnv, - const string& filename, - const ServantInitializerPtr& initializer, - const vector<IndexPtr>& indices, - bool createDb) + const string& envName, + DbEnv& dbEnv, + const string& filename, + const ServantInitializerPtr& initializer, + const vector<IndexPtr>& indices, + bool createDb) { return new EvictorI(adapter, envName, &dbEnv, filename, initializer, indices, createDb); } @@ -78,11 +78,11 @@ handleFatalError(const Freeze::EvictorPtr& evictor, const Ice::CommunicatorPtr& IceUtil::StaticMutex::Lock lock(fatalErrorCallbackMutex); if(fatalErrorCallback != 0) { - fatalErrorCallback(evictor, communicator); + fatalErrorCallback(evictor, communicator); } else { - ::abort(); + ::abort(); } } @@ -112,7 +112,7 @@ Freeze::DeactivateController::Guard::Guard(DeactivateController& controller) : Lock sync(controller); if(controller._deactivated || _controller._deactivating) { - throw EvictorDeactivatedException(__FILE__, __LINE__); + throw EvictorDeactivatedException(__FILE__, __LINE__); } controller._guardCount++; } @@ -123,11 +123,11 @@ Freeze::DeactivateController::Guard::~Guard() _controller._guardCount--; if(_controller._deactivating && _controller._guardCount == 0) { - // - // Notify all the threads -- although we only want to - // reach the thread doing the deactivation. - // - _controller.notifyAll(); + // + // Notify all the threads -- although we only want to + // reach the thread doing the deactivation. + // + _controller.notifyAll(); } } @@ -153,40 +153,40 @@ Freeze::DeactivateController::deactivate() if(_deactivated) { - return false; + return false; } if(_deactivating) { - // - // Wait for deactivated - // - while(!_deactivated) - { - wait(); - } - return false; + // + // Wait for deactivated + // + while(!_deactivated) + { + wait(); + } + return false; } else { - _deactivating = true; - while(_guardCount > 0) - { - if(_evictor->trace() >= 1) - { - Trace out(_evictor->communicator()->getLogger(), "Freeze.Evictor"); - out << "*** Waiting for " << _guardCount << " threads to complete before starting deactivation."; - } - - wait(); - } - - if(_evictor->trace() >= 1) - { - Trace out(_evictor->communicator()->getLogger(), "Freeze.Evictor"); - out << "Starting deactivation."; - } - return true; + _deactivating = true; + while(_guardCount > 0) + { + if(_evictor->trace() >= 1) + { + Trace out(_evictor->communicator()->getLogger(), "Freeze.Evictor"); + out << "*** Waiting for " << _guardCount << " threads to complete before starting deactivation."; + } + + wait(); + } + + if(_evictor->trace() >= 1) + { + Trace out(_evictor->communicator()->getLogger(), "Freeze.Evictor"); + out << "Starting deactivation."; + } + return true; } } @@ -195,8 +195,8 @@ Freeze::DeactivateController::deactivationComplete() { if(_evictor->trace() >= 1) { - Trace out(_evictor->communicator()->getLogger(), "Freeze.Evictor"); - out << "Deactivation complete."; + Trace out(_evictor->communicator()->getLogger(), "Freeze.Evictor"); + out << "Deactivation complete."; } Lock sync(*this); @@ -225,20 +225,20 @@ Freeze::WatchDogThread::run() while(!_done) { - if(_active) - { - if(timedWait(_timeout) == false && _active && !_done) - { - Error out(_evictor.communicator()->getLogger()); - out << "Fatal error: streaming watch dog thread timed out."; - out.flush(); - handleFatalError(&_evictor, _evictor.communicator()); - } - } - else - { - wait(); - } + if(_active) + { + if(timedWait(_timeout) == false && _active && !_done) + { + Error out(_evictor.communicator()->getLogger()); + out << "Fatal error: streaming watch dog thread timed out."; + out.flush(); + handleFatalError(&_evictor, _evictor.communicator()); + } + } + else + { + wait(); + } } } @@ -270,12 +270,12 @@ Freeze::WatchDogThread::terminate() // Freeze::EvictorI::EvictorI(const ObjectAdapterPtr& adapter, - const string& envName, - DbEnv* dbEnv, - const string& filename, - const ServantInitializerPtr& initializer, - const vector<IndexPtr>& indices, - bool createDb) : + const string& envName, + DbEnv* dbEnv, + const string& filename, + const ServantInitializerPtr& initializer, + const vector<IndexPtr>& indices, + bool createDb) : _evictorSize(10), _currentEvictorSize(0), @@ -304,10 +304,10 @@ Freeze::EvictorI::EvictorI(const ObjectAdapterPtr& adapter, // _saveSizeTrigger = _communicator->getProperties()-> - getPropertyAsIntWithDefault(propertyPrefix + ".SaveSizeTrigger", 10); + getPropertyAsIntWithDefault(propertyPrefix + ".SaveSizeTrigger", 10); Int savePeriod = _communicator->getProperties()-> - getPropertyAsIntWithDefault(propertyPrefix + ".SavePeriod", 60 * 1000); + getPropertyAsIntWithDefault(propertyPrefix + ".SavePeriod", 60 * 1000); _savePeriod = IceUtil::Time::milliSeconds(savePeriod); @@ -315,17 +315,17 @@ Freeze::EvictorI::EvictorI(const ObjectAdapterPtr& adapter, // By default, we save at most 10 * SaveSizeTrigger objects per transaction // _maxTxSize = _communicator->getProperties()-> - getPropertyAsIntWithDefault(propertyPrefix + ".MaxTxSize", 10 * _saveSizeTrigger); + getPropertyAsIntWithDefault(propertyPrefix + ".MaxTxSize", 10 * _saveSizeTrigger); if(_maxTxSize <= 0) { - _maxTxSize = 100; - } + _maxTxSize = 100; + } bool populateEmptyIndices = - (_communicator->getProperties()-> - getPropertyAsIntWithDefault(propertyPrefix + ".PopulateEmptyIndices", 0) != 0); - + (_communicator->getProperties()-> + getPropertyAsIntWithDefault(propertyPrefix + ".PopulateEmptyIndices", 0) != 0); + // // Instantiate all Dbs in 2 steps: // (1) iterate over the indices and create ObjectStore with indices @@ -342,57 +342,57 @@ Freeze::EvictorI::EvictorI(const ObjectAdapterPtr& adapter, for(vector<IndexPtr>::const_iterator i = indices.begin(); i != indices.end(); ++i) { - string facet = (*i)->facet(); + string facet = (*i)->facet(); - StoreMap::iterator q = _storeMap.find(facet); - if(q == _storeMap.end()) - { - // - // New db - // + StoreMap::iterator q = _storeMap.find(facet); + if(q == _storeMap.end()) + { + // + // New db + // - vector<IndexPtr> storeIndices; + vector<IndexPtr> storeIndices; - for(vector<IndexPtr>::const_iterator r = i; r != indices.end(); ++r) - { - if((*r)->facet() == facet) - { - storeIndices.push_back(*r); - } - } - ObjectStore* store = new ObjectStore(facet, _createDb, this, storeIndices, populateEmptyIndices); - _storeMap.insert(StoreMap::value_type(facet, store)); - } + for(vector<IndexPtr>::const_iterator r = i; r != indices.end(); ++r) + { + if((*r)->facet() == facet) + { + storeIndices.push_back(*r); + } + } + ObjectStore* store = new ObjectStore(facet, _createDb, this, storeIndices, populateEmptyIndices); + _storeMap.insert(StoreMap::value_type(facet, store)); + } } for(vector<string>::iterator p = dbs.begin(); p != dbs.end(); ++p) { - string facet = *p; - if(facet == defaultDb) - { - facet = ""; - } - - pair<StoreMap::iterator, bool> ir = - _storeMap.insert(StoreMap::value_type(facet, 0)); - - if(ir.second) - { - ir.first->second = new ObjectStore(facet, _createDb, this); - } + string facet = *p; + if(facet == defaultDb) + { + facet = ""; + } + + pair<StoreMap::iterator, bool> ir = + _storeMap.insert(StoreMap::value_type(facet, 0)); + + if(ir.second) + { + ir.first->second = new ObjectStore(facet, _createDb, this); + } } // // By default, no stream timeout // long streamTimeout = _communicator->getProperties()-> - getPropertyAsIntWithDefault(propertyPrefix+ ".StreamTimeout", 0) * 1000; + getPropertyAsIntWithDefault(propertyPrefix+ ".StreamTimeout", 0) * 1000; if(streamTimeout > 0) { - _watchDogThread = new WatchDogThread(streamTimeout, *this); - _watchDogThread->start(); + _watchDogThread = new WatchDogThread(streamTimeout, *this); + _watchDogThread->start(); } // @@ -405,13 +405,13 @@ Freeze::EvictorI::~EvictorI() { if(!_deactivateController.deactivated()) { - Warning out(_communicator->getLogger()); - out << "evictor has not been deactivated"; + Warning out(_communicator->getLogger()); + out << "evictor has not been deactivated"; - // - // Need to deactivate to save objects and join saving thread - // - deactivate(""); + // + // Need to deactivate to save objects and join saving thread + // + deactivate(""); } // @@ -419,7 +419,7 @@ Freeze::EvictorI::~EvictorI() // for(StoreMap::iterator p = _storeMap.begin(); p != _storeMap.end(); ++p) { - delete (*p).second; + delete (*p).second; } } @@ -435,7 +435,7 @@ Freeze::EvictorI::setSize(Int evictorSize) // if(evictorSize < 0) { - return; + return; } // @@ -473,35 +473,35 @@ Freeze::EvictorI::addFacet(const ObjectPtr& servant, const Identity& ident, cons for(;;) { - { - Lock sync(*this); - - StoreMap::iterator p = _storeMap.find(facet); - if(p == _storeMap.end()) - { - if(store != 0) - { - _storeMap.insert(StoreMap::value_type(facet, store)); - } - } - else - { - delete store; - store = (*p).second; - assert(store != 0); - } - } - - if(store == 0) - { - assert(facet != ""); - store = new ObjectStore(facet, _createDb, this); - // loop - } - else - { - break; // for(;;) - } + { + Lock sync(*this); + + StoreMap::iterator p = _storeMap.find(facet); + if(p == _storeMap.end()) + { + if(store != 0) + { + _storeMap.insert(StoreMap::value_type(facet, store)); + } + } + else + { + delete store; + store = (*p).second; + assert(store != 0); + } + } + + if(store == 0) + { + assert(facet != ""); + store = new ObjectStore(facet, _createDb, this); + // loop + } + else + { + break; // for(;;) + } } assert(store != 0); @@ -509,103 +509,103 @@ Freeze::EvictorI::addFacet(const ObjectPtr& servant, const Identity& ident, cons for(;;) { - // - // Create a new entry - // - - EvictorElementPtr element = new EvictorElement(*store); - element->status = EvictorElement::dead; - EvictorElementPtr oldElt = store->putIfAbsent(ident, element); + // + // Create a new entry + // + + EvictorElementPtr element = new EvictorElement(*store); + element->status = EvictorElement::dead; + EvictorElementPtr oldElt = store->putIfAbsent(ident, element); - if(oldElt != 0) - { - element = oldElt; - } - - { - Lock sync(*this); - - if(element->stale) - { - // - // Try again - // - continue; - } - fixEvictPosition(element); - - IceUtil::Mutex::Lock lock(element->mutex); - - switch(element->status) - { - case EvictorElement::clean: - case EvictorElement::created: - case EvictorElement::modified: - { - alreadyThere = true; - break; - } - case EvictorElement::destroyed: - { - element->status = EvictorElement::modified; - element->rec.servant = servant; - - // - // No need to push it on the modified queue, as a destroyed object - // is either already on the queue or about to be saved. When saved, - // it becomes dead. - // - break; - } - case EvictorElement::dead: - { - element->status = EvictorElement::created; - ObjectRecord& rec = element->rec; - - rec.servant = servant; - rec.stats.creationTime = IceUtil::Time::now().toMilliSeconds(); - rec.stats.lastSaveTime = 0; - rec.stats.avgSaveTime = 0; - - addToModifiedQueue(element); - break; - } - default: - { - assert(0); - break; - } - } - } - break; // for(;;) + if(oldElt != 0) + { + element = oldElt; + } + + { + Lock sync(*this); + + if(element->stale) + { + // + // Try again + // + continue; + } + fixEvictPosition(element); + + IceUtil::Mutex::Lock lock(element->mutex); + + switch(element->status) + { + case EvictorElement::clean: + case EvictorElement::created: + case EvictorElement::modified: + { + alreadyThere = true; + break; + } + case EvictorElement::destroyed: + { + element->status = EvictorElement::modified; + element->rec.servant = servant; + + // + // No need to push it on the modified queue, as a destroyed object + // is either already on the queue or about to be saved. When saved, + // it becomes dead. + // + break; + } + case EvictorElement::dead: + { + element->status = EvictorElement::created; + ObjectRecord& rec = element->rec; + + rec.servant = servant; + rec.stats.creationTime = IceUtil::Time::now().toMilliSeconds(); + rec.stats.lastSaveTime = 0; + rec.stats.avgSaveTime = 0; + + addToModifiedQueue(element); + break; + } + default: + { + assert(0); + break; + } + } + } + break; // for(;;) } if(alreadyThere) { - AlreadyRegisteredException ex(__FILE__, __LINE__); - ex.kindOfObject = "servant"; - ex.id = _communicator->identityToString(ident); - if(!facet.empty()) - { - ex.id += " -f " + IceUtil::escapeString(facet, ""); - } - throw ex; + AlreadyRegisteredException ex(__FILE__, __LINE__); + ex.kindOfObject = "servant"; + ex.id = _communicator->identityToString(ident); + if(!facet.empty()) + { + ex.id += " -f " + IceUtil::escapeString(facet, ""); + } + throw ex; } if(_trace >= 1) { - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "added object \"" << _communicator->identityToString(ident) << "\""; - if(!facet.empty()) - { - out << " with facet \"" << facet << "\""; - } + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "added object \"" << _communicator->identityToString(ident) << "\""; + if(!facet.empty()) + { + out << " with facet \"" << facet << "\""; + } } ObjectPrx obj = _adapter->createProxy(ident); if(!facet.empty()) { - obj = obj->ice_facet(facet); + obj = obj->ice_facet(facet); } return obj; } @@ -627,108 +627,108 @@ Freeze::EvictorI::removeFacet(const Identity& ident, const string& facet) if(store != 0) { - for(;;) - { - // - // Retrieve object - // - - EvictorElementPtr element = store->pin(ident); - if(element != 0) - { - Lock sync(*this); - if(element->stale) - { - // - // Try again - // - continue; - } - - fixEvictPosition(element); - { - IceUtil::Mutex::Lock lock(element->mutex); - - switch(element->status) - { - case EvictorElement::clean: - { - servant = element->rec.servant; - element->status = EvictorElement::destroyed; - element->rec.servant = 0; - addToModifiedQueue(element); - break; - } - case EvictorElement::created: - { - servant = element->rec.servant; - element->status = EvictorElement::dead; - element->rec.servant = 0; - break; - } - case EvictorElement::modified: - { - servant = element->rec.servant; - element->status = EvictorElement::destroyed; - element->rec.servant = 0; - // - // Not necessary to push it on the modified queue, as a modified - // element is either on the queue already or about to be saved - // (at which point it becomes clean) - // - break; - } - case EvictorElement::destroyed: - case EvictorElement::dead: - { - break; - } - default: - { - assert(0); - break; - } - } - } - if(element->keepCount > 0) - { - assert(servant != 0); - - element->keepCount = 0; - // - // Add to front of evictor queue - // - // Note that save evicts dead objects - // - _evictorList.push_front(element); - _currentEvictorSize++; - element->evictPosition = _evictorList.begin(); - } - } - break; // for(;;) - } + for(;;) + { + // + // Retrieve object + // + + EvictorElementPtr element = store->pin(ident); + if(element != 0) + { + Lock sync(*this); + if(element->stale) + { + // + // Try again + // + continue; + } + + fixEvictPosition(element); + { + IceUtil::Mutex::Lock lock(element->mutex); + + switch(element->status) + { + case EvictorElement::clean: + { + servant = element->rec.servant; + element->status = EvictorElement::destroyed; + element->rec.servant = 0; + addToModifiedQueue(element); + break; + } + case EvictorElement::created: + { + servant = element->rec.servant; + element->status = EvictorElement::dead; + element->rec.servant = 0; + break; + } + case EvictorElement::modified: + { + servant = element->rec.servant; + element->status = EvictorElement::destroyed; + element->rec.servant = 0; + // + // Not necessary to push it on the modified queue, as a modified + // element is either on the queue already or about to be saved + // (at which point it becomes clean) + // + break; + } + case EvictorElement::destroyed: + case EvictorElement::dead: + { + break; + } + default: + { + assert(0); + break; + } + } + } + if(element->keepCount > 0) + { + assert(servant != 0); + + element->keepCount = 0; + // + // Add to front of evictor queue + // + // Note that save evicts dead objects + // + _evictorList.push_front(element); + _currentEvictorSize++; + element->evictPosition = _evictorList.begin(); + } + } + break; // for(;;) + } } if(servant == 0) { - NotRegisteredException ex(__FILE__, __LINE__); - ex.kindOfObject = "servant"; - ex.id = _communicator->identityToString(ident); - if(!facet.empty()) - { - ex.id += " -f " + IceUtil::escapeString(facet, ""); - } - throw ex; + NotRegisteredException ex(__FILE__, __LINE__); + ex.kindOfObject = "servant"; + ex.id = _communicator->identityToString(ident); + if(!facet.empty()) + { + ex.id += " -f " + IceUtil::escapeString(facet, ""); + } + throw ex; } if(_trace >= 1) { - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "removed object \"" << _communicator->identityToString(ident) << "\""; - if(!facet.empty()) - { - out << " with facet \"" << facet << "\""; - } + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "removed object \"" << _communicator->identityToString(ident) << "\""; + if(!facet.empty()) + { + out << " with facet \"" << facet << "\""; + } } return servant; } @@ -750,77 +750,77 @@ Freeze::EvictorI::keepFacet(const Identity& ident, const string& facet) ObjectStore* store = findStore(facet); if(store == 0) { - notThere = true; + notThere = true; } else { - for(;;) - { - EvictorElementPtr element = store->pin(ident); - if(element == 0) - { - notThere = true; - break; - } - - Lock sync(*this); - - if(element->stale) - { - // - // try again - // - continue; - } - - - { - IceUtil::Mutex::Lock lockElement(element->mutex); - if(element->status == EvictorElement::destroyed || element->status == EvictorElement::dead) - { - notThere = true; - break; - } - } - - // - // Found! - // - - if(element->keepCount == 0) - { - if(element->usageCount < 0) - { - // - // New object - // - element->usageCount = 0; - } - else - { - _evictorList.erase(element->evictPosition); - _currentEvictorSize--; - } - element->keepCount = 1; - } - else - { - element->keepCount++; - } - break; - } + for(;;) + { + EvictorElementPtr element = store->pin(ident); + if(element == 0) + { + notThere = true; + break; + } + + Lock sync(*this); + + if(element->stale) + { + // + // try again + // + continue; + } + + + { + IceUtil::Mutex::Lock lockElement(element->mutex); + if(element->status == EvictorElement::destroyed || element->status == EvictorElement::dead) + { + notThere = true; + break; + } + } + + // + // Found! + // + + if(element->keepCount == 0) + { + if(element->usageCount < 0) + { + // + // New object + // + element->usageCount = 0; + } + else + { + _evictorList.erase(element->evictPosition); + _currentEvictorSize--; + } + element->keepCount = 1; + } + else + { + element->keepCount++; + } + break; + } } if(notThere) { - NotRegisteredException ex(__FILE__, __LINE__); - ex.kindOfObject = "servant"; - ex.id = _communicator->identityToString(ident); - if(!facet.empty()) - { - ex.id += " -f " + IceUtil::escapeString(facet, ""); - } - throw ex; + NotRegisteredException ex(__FILE__, __LINE__); + ex.kindOfObject = "servant"; + ex.id = _communicator->identityToString(ident); + if(!facet.empty()) + { + ex.id += " -f " + IceUtil::escapeString(facet, ""); + } + throw ex; } } @@ -837,38 +837,38 @@ Freeze::EvictorI::releaseFacet(const Identity& ident, const string& facet) DeactivateController::Guard deactivateGuard(_deactivateController); { - Lock sync(*this); - - StoreMap::iterator p = _storeMap.find(facet); - if(p != _storeMap.end()) - { - ObjectStore* store = (*p).second; - - EvictorElementPtr element = store->getIfPinned(ident); - if(element != 0) - { - assert(!element->stale); - if(element->keepCount > 0) - { - if(--element->keepCount == 0) - { - // - // Add to front of evictor queue - // - // Note that the element cannot be destroyed or dead since - // its keepCount was > 0. - // - _evictorList.push_front(element); - _currentEvictorSize++; - element->evictPosition = _evictorList.begin(); - } - // - // Success - // - return; - } - } - } + Lock sync(*this); + + StoreMap::iterator p = _storeMap.find(facet); + if(p != _storeMap.end()) + { + ObjectStore* store = (*p).second; + + EvictorElementPtr element = store->getIfPinned(ident); + if(element != 0) + { + assert(!element->stale); + if(element->keepCount > 0) + { + if(--element->keepCount == 0) + { + // + // Add to front of evictor queue + // + // Note that the element cannot be destroyed or dead since + // its keepCount was > 0. + // + _evictorList.push_front(element); + _currentEvictorSize++; + element->evictPosition = _evictorList.begin(); + } + // + // Success + // + return; + } + } + } } NotRegisteredException ex(__FILE__, __LINE__); @@ -876,7 +876,7 @@ Freeze::EvictorI::releaseFacet(const Identity& ident, const string& facet) ex.id = _communicator->identityToString(ident); if(!facet.empty()) { - ex.id += " -f " + IceUtil::escapeString(facet, ""); + ex.id += " -f " + IceUtil::escapeString(facet, ""); } throw ex; } @@ -888,14 +888,14 @@ Freeze::EvictorI::getIterator(const string& facet, Int batchSize) ObjectStore* store = 0; { - Lock sync(*this); - - StoreMap::iterator p = _storeMap.find(facet); - if(p != _storeMap.end()) - { - store = (*p).second; - saveNowNoSync(); - } + Lock sync(*this); + + StoreMap::iterator p = _storeMap.find(facet); + if(p != _storeMap.end()) + { + store = (*p).second; + saveNowNoSync(); + } } return new EvictorIteratorI(store, batchSize); } @@ -924,25 +924,25 @@ Freeze::EvictorI::hasFacetImpl(const Identity& ident, const string& facet) ObjectStore* store = 0; { - Lock sync(*this); + Lock sync(*this); - StoreMap::iterator p = _storeMap.find(facet); - if(p == _storeMap.end()) - { - return false; - } - - store = (*p).second; - - EvictorElementPtr element = store->getIfPinned(ident); - if(element != 0) - { - assert(!element->stale); - - IceUtil::Mutex::Lock lock(element->mutex); - return element->status != EvictorElement::dead && - element->status != EvictorElement::destroyed; - } + StoreMap::iterator p = _storeMap.find(facet); + if(p == _storeMap.end()) + { + return false; + } + + store = (*p).second; + + EvictorElementPtr element = store->getIfPinned(ident); + if(element != 0) + { + assert(!element->stale); + + IceUtil::Mutex::Lock lock(element->mutex); + return element->status != EvictorElement::dead && + element->status != EvictorElement::destroyed; + } } return store->dbHasObject(ident); } @@ -960,45 +960,45 @@ Freeze::EvictorI::hasAnotherFacet(const Identity& ident, const string& facet) // StoreMap storeMapCopy; { - Lock sync(*this); - storeMapCopy = _storeMap; - } - + Lock sync(*this); + storeMapCopy = _storeMap; + } + for(StoreMap::iterator p = storeMapCopy.begin(); p != storeMapCopy.end(); ++p) { - // - // Do not check again the given facet - // - if((*p).first != facet) - { - ObjectStore* store = (*p).second; - - bool inCache = false; - { - Lock sync(*this); - - EvictorElementPtr element = store->getIfPinned(ident); - if(element != 0) - { - inCache = true; - assert(!element->stale); - - IceUtil::Mutex::Lock lock(element->mutex); - if(element->status != EvictorElement::dead && - element->status != EvictorElement::destroyed) - { - return true; - } - } - } - if(!inCache) - { - if(store->dbHasObject(ident)) - { - return true; - } - } - } + // + // Do not check again the given facet + // + if((*p).first != facet) + { + ObjectStore* store = (*p).second; + + bool inCache = false; + { + Lock sync(*this); + + EvictorElementPtr element = store->getIfPinned(ident); + if(element != 0) + { + inCache = true; + assert(!element->stale); + + IceUtil::Mutex::Lock lock(element->mutex); + if(element->status != EvictorElement::dead && + element->status != EvictorElement::destroyed) + { + return true; + } + } + } + if(!inCache) + { + if(store->dbHasObject(ident)) + { + return true; + } + } + } } return false; } @@ -1018,48 +1018,48 @@ Freeze::EvictorI::locate(const Current& current, LocalObjectPtr& cookie) // if(current.operation == "ice_ping") { - if(hasFacetImpl(current.id, current.facet)) - { - if(_trace >= 3) - { - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "ice_ping found \"" << _communicator->identityToString(current.id) - << "\" with facet \"" << current.facet + "\""; - } - - cookie = 0; - return _pingObject; - } - else if(hasAnotherFacet(current.id, current.facet)) - { - if(_trace >= 3) - { - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "ice_ping raises FacetNotExistException for \"" << _communicator->identityToString(current.id) - << "\" with facet \"" << current.facet + "\""; - } - throw FacetNotExistException(__FILE__, __LINE__); - } - else - { - if(_trace >= 3) - { - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "ice_ping will raise ObjectNotExistException for \"" - << _communicator->identityToString(current.id) << "\" with facet \"" << current.facet + "\""; - } - return 0; - } + if(hasFacetImpl(current.id, current.facet)) + { + if(_trace >= 3) + { + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "ice_ping found \"" << _communicator->identityToString(current.id) + << "\" with facet \"" << current.facet + "\""; + } + + cookie = 0; + return _pingObject; + } + else if(hasAnotherFacet(current.id, current.facet)) + { + if(_trace >= 3) + { + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "ice_ping raises FacetNotExistException for \"" << _communicator->identityToString(current.id) + << "\" with facet \"" << current.facet + "\""; + } + throw FacetNotExistException(__FILE__, __LINE__); + } + else + { + if(_trace >= 3) + { + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "ice_ping will raise ObjectNotExistException for \"" + << _communicator->identityToString(current.id) << "\" with facet \"" << current.facet + "\""; + } + return 0; + } } ObjectPtr result = locateImpl(current, cookie); if(result == 0) { - if(hasAnotherFacet(current.id, current.facet)) - { - throw FacetNotExistException(__FILE__, __LINE__); - } + if(hasAnotherFacet(current.id, current.facet)) + { + throw FacetNotExistException(__FILE__, __LINE__); + } } return result; } @@ -1074,65 +1074,65 @@ Freeze::EvictorI::locateImpl(const Current& current, LocalObjectPtr& cookie) if(store == 0) { if(_trace >= 2) - { - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "locate could not find a database for facet \"" << current.facet << "\""; - } - return 0; + { + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "locate could not find a database for facet \"" << current.facet << "\""; + } + return 0; } for(;;) { - EvictorElementPtr element = store->pin(current.id); - if(element == 0) - { + EvictorElementPtr element = store->pin(current.id); + if(element == 0) + { if(_trace >= 2) - { - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "locate could not find \"" << _communicator->identityToString(current.id) << "\" in database \"" - << current.facet << "\""; - } - return 0; - } - - Lock sync(*this); - - if(element->stale) - { - // - // try again - // - continue; - } - - - IceUtil::Mutex::Lock lockElement(element->mutex); - if(element->status == EvictorElement::destroyed || element->status == EvictorElement::dead) - { + { + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "locate could not find \"" << _communicator->identityToString(current.id) << "\" in database \"" + << current.facet << "\""; + } + return 0; + } + + Lock sync(*this); + + if(element->stale) + { + // + // try again + // + continue; + } + + + IceUtil::Mutex::Lock lockElement(element->mutex); + if(element->status == EvictorElement::destroyed || element->status == EvictorElement::dead) + { if(_trace >= 2) - { - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "locate found \"" << _communicator->identityToString(current.id) - << "\" in the cache for database \"" << current.facet << "\" but it was dead or destroyed"; - } - return 0; - } - - // - // It's a good one! - // + { + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "locate found \"" << _communicator->identityToString(current.id) + << "\" in the cache for database \"" << current.facet << "\" but it was dead or destroyed"; + } + return 0; + } + + // + // It's a good one! + // if(_trace >= 2) - { - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "locate found \"" << _communicator->identityToString(current.id) << "\" in database \"" - << current.facet << "\""; - } - - fixEvictPosition(element); - element->usageCount++; - cookie = element; - assert(element->rec.servant != 0); - return element->rec.servant; + { + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "locate found \"" << _communicator->identityToString(current.id) << "\" in database \"" + << current.facet << "\""; + } + + fixEvictPosition(element); + element->usageCount++; + cookie = element; + assert(element->rec.servant != 0); + return element->rec.servant; } } @@ -1148,51 +1148,51 @@ Freeze::EvictorI::finished(const Current& current, const ObjectPtr& servant, con if(cookie != 0) { - EvictorElementPtr element = EvictorElementPtr::dynamicCast(cookie); - assert(element); + EvictorElementPtr element = EvictorElementPtr::dynamicCast(cookie); + assert(element); - bool enqueue = false; - - if((_useNonmutating && current.mode != Nonmutating) || - (servant->ice_operationAttributes(current.operation) & 0x1) != 0) - { - IceUtil::Mutex::Lock lock(element->mutex); - - if(element->status == EvictorElement::clean) - { - // - // Assume this operation updated the object - // - element->status = EvictorElement::modified; - enqueue = true; - } - } - - Lock sync(*this); - - // - // Only elements with a usageCount == 0 can become stale and we own - // one count! - // - assert(!element->stale); - assert(element->usageCount >= 1); - - // - // Decrease the usage count of the evictor queue element. - // - element->usageCount--; - - if(enqueue) - { - addToModifiedQueue(element); - } - else if(element->usageCount == 0 && element->keepCount == 0) - { - // - // Evict as many elements as necessary. - // - evict(); - } + bool enqueue = false; + + if((_useNonmutating && current.mode != Nonmutating) || + (servant->ice_operationAttributes(current.operation) & 0x1) != 0) + { + IceUtil::Mutex::Lock lock(element->mutex); + + if(element->status == EvictorElement::clean) + { + // + // Assume this operation updated the object + // + element->status = EvictorElement::modified; + enqueue = true; + } + } + + Lock sync(*this); + + // + // Only elements with a usageCount == 0 can become stale and we own + // one count! + // + assert(!element->stale); + assert(element->usageCount >= 1); + + // + // Decrease the usage count of the evictor queue element. + // + element->usageCount--; + + if(enqueue) + { + addToModifiedQueue(element); + } + else if(element->usageCount == 0 && element->keepCount == 0) + { + // + // Evict as many elements as necessary. + // + evict(); + } } } @@ -1201,44 +1201,44 @@ Freeze::EvictorI::deactivate(const string&) { if(_deactivateController.deactivate()) { - try - { - Lock sync(*this); - - saveNowNoSync(); - - // - // Set the evictor size to zero, meaning that we will evict - // everything possible. - // - _evictorSize = 0; - evict(); - - _savingThreadDone = true; - notifyAll(); - sync.release(); - getThreadControl().join(); - - if(_watchDogThread != 0) - { - _watchDogThread->terminate(); - _watchDogThread->getThreadControl().join(); - } - - for(StoreMap::iterator p = _storeMap.begin(); p != _storeMap.end(); ++p) - { - (*p).second->close(); - } - - _dbEnv = 0; - _initializer = 0; - } - catch(...) - { - _deactivateController.deactivationComplete(); - throw; - } - _deactivateController.deactivationComplete(); + try + { + Lock sync(*this); + + saveNowNoSync(); + + // + // Set the evictor size to zero, meaning that we will evict + // everything possible. + // + _evictorSize = 0; + evict(); + + _savingThreadDone = true; + notifyAll(); + sync.release(); + getThreadControl().join(); + + if(_watchDogThread != 0) + { + _watchDogThread->terminate(); + _watchDogThread->getThreadControl().join(); + } + + for(StoreMap::iterator p = _storeMap.begin(); p != _storeMap.end(); ++p) + { + (*p).second->close(); + } + + _dbEnv = 0; + _initializer = 0; + } + catch(...) + { + _deactivateController.deactivationComplete(); + throw; + } + _deactivateController.deactivationComplete(); } } @@ -1248,7 +1248,7 @@ Freeze::EvictorI::initialize(const Identity& ident, const string& facet, const O { if(_initializer != 0) { - _initializer->initialize(_adapter, ident, facet, servant); + _initializer->initialize(_adapter, ident, facet, servant); } } @@ -1258,387 +1258,387 @@ Freeze::EvictorI::run() { try { - for(;;) - { - deque<EvictorElementPtr> allObjects; - deque<EvictorElementPtr> deadObjects; - - size_t saveNowThreadsSize = 0; - - { - Lock sync(*this); - - while(!_savingThreadDone && - (_saveNowThreads.size() == 0) && - (_saveSizeTrigger < 0 || static_cast<Int>(_modifiedQueue.size()) < _saveSizeTrigger)) - { - if(_savePeriod == IceUtil::Time::milliSeconds(0)) - { - wait(); - } - else if(timedWait(_savePeriod) == false) - { - // - // Timeout, so let's save - // - break; // while - } - } - - saveNowThreadsSize = _saveNowThreads.size(); - - if(_savingThreadDone) - { - assert(_modifiedQueue.size() == 0); - assert(saveNowThreadsSize == 0); - break; // for(;;) - } - - // - // Check first if there is something to do! - // - if(_modifiedQueue.size() == 0) - { - if(saveNowThreadsSize > 0) - { - _saveNowThreads.clear(); - notifyAll(); - } - continue; // for(;;) - } - - _modifiedQueue.swap(allObjects); - } - - const size_t size = allObjects.size(); - - deque<StreamedObject> streamedObjectQueue; - - Long streamStart = IceUtil::Time::now().toMilliSeconds(); - - // - // Stream each element - // - for(size_t i = 0; i < size; i++) - { - EvictorElementPtr& element = allObjects[i]; - - bool tryAgain; - do - { - tryAgain = false; - ObjectPtr servant = 0; - - // - // These elements can't be stale as only elements with - // usageCount == 0 can become stale, and the modifiedQueue - // (us now) owns one count. - // - - IceUtil::Mutex::Lock lockElement(element->mutex); - Byte status = element->status; - - switch(status) - { - case EvictorElement::created: - case EvictorElement::modified: - { - servant = element->rec.servant; - break; - } - case EvictorElement::destroyed: - { - size_t index = streamedObjectQueue.size(); - streamedObjectQueue.resize(index + 1); - StreamedObject& obj = streamedObjectQueue[index]; - stream(element, streamStart, obj); - - element->status = EvictorElement::dead; - deadObjects.push_back(element); - - break; - } - case EvictorElement::dead: - { - deadObjects.push_back(element); - break; - } - default: - { - // - // Nothing to do (could be a duplicate) - // - break; - } - } - if(servant == 0) - { - lockElement.release(); - } - else - { - IceUtil::AbstractMutex* mutex = dynamic_cast<IceUtil::AbstractMutex*>(servant.get()); - if(mutex != 0) - { - // - // Lock servant and then element so that user can safely lock - // servant and call various Evictor operations - // - - IceUtil::AbstractMutex::TryLock lockServant(*mutex); - if(!lockServant.acquired()) - { - lockElement.release(); - - if(_watchDogThread != 0) - { - _watchDogThread->activate(); - } - lockServant.acquire(); - if(_watchDogThread != 0) - { - _watchDogThread->deactivate(); - } - - lockElement.acquire(); - status = element->status; - } + for(;;) + { + deque<EvictorElementPtr> allObjects; + deque<EvictorElementPtr> deadObjects; + + size_t saveNowThreadsSize = 0; + + { + Lock sync(*this); + + while(!_savingThreadDone && + (_saveNowThreads.size() == 0) && + (_saveSizeTrigger < 0 || static_cast<Int>(_modifiedQueue.size()) < _saveSizeTrigger)) + { + if(_savePeriod == IceUtil::Time::milliSeconds(0)) + { + wait(); + } + else if(timedWait(_savePeriod) == false) + { + // + // Timeout, so let's save + // + break; // while + } + } + + saveNowThreadsSize = _saveNowThreads.size(); + + if(_savingThreadDone) + { + assert(_modifiedQueue.size() == 0); + assert(saveNowThreadsSize == 0); + break; // for(;;) + } + + // + // Check first if there is something to do! + // + if(_modifiedQueue.size() == 0) + { + if(saveNowThreadsSize > 0) + { + _saveNowThreads.clear(); + notifyAll(); + } + continue; // for(;;) + } + + _modifiedQueue.swap(allObjects); + } + + const size_t size = allObjects.size(); + + deque<StreamedObject> streamedObjectQueue; + + Long streamStart = IceUtil::Time::now().toMilliSeconds(); + + // + // Stream each element + // + for(size_t i = 0; i < size; i++) + { + EvictorElementPtr& element = allObjects[i]; + + bool tryAgain; + do + { + tryAgain = false; + ObjectPtr servant = 0; + + // + // These elements can't be stale as only elements with + // usageCount == 0 can become stale, and the modifiedQueue + // (us now) owns one count. + // + + IceUtil::Mutex::Lock lockElement(element->mutex); + Byte status = element->status; + + switch(status) + { + case EvictorElement::created: + case EvictorElement::modified: + { + servant = element->rec.servant; + break; + } + case EvictorElement::destroyed: + { + size_t index = streamedObjectQueue.size(); + streamedObjectQueue.resize(index + 1); + StreamedObject& obj = streamedObjectQueue[index]; + stream(element, streamStart, obj); + + element->status = EvictorElement::dead; + deadObjects.push_back(element); + + break; + } + case EvictorElement::dead: + { + deadObjects.push_back(element); + break; + } + default: + { + // + // Nothing to do (could be a duplicate) + // + break; + } + } + if(servant == 0) + { + lockElement.release(); + } + else + { + IceUtil::AbstractMutex* mutex = dynamic_cast<IceUtil::AbstractMutex*>(servant.get()); + if(mutex != 0) + { + // + // Lock servant and then element so that user can safely lock + // servant and call various Evictor operations + // + + IceUtil::AbstractMutex::TryLock lockServant(*mutex); + if(!lockServant.acquired()) + { + lockElement.release(); + + if(_watchDogThread != 0) + { + _watchDogThread->activate(); + } + lockServant.acquire(); + if(_watchDogThread != 0) + { + _watchDogThread->deactivate(); + } + + lockElement.acquire(); + status = element->status; + } - switch(status) - { - case EvictorElement::created: - case EvictorElement::modified: - { - if(servant == element->rec.servant) - { - size_t index = streamedObjectQueue.size(); - streamedObjectQueue.resize(index + 1); - StreamedObject& obj = streamedObjectQueue[index]; - stream(element, streamStart, obj); - - element->status = EvictorElement::clean; - } - else - { - tryAgain = true; - } - break; - } - case EvictorElement::destroyed: - { - lockServant.release(); - - size_t index = streamedObjectQueue.size(); - streamedObjectQueue.resize(index + 1); - StreamedObject& obj = streamedObjectQueue[index]; - stream(element, streamStart, obj); - - element->status = EvictorElement::dead; - deadObjects.push_back(element); - break; - } - case EvictorElement::dead: - { - deadObjects.push_back(element); - break; - } - default: - { - // - // Nothing to do (could be a duplicate) - // - break; - } - } - } - else - { - DatabaseException ex(__FILE__, __LINE__); - ex.message = string(typeid(*element->rec.servant).name()) - + " does not implement IceUtil::AbstractMutex"; - throw ex; - } - } - } while(tryAgain); - } - - if(_trace >= 1) - { - Long now = IceUtil::Time::now().toMilliSeconds(); - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "streamed " << streamedObjectQueue.size() << " objects in " - << static_cast<Int>(now - streamStart) << " ms"; - } - - // - // Now let's save all these streamed objects to disk using a transaction - // - - // - // Each time we get a deadlock, we reduce the number of objects to save - // per transaction - // - size_t txSize = streamedObjectQueue.size(); - if(txSize > static_cast<size_t>(_maxTxSize)) - { - txSize = static_cast<size_t>(_maxTxSize); - } - bool tryAgain; - - do - { - tryAgain = false; - - while(streamedObjectQueue.size() > 0) - { - if(txSize > streamedObjectQueue.size()) - { - txSize = streamedObjectQueue.size(); - } - - Long saveStart = IceUtil::Time::now().toMilliSeconds(); - try - { - DbTxn* tx = 0; - _dbEnv->getEnv()->txn_begin(0, &tx, 0); - - long txnId = 0; - if(_txTrace >= 1) - { - txnId = (tx->id() & 0x7FFFFFFF) + 0x80000000L; - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "started transaction " << hex << txnId << dec << " in saving thread"; - } - - try - { - for(size_t i = 0; i < txSize; i++) - { - StreamedObject& obj = streamedObjectQueue[i]; - obj.store->save(obj.key, obj.value, obj.status, tx); - } - } - catch(...) - { - tx->abort(); - if(_txTrace >= 1) - { - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "rolled back transaction " << hex << txnId << dec; - } - throw; - } - tx->commit(0); - - if(_txTrace >= 1) - { - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "committed transaction " << hex << txnId << dec; - } - - streamedObjectQueue.erase - (streamedObjectQueue.begin(), - streamedObjectQueue.begin() + txSize); - - if(_trace >= 1) - { - Long now = IceUtil::Time::now().toMilliSeconds(); - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "saved " << txSize << " objects in " - << static_cast<Int>(now - saveStart) << " ms"; - } - } - catch(const DbDeadlockException&) - { - if(_deadlockWarning) - { - Warning out(_communicator->getLogger()); - out << "Deadlock in Freeze::EvictorI::run while writing into Db \"" + _filename - + "\"; retrying ..."; - } - - tryAgain = true; - txSize = (txSize + 1)/2; - } - catch(const DbException& dx) - { - DatabaseException ex(__FILE__, __LINE__); - ex.message = dx.what(); - throw ex; - } - } - } - while(tryAgain); - - { - Lock sync(*this); - - // - // Release usage count - // - for(deque<EvictorElementPtr>::iterator p = allObjects.begin(); - p != allObjects.end(); p++) - { - EvictorElementPtr& element = *p; - element->usageCount--; - } - allObjects.clear(); - - for(deque<EvictorElementPtr>::iterator q = deadObjects.begin(); - q != deadObjects.end(); q++) - { - EvictorElementPtr& element = *q; - if(!element->stale) - { - // - // Can be stale when there are duplicate elements on the - // deadObjecst queue - // - - if(!element->stale && element->usageCount == 0 && element->keepCount == 0) - { - // - // Get rid of unused dead elements - // - IceUtil::Mutex::Lock lockElement(element->mutex); - if(element->status == EvictorElement::dead) - { - evict(element); - } - } - } - } - deadObjects.clear(); - evict(); - - if(saveNowThreadsSize > 0) - { - _saveNowThreads.erase(_saveNowThreads.begin(), _saveNowThreads.begin() + saveNowThreadsSize); - notifyAll(); - } - } - } + switch(status) + { + case EvictorElement::created: + case EvictorElement::modified: + { + if(servant == element->rec.servant) + { + size_t index = streamedObjectQueue.size(); + streamedObjectQueue.resize(index + 1); + StreamedObject& obj = streamedObjectQueue[index]; + stream(element, streamStart, obj); + + element->status = EvictorElement::clean; + } + else + { + tryAgain = true; + } + break; + } + case EvictorElement::destroyed: + { + lockServant.release(); + + size_t index = streamedObjectQueue.size(); + streamedObjectQueue.resize(index + 1); + StreamedObject& obj = streamedObjectQueue[index]; + stream(element, streamStart, obj); + + element->status = EvictorElement::dead; + deadObjects.push_back(element); + break; + } + case EvictorElement::dead: + { + deadObjects.push_back(element); + break; + } + default: + { + // + // Nothing to do (could be a duplicate) + // + break; + } + } + } + else + { + DatabaseException ex(__FILE__, __LINE__); + ex.message = string(typeid(*element->rec.servant).name()) + + " does not implement IceUtil::AbstractMutex"; + throw ex; + } + } + } while(tryAgain); + } + + if(_trace >= 1) + { + Long now = IceUtil::Time::now().toMilliSeconds(); + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "streamed " << streamedObjectQueue.size() << " objects in " + << static_cast<Int>(now - streamStart) << " ms"; + } + + // + // Now let's save all these streamed objects to disk using a transaction + // + + // + // Each time we get a deadlock, we reduce the number of objects to save + // per transaction + // + size_t txSize = streamedObjectQueue.size(); + if(txSize > static_cast<size_t>(_maxTxSize)) + { + txSize = static_cast<size_t>(_maxTxSize); + } + bool tryAgain; + + do + { + tryAgain = false; + + while(streamedObjectQueue.size() > 0) + { + if(txSize > streamedObjectQueue.size()) + { + txSize = streamedObjectQueue.size(); + } + + Long saveStart = IceUtil::Time::now().toMilliSeconds(); + try + { + DbTxn* tx = 0; + _dbEnv->getEnv()->txn_begin(0, &tx, 0); + + long txnId = 0; + if(_txTrace >= 1) + { + txnId = (tx->id() & 0x7FFFFFFF) + 0x80000000L; + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "started transaction " << hex << txnId << dec << " in saving thread"; + } + + try + { + for(size_t i = 0; i < txSize; i++) + { + StreamedObject& obj = streamedObjectQueue[i]; + obj.store->save(obj.key, obj.value, obj.status, tx); + } + } + catch(...) + { + tx->abort(); + if(_txTrace >= 1) + { + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "rolled back transaction " << hex << txnId << dec; + } + throw; + } + tx->commit(0); + + if(_txTrace >= 1) + { + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "committed transaction " << hex << txnId << dec; + } + + streamedObjectQueue.erase + (streamedObjectQueue.begin(), + streamedObjectQueue.begin() + txSize); + + if(_trace >= 1) + { + Long now = IceUtil::Time::now().toMilliSeconds(); + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "saved " << txSize << " objects in " + << static_cast<Int>(now - saveStart) << " ms"; + } + } + catch(const DbDeadlockException&) + { + if(_deadlockWarning) + { + Warning out(_communicator->getLogger()); + out << "Deadlock in Freeze::EvictorI::run while writing into Db \"" + _filename + + "\"; retrying ..."; + } + + tryAgain = true; + txSize = (txSize + 1)/2; + } + catch(const DbException& dx) + { + DatabaseException ex(__FILE__, __LINE__); + ex.message = dx.what(); + throw ex; + } + } + } + while(tryAgain); + + { + Lock sync(*this); + + // + // Release usage count + // + for(deque<EvictorElementPtr>::iterator p = allObjects.begin(); + p != allObjects.end(); p++) + { + EvictorElementPtr& element = *p; + element->usageCount--; + } + allObjects.clear(); + + for(deque<EvictorElementPtr>::iterator q = deadObjects.begin(); + q != deadObjects.end(); q++) + { + EvictorElementPtr& element = *q; + if(!element->stale) + { + // + // Can be stale when there are duplicate elements on the + // deadObjecst queue + // + + if(!element->stale && element->usageCount == 0 && element->keepCount == 0) + { + // + // Get rid of unused dead elements + // + IceUtil::Mutex::Lock lockElement(element->mutex); + if(element->status == EvictorElement::dead) + { + evict(element); + } + } + } + } + deadObjects.clear(); + evict(); + + if(saveNowThreadsSize > 0) + { + _saveNowThreads.erase(_saveNowThreads.begin(), _saveNowThreads.begin() + saveNowThreadsSize); + notifyAll(); + } + } + } } catch(const IceUtil::Exception& ex) { - Error out(_communicator->getLogger()); - out << "Saving thread killed by exception: " << ex; - out.flush(); - handleFatalError(this, _communicator); + Error out(_communicator->getLogger()); + out << "Saving thread killed by exception: " << ex; + out.flush(); + handleFatalError(this, _communicator); } catch(const std::exception& ex) { - Error out(_communicator->getLogger()); - out << "Saving thread killed by std::exception: " << ex.what(); - out.flush(); - handleFatalError(this, _communicator); + Error out(_communicator->getLogger()); + out << "Saving thread killed by std::exception: " << ex.what(); + out.flush(); + handleFatalError(this, _communicator); } catch(...) { - Error out(_communicator->getLogger()); - out << "Saving thread killed by unknown exception"; - out.flush(); - handleFatalError(this, _communicator); + Error out(_communicator->getLogger()); + out << "Saving thread killed by unknown exception"; + out.flush(); + handleFatalError(this, _communicator); } } @@ -1665,7 +1665,7 @@ Freeze::EvictorI::saveNowNoSync() notifyAll(); do { - wait(); + wait(); } while(find(_saveNowThreads.begin(), _saveNowThreads.end(), myself) != _saveNowThreads.end()); } @@ -1683,50 +1683,50 @@ Freeze::EvictorI::evict() while(_currentEvictorSize > _evictorSize) { - // - // Get the last unused element from the evictor queue. - // - while(p != _evictorList.rend()) - { - if((*p)->usageCount == 0) - { - break; // Fine, servant is not in use (and not in the modifiedQueue) - } - ++p; - } - if(p == _evictorList.rend()) - { - // - // All servants are active, can't evict any further. - // - break; - } - - EvictorElementPtr& element = *p; - assert(!element->stale); - assert(element->keepCount == 0); - - if(_trace >= 2 || (_trace >= 1 && _evictorList.size() % 50 == 0)) - { - string facet = element->store.facet(); - - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "evicting \"" << _communicator->identityToString(element->cachePosition->first) << "\" "; - if(facet != "") - { - out << "-f \"" << facet << "\" "; - } - out << "from the queue\n" - << "number of elements in the queue: " << _currentEvictorSize; - } - - // - // Remove last unused element from the evictor queue. - // - element->stale = true; - element->store.unpin(element->cachePosition); - p = list<EvictorElementPtr>::reverse_iterator(_evictorList.erase(element->evictPosition)); - _currentEvictorSize--; + // + // Get the last unused element from the evictor queue. + // + while(p != _evictorList.rend()) + { + if((*p)->usageCount == 0) + { + break; // Fine, servant is not in use (and not in the modifiedQueue) + } + ++p; + } + if(p == _evictorList.rend()) + { + // + // All servants are active, can't evict any further. + // + break; + } + + EvictorElementPtr& element = *p; + assert(!element->stale); + assert(element->keepCount == 0); + + if(_trace >= 2 || (_trace >= 1 && _evictorList.size() % 50 == 0)) + { + string facet = element->store.facet(); + + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "evicting \"" << _communicator->identityToString(element->cachePosition->first) << "\" "; + if(facet != "") + { + out << "-f \"" << facet << "\" "; + } + out << "from the queue\n" + << "number of elements in the queue: " << _currentEvictorSize; + } + + // + // Remove last unused element from the evictor queue. + // + element->stale = true; + element->store.unpin(element->cachePosition); + p = list<EvictorElementPtr>::reverse_iterator(_evictorList.erase(element->evictPosition)); + _currentEvictorSize--; } } @@ -1737,20 +1737,20 @@ Freeze::EvictorI::fixEvictPosition(const EvictorElementPtr& element) if(element->keepCount == 0) { - if(element->usageCount < 0) - { - // - // New object - // - element->usageCount = 0; - _currentEvictorSize++; - } - else - { - _evictorList.erase(element->evictPosition); - } - _evictorList.push_front(element); - element->evictPosition = _evictorList.begin(); + if(element->usageCount < 0) + { + // + // New object + // + element->usageCount = 0; + _currentEvictorSize++; + } + else + { + _evictorList.erase(element->evictPosition); + } + _evictorList.push_front(element); + element->evictPosition = _evictorList.begin(); } } @@ -1775,7 +1775,7 @@ Freeze::EvictorI::addToModifiedQueue(const EvictorElementPtr& element) if(_saveSizeTrigger >= 0 && static_cast<Int>(_modifiedQueue.size()) >= _saveSizeTrigger) { - notifyAll(); + notifyAll(); } } @@ -1793,22 +1793,22 @@ Freeze::EvictorI::stream(const EvictorElementPtr& element, Long streamStart, Str if(element->status != EvictorElement::destroyed) { - // - // Update stats first - // - Statistics& stats = element->rec.stats; - Long diff = streamStart - (stats.creationTime + stats.lastSaveTime); - if(stats.lastSaveTime == 0) - { - stats.lastSaveTime = diff; - stats.avgSaveTime = diff; - } - else - { - stats.lastSaveTime = streamStart - stats.creationTime; - stats.avgSaveTime = static_cast<Long>(stats.avgSaveTime * 0.95 + diff * 0.05); - } - ObjectStore::marshal(element->rec, obj.value, _communicator); + // + // Update stats first + // + Statistics& stats = element->rec.stats; + Long diff = streamStart - (stats.creationTime + stats.lastSaveTime); + if(stats.lastSaveTime == 0) + { + stats.lastSaveTime = diff; + stats.avgSaveTime = diff; + } + else + { + stats.lastSaveTime = streamStart - stats.creationTime; + stats.avgSaveTime = static_cast<Long>(stats.avgSaveTime * 0.95 + diff * 0.05); + } + ObjectStore::marshal(element->rec, obj.value, _communicator); } } @@ -1820,11 +1820,11 @@ Freeze::EvictorI::findStore(const string& facet) const StoreMap::const_iterator p = _storeMap.find(facet); if(p == _storeMap.end()) { - return 0; + return 0; } else { - return (*p).second; + return (*p).second; } } @@ -1836,45 +1836,45 @@ Freeze::EvictorI::allDbs() const try { - Db db(_dbEnv->getEnv(), 0); - db.open(0, _filename.c_str(), 0, DB_UNKNOWN, DB_RDONLY, 0); - - Dbc* dbc = 0; - db.cursor(0, &dbc, 0); - - Dbt dbKey; - dbKey.set_flags(DB_DBT_MALLOC); - - Dbt dbValue; - dbValue.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL); - - bool more = true; - while(more) - { - more = (dbc->get(&dbKey, &dbValue, DB_NEXT) == 0); - if(more) - { - string dbName(static_cast<char*>(dbKey.get_data()), dbKey.get_size()); - - if(dbName.find(indexPrefix) != 0) - { - result.push_back(dbName); - } - free(dbKey.get_data()); - } - } - - dbc->close(); - db.close(0); + Db db(_dbEnv->getEnv(), 0); + db.open(0, _filename.c_str(), 0, DB_UNKNOWN, DB_RDONLY, 0); + + Dbc* dbc = 0; + db.cursor(0, &dbc, 0); + + Dbt dbKey; + dbKey.set_flags(DB_DBT_MALLOC); + + Dbt dbValue; + dbValue.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL); + + bool more = true; + while(more) + { + more = (dbc->get(&dbKey, &dbValue, DB_NEXT) == 0); + if(more) + { + string dbName(static_cast<char*>(dbKey.get_data()), dbKey.get_size()); + + if(dbName.find(indexPrefix) != 0) + { + result.push_back(dbName); + } + free(dbKey.get_data()); + } + } + + dbc->close(); + db.close(0); } catch(const DbException& dx) { - if(dx.get_errno() != ENOENT) - { - DatabaseException ex(__FILE__, __LINE__); - ex.message = dx.what(); - throw ex; - } + if(dx.get_errno() != ENOENT) + { + DatabaseException ex(__FILE__, __LINE__); + ex.message = dx.what(); + throw ex; + } } return result; |