diff options
author | Bernard Normier <bernard@zeroc.com> | 2007-05-25 13:45:57 +0000 |
---|---|---|
committer | Bernard Normier <bernard@zeroc.com> | 2007-05-25 13:45:57 +0000 |
commit | 87066d75a2e1026f39e81aa9d5c555b571295b70 (patch) | |
tree | e6304ee267f05f065ac223dc17f02fc42e2e0b0d /java/src | |
parent | adding listener for assertion failures (diff) | |
download | ice-87066d75a2e1026f39e81aa9d5c555b571295b70.tar.bz2 ice-87066d75a2e1026f39e81aa9d5c555b571295b70.tar.xz ice-87066d75a2e1026f39e81aa9d5c555b571295b70.zip |
Renamed existing Freeze Evictor to BackgroundSaveEvictor and added new
TransactionalEvictor
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/Freeze/BackgroundSaveEvictorI.java | 1540 | ||||
-rwxr-xr-x | java/src/Freeze/ConnectionI.java | 18 | ||||
-rw-r--r-- | java/src/Freeze/EvictorI.java | 1978 | ||||
-rw-r--r-- | java/src/Freeze/EvictorIteratorI.java | 181 | ||||
-rw-r--r-- | java/src/Freeze/Index.java | 269 | ||||
-rw-r--r-- | java/src/Freeze/LinkedList.java | 13 | ||||
-rw-r--r-- | java/src/Freeze/ObjectStore.java | 286 | ||||
-rw-r--r-- | java/src/Freeze/SharedDbEnv.java | 53 | ||||
-rw-r--r-- | java/src/Freeze/TransactionalEvictorContextI.java | 349 | ||||
-rw-r--r-- | java/src/Freeze/TransactionalEvictorI.java | 729 | ||||
-rw-r--r-- | java/src/Freeze/Util.java | 34 | ||||
-rw-r--r-- | java/src/Ice/LoggerI.java | 2 | ||||
-rw-r--r-- | java/src/IceUtil/Cache.java | 8 |
13 files changed, 3418 insertions, 2042 deletions
diff --git a/java/src/Freeze/BackgroundSaveEvictorI.java b/java/src/Freeze/BackgroundSaveEvictorI.java new file mode 100644 index 00000000000..488adabadca --- /dev/null +++ b/java/src/Freeze/BackgroundSaveEvictorI.java @@ -0,0 +1,1540 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package Freeze; + +// +// A Freeze evictor implementation that saves updates asynchronously, +// in a "saving" thread +// + +class BackgroundSaveEvictorI extends EvictorI implements BackgroundSaveEvictor, Runnable +{ + // + // Clean object; can become modified or destroyed + // + static final byte clean = 0; + + // + // New object; can become clean, dead or destroyed + // + static final byte created = 1; + + // + // Modified object; can become clean or destroyed + // + static final byte modified = 2; + + // + // Being saved. Can become dead or created + // + static final byte destroyed = 3; + + // + // Exists only in the SaveAsyncEvictor; for example the object was created + // and later destroyed (without a save in between), or it was + // destroyed on disk but is still in use. Can become created. + // + static final byte dead = 4; + + // + // The WatchDogThread is used by the saving thread to ensure the + // streaming of some object does not take more than timeout ms. + // We only measure the time necessary to acquire the lock on the + // object (servant), not the streaming itself. + // + class WatchDogThread extends Thread + { + WatchDogThread(long timeout, String name) + { + super(name); + _timeout = timeout; + assert timeout > 0; + } + + public synchronized void run() + { + while(!_done) + { + long startTime = 0; + + try + { + if(_active) + { + startTime = System.currentTimeMillis(); + wait(_timeout); + } + else + { + wait(); + } + } + catch(InterruptedException e) + { + // + // Ignore + // + } + + if(!_done && _active && startTime > 0) + { + // + // Did we timeout? + // + if(System.currentTimeMillis() - startTime >= _timeout) + { + _communicator.getLogger().error(_errorPrefix + + "Fatal error: streaming watch dog thread timed out."); + + Util.handleFatalError(BackgroundSaveEvictorI.this, _communicator, null); + } + } + } + } + + synchronized void activate() + { + _active = true; + notify(); + } + + synchronized void deactivate() + { + _active = false; + notify(); + } + + synchronized void terminate() + { + _done = true; + notify(); + } + + private final long _timeout; + private boolean _done = false; + private boolean _active = false; + } + + + BackgroundSaveEvictorI(Ice.ObjectAdapter adapter, String envName, String filename, + ServantInitializer initializer, Index[] indices, boolean createDb) + { + this(adapter, envName, null, filename, initializer, indices, createDb); + } + + + BackgroundSaveEvictorI(Ice.ObjectAdapter adapter, String envName, com.sleepycat.db.Environment dbEnv, + String filename, ServantInitializer initializer, Index[] indices, boolean createDb) + { + super(adapter, envName, dbEnv, filename, null, initializer, indices, createDb); + + String propertyPrefix = "Freeze.Evictor." + envName + '.' + filename; + + // + // By default, we save every minute or when the size of the modified + // queue reaches 10. + // + + _saveSizeTrigger = + _communicator.getProperties().getPropertyAsIntWithDefault(propertyPrefix + ".SaveSizeTrigger", 10); + + _savePeriod = + _communicator.getProperties().getPropertyAsIntWithDefault(propertyPrefix + ".SavePeriod", 60 * 1000); + + // + // By default, we save at most 10 * SaveSizeTrigger objects per transaction + // + _maxTxSize = _communicator.getProperties().getPropertyAsIntWithDefault( + propertyPrefix + ".MaxTxSize", 10 * _saveSizeTrigger); + + if(_maxTxSize <= 0) + { + _maxTxSize = 100; + } + + // + // Start threads + // + String savingThreadName; + + String programName = _communicator.getProperties().getProperty("Ice.ProgramName"); + if(programName.length() > 0) + { + savingThreadName = programName + "-"; + } + else + { + savingThreadName = ""; + } + String watchDogThreadName = savingThreadName + "FreezeEvictorWatchDogThread(" + envName + '.' + _filename + ")"; + savingThreadName += "FreezeEvictorThread(" + envName + '.' + _filename + ")"; + + // + // By default, no stream timeout + // + long streamTimeout = + _communicator.getProperties().getPropertyAsIntWithDefault(propertyPrefix + ".StreamTimeout", 0) * 1000; + + if(streamTimeout > 0) + { + _watchDogThread = new WatchDogThread(streamTimeout, watchDogThreadName); + _watchDogThread.start(); + } + + _thread = new Thread(this, savingThreadName); + _thread.start(); + } + + + public Ice.ObjectPrx + addFacet(Ice.Object servant, Ice.Identity ident, String facet) + { + checkIdentity(ident); + + if(facet == null) + { + facet = ""; + } + + _deactivateController.lock(); + try + { + ObjectStore store = findStore(facet, _createDb); + + if(store == null) + { + NotFoundException ex = new NotFoundException(); + ex.message = _errorPrefix + "addFacet: could not open database for facet '" + + facet + "'"; + throw ex; + } + + boolean alreadyThere = false; + + for(;;) + { + // + // Create a new entry + // + + EvictorElement element = new EvictorElement(ident, store); + element.status = dead; + element.rec = new ObjectRecord(); + element.rec.stats = new Statistics(); + + Object o = store.cache().putIfAbsent(ident, element); + + if(o != null) + { + element = (EvictorElement)o; + } + + synchronized(this) + { + if(element.stale) + { + // + // Try again + // + continue; + } + fixEvictPosition(element); + + synchronized(element) + { + switch(element.status) + { + case clean: + case created: + case modified: + { + alreadyThere = true; + break; + } + case destroyed: + { + element.status = modified; + element.rec.servant = servant; + + // + // No need to push it on the modified queue, as a destroyed object + // is either already on the queue or about to be saved. When saved, + // it becomes dead. + // + break; + } + case dead: + { + element.status = created; + ObjectRecord rec = element.rec; + + rec.servant = servant; + rec.stats.creationTime = System.currentTimeMillis(); + rec.stats.lastSaveTime = 0; + rec.stats.avgSaveTime = 0; + + addToModifiedQueue(element); + break; + } + default: + { + assert false; + break; + } + } + } + } + break; // for(;;) + } + + if(alreadyThere) + { + Ice.AlreadyRegisteredException ex = new Ice.AlreadyRegisteredException(); + ex.kindOfObject = "servant"; + ex.id = _communicator.identityToString(ident); + if(facet.length() > 0) + { + ex.id += " -f " + IceUtil.StringUtil.escapeString(facet, ""); + } + throw ex; + } + + if(_trace >= 1) + { + String objString = "object \"" + _communicator.identityToString(ident) + "\""; + if(!facet.equals("")) + { + objString += " with facet \"" + facet + "\""; + } + + _communicator.getLogger().trace("Freeze.Evictor", "added " + objString + " to Db \"" + _filename + + "\""); + } + + Ice.ObjectPrx obj = _adapter.createProxy(ident); + if(facet.length() > 0) + { + obj = obj.ice_facet(facet); + } + return obj; + } + finally + { + _deactivateController.unlock(); + } + } + + public Ice.Object + removeFacet(Ice.Identity ident, String facet) + { + checkIdentity(ident); + if(facet == null) + { + facet = ""; + } + + _deactivateController.lock(); + + try + { + ObjectStore store = findStore(facet, false); + Ice.Object servant = null; + + if(store != null) + { + for(;;) + { + // + // Retrieve object + // + EvictorElement element = (EvictorElement)store.cache().pin(ident); + if(element != null) + { + synchronized(this) + { + if(element.stale) + { + // + // Try again + // + continue; + } + + fixEvictPosition(element); + synchronized(element) + { + switch(element.status) + { + case clean: + { + servant = element.rec.servant; + element.status = destroyed; + element.rec.servant = null; + addToModifiedQueue(element); + break; + } + case created: + { + servant = element.rec.servant; + element.status = dead; + element.rec.servant = null; + break; + } + case modified: + { + servant = element.rec.servant; + element.status = destroyed; + element.rec.servant = null; + // + // Not necessary to push it on the modified queue, as a modified + // element is either on the queue already or about to be saved + // (at which point it becomes clean) + // + break; + } + case destroyed: + case dead: + { + break; + } + default: + { + assert false; + break; + } + } + } + + if(element.keepCount > 0) + { + assert servant != null; + + element.keepCount = 0; + // + // Add to front of evictor queue + // + // Note that save evicts dead objects + // + _evictorList.addFirst(element); + element.evictPosition = _evictorList.iterator(); + // + // Position the iterator "on" the element. + // + element.evictPosition.next(); + _currentEvictorSize++; + } + } + } + break; // for(;;) + } + } + + if(servant == null) + { + Ice.NotRegisteredException ex = new Ice.NotRegisteredException(); + ex.kindOfObject = "servant"; + ex.id = _communicator.identityToString(ident); + if(facet.length() > 0) + { + ex.id += " -f " + IceUtil.StringUtil.escapeString(facet, ""); + } + throw ex; + } + + if(_trace >= 1) + { + String objString = "object \"" + _communicator.identityToString(ident) + "\""; + if(!facet.equals("")) + { + objString += " with facet \"" + facet + "\""; + } + + _communicator.getLogger().trace("Freeze.Evictor", "removed " + objString + " from Db \"" + _filename + + "\""); + } + return servant; + } + finally + { + _deactivateController.unlock(); + } + } + + + public void + keep(Ice.Identity ident) + { + keepFacet(ident, ""); + } + + public void + keepFacet(Ice.Identity ident, String facet) + { + checkIdentity(ident); + if(facet == null) + { + facet = ""; + } + + _deactivateController.lock(); + try + { + boolean notThere = false; + + ObjectStore store = findStore(facet, false); + if(store == null) + { + notThere = true; + } + else + { + for(;;) + { + EvictorElement element = (EvictorElement)store.cache().pin(ident); + if(element == null) + { + notThere = true; + break; + } + + synchronized(this) + { + if(element.stale) + { + // + // try again + // + continue; + } + + synchronized(element) + { + if(element.status == destroyed || element.status == dead) + { + notThere = true; + break; + } + } + + // + // Found! + // + + if(element.keepCount == 0) + { + if(element.usageCount < 0) + { + // + // New object + // + element.usageCount = 0; + } + else + { + assert element.evictPosition != null; + element.evictPosition.remove(); + element.evictPosition = null; + _currentEvictorSize--; + } + element.keepCount = 1; + } + else + { + element.keepCount++; + } + break; + } + } + } + + if(notThere) + { + Ice.NotRegisteredException ex = new Ice.NotRegisteredException(); + ex.kindOfObject = "servant"; + ex.id = _communicator.identityToString(ident); + if(facet.length() > 0) + { + ex.id += " -f " + IceUtil.StringUtil.escapeString(facet, ""); + } + throw ex; + } + } + finally + { + _deactivateController.unlock(); + } + } + + public void + release(Ice.Identity ident) + { + releaseFacet(ident, ""); + } + + public void + releaseFacet(Ice.Identity ident, String facet) + { + checkIdentity(ident); + if(facet == null) + { + facet = ""; + } + + _deactivateController.lock(); + try + { + + ObjectStore store = findStore(facet, false); + + if(store != null) + { + synchronized(this) + { + EvictorElement element = (EvictorElement) store.cache().getIfPinned(ident); + if(element != null) + { + assert !element.stale; + if(element.keepCount > 0) + { + if(--element.keepCount == 0) + { + // + // Add to front of evictor queue + // + // Note that the element cannot be destroyed or dead since + // its keepCount was > 0. + // + assert element.evictPosition == null; + _evictorList.addFirst(element); + element.evictPosition = _evictorList.iterator(); + // + // Position the iterator "on" the element. + // + element.evictPosition.next(); + _currentEvictorSize++; + } + // + // Success + // + return; + } + } + } + } + + Ice.NotRegisteredException ex = new Ice.NotRegisteredException(); + ex.kindOfObject = "servant"; + ex.id = _communicator.identityToString(ident); + if(facet.length() > 0) + { + ex.id += " -f " + IceUtil.StringUtil.escapeString(facet, ""); + } + + throw ex; + } + finally + { + _deactivateController.unlock(); + } + } + + public boolean + hasFacet(Ice.Identity ident, String facet) + { + checkIdentity(ident); + if(facet == null) + { + facet = ""; + } + + _deactivateController.lock(); + try + { + + ObjectStore store = findStore(facet, false); + + if(store == null) + { + return false; + } + + synchronized(this) + { + EvictorElement element = (EvictorElement)store.cache().getIfPinned(ident); + if(element != null) + { + assert !element.stale; + + synchronized(element) + { + return element.status != dead && element.status != destroyed; + } + } + } + return store.dbHasObject(ident, null); + } + finally + { + _deactivateController.unlock(); + } + } + + + protected boolean + hasAnotherFacet(Ice.Identity ident, String facet) + { + _deactivateController.lock(); + try + { + // + // If the object exists in another store, throw FacetNotExistException + // instead of returning null (== ObjectNotExistException) + // + java.util.Map storeMapCopy; + synchronized(this) + { + storeMapCopy = new java.util.HashMap(_storeMap); + } + + java.util.Iterator p = storeMapCopy.entrySet().iterator(); + while(p.hasNext()) + { + java.util.Map.Entry entry = (java.util.Map.Entry) p.next(); + + // + // Do not check facet + // + if(!facet.equals(entry.getKey())) + { + ObjectStore store = (ObjectStore)entry.getValue(); + boolean inCache = false; + + synchronized(this) + { + EvictorElement element = (EvictorElement)store.cache().getIfPinned(ident); + if(element != null) + { + inCache = true; + assert !element.stale; + + synchronized(element) + { + if(element.status != dead && element.status != destroyed) + { + return true; + } + } + } + } + if(!inCache) + { + if(store.dbHasObject(ident, null)) + { + return true; + } + } + } + } + return false; + } + finally + { + _deactivateController.unlock(); + } + } + + + protected Object + createEvictorElement(Ice.Identity ident, ObjectRecord rec, ObjectStore store) + { + EvictorElement elt = new EvictorElement(ident, store); + elt.rec = rec; + return elt; + } + + protected Ice.Object + locateImpl(Ice.Current current, Ice.LocalObjectHolder cookie) + { + _deactivateController.lock(); + try + { + + cookie.value = null; + + ObjectStore store = findStore(current.facet, false); + if(store == null) + { + if(_trace >= 2) + { + _communicator.getLogger().trace("Freeze.Evictor", "locate could not find a database for facet \"" + + current.facet + "\""); + } + return null; + } + + for(;;) + { + EvictorElement element = (EvictorElement)store.cache().pin(current.id); + if(element == null) + { + if(_trace >= 2) + { + _communicator.getLogger().trace("Freeze.Evictor", "locate could not find \"" + + _communicator.identityToString(current.id) + "\" in Db \"" + + _filename + "\""); + } + return null; + } + + synchronized(this) + { + if(element.stale) + { + // + // try again + // + continue; + } + + synchronized(element) + { + if(element.status == destroyed || + element.status == dead) + { + if(_trace >= 2) + { + _communicator.getLogger().trace("Freeze.Evictor", "locate found \"" + + _communicator.identityToString(current.id) + + "\" in the cache for Db \"" + _filename + + "\" but it was dead or destroyed"); + } + return null; + } + + // + // It's a good one! + // + + if(_trace >= 2) + { + _communicator.getLogger().trace("Freeze.Evictor", "locate found \"" + + _communicator.identityToString(current.id) + "\" in Db \"" + + _filename + "\""); + } + + fixEvictPosition(element); + element.usageCount++; + cookie.value = element; + assert element.rec.servant != null; + return element.rec.servant; + } + } + } + } + finally + { + _deactivateController.unlock(); + } + } + + public void + finished(Ice.Current current, Ice.Object servant, Ice.LocalObject cookie) + { + _deactivateController.lock(); + try + { + if(cookie != null) + { + EvictorElement element = (EvictorElement)cookie; + + boolean enqueue = false; + + if((_useNonmutating && current.mode != Ice.OperationMode.Nonmutating) || + (!_useNonmutating && (servant.ice_operationAttributes(current.operation) & 0x1) != 0)) + { + synchronized(element) + { + if(element.status == clean) + { + // + // Assume this operation updated the object + // + element.status = modified; + enqueue = true; + } + } + } + + synchronized(this) + { + // + // Only elements with a usageCount == 0 can become stale and we own + // one count! + // + assert !element.stale; + assert element.usageCount >= 1; + + // + // Decrease the usage count of the evictor queue element. + // + element.usageCount--; + + if(enqueue) + { + addToModifiedQueue(element); + } + else if(element.usageCount == 0 && element.keepCount == 0) + { + // + // Evict as many elements as necessary. + // + evict(); + } + } + } + } + finally + { + _deactivateController.unlock(); + } + } + + public void + deactivate(String category) + { + if(_deactivateController.deactivate()) + { + try + { + saveNow(); + + synchronized(this) + { + // + // Set the evictor size to zero, meaning that we will evict + // everything possible. + // + _evictorSize = 0; + evict(); + + _savingThreadDone = true; + notifyAll(); + } + + try + { + _thread.join(); + } + catch(InterruptedException ex) + { + } + + if(_watchDogThread != null) + { + _watchDogThread.terminate(); + + try + { + _watchDogThread.join(); + } + catch(InterruptedException ex) + { + } + } + + closeDbEnv(); + } + finally + { + _deactivateController.deactivationComplete(); + } + } + } + + + public void + run() + { + try + { + for(;;) + { + java.util.List allObjects; + java.util.List deadObjects = new java.util.LinkedList(); + + int saveNowThreadsSize = 0; + + synchronized(this) + { + while(!_savingThreadDone && + (_saveNowThreads.size() == 0) && + (_saveSizeTrigger < 0 || _modifiedQueue.size() < _saveSizeTrigger)) + { + try + { + if(_savePeriod == 0) + { + wait(); + } + else + { + long preSave = System.currentTimeMillis(); + wait(_savePeriod); + if(System.currentTimeMillis() > preSave + _savePeriod) + { + break; + } + } + } + catch(InterruptedException ex) + { + } + } + + saveNowThreadsSize = _saveNowThreads.size(); + + if(_savingThreadDone) + { + assert(_modifiedQueue.size() == 0); + assert(saveNowThreadsSize == 0); + break; // for(;;) + } + + // + // Check first if there is something to do! + // + if(_modifiedQueue.size() == 0) + { + if(saveNowThreadsSize > 0) + { + _saveNowThreads.clear(); + notifyAll(); + } + continue; // for(;;) + } + + allObjects = _modifiedQueue; + _modifiedQueue = new java.util.ArrayList(); + } + + int size = allObjects.size(); + + java.util.List streamedObjectQueue = new java.util.ArrayList(); + + long streamStart = System.currentTimeMillis(); + + // + // Stream each element + // + for(int i = 0; i < size; i++) + { + EvictorElement element = (EvictorElement)allObjects.get(i); + + boolean tryAgain; + + do + { + tryAgain = false; + Ice.Object servant = null; + + synchronized(element) + { + byte status = element.status; + + switch(status) + { + case created: + case modified: + { + servant = element.rec.servant; + break; + } + case destroyed: + { + streamedObjectQueue.add(stream(element, streamStart)); + + element.status = dead; + deadObjects.add(element); + break; + } + case dead: + { + deadObjects.add(element); + break; + } + default: + { + // + // Nothing to do (could be a duplicate) + // + break; + } + } + } + + if(servant != null) + { + // + // Lock servant and then facet so that user can safely lock + // servant and call various Evictor operations + // + if(_watchDogThread != null) + { + _watchDogThread.activate(); + } + synchronized(servant) + { + if(_watchDogThread != null) + { + _watchDogThread.deactivate(); + } + + synchronized(element) + { + byte status = element.status; + + switch(status) + { + case created: + case modified: + { + if(servant == element.rec.servant) + { + streamedObjectQueue.add(stream(element, streamStart)); + + element.status = clean; + } + else + { + tryAgain = true; + } + break; + } + case destroyed: + { + streamedObjectQueue.add(stream(element, streamStart)); + + element.status = dead; + deadObjects.add(element); + break; + } + case dead: + { + deadObjects.add(element); + break; + } + default: + { + // + // Nothing to do (could be a duplicate) + // + break; + } + } + } + } + } + } while(tryAgain); + } + + if(_trace >= 1) + { + long now = System.currentTimeMillis(); + _communicator.getLogger().trace("Freeze.Evictor", "streamed " + streamedObjectQueue.size() + + " objects in " + (now - streamStart) + " ms"); + } + + // + // Now let's save all these streamed objects to disk using a transaction + // + + // + // Each time we get a deadlock, we reduce the number of objects to save + // per transaction + // + int txSize = streamedObjectQueue.size(); + if(txSize > _maxTxSize) + { + txSize = _maxTxSize; + } + + boolean tryAgain; + + do + { + tryAgain = false; + + while(streamedObjectQueue.size() > 0) + { + if(txSize > streamedObjectQueue.size()) + { + txSize = streamedObjectQueue.size(); + } + + long saveStart = System.currentTimeMillis(); + String txnId = null; + + try + { + com.sleepycat.db.Transaction tx = _dbEnv.getEnv().beginTransaction(null, null); + + if(_txTrace >= 1) + { + txnId = Long.toHexString((tx.getId() & 0x7FFFFFFF) + 0x80000000L); + + _communicator.getLogger().trace("Freeze.Evictor", _errorPrefix + + "started transaction " + txnId + " in saving thread"); + } + + try + { + for(int i = 0; i < txSize; i++) + { + StreamedObject obj = (StreamedObject) streamedObjectQueue.get(i); + obj.store.save(obj.key, obj.value, obj.status, tx); + } + + com.sleepycat.db.Transaction toCommit = tx; + tx = null; + toCommit.commit(); + + if(_txTrace >= 1) + { + _communicator.getLogger().trace("Freeze.Evictor", _errorPrefix + + "committed transaction " + txnId); + } + } + finally + { + if(tx != null) + { + tx.abort(); + if(_txTrace >= 1) + { + _communicator.getLogger().trace("Freeze.Evictor", _errorPrefix + + "rolled back transaction " + txnId); + } + } + } + + for(int i = 0; i < txSize; i++) + { + streamedObjectQueue.remove(0); + } + + if(_trace >= 1) + { + long now = System.currentTimeMillis(); + _communicator.getLogger().trace("Freeze.Evictor", "saved " + txSize + " objects in " + + (now - saveStart) + " ms"); + } + } + catch(com.sleepycat.db.DeadlockException dx) + { + if(_deadlockWarning) + { + _communicator.getLogger().warning("Deadlock in Freeze.BackgroundSaveEvictorI.run while writing " + + "into Db \"" + _filename + "\"; retrying..."); + } + + tryAgain = true; + txSize = (txSize + 1)/2; + } + catch(com.sleepycat.db.DatabaseException dx) + { + DatabaseException ex = new DatabaseException(); + ex.initCause(dx); + ex.message = _errorPrefix + "saving: " + dx.getMessage(); + throw ex; + } + } + } while(tryAgain); + + synchronized(this) + { + // + // Release usage count + // + for(int i = 0; i < allObjects.size(); i++) + { + EvictorElement element = (EvictorElement) allObjects.get(i); + assert element.usageCount > 0; + element.usageCount--; + } + allObjects.clear(); + + java.util.Iterator p = deadObjects.iterator(); + while(p.hasNext()) + { + EvictorElement element = (EvictorElement) p.next(); + + // + // Can be stale when there are duplicates on the deadObjects list + // + if(!element.stale && element.usageCount == 0 && element.keepCount == 0) + { + // + // Get rid of unused dead elements + // + synchronized(element) + { + if(element.status == dead) + { + evict(element); + } + } + } + } + + deadObjects.clear(); + evict(); + + if(saveNowThreadsSize > 0) + { + for(int i = 0; i < saveNowThreadsSize; i++) + { + _saveNowThreads.remove(0); + } + notifyAll(); + } + } + } + } + catch(RuntimeException ex) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + _communicator.getLogger().error(_errorPrefix + "Fatal error in saving thread:\n" + sw.toString()); + + Util.handleFatalError(this, _communicator, ex); + } + } + + protected void + evict() + { + assert Thread.holdsLock(this); + + java.util.Iterator p = _evictorList.riterator(); + while(p.hasNext() && _currentEvictorSize > _evictorSize) + { + // + // Get the last unused element from the evictor queue. + // + EvictorElement element = (EvictorElement)p.next(); + if(element.usageCount == 0) + { + // + // Fine, servant is not in use (and not in the modifiedQueue) + // + + assert !element.stale; + assert element.keepCount == 0; + assert element.evictPosition != null; + + if(_trace >= 2 || (_trace >= 1 && _evictorList.size() % 50 == 0)) + { + String objString = "object \"" + _communicator.identityToString(element.identity) + "\""; + String facet = element.store.facet(); + if(facet.length() > 0) + { + objString += " with facet \"" + facet + "\""; + } + + _communicator.getLogger().trace("Freeze.Evictor", "evicting " + objString + " from the queue; " + + "number of elements in the queue: " + _currentEvictorSize); + } + + // + // Remove last unused element from the evictor queue. + // + element.stale = true; + element.store.cache().unpin(element.identity); + p.remove(); + element.evictPosition = null; + _currentEvictorSize--; + } + } + } + + protected com.sleepycat.db.Transaction + beforeQuery() + { + saveNow(); + return null; + } + + synchronized private void + saveNow() + { + Thread myself = Thread.currentThread(); + + _saveNowThreads.add(myself); + notifyAll(); + do + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + while(_saveNowThreads.contains(myself)); + } + + private void + fixEvictPosition(EvictorElement element) + { + assert Thread.holdsLock(this); + + assert !element.stale; + + if(element.keepCount == 0) + { + if(element.usageCount < 0) + { + assert element.evictPosition == null; + + // + // New object + // + element.usageCount = 0; + _currentEvictorSize++; + } + else + { + assert element.evictPosition != null; + element.evictPosition.remove(); + } + _evictorList.addFirst(element); + element.evictPosition = _evictorList.iterator(); + // + // Position the iterator "on" the element. + // + element.evictPosition.next(); + } + } + + private void + evict(EvictorElement element) + { + assert Thread.holdsLock(this); + + assert !element.stale; + assert element.keepCount == 0; + + element.evictPosition.remove(); + _currentEvictorSize--; + element.stale = true; + element.store.cache().unpin(element.identity); + } + + private void + addToModifiedQueue(EvictorElement element) + { + assert Thread.holdsLock(this); + + element.usageCount++; + _modifiedQueue.add(element); + + if(_saveSizeTrigger >= 0 && _modifiedQueue.size() >= _saveSizeTrigger) + { + notifyAll(); + } + } + + private StreamedObject + stream(EvictorElement element, long streamStart) + { + assert Thread.holdsLock(element); + + assert element.status != dead; + + StreamedObject obj = new StreamedObject(); + + obj.status = element.status; + obj.store = element.store; + obj.key = ObjectStore.marshalKey(element.identity, _communicator); + + if(element.status != destroyed) + { + updateStats(element.rec.stats, streamStart); + obj.value = ObjectStore.marshalValue(element.rec, _communicator); + } + return obj; + } + + static private class EvictorElement extends Ice.LocalObjectImpl + { + EvictorElement(Ice.Identity identity, ObjectStore store) + { + this.identity = identity; + this.store = store; + } + + final ObjectStore store; + final Ice.Identity identity; + + // + // Protected by SaveAsyncEvictor + // + java.util.Iterator evictPosition = null; + int usageCount = -1; + int keepCount = 0; + boolean stale = false; + + // + // Protected by this + // + ObjectRecord rec = null; + byte status = clean; + } + + + static private class StreamedObject + { + byte[] key = null; + byte[] value = null; + byte status = dead; + ObjectStore store = null; + } + + // + // List of EvictorElement with stable iterators + // + private final Freeze.LinkedList _evictorList = new Freeze.LinkedList(); + private int _currentEvictorSize = 0; + + // + // The _modifiedQueue contains a queue of all modified facets + // Each element in the queue "owns" a usage count, to ensure the + // elements containing them remain in the map. + // + private java.util.List _modifiedQueue = new java.util.ArrayList(); + + private boolean _savingThreadDone = false; + private WatchDogThread _watchDogThread = null; + + // + // Threads that have requested a "saveNow" and are waiting for + // its completion + // + private final java.util.List _saveNowThreads = new java.util.ArrayList(); + + private int _saveSizeTrigger; + private int _maxTxSize; + private long _savePeriod; + + private Thread _thread; +} diff --git a/java/src/Freeze/ConnectionI.java b/java/src/Freeze/ConnectionI.java index 0c8f2eed788..053d32fd353 100755 --- a/java/src/Freeze/ConnectionI.java +++ b/java/src/Freeze/ConnectionI.java @@ -88,7 +88,7 @@ class ConnectionI extends Ice.LocalObjectImpl implements Connection } - if(_dbEnv != null) + if(_dbEnv != null && _ownDbEnv) { try { @@ -101,11 +101,11 @@ class ConnectionI extends Ice.LocalObjectImpl implements Connection } } - ConnectionI(Ice.Communicator communicator, String envName, com.sleepycat.db.Environment dbEnv) + ConnectionI(SharedDbEnv dbEnv) { - _communicator = communicator; - _dbEnv = SharedDbEnv.get(communicator, envName, dbEnv); - _envName = envName; + _dbEnv = dbEnv; + _communicator = dbEnv.getCommunicator(); + _envName = dbEnv.getEnvName(); _trace = _communicator.getProperties().getPropertyAsInt("Freeze.Trace.Map"); _txTrace = _communicator.getProperties().getPropertyAsInt("Freeze.Trace.Transaction"); @@ -114,6 +114,13 @@ class ConnectionI extends Ice.LocalObjectImpl implements Connection _closeInFinalizeWarning = properties.getPropertyAsIntWithDefault("Freeze.Warn.CloseInFinalize", 1) > 0; } + ConnectionI(Ice.Communicator communicator, String envName, com.sleepycat.db.Environment dbEnv) + { + this(SharedDbEnv.get(communicator, envName, dbEnv)); + _ownDbEnv = true; + } + + // // The synchronization is only needed only during finalization // @@ -206,6 +213,7 @@ class ConnectionI extends Ice.LocalObjectImpl implements Connection private Ice.Communicator _communicator; private SharedDbEnv _dbEnv; + private boolean _ownDbEnv = false; private String _envName; private TransactionI _transaction; private LinkedList _mapList = new Freeze.LinkedList(); diff --git a/java/src/Freeze/EvictorI.java b/java/src/Freeze/EvictorI.java index 0ffbb2a3d5e..21d6d633036 100644 --- a/java/src/Freeze/EvictorI.java +++ b/java/src/Freeze/EvictorI.java @@ -9,94 +9,15 @@ package Freeze; -class EvictorI extends Ice.LocalObjectImpl implements Evictor, Runnable +abstract class EvictorI extends Ice.LocalObjectImpl implements Evictor { - final static String defaultDb = "$default"; - final static String indexPrefix = "$index:"; - // - // The WatchDogThread is used by the saving thread to ensure the - // streaming of some object does not take more than timeout ms. - // We only measure the time necessary to acquire the lock on the - // object (servant), not the streaming itself. + // The deactivate controller is used by the implementation of all public + // operations to ensure that deactivate() (which closes/clears various + // Berkeley DB objects) is not called during these operations. + // Note that the only threads that may perform such concurrent calls + // are threads other than the dispatch threads of the associated adapter. // - class WatchDogThread extends Thread - { - WatchDogThread(long timeout, String name) - { - super(name); - _timeout = timeout; - assert timeout > 0; - } - - public synchronized void run() - { - while(!_done) - { - long startTime = 0; - - // - // Unfortunely wait can be waken up by a spurious wakeup, - // so we cannot reliably tell when we are woken up by a timeout. - // - try - { - if(_active) - { - startTime = System.currentTimeMillis(); - wait(_timeout); - } - else - { - wait(); - } - } - catch(InterruptedException e) - { - // - // Ignore - // - } - - if(!_done && _active && startTime > 0) - { - // - // Did we timeout? - // - if(System.currentTimeMillis() - startTime >= _timeout) - { - _communicator.getLogger().error(_errorPrefix + - "Fatal error: streaming watch dog thread timed out."); - - Util.handleFatalError(EvictorI.this, _communicator, null); - } - } - } - } - - synchronized void activate() - { - _active = true; - notify(); - } - - synchronized void deactivate() - { - _active = false; - notify(); - } - - synchronized void terminate() - { - _done = true; - notify(); - } - - private final long _timeout; - private boolean _done = false; - private boolean _active = false; - } - class DeactivateController { synchronized void activate() @@ -133,7 +54,7 @@ class EvictorI extends Ice.LocalObjectImpl implements Evictor, Runnable synchronized boolean deactivated() { - return !_activated || _deactivated || _deactivating; + return !_activated || _deactivated; } synchronized boolean deactivate() @@ -171,7 +92,7 @@ class EvictorI extends Ice.LocalObjectImpl implements Evictor, Runnable if(_trace >= 1) { _communicator.getLogger().trace( - "Freeze.Evictor", "*** Waiting for " + _guardCount + + "Freeze.Evictor", "Waiting for " + _guardCount + " threads to complete before starting deactivation."); } @@ -211,172 +132,87 @@ class EvictorI extends Ice.LocalObjectImpl implements Evictor, Runnable private int _guardCount = 0; } - public - EvictorI(Ice.ObjectAdapter adapter, String envName, String filename, ServantInitializer initializer, - Index[] indices, boolean createDb) + static final String defaultDb = "$default"; + static final String indexPrefix = "$index:"; + + + public Ice.ObjectPrx + add(Ice.Object servant, Ice.Identity ident) { - _adapter = adapter; - _communicator = adapter.getCommunicator(); - _initializer = initializer; - _filename = filename; - _createDb = createDb; - init(envName, null, indices); + return addFacet(servant, ident, ""); } - public - EvictorI(Ice.ObjectAdapter adapter, String envName, com.sleepycat.db.Environment dbEnv, String filename, - ServantInitializer initializer, Index[] indices, boolean createDb) + public Ice.Object + remove(Ice.Identity ident) { - _adapter = adapter; - _communicator = adapter.getCommunicator(); - _initializer = initializer; - _filename = filename; - _createDb = createDb; - init(envName, dbEnv, indices); + return removeFacet(ident, ""); } - private void - init(String envName, com.sleepycat.db.Environment dbEnv, Index[] indices) + public boolean + hasObject(Ice.Identity ident) { - _dbEnv = SharedDbEnv.get(_communicator, envName, dbEnv); - - _trace = _communicator.getProperties().getPropertyAsInt("Freeze.Trace.Evictor"); - _txTrace = _communicator.getProperties().getPropertyAsInt("Freeze.Trace.Transaction"); - _deadlockWarning = _communicator.getProperties().getPropertyAsInt("Freeze.Warn.Deadlocks") != 0; - _useNonmutating = _communicator.getProperties().getPropertyAsInt("Freeze.Evictor.UseNonmutating") != 0; - - - _errorPrefix = "Freeze Evictor DbEnv(\"" + envName + "\") Db(\"" + _filename + "\"): "; - - String propertyPrefix = "Freeze.Evictor." + envName + '.' + _filename; - - // - // By default, we save every minute or when the size of the modified - // queue reaches 10. - // - - _saveSizeTrigger = - _communicator.getProperties().getPropertyAsIntWithDefault(propertyPrefix + ".SaveSizeTrigger", 10); - - _savePeriod = - _communicator.getProperties().getPropertyAsIntWithDefault(propertyPrefix + ".SavePeriod", 60 * 1000); - - // - // By default, we save at most 10 * SaveSizeTrigger objects per transaction - // - _maxTxSize = _communicator.getProperties().getPropertyAsIntWithDefault( - propertyPrefix + ".MaxTxSize", 10 * _saveSizeTrigger); - - if(_maxTxSize <= 0) - { - _maxTxSize = 100; - } - - boolean populateEmptyIndices = - _communicator.getProperties().getPropertyAsIntWithDefault(propertyPrefix + ".PopulateEmptyIndices", 0) != 0; - - // - // Instantiate all Dbs in 2 steps: - // (1) iterate over the indices and create ObjectStore with indices - // (2) open ObjectStores without indices - // + return hasFacet(ident, ""); + } - java.util.List dbs = allDbs(); + public Ice.Object + locate(Ice.Current current, Ice.LocalObjectHolder cookie) + { // - // Add default db in case it's not there + // Special ice_ping() handling // - dbs.add(defaultDb); - - if(indices != null) - { - for(int i = 0; i < indices.length; ++i) + if(current.operation != null && current.operation.equals("ice_ping")) + { + if(hasFacet(current.id, current.facet)) { - String facet = indices[i].facet(); - - if(_storeMap.get(facet) == null) + if(_trace >= 3) { - java.util.List storeIndices = new java.util.LinkedList(); - for(int j = i; j < indices.length; ++j) - { - if(indices[j].facet().equals(facet)) - { - storeIndices.add(indices[j]); - } - } - - ObjectStore store = new ObjectStore(facet, _createDb, this, storeIndices, populateEmptyIndices); - _storeMap.put(facet, store); + _communicator.getLogger().trace( + "Freeze.Evictor", "ice_ping found \"" + _communicator.identityToString(current.id) + + "\" with facet \"" + current.facet + "\""); } + cookie.value = null; + return _pingObject; } - } - - java.util.Iterator p = dbs.iterator(); - while(p.hasNext()) - { - String facet = (String) p.next(); - if(facet.equals(defaultDb)) + else if(hasAnotherFacet(current.id, current.facet)) { - facet = ""; + if(_trace >= 3) + { + _communicator.getLogger().trace( + "Freeze.Evictor", "ice_ping raises FacetNotExistException for \"" + + _communicator.identityToString(current.id) + "\" with facet \"" + current.facet + "\""); + } + + throw new Ice.FacetNotExistException(); } - - if(_storeMap.get(facet) == null) + else { - ObjectStore store = new ObjectStore(facet, _createDb, this, new java.util.LinkedList(), - populateEmptyIndices); + if(_trace >= 3) + { + _communicator.getLogger().trace( + "Freeze.Evictor", "ice_ping will raise ObjectNotExistException for \"" + + _communicator.identityToString(current.id) + "\" with facet \"" + current.facet + "\""); + } - _storeMap.put(facet, store); + return null; } } - - // - // Start threads - // - String savingThreadName; - String programName = _communicator.getProperties().getProperty("Ice.ProgramName"); - if(programName.length() > 0) - { - savingThreadName = programName + "-"; - } - else - { - savingThreadName = ""; - } - String watchDogThreadName = savingThreadName + "FreezeEvictorWatchDogThread(" + envName + '.' + _filename + ")"; - savingThreadName += "FreezeEvictorThread(" + envName + '.' + _filename + ")"; - - // - // By default, no stream timeout - // - long streamTimeout = - _communicator.getProperties().getPropertyAsIntWithDefault(propertyPrefix + ".StreamTimeout", 0) * 1000; - - if(streamTimeout > 0) - { - _watchDogThread = new WatchDogThread(streamTimeout, watchDogThreadName); - _watchDogThread.start(); - } - - _thread = new Thread(this, savingThreadName); - _thread.start(); - _deactivateController.activate(); - } - - protected void - finalize() - { - if(!_deactivateController.deactivated()) + Ice.Object result = locateImpl(current, cookie); + + if(result == null) { - _communicator.getLogger().warning("evictor has not been deactivated"); - deactivate(""); + if(hasAnotherFacet(current.id, current.facet)) + { + throw new Ice.FacetNotExistException(current.id, current.facet, current.operation); + } } + return result; } synchronized public void setSize(int evictorSize) { _deactivateController.lock(); - try { // @@ -386,12 +222,12 @@ class EvictorI extends Ice.LocalObjectImpl implements Evictor, Runnable { return; } - + // // Update the evictor size. // _evictorSize = evictorSize; - + // // Evict as many elements as necessary. // @@ -408,178 +244,20 @@ class EvictorI extends Ice.LocalObjectImpl implements Evictor, Runnable { return _evictorSize; } - - public Ice.ObjectPrx - add(Ice.Object servant, Ice.Identity ident) - { - return addFacet(servant, ident, ""); - } - public Ice.ObjectPrx - addFacet(Ice.Object servant, Ice.Identity ident, String facet) + + public EvictorIterator + getIterator(String facet, int batchSize) { - checkIdentity(ident); - - if(facet == null) - { - facet = ""; - } - - // - // Need to clone in case the given ident changes. - // - ident = (Ice.Identity) ident.clone(); - _deactivateController.lock(); - try { - ObjectStore store = null; - - for(;;) - { - synchronized(this) - { - Object o = _storeMap.get(facet); - - if(o == null) - { - if(store != null) - { - _storeMap.put(facet, store); - } - } - else - { - if(store != null) - { - store.close(); - } - store = (ObjectStore) o; - } - } - - if(store == null) - { - assert facet.length() > 0; - store = new ObjectStore(facet, _createDb, this, new java.util.LinkedList(), false); - // loop - } - else - { - break; // for(;;) - } - } - - assert store != null; - boolean alreadyThere = false; - - for(;;) + if(facet == null) { - // - // Create a new entry - // - - EvictorElement element = new EvictorElement(ident, store); - element.status = EvictorElement.dead; - element.rec = new ObjectRecord(); - element.rec.stats = new Statistics(); - - Object o = store.cache().putIfAbsent(ident, element); - - if(o != null) - { - element = (EvictorElement) o; - } - - synchronized(this) - { - if(element.stale) - { - // - // Try again - // - continue; - } - fixEvictPosition(element); - - synchronized(element) - { - switch(element.status) - { - case EvictorElement.clean: - case EvictorElement.created: - case EvictorElement.modified: - { - alreadyThere = true; - break; - } - case EvictorElement.destroyed: - { - element.status = EvictorElement.modified; - element.rec.servant = servant; - - // - // No need to push it on the modified queue, as a destroyed object - // is either already on the queue or about to be saved. When saved, - // it becomes dead. - // - break; - } - case EvictorElement.dead: - { - element.status = EvictorElement.created; - ObjectRecord rec = element.rec; - - rec.servant = servant; - rec.stats.creationTime = System.currentTimeMillis(); - rec.stats.lastSaveTime = 0; - rec.stats.avgSaveTime = 0; - - addToModifiedQueue(element); - break; - } - default: - { - assert false; - break; - } - } - } - } - break; // for(;;) - } - - if(alreadyThere) - { - Ice.AlreadyRegisteredException ex = new Ice.AlreadyRegisteredException(); - ex.kindOfObject = "servant"; - ex.id = _communicator.identityToString(ident); - if(facet.length() > 0) - { - ex.id += " -f " + IceUtil.StringUtil.escapeString(facet, ""); - } - throw ex; - } - - if(_trace >= 1) - { - String objString = "object \"" + _communicator.identityToString(ident) + "\""; - if(!facet.equals("")) - { - objString += " with facet \"" + facet + "\""; - } - - _communicator.getLogger().trace("Freeze.Evictor", "added " + objString + " to Db \"" + _filename + - "\""); - } - - Ice.ObjectPrx obj = _adapter.createProxy(ident); - if(facet.length() > 0) - { - obj = obj.ice_facet(facet); + facet = ""; } - return obj; + com.sleepycat.db.Transaction tx = beforeQuery(); + return new EvictorIteratorI(findStore(facet, false), tx, batchSize); } finally { @@ -587,1247 +265,170 @@ class EvictorI extends Ice.LocalObjectImpl implements Evictor, Runnable } } - /** - * @deprecated - **/ - public void - createObject(Ice.Identity ident, Ice.Object servant) - { - checkIdentity(ident); + abstract protected boolean hasAnotherFacet(Ice.Identity ident, String facet); - // - // Need to clone in case the given ident changes. - // - ident = (Ice.Identity) ident.clone(); - - _deactivateController.lock(); - - try - { - ObjectStore store = findStore(""); - assert store != null; - - for(;;) - { - // - // Create a new entry - // - - EvictorElement element = new EvictorElement(ident, store); - element.status = EvictorElement.dead; - element.rec = new ObjectRecord(); - element.rec.stats = new Statistics(); - - Object o = store.cache().putIfAbsent(ident, element); - - if(o != null) - { - element = (EvictorElement) o; - } - - synchronized(this) - { - if(element.stale) - { - // - // Try again - // - continue; - } - fixEvictPosition(element); - - synchronized(element) - { - switch(element.status) - { - case EvictorElement.clean: - { - element.status = EvictorElement.modified; - element.rec.servant = servant; - addToModifiedQueue(element); - break; - } - case EvictorElement.created: - case EvictorElement.modified: - { - element.rec.servant = servant; - break; - } - case EvictorElement.destroyed: - { - element.status = EvictorElement.modified; - element.rec.servant = servant; - - // - // No need to push it on the modified queue, as a destroyed object - // is either already on the queue or about to be saved. When saved, - // it becomes dead. - // - break; - } - case EvictorElement.dead: - { - element.status = EvictorElement.created; - ObjectRecord rec = element.rec; - - rec.servant = servant; - rec.stats.creationTime = System.currentTimeMillis(); - rec.stats.lastSaveTime = 0; - rec.stats.avgSaveTime = 0; - - addToModifiedQueue(element); - break; - } - default: - { - assert false; - break; - } - } - } - } - break; // for(;;) - } - - if(_trace >= 1) - { - String objString = "object \"" + _communicator.identityToString(ident) + "\""; - _communicator.getLogger().trace("Freeze.Evictor", "added or updated " + objString + " in the database"); - } - } - finally - { - _deactivateController.unlock(); - } - } + abstract protected Object createEvictorElement(Ice.Identity ident, ObjectRecord rec, ObjectStore store); + + abstract protected Ice.Object locateImpl(Ice.Current current, Ice.LocalObjectHolder cookie); + + abstract protected void evict(); - public Ice.Object - remove(Ice.Identity ident) - { - return removeFacet(ident, ""); - } - public Ice.Object - removeFacet(Ice.Identity ident, String facet) + synchronized protected void + finalize() { - checkIdentity(ident); - if(facet == null) - { - facet = ""; - } - - // - // Need to clone in case the given ident changes. - // - ident = (Ice.Identity) ident.clone(); - - _deactivateController.lock(); - - try - { - ObjectStore store = findStore(facet); - Ice.Object servant = null; - - if(store != null) - { - for(;;) - { - // - // Retrieve object - // - - EvictorElement element = (EvictorElement) store.cache().pin(ident); - if(element != null) - { - synchronized(this) - { - if(element.stale) - { - // - // Try again - // - continue; - } - - fixEvictPosition(element); - synchronized(element) - { - switch(element.status) - { - case EvictorElement.clean: - { - servant = element.rec.servant; - element.status = EvictorElement.destroyed; - element.rec.servant = null; - addToModifiedQueue(element); - break; - } - case EvictorElement.created: - { - servant = element.rec.servant; - element.status = EvictorElement.dead; - element.rec.servant = null; - break; - } - case EvictorElement.modified: - { - servant = element.rec.servant; - element.status = EvictorElement.destroyed; - element.rec.servant = null; - // - // Not necessary to push it on the modified queue, as a modified - // element is either on the queue already or about to be saved - // (at which point it becomes clean) - // - break; - } - case EvictorElement.destroyed: - case EvictorElement.dead: - { - break; - } - default: - { - assert false; - break; - } - } - } - - if(element. keepCount > 0) - { - assert servant != null; - - element.keepCount = 0; - // - // Add to front of evictor queue - // - // Note that save evicts dead objects - // - _evictorList.addFirst(element); - element.evictPosition = _evictorList.iterator(); - // - // Position the iterator "on" the element. - // - element.evictPosition.next(); - _currentEvictorSize++; - } - } - } - break; // for(;;) - } - } - - if(servant == null) - { - Ice.NotRegisteredException ex = new Ice.NotRegisteredException(); - ex.kindOfObject = "servant"; - ex.id = _communicator.identityToString(ident); - if(facet.length() > 0) - { - ex.id += " -f " + IceUtil.StringUtil.escapeString(facet, ""); - } - throw ex; - } - - if(_trace >= 1) - { - String objString = "object \"" + _communicator.identityToString(ident) + "\""; - if(!facet.equals("")) - { - objString += " with facet \"" + facet + "\""; - } - - _communicator.getLogger().trace("Freeze.Evictor", "removed " + objString + " from Db \"" + _filename + - "\""); - } - return servant; - } - finally + if(!_deactivateController.deactivated()) { - _deactivateController.unlock(); + _communicator.getLogger().warning("Freeze evictor " + toString() + " has not been deactivated"); + deactivate(""); } } - public void - keep(Ice.Identity ident) - { - keepFacet(ident, ""); - } - - public void - keepFacet(Ice.Identity ident, String facet) + protected void + closeDbEnv() { - checkIdentity(ident); - if(facet == null) - { - facet = ""; - } - - _deactivateController.lock(); - - try - { - boolean notThere = false; - - ObjectStore store = findStore(facet); - if(store == null) - { - notThere = true; - } - else - { - for(;;) - { - EvictorElement element = (EvictorElement) store.cache().pin(ident); - if(element == null) - { - notThere = true; - break; - } - - synchronized(this) - { - if(element.stale) - { - // - // try again - // - continue; - } - - synchronized(element) - { - if(element.status == EvictorElement.destroyed || element.status == EvictorElement.dead) - { - notThere = true; - break; - } - } - - // - // Found! - // - - if(element.keepCount == 0) - { - if(element.usageCount < 0) - { - // - // New object - // - element.usageCount = 0; - } - else - { - assert element.evictPosition != null; - element.evictPosition.remove(); - element.evictPosition = null; - _currentEvictorSize--; - } - element.keepCount = 1; - } - else - { - element.keepCount++; - } - break; - } - } - } - - if(notThere) - { - Ice.NotRegisteredException ex = new Ice.NotRegisteredException(); - ex.kindOfObject = "servant"; - ex.id = _communicator.identityToString(ident); - if(facet.length() > 0) - { - ex.id += " -f " + IceUtil.StringUtil.escapeString(facet, ""); - } - throw ex; - } - } - finally + assert _dbEnv != null; + java.util.Iterator p = _storeMap.values().iterator(); + while(p.hasNext()) { - _deactivateController.unlock(); + ObjectStore store = (ObjectStore)p.next(); + store.close(); } + _dbEnv.close(); + _dbEnv = null; } - public void - release(Ice.Identity ident) - { - releaseFacet(ident, ""); - } - public void - releaseFacet(Ice.Identity ident, String facet) + protected synchronized ObjectStore + findStore(String facet, boolean createIt) { - checkIdentity(ident); - if(facet == null) - { - facet = ""; - } - - _deactivateController.lock(); + ObjectStore os = (ObjectStore)_storeMap.get(facet); - try + if(os == null && createIt) { - synchronized(this) - { - ObjectStore store = (ObjectStore) _storeMap.get(facet); - - if(store != null) - { - EvictorElement element = (EvictorElement) store.cache().getIfPinned(ident); - if(element != null) - { - assert !element.stale; - if(element.keepCount > 0) - { - if(--element.keepCount == 0) - { - // - // Add to front of evictor queue - // - // Note that the element cannot be destroyed or dead since - // its keepCount was > 0. - // - assert element.evictPosition == null; - _evictorList.addFirst(element); - element.evictPosition = _evictorList.iterator(); - // - // Position the iterator "on" the element. - // - element.evictPosition.next(); - _currentEvictorSize++; - } - // - // Success - // - return; - } - } - } - } + String facetType = (String)_facetTypes.get(facet); + os = new ObjectStore(facet, facetType, true, this, new java.util.LinkedList(), false); - Ice.NotRegisteredException ex = new Ice.NotRegisteredException(); - ex.kindOfObject = "servant"; - ex.id = _communicator.identityToString(ident); - if(facet.length() > 0) - { - ex.id += " -f " + IceUtil.StringUtil.escapeString(facet, ""); - } - - throw ex; - } - finally - { - _deactivateController.unlock(); + _storeMap.put(facet, os); } + return os; } - public EvictorIterator - getIterator(String facet, int batchSize) + protected void + initialize(Ice.Identity ident, String facet, Ice.Object servant) { - if(facet == null) - { - facet = ""; - } - - _deactivateController.lock(); - - try - { - ObjectStore store = null; - synchronized(this) - { - store = (ObjectStore) _storeMap.get(facet); - if(store != null) - { - saveNowNoSync(); - } - } - return new EvictorIteratorI(store, batchSize); - } - finally + if(_initializer != null) { - _deactivateController.unlock(); + _initializer.initialize(_adapter, ident, facet, servant); } } - public boolean - hasObject(Ice.Identity ident) + protected EvictorI(Ice.ObjectAdapter adapter, String envName, com.sleepycat.db.Environment dbEnv, String filename, + java.util.Map facetTypes, ServantInitializer initializer, Index[] indices, boolean createDb) { - return hasFacet(ident, ""); - } + _adapter = adapter; + _communicator = adapter.getCommunicator(); + _initializer = initializer; + _filename = filename; + _createDb = createDb; + _facetTypes = facetTypes == null ? new java.util.HashMap() : new java.util.HashMap(facetTypes); + + _dbEnv = SharedDbEnv.get(_communicator, envName, dbEnv); - public boolean - hasFacet(Ice.Identity ident, String facet) - { - _deactivateController.lock(); - try - { - return hasFacetImpl(ident, facet); - } - finally - { - _deactivateController.unlock(); - } - } - - private boolean - hasFacetImpl(Ice.Identity ident, String facet) - { + _trace = _communicator.getProperties().getPropertyAsInt("Freeze.Trace.Evictor"); + _txTrace = _communicator.getProperties().getPropertyAsInt("Freeze.Trace.Transaction"); + _deadlockWarning = _communicator.getProperties().getPropertyAsInt("Freeze.Warn.Deadlocks") != 0; + _useNonmutating = _communicator.getProperties().getPropertyAsInt("Freeze.Evictor.UseNonmutating") != 0; + + _errorPrefix = "Freeze Evictor DbEnv(\"" + envName + "\") Db(\"" + _filename + "\"): "; + + String propertyPrefix = "Freeze.Evictor." + envName + '.' + _filename; + + boolean populateEmptyIndices = + _communicator.getProperties().getPropertyAsIntWithDefault(propertyPrefix + + ".PopulateEmptyIndices", 0) != 0; + // - // Must be called with _deactivateController locked. + // Instantiate all Dbs in 2 steps: + // (1) iterate over the indices and create ObjectStore with indices + // (2) open ObjectStores without indices // - checkIdentity(ident); - if(facet == null) - { - facet = ""; - } - - ObjectStore store = null; - - synchronized(this) - { - store = (ObjectStore) _storeMap.get(facet); - if(store == null) - { - return false; - } - - EvictorElement element = (EvictorElement) store.cache().getIfPinned(ident); - if(element != null) - { - assert !element.stale; - - synchronized(element) - { - return element.status != EvictorElement.dead && element.status != EvictorElement.destroyed; - } - } - } - return store.dbHasObject(ident); - } - - private boolean - hasAnotherFacet(Ice.Identity ident, String facet) - { + java.util.List dbs = allDbs(); // - // Must be called with _deactivateController locked. + // Add default db in case it's not there // + dbs.add(defaultDb); - // - // If the object exists in another store, throw FacetNotExistException - // instead of returning null (== ObjectNotExistException) - // - java.util.Map storeMapCopy; - synchronized(this) - { - storeMapCopy = new java.util.HashMap(_storeMap); - } - - java.util.Iterator p = storeMapCopy.entrySet().iterator(); - while(p.hasNext()) + if(indices != null) { - java.util.Map.Entry entry = (java.util.Map.Entry) p.next(); - - // - // Do not check facet - // - if(!facet.equals(entry.getKey())) + for(int i = 0; i < indices.length; ++i) { - ObjectStore store = (ObjectStore) entry.getValue(); - boolean inCache = false; + String facet = indices[i].facet(); - synchronized(this) + if(_storeMap.get(facet) == null) { - EvictorElement element = (EvictorElement) store.cache().getIfPinned(ident); - if(element != null) + java.util.List storeIndices = new java.util.LinkedList(); + for(int j = i; j < indices.length; ++j) { - inCache = true; - assert !element.stale; - - synchronized(element) + if(indices[j].facet().equals(facet)) { - if(element.status != EvictorElement.dead && element.status != EvictorElement.destroyed) - { - return true; - } + storeIndices.add(indices[j]); } } - } - if(!inCache) - { - if(store.dbHasObject(ident)) - { - return true; - } - } - } - } - return false; - } - - public Ice.Object - locate(Ice.Current current, Ice.LocalObjectHolder cookie) - { - _deactivateController.lock(); - try - { - // - // Special ice_ping() handling - // - if(current.operation != null && current.operation.equals("ice_ping")) - { - assert current.mode == Ice.OperationMode.Nonmutating; - - if(hasFacetImpl(current.id, current.facet)) - { - if(_trace >= 3) - { - _communicator.getLogger().trace( - "Freeze.Evictor", "ice_ping found \"" + _communicator.identityToString(current.id) + - "\" with facet \"" + current.facet + "\""); - } - cookie.value = null; - return _pingObject; - } - else if(hasAnotherFacet(current.id, current.facet)) - { - if(_trace >= 3) - { - _communicator.getLogger().trace( - "Freeze.Evictor", "ice_ping raises FacetNotExistException for \"" + - _communicator.identityToString(current.id) + "\" with facet \"" + current.facet + "\""); - } - - throw new Ice.FacetNotExistException(); - } - else - { - if(_trace >= 3) - { - _communicator.getLogger().trace( - "Freeze.Evictor", "ice_ping will raise ObjectNotExistException for \"" + - _communicator.identityToString(current.id) + "\" with facet \"" + current.facet + "\""); - } - return null; - } - } - - Ice.Object result = locateImpl(current, cookie); - - if(result == null) - { - if(hasAnotherFacet(current.id, current.facet)) - { - throw new Ice.FacetNotExistException(); + String facetType = (String)_facetTypes.get(facet); + ObjectStore store = new ObjectStore(facet, facetType,_createDb, this, storeIndices, + populateEmptyIndices); + _storeMap.put(facet, store); } } - - return result; } - finally - { - _deactivateController.unlock(); - } - } - Ice.Object - locateImpl(Ice.Current current, Ice.LocalObjectHolder cookie) - { - cookie.value = null; - - // - // Need to clone as current.id gets reused - // - Ice.Identity ident = null; - ident = (Ice.Identity) current.id.clone(); - - ObjectStore store = findStore(current.facet); - if(store == null) + java.util.Iterator p = dbs.iterator(); + while(p.hasNext()) { - if(_trace >= 2) + String facet = (String) p.next(); + if(facet.equals(defaultDb)) { - _communicator.getLogger().trace("Freeze.Evictor", "locate could not find a database for facet \"" + - current.facet + "\""); + facet = ""; } - return null; - } - - for(;;) - { - EvictorElement element = (EvictorElement) store.cache().pin(ident); - if(element == null) + + if(_storeMap.get(facet) == null) { - if(_trace >= 2) - { - _communicator.getLogger().trace("Freeze.Evictor", "locate could not find \"" + - _communicator.identityToString(ident) + "\" in Db \"" + - _filename + "\""); - } - return null; - } - - synchronized(this) - { - if(element.stale) - { - // - // try again - // - continue; - } - - synchronized(element) - { - if(element.status == EvictorElement.destroyed || - element.status == EvictorElement.dead) - { - if(_trace >= 2) - { - _communicator.getLogger().trace("Freeze.Evictor", "locate found \"" + - _communicator.identityToString(ident) + - "\" in the cache for Db \"" + _filename + - "\" but it was dead or destroyed"); - } - return null; - } - - // - // It's a good one! - // - - if(_trace >= 2) - { - _communicator.getLogger().trace("Freeze.Evictor", "locate found \"" + - _communicator.identityToString(ident) + "\" in Db \"" + - _filename + "\""); - } + String facetType = (String)_facetTypes.get(facet); - fixEvictPosition(element); - element.usageCount++; - cookie.value = element; - assert element.rec.servant != null; - return element.rec.servant; - } - } - } - } - - public void - finished(Ice.Current current, Ice.Object servant, Ice.LocalObject cookie) - { - assert servant != null; - - _deactivateController.lock(); - - try - { - if(cookie != null) - { - EvictorElement element = (EvictorElement) cookie; - - boolean enqueue = false; - - if((_useNonmutating && current.mode != Ice.OperationMode.Nonmutating) || - (servant.ice_operationAttributes(current.operation) & 0x1) != 0) - { - synchronized(element) - { - if(element.status == EvictorElement.clean) - { - // - // Assume this operation updated the object - // - element.status = EvictorElement.modified; - enqueue = true; - } - } - } + ObjectStore store = new ObjectStore(facet, facetType, _createDb, this, new java.util.LinkedList(), + populateEmptyIndices); - synchronized(this) - { - // - // Only elements with a usageCount == 0 can become stale and we own - // one count! - // - assert !element.stale; - assert element.usageCount >= 1; - - // - // Decrease the usage count of the evictor queue element. - // - element.usageCount--; - - if(enqueue) - { - addToModifiedQueue(element); - } - else if(element.usageCount == 0 && element.keepCount == 0) - { - // - // Evict as many elements as necessary. - // - evict(); - } - } + _storeMap.put(facet, store); } } - finally - { - _deactivateController.unlock(); - } + _deactivateController.activate(); } - public void - deactivate(String category) - { - if(_deactivateController.deactivate()) - { - try - { - synchronized(this) - { - saveNowNoSync(); - - // - // Set the evictor size to zero, meaning that we will evict - // everything possible. - // - _evictorSize = 0; - evict(); - - _savingThreadDone = true; - notifyAll(); - } - - for(;;) - { - try - { - _thread.join(); - break; - } - catch(InterruptedException ex) - { - } - } - if(_watchDogThread != null) - { - _watchDogThread.terminate(); - - for(;;) - { - try - { - _watchDogThread.join(); - break; - } - catch(InterruptedException ex) - { - } - } - } - - java.util.Iterator p = _storeMap.values().iterator(); - while(p.hasNext()) - { - ObjectStore store = (ObjectStore) p.next(); - store.close(); - } - - if(_dbEnv != null) - { - try - { - _dbEnv.close(); - } - finally - { - _dbEnv = null; - } - } - } - finally - { - _deactivateController.deactivationComplete(); - } - } - } - - void - initialize(Ice.Identity ident, String facet, Ice.Object servant) + protected EvictorI(Ice.ObjectAdapter adapter, String envName, String filename, java.util.Map facetTypes, + ServantInitializer initializer, Index[] indices, boolean createDb) { - if(_initializer != null) - { - _initializer.initialize(_adapter, ident, facet, servant); - } + this(adapter, envName, null, filename, facetTypes, initializer, indices, createDb); } - public void - run() + abstract com.sleepycat.db.Transaction beforeQuery(); + + static void + updateStats(Statistics stats, long time) { - try + long diff = time - (stats.creationTime + stats.lastSaveTime); + if(stats.lastSaveTime == 0) { - for(;;) - { - java.util.List allObjects; - java.util.List deadObjects = new java.util.LinkedList(); - - int saveNowThreadsSize = 0; - - synchronized(this) - { - while(!_savingThreadDone && - (_saveNowThreads.size() == 0) && - (_saveSizeTrigger < 0 || _modifiedQueue.size() < _saveSizeTrigger)) - { - try - { - if(_savePeriod == 0) - { - wait(); - } - else - { - long preSave = System.currentTimeMillis(); - wait(_savePeriod); - if(System.currentTimeMillis() > preSave + _savePeriod) - { - break; - } - } - } - catch(InterruptedException ex) - { - } - } - - saveNowThreadsSize = _saveNowThreads.size(); - - if(_savingThreadDone) - { - assert(_modifiedQueue.size() == 0); - assert(saveNowThreadsSize == 0); - break; // for(;;) - } - - // - // Check first if there is something to do! - // - if(_modifiedQueue.size() == 0) - { - if(saveNowThreadsSize > 0) - { - _saveNowThreads.clear(); - notifyAll(); - } - continue; // for(;;) - } - - allObjects = _modifiedQueue; - _modifiedQueue = new java.util.ArrayList(); - } - - int size = allObjects.size(); - - java.util.List streamedObjectQueue = new java.util.ArrayList(); - - long streamStart = System.currentTimeMillis(); - - // - // Stream each element - // - for(int i = 0; i < size; i++) - { - EvictorElement element = (EvictorElement) allObjects.get(i); - - boolean tryAgain; - - do - { - tryAgain = false; - Ice.Object servant = null; - - synchronized(element) - { - byte status = element.status; - - switch(status) - { - case EvictorElement.created: - case EvictorElement.modified: - { - servant = element.rec.servant; - break; - } - case EvictorElement.destroyed: - { - streamedObjectQueue.add(stream(element, streamStart)); - - element.status = EvictorElement.dead; - deadObjects.add(element); - break; - } - case EvictorElement.dead: - { - deadObjects.add(element); - break; - } - default: - { - // - // Nothing to do (could be a duplicate) - // - break; - } - } - } - - if(servant != null) - { - // - // Lock servant and then facet so that user can safely lock - // servant and call various Evictor operations - // - if(_watchDogThread != null) - { - _watchDogThread.activate(); - } - synchronized(servant) - { - if(_watchDogThread != null) - { - _watchDogThread.deactivate(); - } - - synchronized(element) - { - byte status = element.status; - - switch(status) - { - case EvictorElement.created: - case EvictorElement.modified: - { - if(servant == element.rec.servant) - { - streamedObjectQueue.add(stream(element, streamStart)); - - element.status = EvictorElement.clean; - } - else - { - tryAgain = true; - } - break; - } - case EvictorElement.destroyed: - { - streamedObjectQueue.add(stream(element, streamStart)); - - element.status = EvictorElement.dead; - deadObjects.add(element); - break; - } - case EvictorElement.dead: - { - deadObjects.add(element); - break; - } - default: - { - // - // Nothing to do (could be a duplicate) - // - break; - } - } - } - } - } - } while(tryAgain); - } - - if(_trace >= 1) - { - long now = System.currentTimeMillis(); - _communicator.getLogger().trace("Freeze.Evictor", "streamed " + streamedObjectQueue.size() + - " objects in " + (now - streamStart) + " ms"); - } - - // - // Now let's save all these streamed objects to disk using a transaction - // - - // - // Each time we get a deadlock, we reduce the number of objects to save - // per transaction - // - int txSize = streamedObjectQueue.size(); - if(txSize > _maxTxSize) - { - txSize = _maxTxSize; - } - - boolean tryAgain; - - do - { - tryAgain = false; - - while(streamedObjectQueue.size() > 0) - { - if(txSize > streamedObjectQueue.size()) - { - txSize = streamedObjectQueue.size(); - } - - long saveStart = System.currentTimeMillis(); - String txnId = null; - - try - { - com.sleepycat.db.Transaction tx = _dbEnv.getEnv().beginTransaction(null, null); - - if(_txTrace >= 1) - { - txnId = Long.toHexString((tx.getId() & 0x7FFFFFFF) + 0x80000000L); - - _communicator.getLogger().trace("Freeze.Evictor", _errorPrefix + - "started transaction " + txnId + " in saving thread"); - } - - try - { - for(int i = 0; i < txSize; i++) - { - StreamedObject obj = (StreamedObject) streamedObjectQueue.get(i); - obj.store.save(obj.key, obj.value, obj.status, tx); - } - - com.sleepycat.db.Transaction toCommit = tx; - tx = null; - toCommit.commit(); - - if(_txTrace >= 1) - { - _communicator.getLogger().trace("Freeze.Evictor", _errorPrefix + - "committed transaction " + txnId); - } - } - finally - { - if(tx != null) - { - tx.abort(); - if(_txTrace >= 1) - { - _communicator.getLogger().trace("Freeze.Evictor", _errorPrefix + - "rolled back transaction " + txnId); - } - } - } - - for(int i = 0; i < txSize; i++) - { - streamedObjectQueue.remove(0); - } - - if(_trace >= 1) - { - long now = System.currentTimeMillis(); - _communicator.getLogger().trace("Freeze.Evictor", "saved " + txSize + " objects in " + - (now - saveStart) + " ms"); - } - } - catch(com.sleepycat.db.DeadlockException dx) - { - if(_deadlockWarning) - { - _communicator.getLogger().warning("Deadlock in Freeze.EvictorI.run while writing " + - "into Db \"" + _filename + "\"; retrying..."); - } - - tryAgain = true; - txSize = (txSize + 1)/2; - } - catch(com.sleepycat.db.DatabaseException dx) - { - DatabaseException ex = new DatabaseException(); - ex.initCause(dx); - ex.message = _errorPrefix + "saving: " + dx.getMessage(); - throw ex; - } - } - } while(tryAgain); - - synchronized(this) - { - // - // Release usage count - // - for(int i = 0; i < allObjects.size(); i++) - { - EvictorElement element = (EvictorElement) allObjects.get(i); - assert element.usageCount > 0; - element.usageCount--; - } - allObjects.clear(); - - java.util.Iterator p = deadObjects.iterator(); - while(p.hasNext()) - { - EvictorElement element = (EvictorElement) p.next(); - - // - // Can be stale when there are duplicates on the deadObjects list - // - if(!element.stale && element.usageCount == 0 && element.keepCount == 0) - { - // - // Get rid of unused dead elements - // - synchronized(element) - { - if(element.status == EvictorElement.dead) - { - evict(element); - } - } - } - } - - deadObjects.clear(); - evict(); - - if(saveNowThreadsSize > 0) - { - for(int i = 0; i < saveNowThreadsSize; i++) - { - _saveNowThreads.remove(0); - } - notifyAll(); - } - } - } + stats.lastSaveTime = diff; + stats.avgSaveTime = diff; } - catch(RuntimeException ex) + else { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - _communicator.getLogger().error(_errorPrefix + "Fatal error in saving thread:\n" + sw.toString()); - - Util.handleFatalError(this, _communicator, ex); + stats.lastSaveTime = time - stats.creationTime; + stats.avgSaveTime = (long)(stats.avgSaveTime * 0.95 + diff * 0.05); } } @@ -1867,179 +468,11 @@ class EvictorI extends Ice.LocalObjectImpl implements Evictor, Runnable return _deadlockWarning; } - synchronized void - saveNow() - { - saveNowNoSync(); - } private void - saveNowNoSync() - { - Thread myself = Thread.currentThread(); - - _saveNowThreads.add(myself); - notifyAll(); - do - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - while(_saveNowThreads.contains(myself)); - } - - private void - evict() - { - assert Thread.holdsLock(this); - - java.util.Iterator p = _evictorList.riterator(); - while(p.hasNext() && _currentEvictorSize > _evictorSize) - { - // - // Get the last unused element from the evictor queue. - // - EvictorElement element = (EvictorElement)p.next(); - if(element.usageCount == 0) - { - // - // Fine, servant is not in use (and not in the modifiedQueue) - // - - assert !element.stale; - assert element.keepCount == 0; - assert element.evictPosition != null; - - if(_trace >= 2 || (_trace >= 1 && _evictorList.size() % 50 == 0)) - { - String objString = "object \"" + _communicator.identityToString(element.identity) + "\""; - String facet = element.store.facet(); - if(facet.length() > 0) - { - objString += " with facet \"" + facet + "\""; - } - - _communicator.getLogger().trace("Freeze.Evictor", "evicting " + objString + " from the queue; " + - "number of elements in the queue: " + _currentEvictorSize); - } - - // - // Remove last unused element from the evictor queue. - // - element.stale = true; - element.store.cache().unpin(element.identity); - p.remove(); - element.evictPosition = null; - _currentEvictorSize--; - } - } - } - - private void - fixEvictPosition(EvictorElement element) + init(String envName, com.sleepycat.db.Environment dbEnv, java.util.Map facetTypes, Index[] indices) { - assert Thread.holdsLock(this); - - assert !element.stale; - - if(element.keepCount == 0) - { - if(element.usageCount < 0) - { - assert element.evictPosition == null; - - // - // New object - // - element.usageCount = 0; - _currentEvictorSize++; - } - else - { - assert element.evictPosition != null; - element.evictPosition.remove(); - } - _evictorList.addFirst(element); - element.evictPosition = _evictorList.iterator(); - // - // Position the iterator "on" the element. - // - element.evictPosition.next(); - } - } - - private void - evict(EvictorElement element) - { - assert Thread.holdsLock(this); - - assert !element.stale; - assert element.keepCount == 0; - element.evictPosition.remove(); - _currentEvictorSize--; - element.stale = true; - element.store.cache().unpin(element.identity); - } - - private void - addToModifiedQueue(EvictorElement element) - { - assert Thread.holdsLock(this); - - element.usageCount++; - _modifiedQueue.add(element); - - if(_saveSizeTrigger >= 0 && _modifiedQueue.size() >= _saveSizeTrigger) - { - notifyAll(); - } - } - - private StreamedObject - stream(EvictorElement element, long streamStart) - { - assert Thread.holdsLock(element); - - assert element.status != EvictorElement.dead; - - StreamedObject obj = new StreamedObject(); - - obj.status = element.status; - obj.store = element.store; - obj.key = ObjectStore.marshalKey(element.identity, _communicator); - - if(element.status != EvictorElement.destroyed) - { - // - // Update stats first - // - Statistics stats = element.rec.stats; - long diff = streamStart - (stats.creationTime + stats.lastSaveTime); - if(stats.lastSaveTime == 0) - { - stats.lastSaveTime = diff; - stats.avgSaveTime = diff; - } - else - { - stats.lastSaveTime = streamStart - stats.creationTime; - stats.avgSaveTime = (long)(stats.avgSaveTime * 0.95 + diff * 0.05); - } - obj.value = ObjectStore.marshalValue(element.rec, _communicator); - } - return obj; - } - - private synchronized ObjectStore - findStore(String facet) - { - return (ObjectStore) _storeMap.get(facet); } private java.util.List @@ -2135,79 +568,48 @@ class EvictorI extends Ice.LocalObjectImpl implements Evictor, Runnable return result; } - private static void + static void checkIdentity(Ice.Identity ident) { if(ident.name == null || ident.name.length() == 0) { Ice.IllegalIdentityException e = new Ice.IllegalIdentityException(); - e.id = (Ice.Identity)ident.clone(); + e.id = ident; throw e; } } + - static class StreamedObject - { - byte[] key = null; - byte[] value = null; - byte status = EvictorElement.dead; - ObjectStore store = null; - } - - // - // Map of string (facet) to ObjectStore - // - private final java.util.Map _storeMap = new java.util.HashMap(); - // - // List of EvictorElement with stable iterators - // - private final Freeze.LinkedList _evictorList = new Freeze.LinkedList(); - private int _evictorSize = 10; - private int _currentEvictorSize = 0; + protected int _evictorSize = 10; // - // The _modifiedQueue contains a queue of all modified facets - // Each element in the queue "owns" a usage count, to ensure the - // elements containing them remain in the map. + // Map of string (facet) to ObjectStore // - private java.util.List _modifiedQueue = new java.util.ArrayList(); - - private boolean _savingThreadDone = false; - private WatchDogThread _watchDogThread = null; - - private DeactivateController _deactivateController = new DeactivateController(); + protected final java.util.Map _storeMap = new java.util.HashMap(); + private final java.util.Map _facetTypes; - private final Ice.ObjectAdapter _adapter; - private final Ice.Communicator _communicator; - - private final ServantInitializer _initializer; + protected final Ice.ObjectAdapter _adapter; + protected final Ice.Communicator _communicator; - private SharedDbEnv _dbEnv; + protected final ServantInitializer _initializer; - private final String _filename; - private final boolean _createDb; + protected SharedDbEnv _dbEnv; - private int _trace = 0; - private int _txTrace = 0; + protected final String _filename; + protected final boolean _createDb; - // - // Threads that have requested a "saveNow" and are waiting for - // its completion - // - private final java.util.List _saveNowThreads = new java.util.ArrayList(); - - private int _saveSizeTrigger; - private int _maxTxSize; - private long _savePeriod; + protected int _trace = 0; + protected int _txTrace = 0; - private Thread _thread; - - private String _errorPrefix; + protected String _errorPrefix; + + protected boolean _deadlockWarning; - private boolean _deadlockWarning; + protected boolean _useNonmutating; - private boolean _useNonmutating; + protected DeactivateController _deactivateController = new DeactivateController(); private Ice.Object _pingObject = new PingObject(); + } diff --git a/java/src/Freeze/EvictorIteratorI.java b/java/src/Freeze/EvictorIteratorI.java index 9e88aa72a36..437f527f0f7 100644 --- a/java/src/Freeze/EvictorIteratorI.java +++ b/java/src/Freeze/EvictorIteratorI.java @@ -30,7 +30,7 @@ class EvictorIteratorI extends Ice.LocalObjectImpl implements EvictorIterator { if(hasNext()) { - return (Ice.Identity) _batchIterator.next(); + return (Ice.Identity)_batchIterator.next(); } else { @@ -38,11 +38,12 @@ class EvictorIteratorI extends Ice.LocalObjectImpl implements EvictorIterator } } - EvictorIteratorI(ObjectStore store, int batchSize) + EvictorIteratorI(ObjectStore store, com.sleepycat.db.Transaction tx, int batchSize) { _store = store; _more = (store != null); _batchSize = batchSize; + _tx = tx; assert batchSize > 0; @@ -59,87 +60,94 @@ class EvictorIteratorI extends Ice.LocalObjectImpl implements EvictorIterator { EvictorI.DeactivateController deactivateController = _store.evictor().deactivateController(); deactivateController.lock(); - try { + if(!_more) { return null; } - + java.util.List evictorElements = null; - + Ice.Communicator communicator = _store.communicator(); - + byte[] firstKey = null; if(_key.getSize() > 0) { firstKey = new byte[_key.getSize()]; System.arraycopy(_key.getData(), 0, firstKey, 0, firstKey.length); } - - try + + for(;;) { - for(;;) + com.sleepycat.db.Cursor dbc = null; + + _batch = new java.util.ArrayList(); + + try { - com.sleepycat.db.Cursor dbc = null; - - _batch = new java.util.ArrayList(); - - try + // + // Move to the first record + // + boolean range = false; + if(firstKey != null) { // - // Move to the first record - // - boolean range = false; - if(firstKey != null) + // _key represents the next element not yet returned + // if it has been deleted, we want the one after + // + range = true; + } + + dbc = _store.db().openCursor(_tx, null); + + boolean done = false; + do + { + com.sleepycat.db.OperationStatus status; + if(range) { - // - // _key represents the next element not yet returned - // if it has been deleted, we want the one after - // - range = true; + status = dbc.getSearchKeyRange(_key, _value, null); } - - dbc = _store.db().openCursor(null, null); - - boolean done = false; - do + else + { + status = dbc.getNext(_key, _value, null); + } + _more = (status == com.sleepycat.db.OperationStatus.SUCCESS); + + if(_more) { - com.sleepycat.db.OperationStatus status; - if(range) + range = false; + + if(_batch.size() < _batchSize) { - status = dbc.getSearchKeyRange(_key, _value, null); + Ice.Identity ident = ObjectStore.unmarshalKey(_key.getData(), communicator); + _batch.add(ident); } else { - status = dbc.getNext(_key, _value, null); - } - _more = (status == com.sleepycat.db.OperationStatus.SUCCESS); - - if(_more) - { - range = false; - - if(_batch.size() < _batchSize) - { - Ice.Identity ident = ObjectStore.unmarshalKey(_key.getData(), communicator); - _batch.add(ident); - } - else - { - // - // Keep the last element in _key - // - done = true; - } + // + // Keep the last element in _key + // + done = true; } } - while(!done && _more); - - break; // for (;;) } - catch(com.sleepycat.db.DeadlockException dx) + while(!done && _more); + + break; // for (;;) + } + catch(com.sleepycat.db.DeadlockException dx) + { + if(_store.evictor().deadlockWarning()) + { + communicator.getLogger().warning("Deadlock in Freeze.EvictorIteratorI.load while " + + "iterating over Db \"" + _store.evictor().filename() + "/" + _store.dbName() + + "\""); + } + + if(_tx == null) { if(firstKey != null) { @@ -151,44 +159,54 @@ class EvictorIteratorI extends Ice.LocalObjectImpl implements EvictorIterator { _key.setSize(0); } - - if(_store.evictor().deadlockWarning()) - { - communicator.getLogger().warning("Deadlock in Freeze.EvictorIteratorI.load while " + - "iterating over Db \"" + _store.evictor().filename() + "/" + _store.dbName() + - "\"; retrying..."); - } - + // // Retry // } - finally + else { - if(dbc != null) + DeadlockException ex = new DeadlockException(); + ex.initCause(dx); + ex.message = _store.evictor().errorPrefix() + "Db.cursor: " + dx.getMessage(); + throw ex; + } + } + catch(com.sleepycat.db.DatabaseException dx) + { + DatabaseException ex = new DatabaseException(); + ex.initCause(dx); + ex.message = _store.evictor().errorPrefix() + "Db.cursor: " + dx.getMessage(); + throw ex; + } + finally + { + if(dbc != null) + { + try { - try - { - dbc.close(); - } - catch(com.sleepycat.db.DeadlockException dx) + dbc.close(); + } + catch(com.sleepycat.db.DeadlockException dx) + { + if(_tx != null) { - // - // Ignored - // + DeadlockException ex = new DeadlockException(); + ex.initCause(dx); + ex.message = _store.evictor().errorPrefix() + "Db.cursor: " + dx.getMessage(); + throw ex; } } + catch(com.sleepycat.db.DatabaseException dx) + { + // + // Ignored + // + } } } } - catch(com.sleepycat.db.DatabaseException dx) - { - DatabaseException ex = new DatabaseException(); - ex.initCause(dx); - ex.message = _store.evictor().errorPrefix() + "Db.cursor: " + dx.getMessage(); - throw ex; - } - + if(_batch.size() == 0) { return null; @@ -205,6 +223,7 @@ class EvictorIteratorI extends Ice.LocalObjectImpl implements EvictorIterator } private final ObjectStore _store; + private final com.sleepycat.db.Transaction _tx; private final int _batchSize; private java.util.Iterator _batchIterator; diff --git a/java/src/Freeze/Index.java b/java/src/Freeze/Index.java index 026cc841742..42e72107baf 100644 --- a/java/src/Freeze/Index.java +++ b/java/src/Freeze/Index.java @@ -67,17 +67,16 @@ public abstract class Index implements com.sleepycat.db.SecondaryKeyCreator { EvictorI.DeactivateController deactivateController = _store.evictor().deactivateController(); deactivateController.lock(); - try { com.sleepycat.db.DatabaseEntry key = new com.sleepycat.db.DatabaseEntry(k); - + // When we have a custom-comparison function, Berkeley DB returns // the key on-disk (when it finds one). We disable this behavior: // (ref Oracle SR 5925672.992) // key.setPartial(true); - + com.sleepycat.db.DatabaseEntry pkey = new com.sleepycat.db.DatabaseEntry(); com.sleepycat.db.DatabaseEntry value = new com.sleepycat.db.DatabaseEntry(); // @@ -86,95 +85,111 @@ public abstract class Index implements com.sleepycat.db.SecondaryKeyCreator value.setPartial(true); Ice.Communicator communicator = _store.communicator(); - _store.evictor().saveNow(); - + + com.sleepycat.db.Transaction tx = _store.evictor().beforeQuery(); + java.util.List identities; - - try + + for(;;) { - for(;;) + com.sleepycat.db.SecondaryCursor dbc = null; + identities = new java.util.ArrayList(); + + try { - com.sleepycat.db.SecondaryCursor dbc = null; - identities = new java.util.ArrayList(); - - try + // + // Move to the first record + // + dbc = _db.openSecondaryCursor(tx, null); + boolean first = true; + + boolean found; + + do { - // - // Move to the first record - // - dbc = _db.openSecondaryCursor(null, null); - boolean first = true; - - boolean found; - - do + com.sleepycat.db.OperationStatus status; + if(first) { - com.sleepycat.db.OperationStatus status; - if(first) - { - status = dbc.getSearchKey(key, pkey, value, null); - } - else - { - status = dbc.getNextDup(key, pkey, value, null); - } - - found = status == com.sleepycat.db.OperationStatus.SUCCESS; - - if(found) - { - Ice.Identity ident = ObjectStore.unmarshalKey(pkey.getData(), communicator); - identities.add(ident); - first = false; - } + status = dbc.getSearchKey(key, pkey, value, null); } - while((firstN <= 0 || identities.size() < firstN) && found); - - break; // for(;;) - } - catch(com.sleepycat.db.DeadlockException dx) - { - if(_store.evictor().deadlockWarning()) + else { - communicator.getLogger().warning("Deadlock in Freeze.Index.untypedFindFirst while " + - "iterating over Db \"" + _store.evictor().filename() + - "/" + _dbName + "\"; retrying..."); + status = dbc.getNextDup(key, pkey, value, null); } - - // - // Retry - // + + found = status == com.sleepycat.db.OperationStatus.SUCCESS; + + if(found) + { + Ice.Identity ident = ObjectStore.unmarshalKey(pkey.getData(), communicator); + identities.add(ident); + first = false; + } + } + while((firstN <= 0 || identities.size() < firstN) && found); + + break; // for(;;) + } + catch(com.sleepycat.db.DeadlockException dx) + { + if(_store.evictor().deadlockWarning()) + { + communicator.getLogger().warning("Deadlock in Freeze.Index.untypedFindFirst while " + + "iterating over Db \"" + _store.evictor().filename() + + "/" + _dbName + "\""); } - finally + + if(tx != null) { - if(dbc != null) + DeadlockException ex = new DeadlockException(); + ex.initCause(dx); + ex.message = _store.evictor().errorPrefix() + "Db.cursor: " + dx.getMessage(); + throw ex; + } + + // + // Otherwise retry + // + } + catch(com.sleepycat.db.DatabaseException dx) + { + DatabaseException ex = new DatabaseException(); + ex.initCause(dx); + ex.message = _store.evictor().errorPrefix() + "Db.cursor: " + dx.getMessage(); + throw ex; + } + finally + { + if(dbc != null) + { + try { - try - { - dbc.close(); - } - catch(com.sleepycat.db.DeadlockException dx) + dbc.close(); + } + catch(com.sleepycat.db.DeadlockException dx) + { + if(tx != null) { - // - // Ignored - // + DeadlockException ex = new DeadlockException(); + ex.initCause(dx); + ex.message = _store.evictor().errorPrefix() + "Db.cursor: " + dx.getMessage(); + throw ex; } } + catch(com.sleepycat.db.DatabaseException dx) + { + // + // Ignored + // + } } } } - catch(com.sleepycat.db.DatabaseException dx) - { - DatabaseException ex = new DatabaseException(); - ex.initCause(dx); - ex.message = _store.evictor().errorPrefix() + "Db.cursor: " + dx.getMessage(); - throw ex; - } - + if(identities.size() != 0) { Ice.Identity[] result = new Ice.Identity[identities.size()]; - return (Ice.Identity[]) identities.toArray(result); + return (Ice.Identity[])identities.toArray(result); } else { @@ -198,87 +213,101 @@ public abstract class Index implements com.sleepycat.db.SecondaryKeyCreator { EvictorI.DeactivateController deactivateController = _store.evictor().deactivateController(); deactivateController.lock(); - try { - com.sleepycat.db.DatabaseEntry key = new com.sleepycat.db.DatabaseEntry(k); + com.sleepycat.db.DatabaseEntry key = new com.sleepycat.db.DatabaseEntry(k); + // When we have a custom-comparison function, Berkeley DB returns // the key on-disk (when it finds one). We disable this behavior: // (ref Oracle SR 5925672.992) // key.setPartial(true); - + com.sleepycat.db.DatabaseEntry value = new com.sleepycat.db.DatabaseEntry(); // // dlen is 0, so we should not retrieve any value // value.setPartial(true); - _store.evictor().saveNow(); - - try + com.sleepycat.db.Transaction tx = _store.evictor().beforeQuery(); + + for(;;) { - for(;;) + com.sleepycat.db.Cursor dbc = null; + try { - com.sleepycat.db.Cursor dbc = null; - try + dbc = _db.openCursor(tx, null); + if(dbc.getSearchKey(key, value, null) == com.sleepycat.db.OperationStatus.SUCCESS) { - dbc = _db.openCursor(null, null); - if(dbc.getSearchKey(key, value, null) == com.sleepycat.db.OperationStatus.SUCCESS) - { - return dbc.count(); - } - else - { - return 0; - } + return dbc.count(); } - catch(com.sleepycat.db.DeadlockException dx) + else { - if(_store.evictor().deadlockWarning()) - { - _store.communicator().getLogger().warning("Deadlock in Freeze.Index.untypedCount while " + - "iterating over Db \"" + - _store.evictor().filename() + "/" + _dbName + - "\"; retrying..."); - } - - // - // Retry - // + return 0; } - finally + } + catch(com.sleepycat.db.DeadlockException dx) + { + if(_store.evictor().deadlockWarning()) { - if(dbc != null) + _store.communicator().getLogger().warning("Deadlock in Freeze.Index.untypedCount while " + + "iterating over Db \"" + + _store.evictor().filename() + "/" + _dbName + + "\""); + } + + if(tx != null) + { + DeadlockException ex = new DeadlockException(); + ex.initCause(dx); + ex.message = _store.evictor().errorPrefix() + "Db.cursor: " + dx.getMessage(); + throw ex; + } + // + // Otherwise retry + // + } + catch(com.sleepycat.db.DatabaseException dx) + { + DatabaseException ex = new DatabaseException(); + ex.initCause(dx); + ex.message = _store.evictor().errorPrefix() + "Db.cursor: " + dx.getMessage(); + throw ex; + } + finally + { + if(dbc != null) + { + try { - try - { - dbc.close(); - } - catch(com.sleepycat.db.DeadlockException dx) + dbc.close(); + } + catch(com.sleepycat.db.DeadlockException dx) + { + if(tx != null) { - // - // Ignored - // + DeadlockException ex = new DeadlockException(); + ex.initCause(dx); + ex.message = _store.evictor().errorPrefix() + "Db.cursor: " + dx.getMessage(); + throw ex; } } + catch(com.sleepycat.db.DatabaseException dx) + { + // + // Ignored + // + } } } } - catch(com.sleepycat.db.DatabaseException dx) - { - DatabaseException ex = new DatabaseException(); - ex.initCause(dx); - ex.message = _store.evictor().errorPrefix() + "Db.cursor: " + dx.getMessage(); - throw ex; - } } finally { deactivateController.unlock(); } } - + protected final Ice.Communicator communicator() { diff --git a/java/src/Freeze/LinkedList.java b/java/src/Freeze/LinkedList.java index 64601bd8cef..7214bb246fa 100644 --- a/java/src/Freeze/LinkedList.java +++ b/java/src/Freeze/LinkedList.java @@ -39,12 +39,23 @@ public class LinkedList return _header.next.element; } + public java.lang.Object + getLast() + { + if(_size == 0) + { + throw new java.util.NoSuchElementException(); + } + + return _header.previous.element; + } + public void addFirst(java.lang.Object o) { addBefore(o, _header.next); } - + public boolean isEmpty() { diff --git a/java/src/Freeze/ObjectStore.java b/java/src/Freeze/ObjectStore.java index 4d39e2cb365..c2425c5bba5 100644 --- a/java/src/Freeze/ObjectStore.java +++ b/java/src/Freeze/ObjectStore.java @@ -11,13 +11,13 @@ package Freeze; class ObjectStore implements IceUtil.Store { - - ObjectStore(String facet, boolean createDb, EvictorI evictor, + ObjectStore(String facet, String facetType, boolean createDb, EvictorI evictor, java.util.List indices, boolean populateEmptyIndices) { _cache = new IceUtil.Cache(this); _facet = facet; + _evictor = evictor; _indices = indices; _communicator = evictor.communicator(); @@ -31,6 +31,22 @@ class ObjectStore implements IceUtil.Store _dbName = facet; } + if(facetType != null) + { + // + // Create a sample servant with this type + // + Ice.ObjectFactory factory = _communicator.findObjectFactory(facetType); + if(factory == null) + { + throw new DatabaseException(_evictor.errorPrefix() + "No object factory registered for type-id '" + + facetType + "'"); + } + + _sampleServant = factory.create(facetType); + } + + Connection connection = Util.createConnection(_communicator, evictor.dbEnv().getEnvName()); try @@ -111,15 +127,6 @@ class ObjectStore implements IceUtil.Store } } - protected void - finalize() - { - if(_db != null) - { - close(); - } - } - void close() { @@ -130,7 +137,7 @@ class ObjectStore implements IceUtil.Store java.util.Iterator p = _indices.iterator(); while(p.hasNext()) { - Index index = (Index) p.next(); + Index index = (Index)p.next(); index.close(); } _indices.clear(); @@ -146,7 +153,7 @@ class ObjectStore implements IceUtil.Store } boolean - dbHasObject(Ice.Identity ident) + dbHasObject(Ice.Identity ident, com.sleepycat.db.Transaction tx) { byte[] key = marshalKey(ident, _communicator); com.sleepycat.db.DatabaseEntry dbKey = new com.sleepycat.db.DatabaseEntry(key); @@ -161,7 +168,7 @@ class ObjectStore implements IceUtil.Store { try { - com.sleepycat.db.OperationStatus err = _db.get(null, dbKey, dbValue, null); + com.sleepycat.db.OperationStatus err = _db.get(tx, dbKey, dbValue, null); if(err == com.sleepycat.db.OperationStatus.SUCCESS) { @@ -182,18 +189,23 @@ class ObjectStore implements IceUtil.Store { _communicator.getLogger().warning("Deadlock in Freeze.ObjectStore.dhHasObject while reading " + "Db \"" + _evictor.filename() + "/" + _dbName + - "\"; retrying..."); + "\""); } + if(tx != null) + { + DeadlockException ex = new DeadlockException(_evictor.errorPrefix() + "Db.get: " + dx.getMessage()); + ex.initCause(dx); + throw ex; + } // - // Ignored, try again + // Otherwise try again // } catch(com.sleepycat.db.DatabaseException dx) { - DatabaseException ex = new DatabaseException(); + DatabaseException ex = new DatabaseException(_evictor.errorPrefix() + "Db.get: " + dx.getMessage()); ex.initCause(dx); - ex.message = _evictor.errorPrefix() + "Db.get: " + dx.getMessage(); throw ex; } } @@ -203,15 +215,17 @@ class ObjectStore implements IceUtil.Store save(byte[] key, byte[] value, byte status, com.sleepycat.db.Transaction tx) throws com.sleepycat.db.DatabaseException { + assert tx != null; + switch(status) { - case EvictorElement.created: - case EvictorElement.modified: + case BackgroundSaveEvictorI.created: + case BackgroundSaveEvictorI.modified: { com.sleepycat.db.DatabaseEntry dbKey = new com.sleepycat.db.DatabaseEntry(key); com.sleepycat.db.DatabaseEntry dbValue = new com.sleepycat.db.DatabaseEntry(value); com.sleepycat.db.OperationStatus err; - if(status == EvictorElement.created) + if(status == BackgroundSaveEvictorI.created) { err = _db.putNoOverwrite(tx, dbKey, dbValue); } @@ -225,7 +239,7 @@ class ObjectStore implements IceUtil.Store } break; } - case EvictorElement.destroyed: + case BackgroundSaveEvictorI.destroyed: { com.sleepycat.db.DatabaseEntry dbKey = new com.sleepycat.db.DatabaseEntry(key); com.sleepycat.db.OperationStatus err = _db.delete(tx, dbKey); @@ -299,7 +313,6 @@ class ObjectStore implements IceUtil.Store return rec; } - final IceUtil.Cache cache() { @@ -336,10 +349,20 @@ class ObjectStore implements IceUtil.Store return _dbName; } + final Ice.Object + sampleServant() + { + return _sampleServant; + } + + // + // Load a servant from the database; will end up in the cache associated with + // this ObjectStore. This load is not transactional. + // public Object load(Object identObj) { - Ice.Identity ident = (Ice.Identity) identObj; + Ice.Identity ident = (Ice.Identity)identObj; byte[] key = marshalKey(ident, _communicator); @@ -383,20 +406,221 @@ class ObjectStore implements IceUtil.Store throw ex; } } - - EvictorElement result = new EvictorElement(ident, this); - result.rec = unmarshalValue(dbValue.getData(), _communicator); - - _evictor.initialize(ident, _facet, result.rec.servant); + + ObjectRecord rec = unmarshalValue(dbValue.getData(), _communicator); + _evictor.initialize(ident, _facet, rec.servant); + + Object result = _evictor.createEvictorElement(ident, rec, this); return result; } + // + // Load a servant from the database using the given transaction; this servant + // is NOT cached in the ObjectStore associated cache + // + ObjectRecord + load(Ice.Identity ident, TransactionI transaction) + { + com.sleepycat.db.Transaction tx = transaction.dbTxn(); + if(tx == null) + { + throw new DatabaseException(_evictor.errorPrefix() + "invalid TransactionalEvictorContext"); + } + + byte[] key = marshalKey(ident, _communicator); + + com.sleepycat.db.DatabaseEntry dbKey = new com.sleepycat.db.DatabaseEntry(key); + com.sleepycat.db.DatabaseEntry dbValue = new com.sleepycat.db.DatabaseEntry(); + + try + { + com.sleepycat.db.OperationStatus rs = _db.get(tx, dbKey, dbValue, null); + + if(rs == com.sleepycat.db.OperationStatus.NOTFOUND) + { + return null; + } + else if(rs != com.sleepycat.db.OperationStatus.SUCCESS) + { + assert false; + throw new DatabaseException(); + } + } + catch(com.sleepycat.db.DeadlockException dx) + { + if(_evictor.deadlockWarning()) + { + _communicator.getLogger().warning("Deadlock in Freeze.ObjectStore.load while reading Db \"" + + _evictor.filename() + "/" + _dbName + "\""); + } + + DeadlockException ex = new DeadlockException(); + ex.initCause(dx); + ex.message = _evictor.errorPrefix() + "Db.get: " + dx.getMessage(); + throw ex; + } + catch(com.sleepycat.db.DatabaseException dx) + { + DatabaseException ex = new DatabaseException(); + ex.initCause(dx); + ex.message = _evictor.errorPrefix() + "Db.get: " + dx.getMessage(); + throw ex; + } + + ObjectRecord rec = unmarshalValue(dbValue.getData(), _communicator); + _evictor.initialize(ident, _facet, rec.servant); + return rec; + } + + + void + update(Ice.Identity ident, ObjectRecord objectRecord, TransactionI transaction) + { + com.sleepycat.db.Transaction tx = transaction.dbTxn(); + if(tx == null) + { + throw new DatabaseException(_evictor.errorPrefix() + "invalid TransactionalEvictorContext"); + } + + if(_sampleServant != null && !objectRecord.servant.ice_id().equals(_sampleServant.ice_id())) + { + String msg = _evictor.errorPrefix() + "Attempting to save a '" + objectRecord.servant.ice_id() + + "' servant in a database of '" + _sampleServant.ice_id() + "' servants"; + + throw new DatabaseException(msg); + } + + com.sleepycat.db.DatabaseEntry dbKey = new com.sleepycat.db.DatabaseEntry(marshalKey(ident, _communicator)); + com.sleepycat.db.DatabaseEntry dbValue = new com.sleepycat.db.DatabaseEntry(marshalValue(objectRecord, _communicator)); + + try + { + com.sleepycat.db.OperationStatus err = _db.put(tx, dbKey, dbValue); + if(err != com.sleepycat.db.OperationStatus.SUCCESS) + { + throw new DatabaseException(); + } + } + catch(com.sleepycat.db.DeadlockException dx) + { + if(_evictor.deadlockWarning()) + { + _communicator.getLogger().warning("Deadlock in Freeze.ObjectStore.update while updating Db \"" + + _evictor.filename() + "/" + _dbName + "\""); + } + + DeadlockException ex = new DeadlockException(); + ex.initCause(dx); + ex.message = _evictor.errorPrefix() + "Db.put: " + dx.getMessage(); + throw ex; + } + catch(com.sleepycat.db.DatabaseException dx) + { + DatabaseException ex = new DatabaseException(); + ex.initCause(dx); + ex.message = _evictor.errorPrefix() + "Db.put: " + dx.getMessage(); + throw ex; + } + } + + boolean + insert(Ice.Identity ident, ObjectRecord objectRecord, com.sleepycat.db.Transaction tx) + { + com.sleepycat.db.DatabaseEntry dbKey = new com.sleepycat.db.DatabaseEntry(marshalKey(ident, _communicator)); + com.sleepycat.db.DatabaseEntry dbValue = new com.sleepycat.db.DatabaseEntry(marshalValue(objectRecord, _communicator)); + + if(_sampleServant != null && !objectRecord.servant.ice_id().equals(_sampleServant.ice_id())) + { + String msg = _evictor.errorPrefix() + "Attempting to save a '" + objectRecord.servant.ice_id() + + "' servant in a database of '" + _sampleServant.ice_id() + "' servants"; + + throw new DatabaseException(msg); + } + + for(;;) + { + try + { + return _db.putNoOverwrite(tx, dbKey, dbValue) == com.sleepycat.db.OperationStatus.SUCCESS; + } + catch(com.sleepycat.db.DeadlockException dx) + { + if(_evictor.deadlockWarning()) + { + _communicator.getLogger().warning("Deadlock in Freeze.ObjectStore.update while updating Db \"" + + _evictor.filename() + "/" + _dbName + "\""); + } + + if(tx != null) + { + DeadlockException ex = new DeadlockException(); + ex.initCause(dx); + ex.message = _evictor.errorPrefix() + "Db.putNoOverwrite: " + dx.getMessage(); + throw ex; + } + // + // Otherwise retry + // + } + catch(com.sleepycat.db.DatabaseException dx) + { + DatabaseException ex = new DatabaseException(); + ex.initCause(dx); + ex.message = _evictor.errorPrefix() + "Db.putNoOverwrite: " + dx.getMessage(); + throw ex; + } + } + } + + boolean + remove(Ice.Identity ident, com.sleepycat.db.Transaction tx) + { + com.sleepycat.db.DatabaseEntry dbKey = new com.sleepycat.db.DatabaseEntry(marshalKey(ident, _communicator)); + + for(;;) + { + try + { + return _db.delete(tx, dbKey) == com.sleepycat.db.OperationStatus.SUCCESS; + } + catch(com.sleepycat.db.DeadlockException dx) + { + if(_evictor.deadlockWarning()) + { + _communicator.getLogger().warning("Deadlock in Freeze.ObjectStore.remove while updating Db \"" + + _evictor.filename() + "/" + _dbName + "\""); + } + + if(tx != null) + { + DeadlockException ex = new DeadlockException(); + ex.initCause(dx); + ex.message = _evictor.errorPrefix() + "Db.delete: " + dx.getMessage(); + throw ex; + } + + // + // Otherwise retry + // + + } + catch(com.sleepycat.db.DatabaseException dx) + { + DatabaseException ex = new DatabaseException(); + ex.initCause(dx); + ex.message = _evictor.errorPrefix() + "Db.delete: " + dx.getMessage(); + throw ex; + } + } + } + private final IceUtil.Cache _cache; - - private com.sleepycat.db.Database _db; private final String _facet; private final String _dbName; private final EvictorI _evictor; private final java.util.List _indices; private final Ice.Communicator _communicator; + + private com.sleepycat.db.Database _db; + private Ice.Object _sampleServant; } diff --git a/java/src/Freeze/SharedDbEnv.java b/java/src/Freeze/SharedDbEnv.java index 60c7a70dd97..11233f95536 100644 --- a/java/src/Freeze/SharedDbEnv.java +++ b/java/src/Freeze/SharedDbEnv.java @@ -198,12 +198,57 @@ class SharedDbEnv implements com.sleepycat.db.ErrorHandler, Runnable _key.communicator.getLogger().error("Freeze database error in DbEnv \"" + _key.envName + "\": " + message); } - protected void - finalize() + + // + // EvictorContext factory/manager + // + + + // + // Get/create an evictor context associated with the calling thread + // + synchronized TransactionalEvictorContextI + getOrCreateCurrent(Ice.BooleanHolder created) { - assert(_refCount == 0); + if(created != null) + { + created.value = false; + } + + Object k = Thread.currentThread(); + + TransactionalEvictorContextI ctx = (TransactionalEvictorContextI)_ctxMap.get(k); + if(ctx == null) + { + ctx = new TransactionalEvictorContextI(this); + + if(created != null) + { + created.value = true; + } + _ctxMap.put(k, ctx); + } + return ctx; + } + + synchronized TransactionalEvictorContextI + getCurrent() + { + Object k = Thread.currentThread(); + return (TransactionalEvictorContextI)_ctxMap.get(k); + } + + // + // Clear evictor context associated with the calling thread + // + synchronized void + clearCurrent(TransactionalEvictorContextI oldCtx) + { + Object removedCtx = _ctxMap.remove(Thread.currentThread()); + assert removedCtx == oldCtx; } + private SharedDbEnv(MapKey key, com.sleepycat.db.Environment dbEnv) throws com.sleepycat.db.DatabaseException @@ -363,6 +408,8 @@ class SharedDbEnv implements com.sleepycat.db.ErrorHandler, Runnable private int _kbyte = 0; private Thread _thread; + private java.util.Map _ctxMap = new java.util.HashMap(); + // // Hash map of (MapKey, SharedDbEnv) // diff --git a/java/src/Freeze/TransactionalEvictorContextI.java b/java/src/Freeze/TransactionalEvictorContextI.java new file mode 100644 index 00000000000..91e96fcdf3b --- /dev/null +++ b/java/src/Freeze/TransactionalEvictorContextI.java @@ -0,0 +1,349 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package Freeze; + +// +// A Freeze transactional evictor context holds a Berkeley DB transaction +// and a number of servants loaded using this transaction. +// + +class TransactionalEvictorContextI extends Ice.LocalObjectImpl + implements TransactionalEvictorContext, Ice.DispatchInterceptorAsyncCallback +{ + public void + rollbackOnly() + { + _rollbackOnly = true; + } + + public boolean + isRollbackOnly() + { + return _rollbackOnly; + } + + public void + complete() + { + try + { + if(_rollbackOnly) + { + _tx.rollback(); + } + else + { + if(!_stack.empty()) + { + _dbEnv.getCommunicator().getLogger().warning + ("Committing TransactionalEvictorContext on DbEnv '" + _dbEnv.getEnvName() + "' with " + + _stack.size() + " unsaved objects."); + } + + _tx.commit(); + + // + // Finally, remove updated & removed objects from cache + // + java.util.Iterator p = _invalidateList.iterator(); + while(p.hasNext()) + { + ToInvalidate ti = (ToInvalidate)p.next(); + ti.invalidate(); + } + } + } + catch(DeadlockException ex) + { + deadlockException(); + throw ex; + } + finally + { + synchronized(this) + { + if(_dbEnv != null) + { + _dbEnv.clearCurrent(this); + _dbEnv = null; + + notifyAll(); + } + } + } + } + + public boolean + response(boolean ok) + { + if(Thread.currentThread().equals(_owner)) + { + return true; + } + else + { + synchronized(this) + { + if(_deadlockExceptionDetected) + { + return false; + } + if(_dbEnv == null) + { + return true; + } + try + { + wait(); + } + catch(InterruptedException ex) + { + } + return !_deadlockExceptionDetected; + } + } + } + + public boolean + exception(java.lang.Exception ex) + { + if(ex instanceof DeadlockException && Thread.currentThread().equals(_owner)) + { + _deadlockException = (DeadlockException)ex; + return false; + } + return true; + } + + + class ServantHolder + { + ServantHolder(Ice.Current current, ObjectStore store, boolean useNonmutating) + { + _current = current; + _store = store; + + ServantHolder sh = findServantHolder(_current.id, _store); + if(sh != null) + { + if(!sh._removed) + { + _rec = sh._rec; + } + } + else + { + // + // Let's load this servant + // + _rec = store.load(current.id, _tx); + if(_rec != null) + { + _stack.push(this); + _ownServant = true; + + // + // Compute readonly properly + // + _readOnly = (useNonmutating && current.mode == Ice.OperationMode.Nonmutating) || + (!useNonmutating && (_rec.servant.ice_operationAttributes(current.operation) & 0x1) == 0); + } + } + } + + void + release() + { + if(_ownServant) + { + if(!_rollbackOnly) + { + if(!_readOnly && !_removed) + { + EvictorI.updateStats(_rec.stats, System.currentTimeMillis()); + _store.update(_current.id, _rec, _tx); + } + + if(!_readOnly || _removed) + { + _invalidateList.add(new ToInvalidate(_current.id, _store)); + } + } + _stack.pop(); + } + } + + boolean + matches(Ice.Identity ident, ObjectStore store) + { + return ident.equals(_current.id) && store == _store; + } + + Ice.Object + servant() + { + if(_rec == null) + { + return null; + } + else + { + return _rec.servant; + } + } + + void + removed() + { + _removed = true; + } + + private boolean _readOnly = false; + private boolean _ownServant = false; + private boolean _removed = false; + + private final Ice.Current _current; + private final ObjectStore _store; + private ObjectRecord _rec; + }; + + + TransactionalEvictorContextI(SharedDbEnv dbEnv) + { + _dbEnv = dbEnv; + _tx = (TransactionI)(new ConnectionI(_dbEnv).beginTransaction()); + _owner = Thread.currentThread(); + } + + void + checkDeadlockException() + { + if(_deadlockException != null) + { + throw _deadlockException; + } + } + + + TransactionI + transaction() + { + return _tx; + } + + ServantHolder + createServantHolder(Ice.Current current, ObjectStore store, boolean useNonmutating) + { + return new ServantHolder(current, store, useNonmutating); + } + + void + deadlockException() + { + _rollbackOnly = true; + synchronized(this) + { + _deadlockExceptionDetected = true; + notifyAll(); + } + } + + Ice.Object + servantRemoved(Ice.Identity ident, ObjectStore store) + { + if(!_rollbackOnly) + { + // + // Lookup servant holder on stack + // + ServantHolder sh = findServantHolder(ident, store); + if(sh != null) + { + sh.removed(); + return sh.servant(); + } + else + { + _invalidateList.add(new ToInvalidate(ident, store)); + return null; + } + } + return null; + } + + + protected void + finalize() + { + if(_dbEnv != null) + { + _dbEnv.getCommunicator().getLogger().warning + ("Finalizing incomplete TransactionalEvictorContext on DbEnv '" + _dbEnv.getEnvName() + "'"); + } + } + + private ServantHolder + findServantHolder(Ice.Identity ident, ObjectStore store) + { + java.util.Iterator p = _stack.iterator(); + while(p.hasNext()) + { + ServantHolder sh = (ServantHolder)p.next(); + if(sh.matches(ident, store)) + { + return sh; + } + } + return null; + } + + private static class ToInvalidate + { + ToInvalidate(Ice.Identity ident, ObjectStore store) + { + _ident = ident; + _store = store; + } + + void + invalidate() + { + ((TransactionalEvictorI)_store.evictor()).evict(_ident, _store); + } + + private final Ice.Identity _ident; + private final ObjectStore _store; + } + + + // + // Stack of ServantHolder + // + private final java.util.Stack _stack = new java.util.Stack(); + + // + // List of objects to invalidate from the caches upon commit + // + private final java.util.List _invalidateList = new java.util.LinkedList(); + + private final TransactionI _tx; + private final Thread _owner; + + private boolean _rollbackOnly = false; + + private DeadlockException _deadlockException; + + private SharedDbEnv _dbEnv; + + // + // Protected by this + // + private boolean _deadlockExceptionDetected = false; +} diff --git a/java/src/Freeze/TransactionalEvictorI.java b/java/src/Freeze/TransactionalEvictorI.java new file mode 100644 index 00000000000..62696eb061c --- /dev/null +++ b/java/src/Freeze/TransactionalEvictorI.java @@ -0,0 +1,729 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package Freeze; + +class TransactionalEvictorI extends EvictorI implements TransactionalEvictor +{ + public TransactionalEvictorContext + getCurrentContext() + { + _deactivateController.lock(); + try + { + return _dbEnv.getCurrent(); + } + finally + { + _deactivateController.unlock(); + } + } + + public TransactionalEvictorContext + createCurrentContext() + { + _deactivateController.lock(); + try + { + Ice.BooleanHolder created = new Ice.BooleanHolder(); + TransactionalEvictorContext ctx = _dbEnv.getOrCreateCurrent(created); + + if(created.value == false) + { + throw new DatabaseException(_errorPrefix + + "createCurrentContext: there is already a current context"); + } + return ctx; + } + finally + { + _deactivateController.unlock(); + } + } + + public Ice.ObjectPrx + addFacet(Ice.Object servant, Ice.Identity ident, String facet) + { + checkIdentity(ident); + + if(facet == null) + { + facet = ""; + } + + _deactivateController.lock(); + try + { + long currentTime = System.currentTimeMillis(); + + ObjectRecord rec = new ObjectRecord(servant, new Statistics(currentTime, 0, 0)); + + ObjectStore store = findStore(facet, _createDb); + if(store == null) + { + NotFoundException ex = new NotFoundException(); + ex.message = _errorPrefix + "addFacet: could not open database for facet '" + + facet + "'"; + throw ex; + } + + com.sleepycat.db.Transaction tx = beforeQuery(); + + updateStats(rec.stats, currentTime); + + if(!store.insert(ident, rec, tx)) + { + Ice.AlreadyRegisteredException ex = new Ice.AlreadyRegisteredException(); + ex.kindOfObject = "servant"; + ex.id = _communicator.identityToString(ident); + if(facet.length() > 0) + { + ex.id += " -f " + IceUtil.StringUtil.escapeString(facet, ""); + } + throw ex; + } + + Ice.ObjectPrx obj = _adapter.createProxy(ident); + if(facet.length() > 0) + { + obj = obj.ice_facet(facet); + } + return obj; + } + finally + { + _deactivateController.unlock(); + } + } + + public Ice.Object + removeFacet(Ice.Identity ident, String facet) + { + checkIdentity(ident); + if(facet == null) + { + facet = ""; + } + + _deactivateController.lock(); + try + { + Ice.Object servant = null; + boolean removed = false; + + ObjectStore store = findStore(facet, false); + if(store != null) + { + TransactionalEvictorContextI ctx = _dbEnv.getCurrent(); + com.sleepycat.db.Transaction tx = null; + if(ctx != null) + { + tx = ctx.transaction().dbTxn(); + if(tx == null) + { + throw new DatabaseException(_errorPrefix + "invalid TransactionalEvictorContext"); + } + } + + removed = store.remove(ident, tx); + + if(removed) + { + if(ctx != null) + { + // + // Remove from cache when transaction commits + // + servant = ctx.servantRemoved(ident, store); + } + else + { + // + // Remove from cache immediately + // + servant = evict(ident, store); + } + } + } + + if(!removed) + { + Ice.NotRegisteredException ex = new Ice.NotRegisteredException(); + ex.kindOfObject = "servant"; + ex.id = _communicator.identityToString(ident); + if(facet.length() > 0) + { + ex.id += " -f " + IceUtil.StringUtil.escapeString(facet, ""); + } + throw ex; + } + + if(_trace >= 1) + { + String objString = "object \"" + _communicator.identityToString(ident) + "\""; + if(!facet.equals("")) + { + objString += " with facet \"" + facet + "\""; + } + + _communicator.getLogger().trace("Freeze.Evictor", "removed " + objString + " from Db \"" + _filename + + "\""); + } + return servant; + } + finally + { + _deactivateController.unlock(); + } + } + + public boolean + hasFacet(Ice.Identity ident, String facet) + { + checkIdentity(ident); + if(facet == null) + { + facet = ""; + } + + _deactivateController.lock(); + try + { + ObjectStore store = findStore(facet, false); + + if(store == null) + { + return false; + } + + com.sleepycat.db.Transaction tx = beforeQuery(); + + if(tx == null) + { + EvictorElement element = (EvictorElement)store.cache().getIfPinned(ident); + if(element != null) + { + return true; + } + + return store.dbHasObject(ident, null); + } + else + { + return store.dbHasObject(ident, tx); + } + } + finally + { + _deactivateController.unlock(); + } + } + + + public void + finished(Ice.Current current, Ice.Object servant, Ice.LocalObject cookieObj) + { + // + // Nothing to do + // + } + + public void + deactivate(String category) + { + if(_deactivateController.deactivate()) + { + try + { + closeDbEnv(); + } + finally + { + _deactivateController.deactivationComplete(); + } + } + } + + TransactionalEvictorI(Ice.ObjectAdapter adapter, String envName, com.sleepycat.db.Environment dbEnv, String filename, + java.util.Map facetTypes, ServantInitializer initializer, Index[] indices, boolean createDb) + { + super(adapter, envName, dbEnv, filename, facetTypes, initializer, indices, createDb); + } + + TransactionalEvictorI(Ice.ObjectAdapter adapter, String envName, String filename, + java.util.Map facetTypes, ServantInitializer initializer, Index[] indices, boolean createDb) + { + this(adapter, envName, null, filename, facetTypes, initializer, indices, createDb); + } + + // + // The interceptor dispatch call + // + Ice.DispatchStatus + dispatch(Ice.Request request) + { + _deactivateController.lock(); + try + { + Ice.Current current = request.getCurrent(); + + ObjectStore store = findStore(current.facet, false); + if(store == null) + { + servantNotFound(current); + } + + // + // Is there an existing context? + // + TransactionalEvictorContextI ctx = _dbEnv.getCurrent(); + + if(ctx != null) + { + try + { + // + // If yes, use this context; there is no retrying + // + TransactionalEvictorContextI.ServantHolder sh = ctx.createServantHolder(current, store, _useNonmutating); + + if(sh.servant() == null) + { + servantNotFound(current); + } + + try + { + Ice.DispatchStatus dispatchStatus = sh.servant().ice_dispatch(request, ctx); + + if(dispatchStatus == Ice.DispatchStatus.DispatchUserException && _rollbackOnUserException) + { + ctx.rollbackOnly(); + } + if(dispatchStatus == Ice.DispatchStatus.DispatchAsync) + { + ctx.checkDeadlockException(); + } + return dispatchStatus; + } + catch(RuntimeException ex) + { + ctx.rollbackOnly(); + throw ex; + } + finally + { + sh.release(); + } + } + catch(DeadlockException ex) + { + ctx.deadlockException(); + throw ex; + } + catch(RuntimeException ex) + { + ctx.rollbackOnly(); + throw ex; + } + } + else + { + Ice.Object servant = null; + + // + // Otherwise, first figure out if it's a read or write operation + // + boolean readOnly = true; + + if(_useNonmutating) + { + readOnly = (current.mode == Ice.OperationMode.Nonmutating); + } + else + { + // + // Is there a sample-servant associated with this store? + // + + Ice.Object sample = store.sampleServant(); + if(sample != null) + { + readOnly = (sample.ice_operationAttributes(current.operation) & 0x1) == 0; + } + else + { + // + // Otherwise find / load read-only servant + // + servant = loadCachedServant(current.id, store); + if(servant == null) + { + servantNotFound(current); + } + else + { + readOnly = (servant.ice_operationAttributes(current.operation) & 0x1) == 0; + } + } + } + + // + // readOnly is now set properly + // + if(readOnly) + { + if(servant == null) + { + servant = loadCachedServant(current.id, store); + if(servant == null) + { + servantNotFound(current); + } + } + // otherwise reuse servant loaded above + + // + // Non-transactional, read-only dispatch + // + return servant.ice_dispatch(request, null); + } + else + { + // + // Create a new transaction; retry on DeadlockException + // + + boolean tryAgain = false; + + do + { + ctx = _dbEnv.getOrCreateCurrent(null); + + try + { + try + { + TransactionalEvictorContextI.ServantHolder sh = ctx.createServantHolder(current, store, _useNonmutating); + + if(sh.servant() == null) + { + servantNotFound(current); + } + + try + { + Ice.DispatchStatus dispatchStatus = sh.servant().ice_dispatch(request, ctx); + if(dispatchStatus == Ice.DispatchStatus.DispatchUserException && _rollbackOnUserException) + { + ctx.rollbackOnly(); + } + if(dispatchStatus == Ice.DispatchStatus.DispatchAsync) + { + ctx.checkDeadlockException(); + } + return dispatchStatus; + } + catch(RuntimeException ex) + { + ctx.rollbackOnly(); + throw ex; + } + finally + { + sh.release(); + } + } + catch(DeadlockException ex) + { + ctx.deadlockException(); + throw ex; + } + catch(RuntimeException ex) + { + ctx.rollbackOnly(); + throw ex; + } + finally + { + ctx.complete(); + } + } + catch(DeadlockException ex) + { + tryAgain = true; + } + + } while(tryAgain); + } + } + + // + // Javac does not detect this can't be reached + // + assert(false); + throw new Ice.OperationNotExistException(); + } + finally + { + _deactivateController.unlock(); + } + } + + synchronized Ice.Object + evict(Ice.Identity ident, ObjectStore store) + { + EvictorElement element = (EvictorElement)store.cache().unpin(ident); + + if(element != null) + { + element.evict(false); + return element.servant; + } + return null; + } + + protected Object + createEvictorElement(Ice.Identity ident, ObjectRecord rec, ObjectStore store) + { + return new EvictorElement(rec.servant, ident, store); + } + + protected Ice.Object + locateImpl(Ice.Current current, Ice.LocalObjectHolder cookie) + { + return _interceptor; + } + + protected boolean + hasAnotherFacet(Ice.Identity ident, String facet) + { + _deactivateController.lock(); + try + { + java.util.Map storeMapCopy; + synchronized(this) + { + storeMapCopy = new java.util.HashMap(_storeMap); + } + + com.sleepycat.db.Transaction tx = beforeQuery(); + + java.util.Iterator p = storeMapCopy.entrySet().iterator(); + while(p.hasNext()) + { + java.util.Map.Entry entry = (java.util.Map.Entry)p.next(); + + // + // Do not check this facet again + // + if(!facet.equals(entry.getKey())) + { + ObjectStore store = (ObjectStore)entry.getValue(); + + if(tx == null && store.cache().getIfPinned(ident) != null) + { + return true; + } + + if(store.dbHasObject(ident, tx)) + { + return true; + } + } + } + + return false; + } + finally + { + _deactivateController.unlock(); + } + } + + protected void + evict() + { + assert Thread.holdsLock(this); + + while(_currentEvictorSize > _evictorSize) + { + // + // Evict, no matter what! + // + EvictorElement element = (EvictorElement)_evictorList.getLast(); + element.evict(true); + } + } + + protected com.sleepycat.db.Transaction + beforeQuery() + { + TransactionalEvictorContextI ctx = _dbEnv.getCurrent(); + com.sleepycat.db.Transaction tx = null; + if(ctx != null) + { + tx = ctx.transaction().dbTxn(); + if(tx == null) + { + throw new DatabaseException(_errorPrefix + "invalid TransactionalEvictorContext"); + } + } + + return tx; + } + + private void + servantNotFound(Ice.Current current) + { + if(_trace >= 2) + { + _communicator.getLogger().trace("Freeze.Evictor", "could not find \"" + + _communicator.identityToString(current.id) +"\" with facet \"" + current.facet + "\""); + } + + if(hasAnotherFacet(current.id, current.facet)) + { + throw new Ice.FacetNotExistException(current.id, current.facet, current.operation); + } + else + { + throw new Ice.ObjectNotExistException(current.id, current.facet, current.operation); + } + } + + private Ice.Object + loadCachedServant(Ice.Identity ident, ObjectStore store) + { + for(;;) + { + EvictorElement element = (EvictorElement)store.cache().pin(ident); + + if(element == null) + { + return null; + } + + synchronized(this) + { + if(element.stale) + { + // + // try again + // + continue; + } + + element.fixEvictPosition(); + + // + // if _evictorSize is 0, I may evict myself ... no big deal + // + evict(); + return element.servant; + } + } + } + + + private class EvictorElement + { + EvictorElement(Ice.Object servant, Ice.Identity identity, ObjectStore store) + { + this.servant = servant; + _identity = identity; + _store = store; + } + + void + evict(boolean unpin) + { + assert Thread.holdsLock(TransactionalEvictorI.this); + assert stale == false; + stale = true; + + if(unpin) + { + _store.cache().unpin(_identity); + } + + if(_evictPosition != null) + { + _evictPosition.remove(); + _evictPosition = null; + _currentEvictorSize--; + } + else + { + assert(!unpin); + } + } + + void + fixEvictPosition() + { + assert Thread.holdsLock(TransactionalEvictorI.this); + assert stale == false; + + if(_evictPosition == null) + { + // + // New element + // + _currentEvictorSize++; + } + else + { + _evictPosition.remove(); + } + _evictorList.addFirst(this); + _evictPosition = _evictorList.iterator(); + // + // Position the iterator "on" the element. + // + _evictPosition.next(); + } + + + final Ice.Object servant; + + // + // Protected by the TransactionEvictorI mutex + // + boolean stale = false; // stale = true means no longer in the cache + private java.util.Iterator _evictPosition; + + // + // These two fields are only needed for eviction + // + final private ObjectStore _store; + final private Ice.Identity _identity; + } + + + // + // List of EvictorElement with stable iterators + // + private final Freeze.LinkedList _evictorList = new Freeze.LinkedList(); + private int _currentEvictorSize = 0; + + // + // A simple adapter + // + private Ice.DispatchInterceptor _interceptor = new Ice.DispatchInterceptor() + { + public Ice.DispatchStatus + dispatch(Ice.Request request) + { + return TransactionalEvictorI.this.dispatch(request); + } + }; + + private final boolean _rollbackOnUserException = false; +} diff --git a/java/src/Freeze/Util.java b/java/src/Freeze/Util.java index 98faf51ed36..67169f3aef8 100644 --- a/java/src/Freeze/Util.java +++ b/java/src/Freeze/Util.java @@ -11,20 +11,36 @@ package Freeze; public class Util { - public static Evictor - createEvictor(Ice.ObjectAdapter adapter, String envName, String filename, ServantInitializer initializer, - Index[] indices, boolean createDb) + public static BackgroundSaveEvictor + createBackgroundSaveEvictor(Ice.ObjectAdapter adapter, String envName, String filename, ServantInitializer initializer, + Index[] indices, boolean createDb) { - return new EvictorI(adapter, envName, filename, initializer, indices, createDb); + return new BackgroundSaveEvictorI(adapter, envName, filename, initializer, indices, createDb); } - public static Evictor - createEvictor(Ice.ObjectAdapter adapter, String envName, com.sleepycat.db.Environment dbEnv, String filename, - ServantInitializer initializer, Index[] indices, boolean createDb) + public static BackgroundSaveEvictor + createBackgroundSaveEvictor(Ice.ObjectAdapter adapter, String envName, com.sleepycat.db.Environment dbEnv, String filename, + ServantInitializer initializer, Index[] indices, boolean createDb) { - return new EvictorI(adapter, envName, dbEnv, filename, initializer, indices, createDb); + return new BackgroundSaveEvictorI(adapter, envName, dbEnv, filename, initializer, indices, createDb); } + + public static TransactionalEvictor + createTransactionalEvictor(Ice.ObjectAdapter adapter, String envName, String filename, java.util.Map facetTypes, + ServantInitializer initializer, Index[] indices, boolean createDb) + { + return new TransactionalEvictorI(adapter, envName, filename, facetTypes, initializer, indices, createDb); + } + + public static TransactionalEvictor + createTransactionalEvictor(Ice.ObjectAdapter adapter, String envName, com.sleepycat.db.Environment dbEnv, String filename, + java.util.Map facetTypes, ServantInitializer initializer, Index[] indices, boolean createDb) + { + return new TransactionalEvictorI(adapter, envName, dbEnv, filename, facetTypes, initializer, indices, createDb); + } + + public static Connection createConnection(Ice.Communicator communicator, String envName) { @@ -63,7 +79,7 @@ public class Util return result; } - static synchronized void handleFatalError(Evictor evictor, Ice.Communicator communicator, RuntimeException ex) + static synchronized void handleFatalError(BackgroundSaveEvictor evictor, Ice.Communicator communicator, RuntimeException ex) { if(_fatalErrorCallback != null) { diff --git a/java/src/Ice/LoggerI.java b/java/src/Ice/LoggerI.java index 614fc16d1d8..a2e51765d67 100644 --- a/java/src/Ice/LoggerI.java +++ b/java/src/Ice/LoggerI.java @@ -60,6 +60,7 @@ public class LoggerI extends LocalObjectImpl implements Logger s.append(' '); s.append(_prefix); s.append("warning: "); + s.append(Thread.currentThread().getName() + ": "); s.append(message); System.err.print(s.toString() + _lineSeparator); } @@ -73,6 +74,7 @@ public class LoggerI extends LocalObjectImpl implements Logger s.append(' '); s.append(_prefix); s.append("error: "); + s.append(Thread.currentThread().getName() + ": "); s.append(message); System.err.print(s.toString() + _lineSeparator); } diff --git a/java/src/IceUtil/Cache.java b/java/src/IceUtil/Cache.java index 21e36dd39b1..0ba59b91b35 100644 --- a/java/src/IceUtil/Cache.java +++ b/java/src/IceUtil/Cache.java @@ -26,7 +26,7 @@ public class Cache { synchronized(_map) { - CacheValue val = (CacheValue) _map.get(key); + CacheValue val = (CacheValue)_map.get(key); return val == null ? null : val.obj; } } @@ -36,7 +36,7 @@ public class Cache { synchronized(_map) { - CacheValue val = (CacheValue) _map.remove(key); + CacheValue val = (CacheValue)_map.remove(key); return val == null ? null : val.obj; } } @@ -70,7 +70,7 @@ public class Cache { synchronized(_map) { - CacheValue existingVal = (CacheValue) _map.put(key, new CacheValue(o)); + CacheValue existingVal = (CacheValue)_map.put(key, new CacheValue(o)); if(existingVal != null) { _map.put(key, existingVal); @@ -131,7 +131,7 @@ public class Cache synchronized(_map) { - val = (CacheValue) _map.get(key); + val = (CacheValue)_map.get(key); if(val == null) { val = new CacheValue(); |