diff options
author | Bernard Normier <bernard@zeroc.com> | 2004-02-27 21:02:06 +0000 |
---|---|---|
committer | Bernard Normier <bernard@zeroc.com> | 2004-02-27 21:02:06 +0000 |
commit | f9e93ae64859101ed6e89d6c2f4a338dba19d1ff (patch) | |
tree | e944a8076b8ebbc70a66a61da6f4d78aa8362568 /cpp/src/Freeze/EvictorI.cpp | |
parent | Reeanabled IcePack/deployer test on Windows (diff) | |
download | ice-f9e93ae64859101ed6e89d6c2f4a338dba19d1ff.tar.bz2 ice-f9e93ae64859101ed6e89d6c2f4a338dba19d1ff.tar.xz ice-f9e93ae64859101ed6e89d6c2f4a338dba19d1ff.zip |
Fixed VC7.x build
Diffstat (limited to 'cpp/src/Freeze/EvictorI.cpp')
-rw-r--r-- | cpp/src/Freeze/EvictorI.cpp | 553 |
1 files changed, 288 insertions, 265 deletions
diff --git a/cpp/src/Freeze/EvictorI.cpp b/cpp/src/Freeze/EvictorI.cpp index afc969db3c3..c8b81d44add 100644 --- a/cpp/src/Freeze/EvictorI.cpp +++ b/cpp/src/Freeze/EvictorI.cpp @@ -1013,333 +1013,356 @@ Freeze::EvictorI::deactivate(const string&) } void -Freeze::EvictorI::run() throw() +Freeze::EvictorI::run() { - for(;;) + try { - deque<FacetPtr> allObjects; - size_t saveNowThreadsSize = 0; - + for(;;) { - Lock sync(*this); - while(!_deactivated && - (_saveNowThreads.size() == 0) && - (_saveSizeTrigger < 0 || static_cast<Int>(_modifiedQueue.size()) < _saveSizeTrigger)) + deque<FacetPtr> allObjects; + size_t saveNowThreadsSize = 0; + { - if(_savePeriod == IceUtil::Time::milliSeconds(0)) + Lock sync(*this); + while(!_deactivated && + (_saveNowThreads.size() == 0) && + (_saveSizeTrigger < 0 || static_cast<Int>(_modifiedQueue.size()) < _saveSizeTrigger)) { - wait(); + if(_savePeriod == IceUtil::Time::milliSeconds(0)) + { + wait(); + } + else if(timedWait(_savePeriod) == false) + { + // + // Timeout, so let's save + // + break; // while + } } - else if(timedWait(_savePeriod) == false) - { - // - // Timeout, so let's save - // - break; // while - } - } - - saveNowThreadsSize = _saveNowThreads.size(); - - if(_deactivated) - { - assert(_modifiedQueue.size() == 0); - if(saveNowThreadsSize > 0) + + saveNowThreadsSize = _saveNowThreads.size(); + + if(_deactivated) { - _saveNowThreads.clear(); - notifyAll(); + assert(_modifiedQueue.size() == 0); + if(saveNowThreadsSize > 0) + { + _saveNowThreads.clear(); + notifyAll(); + } + break; // for(;;) } - break; // for(;;) - } - - // - // Check first if there is something to do! - // - if(_modifiedQueue.size() == 0) - { - if(saveNowThreadsSize > 0) + + // + // Check first if there is something to do! + // + if(_modifiedQueue.size() == 0) { - _saveNowThreads.clear(); - notifyAll(); + if(saveNowThreadsSize > 0) + { + _saveNowThreads.clear(); + notifyAll(); + } + continue; // for(;;) } - continue; // for(;;) + + _modifiedQueue.swap(allObjects); } - - _modifiedQueue.swap(allObjects); - } - - const size_t size = allObjects.size(); - - deque<StreamedObject> streamedObjectQueue; - - Long streamStart = IceUtil::Time::now().toMilliSeconds(); - - // - // Stream each element - // - for(size_t i = 0; i < size; i++) - { - FacetPtr& facet = allObjects[i]; - bool tryAgain; - do + const size_t size = allObjects.size(); + + deque<StreamedObject> streamedObjectQueue; + + Long streamStart = IceUtil::Time::now().toMilliSeconds(); + + // + // Stream each element + // + for(size_t i = 0; i < size; i++) { - tryAgain = false; - ObjectPtr servant = 0; - - IceUtil::Mutex::Lock lockFacet(facet->mutex); - Byte status = facet->status; + FacetPtr& facet = allObjects[i]; - switch(status) + bool tryAgain; + do { - case created: - case modified: - { - servant = facet->rec.servant; - break; - } - case destroyed: + tryAgain = false; + ObjectPtr servant = 0; + + IceUtil::Mutex::Lock lockFacet(facet->mutex); + Byte status = facet->status; + + switch(status) { - facet->status = dead; - size_t index = streamedObjectQueue.size(); - streamedObjectQueue.resize(index + 1); - StreamedObject& obj = streamedObjectQueue[index]; - streamFacet(facet, facet->position->first, status, streamStart, obj); - break; - } - default: + case created: + case modified: + { + servant = facet->rec.servant; + break; + } + case destroyed: + { + facet->status = dead; + size_t index = streamedObjectQueue.size(); + streamedObjectQueue.resize(index + 1); + StreamedObject& obj = streamedObjectQueue[index]; + streamFacet(facet, facet->position->first, status, streamStart, obj); + break; + } + default: + { + // + // Nothing to do (could be a duplicate) + // + break; + } + } + if(servant == 0) { - // - // Nothing to do (could be a duplicate) - // - break; + lockFacet.release(); } - } - if(servant == 0) - { - lockFacet.release(); - } - else - { - IceUtil::AbstractMutex* mutex = dynamic_cast<IceUtil::AbstractMutex*>(servant.get()); - if(mutex != 0) + else { - // - // Lock servant and then facet so that user can safely lock - // servant and call various Evictor operations - // - - IceUtil::AbstractMutex::TryLock lockServant(*mutex); - if(!lockServant.acquired()) - { - lockFacet.release(); - lockServant.acquire(); - lockFacet.acquire(); - status = facet->status; - } - - switch(status) + IceUtil::AbstractMutex* mutex = dynamic_cast<IceUtil::AbstractMutex*>(servant.get()); + if(mutex != 0) { - case created: - case modified: + // + // Lock servant and then facet so that user can safely lock + // servant and call various Evictor operations + // + + IceUtil::AbstractMutex::TryLock lockServant(*mutex); + if(!lockServant.acquired()) { - if(servant == facet->rec.servant) + lockFacet.release(); + lockServant.acquire(); + lockFacet.acquire(); + status = facet->status; + } + + switch(status) + { + case created: + case modified: { - facet->status = clean; + if(servant == facet->rec.servant) + { + facet->status = clean; + size_t index = streamedObjectQueue.size(); + streamedObjectQueue.resize(index + 1); + StreamedObject& obj = streamedObjectQueue[index]; + streamFacet(facet, facet->position->first, status, streamStart, obj); + } + else + { + tryAgain = true; + } + break; + } + case destroyed: + { + lockServant.release(); + facet->status = dead; size_t index = streamedObjectQueue.size(); streamedObjectQueue.resize(index + 1); StreamedObject& obj = streamedObjectQueue[index]; streamFacet(facet, facet->position->first, status, streamStart, obj); - } - else + break; + } + default: { - tryAgain = true; + // + // Nothing to do (could be a duplicate) + // + break; } - break; - } - case destroyed: - { - lockServant.release(); - facet->status = dead; - size_t index = streamedObjectQueue.size(); - streamedObjectQueue.resize(index + 1); - StreamedObject& obj = streamedObjectQueue[index]; - streamFacet(facet, facet->position->first, status, streamStart, obj); - break; - } - default: - { - // - // Nothing to do (could be a duplicate) - // - break; } } + else + { + DatabaseException ex(__FILE__, __LINE__); + ex.message = string(typeid(*facet->rec.servant).name()) + + " does not implement IceUtil::AbstractMutex"; + throw ex; + } } - else - { - DatabaseException ex(__FILE__, __LINE__); - ex.message = string(typeid(*facet->rec.servant).name()) - + " does not implement IceUtil::AbstractMutex"; - throw ex; - } - } - } while(tryAgain); - } - - if(_trace >= 1) - { - Long now = IceUtil::Time::now().toMilliSeconds(); - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "streamed " << streamedObjectQueue.size() << " objects in " - << static_cast<Int>(now - streamStart) << " ms"; - } - - // - // Now let's save all these streamed objects to disk using a transaction - // - - // - // Each time we get a deadlock, we reduce the number of objects to save - // per transaction - // - size_t txSize = streamedObjectQueue.size(); - if(txSize > static_cast<size_t>(_maxTxSize)) - { - txSize = static_cast<size_t>(_maxTxSize); - } - bool tryAgain; - - do - { - tryAgain = false; + } while(tryAgain); + } - while(streamedObjectQueue.size() > 0) + if(_trace >= 1) { - if(txSize > streamedObjectQueue.size()) - { - txSize = streamedObjectQueue.size(); - } + Long now = IceUtil::Time::now().toMilliSeconds(); + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "streamed " << streamedObjectQueue.size() << " objects in " + << static_cast<Int>(now - streamStart) << " ms"; + } + + // + // Now let's save all these streamed objects to disk using a transaction + // + + // + // Each time we get a deadlock, we reduce the number of objects to save + // per transaction + // + size_t txSize = streamedObjectQueue.size(); + if(txSize > static_cast<size_t>(_maxTxSize)) + { + txSize = static_cast<size_t>(_maxTxSize); + } + bool tryAgain; + + do + { + tryAgain = false; - Long saveStart = IceUtil::Time::now().toMilliSeconds(); - try + while(streamedObjectQueue.size() > 0) { - DbTxn* tx = 0; - _dbEnv->txn_begin(0, &tx, 0); + if(txSize > streamedObjectQueue.size()) + { + txSize = streamedObjectQueue.size(); + } + + Long saveStart = IceUtil::Time::now().toMilliSeconds(); try - { - for(size_t i = 0; i < txSize; i++) - { - StreamedObject& obj = streamedObjectQueue[i]; - - switch(obj.status) + { + DbTxn* tx = 0; + _dbEnv->txn_begin(0, &tx, 0); + try + { + for(size_t i = 0; i < txSize; i++) { - case created: - case modified: + StreamedObject& obj = streamedObjectQueue[i]; + + switch(obj.status) { - Dbt dbKey; - Dbt dbValue; - initializeInDbt(obj.key, dbKey); - initializeInDbt(obj.value, dbValue); - u_int32_t flags = (obj.status == created) ? DB_NOOVERWRITE : 0; - int err = _db->put(tx, &dbKey, &dbValue, flags); - if(err != 0) + case created: + case modified: { - throw DatabaseException(__FILE__, __LINE__); + Dbt dbKey; + Dbt dbValue; + initializeInDbt(obj.key, dbKey); + initializeInDbt(obj.value, dbValue); + u_int32_t flags = (obj.status == created) ? DB_NOOVERWRITE : 0; + int err = _db->put(tx, &dbKey, &dbValue, flags); + if(err != 0) + { + throw DatabaseException(__FILE__, __LINE__); + } + break; } - break; - } - case destroyed: - { - Dbt dbKey; - initializeInDbt(obj.key, dbKey); - int err = _db->del(tx, &dbKey, 0); - if(err != 0) + case destroyed: + { + Dbt dbKey; + initializeInDbt(obj.key, dbKey); + int err = _db->del(tx, &dbKey, 0); + if(err != 0) + { + throw DatabaseException(__FILE__, __LINE__); + } + break; + } + default: { - throw DatabaseException(__FILE__, __LINE__); + assert(0); } - break; - } - default: - { - assert(0); } } } + catch(...) + { + tx->abort(); + throw; + } + tx->commit(0); + streamedObjectQueue.erase + (streamedObjectQueue.begin(), + streamedObjectQueue.begin() + txSize); + + if(_trace >= 1) + { + Long now = IceUtil::Time::now().toMilliSeconds(); + Trace out(_communicator->getLogger(), "Freeze.Evictor"); + out << "saved " << txSize << " objects in " + << static_cast<Int>(now - saveStart) << " ms"; + } } - catch(...) + catch(const DbDeadlockException&) { - tx->abort(); - throw; + tryAgain = true; + txSize = (txSize + 1)/2; } - tx->commit(0); - streamedObjectQueue.erase - (streamedObjectQueue.begin(), - streamedObjectQueue.begin() + txSize); - - if(_trace >= 1) + catch(const DbException& dx) { - Long now = IceUtil::Time::now().toMilliSeconds(); - Trace out(_communicator->getLogger(), "Freeze.Evictor"); - out << "saved " << txSize << " objects in " - << static_cast<Int>(now - saveStart) << " ms"; + DatabaseException ex(__FILE__, __LINE__); + ex.message = dx.what(); + throw ex; } - } - catch(const DbDeadlockException&) - { - tryAgain = true; - txSize = (txSize + 1)/2; - } - catch(const DbException& dx) - { - DatabaseException ex(__FILE__, __LINE__); - ex.message = dx.what(); - throw ex; - } - } - } - while(tryAgain); - - { - Lock sync(*this); + } + } + while(tryAgain); - _generation++; - - for(deque<FacetPtr>::iterator q = allObjects.begin(); - q != allObjects.end(); q++) { - FacetPtr& facet = *q; - facet->element->usageCount--; - - if(facet != facet->element->mainObject) + Lock sync(*this); + + _generation++; + + for(deque<FacetPtr>::iterator q = allObjects.begin(); + q != allObjects.end(); q++) { - // - // Remove if dead - // - IceUtil::Mutex::Lock lockFacet(facet->mutex); + FacetPtr& facet = *q; + facet->element->usageCount--; + + if(facet != facet->element->mainObject) { - if(facet->status == dead) + // + // Remove if dead + // + IceUtil::Mutex::Lock lockFacet(facet->mutex); { - facet->element->facets.erase(facet->position); - facet->position = facet->element->facets.end(); - } + if(facet->status == dead) + { + facet->element->facets.erase(facet->position); + facet->position = facet->element->facets.end(); + } + } } } + allObjects.clear(); + evict(); + + if(saveNowThreadsSize > 0) + { + _saveNowThreads.erase(_saveNowThreads.begin(), _saveNowThreads.begin() + saveNowThreadsSize); + notifyAll(); + } } - allObjects.clear(); - evict(); - - if(saveNowThreadsSize > 0) - { - _saveNowThreads.erase(_saveNowThreads.begin(), _saveNowThreads.begin() + saveNowThreadsSize); - notifyAll(); - } + _lastSave = IceUtil::Time::now(); } - _lastSave = IceUtil::Time::now(); + } + catch(const IceUtil::Exception& ex) + { + Error out(_communicator->getLogger()); + out << "Saving thread killed by exception: " << ex; + out.flush(); + ::abort(); + } + catch(const std::exception& ex) + { + Error out(_communicator->getLogger()); + out << "Saving thread killed by std::exception: " << ex.what(); + out.flush(); + ::abort(); + } + catch(...) + { + Error out(_communicator->getLogger()); + out << "Saving thread killed by unknown exception"; + out.flush(); + ::abort(); } } - bool Freeze::EvictorI::load(Dbc* dbc, Key& key, Value& value, vector<Identity>& identities, |