[teiid-commits] teiid SVN: r3550 - in trunk: engine/src/main/java/org/teiid/common/buffer and 5 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Oct 13 21:46:31 EDT 2011


Author: shawkins
Date: 2011-10-13 21:46:31 -0400 (Thu, 13 Oct 2011)
New Revision: 3550

Removed:
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java
Modified:
   trunk/common-core/src/main/java/org/teiid/core/util/ExecutorUtils.java
   trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
   trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java
   trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
   trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
   trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
Log:
TEIID-1750 wiring in the new buffer logic

Modified: trunk/common-core/src/main/java/org/teiid/core/util/ExecutorUtils.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/util/ExecutorUtils.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/common-core/src/main/java/org/teiid/core/util/ExecutorUtils.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -25,6 +25,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -38,9 +39,13 @@
 	 * @return
 	 */
     public static ExecutorService newFixedThreadPool(int nThreads, String name) {
+        return newFixedThreadPool(nThreads, Integer.MAX_VALUE, name);
+    }
+    
+    public static ExecutorService newFixedThreadPool(int nThreads, int maxQueue, String name) {
         ThreadPoolExecutor tpe = new ThreadPoolExecutor(nThreads, nThreads,
                                       60L, TimeUnit.SECONDS,
-                                      new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(name));
+                                      maxQueue==0?new SynchronousQueue<Runnable>():new LinkedBlockingQueue<Runnable>(maxQueue), new NamedThreadFactory(name));
         tpe.allowCoreThreadTimeOut(true);
         return tpe;
     }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -25,8 +25,8 @@
 public class BaseCacheEntry implements Comparable<BaseCacheEntry> {
 
 	private Long id;
-	protected long lastAccess;
-	protected double orderingValue;
+	protected float lastAccess;
+	protected float orderingValue;
 	
 	public BaseCacheEntry(Long id) {
 		this.id = id;
@@ -46,19 +46,19 @@
 		return getId().toString();
 	}
 
-	public long getLastAccess() {
+	public float getLastAccess() {
 		return lastAccess;
 	}
 	
-	public void setLastAccess(long lastAccess) {
+	public void setLastAccess(float lastAccess) {
 		this.lastAccess = lastAccess;
 	}
 	
-	public double getOrderingValue() {
+	public float getOrderingValue() {
 		return orderingValue;
 	}
 	
-	public void setOrderingValue(double orderingValue) {
+	public void setOrderingValue(float orderingValue) {
 		this.orderingValue = orderingValue;
 	}
 	
@@ -66,7 +66,7 @@
 	public int compareTo(BaseCacheEntry o) {
 		int result = (int) Math.signum(orderingValue - o.orderingValue);
 		if (result == 0) {
-			result = Long.signum(lastAccess - o.lastAccess);
+			result = (int)Math.signum(lastAccess - o.lastAccess);
 			if (result == 0) {
 				return Long.signum(id - o.id);
 			}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -25,17 +25,18 @@
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 
+/**
+ * TODO: support freeing of datablocks as we go
+ */
 final class BlockInputStream extends InputStream {
 	private final BlockManager manager;
 	private final int maxBlock;
 	int blockIndex;
 	ByteBuffer buf;
-	boolean free;
 	boolean done;
 
-	BlockInputStream(BlockManager manager, int blockCount, boolean free) {
+	BlockInputStream(BlockManager manager, int blockCount) {
 		this.manager = manager;
-		this.free = free;
 		this.maxBlock = blockCount;
 	}
 
@@ -52,15 +53,9 @@
 		if (buf == null || buf.remaining() == 0) {
 			if (maxBlock == blockIndex) {
 				done = true;
-				if (blockIndex > 1 && free) {
-					manager.freeBlock(blockIndex - 1, false);
-				}
 				return;
 			}
 			buf = manager.getBlock(blockIndex++);
-			if (blockIndex > 2 && free) {
-				manager.freeBlock(blockIndex - 2, false);
-			}
 		}
 	}
 

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -41,7 +41,7 @@
 	 */
 	ByteBuffer getBlock(int index);
 	
-	int freeBlock(int index, boolean steal);
+	void freeBlock(int index);
 	
 	int free(boolean steal);
 

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -25,16 +25,20 @@
 import org.teiid.common.buffer.FileStore;
 import org.teiid.common.buffer.StorageManager;
 
+/**
+ * Represents a FileStore that holds blocks of a fixed size.
+ */
 class BlockStore {
 	final long blockSize;
 	final ConcurrentBitSet blocksInUse;
 	final FileStore[] stores;
 	
-	public BlockStore(StorageManager storageManager, int blockSize, int blockCountLog) {
+	public BlockStore(StorageManager storageManager, int blockSize, int blockCountLog, int concurrencyLevel) {
 		this.blockSize = blockSize;
 		int blockCount = 1 << blockCountLog;
-		this.blocksInUse = new ConcurrentBitSet(blockCount, BufferManagerImpl.CONCURRENCY_LEVEL/2);
-		this.stores = new FileStore[BufferManagerImpl.CONCURRENCY_LEVEL/2];
+		this.blocksInUse = new ConcurrentBitSet(blockCount, concurrencyLevel);
+		this.blocksInUse.setCompact(true);
+		this.stores = new FileStore[concurrencyLevel];
 		for (int i = 0; i < stores.length; i++) {
 			this.stores[i] = storageManager.createFileStore(String.valueOf(blockSize) + '_' + i); 
 		}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -35,7 +35,10 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -48,6 +51,7 @@
 import org.teiid.common.buffer.StorageManager;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.util.ExecutorUtils;
 import org.teiid.core.util.ObjectConverterUtil;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
@@ -86,9 +90,14 @@
  * proportion to the number of tables/tuplebuffers in use.
  * 
  * TODO: compact tail storage blocks.  there may be dangling blocks causing us to consume disk space.
+ * we should at least reclaim tail space if the end block is removed.  for now we are just relying
+ * on the compact option of {@link ConcurrentBitSet} to keep the blocks at the start of the
+ * files.
  */
 public class BufferFrontedFileStoreCache implements Cache, StorageManager {
 	
+	public static final int DEFAuLT_MAX_OBJECT_SIZE = 1 << 23;
+	
 	static final int ADDRESS_BITS = 31;
 	static final int SYSTEM_MASK = 1<<ADDRESS_BITS;
 	static final int BYTES_PER_BLOCK_ADDRESS = 4;
@@ -212,7 +221,7 @@
 				if ((next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS) {
 					memoryEvictionLock.readLock().unlock();
 					readLocked = false;
-					next = evictFromMemoryBuffer();
+					next = evictFromMemoryBuffer(true);
 				}
 			} finally {
 				if (readLocked) {
@@ -226,12 +235,9 @@
 		}
 
 		@Override
-		public int freeBlock(int index, boolean steal) {
+		public void freeBlock(int index) {
 			int dataBlock = getOrUpdateDataBlockIndex(index, EMPTY_ADDRESS, Mode.UPDATE);
-			if (!steal) {
-				freeDataBlock(dataBlock);
-			}
-			return dataBlock;
+			freeDataBlock(dataBlock);
 		}
 
 		private void freeDataBlock(int dataBlock) {
@@ -248,6 +254,7 @@
 					if (this.inode == -1) {
 						throw new AssertionError("Out of inodes"); //$NON-NLS-1$
 					}
+					inodesCreated.getAndIncrement();
 					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
 						LogManager.logDetail(LogConstants.CTX_DQP, "Allocating inode", this.inode, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
 					}
@@ -268,11 +275,12 @@
 			int dataBlockToSteal = bb.getInt(0);
 			int indirectIndexBlock = bb.getInt(BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS);
 			int doublyIndirectIndexBlock = bb.getInt(BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1));
-			boolean freedAll = freeBlock(steal?BYTES_PER_BLOCK_ADDRESS:0, bb, DIRECT_POINTERS, true);
+			boolean freedAll = freeBlock(steal?BYTES_PER_BLOCK_ADDRESS:0, bb, DIRECT_POINTERS-(steal?1:0), true);
 			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
 				LogManager.logDetail(LogConstants.CTX_DQP, "freeing inode", inode, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
 			}
 			inodesInuse.clear(inode);
+			inodesRemoved.getAndIncrement();
 			if (!freedAll || indirectIndexBlock == EMPTY_ADDRESS) {
 				return steal?dataBlockToSteal:EMPTY_ADDRESS;
 			}
@@ -315,6 +323,8 @@
 			return blockByteBuffer.getByteBuffer(dataBlock);
 		}
 	}
+	AtomicInteger inodesRemoved = new AtomicInteger();
+	AtomicInteger inodesCreated = new AtomicInteger();
 
 	private static class PhysicalInfo extends BaseCacheEntry {
 		int inode = EMPTY_ADDRESS;
@@ -336,32 +346,16 @@
 		}
 	}
 	
-	double crfLamda = .0001;
-	
-	StorageManager storageManager;
-	int maxStorageObjectSize = 1 << 23; //8MB
-	private long memoryBufferSpace = 1 << 27;
+	private StorageManager storageManager;
+	private int maxStorageObjectSize = DEFAuLT_MAX_OBJECT_SIZE;
+	private long memoryBufferSpace = 1 << 26; //64MB
 	private boolean direct;
 	
-	int maxMemoryBlocks;
+	private int maxMemoryBlocks;
 	private AtomicLong readAttempts = new AtomicLong();
-	PartiallyOrderedCache<Long, PhysicalInfo> memoryBufferEntries = new PartiallyOrderedCache<Long, PhysicalInfo>(16, .75f, BufferManagerImpl.CONCURRENCY_LEVEL) {
-		
-		@Override
-		protected void recordAccess(Long key, PhysicalInfo value, boolean initial) {
-			long lastAccess = value.getLastAccess();
-			value.setLastAccess(readAttempts.get());
-			if (initial && lastAccess == 0) {
-				return;
-			}
-			double orderingValue = value.getOrderingValue();
-			orderingValue = computeNextOrderingValue(value.getLastAccess(), lastAccess, orderingValue);
-			value.setOrderingValue(orderingValue);
-		}
-
-	};
+	private OrderedCache<Long, PhysicalInfo> memoryBufferEntries = new OrderedCache<Long, PhysicalInfo>(16, .75f, BufferManagerImpl.CONCURRENCY_LEVEL, readAttempts);
 	private Semaphore memoryWritePermits; //prevents deadlock waiting for free blocks
-	ReentrantReadWriteLock memoryEvictionLock = new ReentrantReadWriteLock();
+	private ReentrantReadWriteLock memoryEvictionLock = new ReentrantReadWriteLock(true);
 	
 	private int blocks;
 	private ConcurrentBitSet blocksInuse;
@@ -374,18 +368,41 @@
 	private ConcurrentHashMap<Long, Map<Long, PhysicalInfo>> physicalMapping = new ConcurrentHashMap<Long, Map<Long, PhysicalInfo>>(16, .75f, BufferManagerImpl.CONCURRENCY_LEVEL);
 	private BlockStore[] sizeBasedStores;
 	
+	private AtomicBoolean cleanerRunning = new AtomicBoolean();
+	private ExecutorService asynchPool = ExecutorUtils.newFixedThreadPool(1, 0, "FileStore Worker"); //$NON-NLS-1$
+	private final Runnable cleaningTask = new Runnable() {
+		
+		@Override
+		public void run() {
+			try {
+				while (lowBlocks()) {
+					if (evictFromMemoryBuffer(false) == EMPTY_ADDRESS) {
+						break;
+					}
+				}
+			} finally {
+				cleanerRunning.set(false);
+			}
+		}
+	};
+	private int cleaningThreshold;
+	
+	private AtomicLong storageWrites = new AtomicLong();
+	private AtomicLong storageReads = new AtomicLong();
+	
 	@Override
 	public void initialize() throws TeiidComponentException {
 		storageManager.initialize();
+		memoryBufferSpace = Math.max(memoryBufferSpace, maxStorageObjectSize);
 		blocks = (int) Math.min(Integer.MAX_VALUE, (memoryBufferSpace>>LOG_BLOCK_SIZE));
 		inodesInuse = new ConcurrentBitSet(blocks+1, BufferManagerImpl.CONCURRENCY_LEVEL);
 		blocksInuse = new ConcurrentBitSet(blocks, BufferManagerImpl.CONCURRENCY_LEVEL);
 		this.blockByteBuffer = new BlockByteBuffer(30, blocks, LOG_BLOCK_SIZE, direct);
 		//ensure that we'll run out of blocks first
 		this.inodeByteBuffer = new BlockByteBuffer(30, blocks+1, LOG_INODE_SIZE, direct);
-		memoryBufferSpace = Math.max(memoryBufferSpace, maxStorageObjectSize);
 		memoryWritePermits = new Semaphore(Math.max(1, (int)Math.min(memoryBufferSpace/maxStorageObjectSize, Integer.MAX_VALUE)));
 		maxMemoryBlocks = Math.min(MAX_DOUBLE_INDIRECT, maxStorageObjectSize>>LOG_BLOCK_SIZE);
+		cleaningThreshold = Math.min(maxMemoryBlocks<<3, blocks>>1); //try to maintain enough freespace to hold 8 max objects
 		//account for index pointer block overhead
 		if (maxMemoryBlocks > DIRECT_POINTERS) {
 			maxMemoryBlocks--;
@@ -397,20 +414,18 @@
 		List<BlockStore> stores = new ArrayList<BlockStore>();
 		int size = BLOCK_SIZE;
 		do {
-			stores.add(new BlockStore(this.storageManager, size, 30));
+			if ((size>>1) >= maxStorageObjectSize) {
+				size>>=1;  //adjust the last block size if needed
+			}
+			stores.add(new BlockStore(this.storageManager, size, 30, BufferManagerImpl.CONCURRENCY_LEVEL>>2));
 			size <<=2;
 		} while (size>>2 < maxStorageObjectSize);
 		this.sizeBasedStores = stores.toArray(new BlockStore[stores.size()]);
 	}
 	
-	double computeNextOrderingValue(long currentTime,
-			long lastAccess, double orderingValue) {
-		orderingValue = 
-			//Frequency component
-			orderingValue*Math.pow(1-crfLamda, currentTime - lastAccess)
-			//recency component
-			+ Math.pow(currentTime, crfLamda);
-		return orderingValue;
+	boolean lowBlocks() {
+		int bitsSet = blocksInuse.getBitsSet();
+		return bitsSet > 0 && (blocks - bitsSet < cleaningThreshold) && memoryBufferEntries.firstEntry() != null;
 	}
 	
 	InodeBlockManager getBlockManager(long gid, long oid, int inode) {
@@ -441,12 +456,17 @@
 			} else {
 				newEntry = false;
 				synchronized (info) {
-					if (info.inode != EMPTY_ADDRESS || !shouldPlaceInMemoryBuffer(readAttempts.get(), info)) {
+					//we assume that serialization would be faster than a disk read
+					if (info.inode != EMPTY_ADDRESS || !shouldPlaceInMemoryBuffer(0, info)) {
 						success = true;
 						return; 
 					}
 				}
 			}
+			if (!cleanerRunning.get() && lowBlocks() && cleanerRunning.compareAndSet(false, true)) {
+				LogManager.logDetail(LogConstants.CTX_DQP, "Starting memory buffer cleaner"); //$NON-NLS-1$
+				asynchPool.execute(cleaningTask);
+			}
 			memoryWritePermits.acquire();
 			hasPermit = true;
 			blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
@@ -461,8 +481,12 @@
             		if (newEntry) {
 	           			info = new PhysicalInfo(s.getId(), entry.getId(), blockManager.getInode(), fsos.getBytesWritten());
 		                map.put(entry.getId(), info);
-	            		memoryBufferEntries.put(entry.getId(), info);
+            		} else {
+            			synchronized (info) {
+                			info.inode = blockManager.getInode();
+						}
             		}
+            		memoryBufferEntries.put(entry.getId(), info);
             		success = true;
             	}
 			}
@@ -477,7 +501,7 @@
 			}
 		}
 	}
-
+	
 	@Override
 	public CacheEntry get(Long oid, Serializer<?> serializer) throws TeiidComponentException {
 		long currentTime = readAttempts.incrementAndGet();
@@ -499,28 +523,34 @@
 						LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at inode", info.inode, serializer.getId(), oid); //$NON-NLS-1$
 					}
 					BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
-					is = new BlockInputStream(manager, info.memoryBlockCount, false);
+					is = new BlockInputStream(manager, info.memoryBlockCount);
 				} else if (info.block != EMPTY_ADDRESS) {
+					storageReads.incrementAndGet();
 					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
 						LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at block", info.block, info.sizeIndex, serializer.getId(), oid); //$NON-NLS-1$
 					}
 					BlockStore blockStore = sizeBasedStores[info.sizeIndex];
 					FileStore fs = blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
 					long blockOffset = (info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
-					is = fs.createInputStream(blockOffset);
+					is = fs.createInputStream(blockOffset, info.memoryBlockCount<<LOG_BLOCK_SIZE);
 					if (shouldPlaceInMemoryBuffer(currentTime, info) && this.memoryWritePermits.tryAcquire()) {
 						BlockManager manager = null;
+						boolean success = false;
 						try {
 							manager = getBlockManager(info.gid, info.getId(), EMPTY_ADDRESS);
 							ExtensibleBufferedOutputStream os = new BlockOutputStream(manager);
 				            ObjectConverterUtil.write(os, is, -1);
+				            info.inode = manager.getInode();
 							memoryBufferEntries.put(info.getId(), info);
-							is = new BlockInputStream(manager, info.memoryBlockCount, false);
+							is = new BlockInputStream(manager, info.memoryBlockCount);
+							success = true;
 						} finally {
 							this.memoryWritePermits.release();
+							if (!success && manager != null) {
+								manager.free(false);
+								info.inode = EMPTY_ADDRESS;
+							}
 						}
-					} else {
-						this.toString();
 					}
 				} else {
 					return null;
@@ -540,11 +570,18 @@
         }
 	}
 
+	/**
+	 * Determine if an object should be in the memory buffer.
+	 * Adds are indicated by a current time of 0.
+	 * @param currentTime
+	 * @param info
+	 * @return
+	 */
 	private boolean shouldPlaceInMemoryBuffer(long currentTime, PhysicalInfo info) {
 		Map.Entry<PhysicalInfo, Long> lowest = memoryBufferEntries.firstEntry();
-		return lowest == null 
-				|| (blocksInuse.getTotalBits() - blocksInuse.getBitsSet()) > (info.memoryBlockCount<<3)
-				|| lowest.getKey().getOrderingValue() < computeNextOrderingValue(currentTime, info.getLastAccess(), info.getOrderingValue());
+		return (blocksInuse.getTotalBits() - blocksInuse.getBitsSet()) > (info.memoryBlockCount<<2)
+				|| (lowest != null && lowest.getKey().block != EMPTY_ADDRESS 
+						&& lowest.getKey().getOrderingValue() < (currentTime>0?memoryBufferEntries.computeNextOrderingValue(currentTime, info.getLastAccess(), info.getOrderingValue()):info.getOrderingValue()));
 	}
 	
 	@Override
@@ -577,7 +614,7 @@
 			return;
 		}
 		PhysicalInfo info = map.remove(id);
-		free(id, info, false);
+		free(id, info, false, false);
 	}
 
 	@Override
@@ -588,51 +625,54 @@
 		}
 		synchronized (map) {
 			for (Map.Entry<Long, PhysicalInfo> entry : map.entrySet()) {
-				free(entry.getKey(), entry.getValue(), false);
+				free(entry.getKey(), entry.getValue(), false, false);
 			}
 			return map.keySet();
 		}
 	}
 	
-	int free(Long oid, PhysicalInfo info, boolean demote) {
-		memoryBufferEntries.remove(oid);
+	int free(Long oid, PhysicalInfo info, boolean demote, boolean stealDataBlock) {
 		if (info == null) {
 			return EMPTY_ADDRESS;
 		}
 		synchronized (info) {
 			memoryBufferEntries.remove(oid);
-			if (info.inode == EMPTY_ADDRESS) {
-				return EMPTY_ADDRESS;
-			}
-			BlockManager bm = getBlockManager(info.gid, oid, info.inode);
-			info.inode = EMPTY_ADDRESS;
 			if (demote) {
+				if (info.inode == EMPTY_ADDRESS) {
+					return EMPTY_ADDRESS;
+				}
+				BlockManager bm = getBlockManager(info.gid, oid, info.inode);
+				info.inode = EMPTY_ADDRESS;
 				if (info.block == EMPTY_ADDRESS) {
-					BlockInputStream is = new BlockInputStream(bm, info.memoryBlockCount, true);
+					storageWrites.getAndIncrement();
+					BlockInputStream is = new BlockInputStream(bm, info.memoryBlockCount);
 					BlockStore blockStore = sizeBasedStores[info.sizeIndex];
+					info.block = getAndSetNextClearBit(blockStore);
 					FileStore fs = blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
-					info.block = getAndSetNextClearBit(blockStore);
 					long blockOffset = (info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
 					byte[] b = new byte[BLOCK_SIZE];
 					int read = 0;
-					boolean errored = false;
-					while ((read = is.read(b, 0, b.length)) != -1) {
-						if (!errored) {
-							try {
-								fs.write(blockOffset, b, 0, read);
-								blockOffset+=read;
-							} catch (Throwable e) {
-								//just continue to free
-								errored = true;
-								LogManager.logError(LogConstants.CTX_DQP, e, "Error transferring block to storage " + oid); //$NON-NLS-1$
-							}
+					try {
+						while ((read = is.read(b, 0, b.length)) != -1) {
+							fs.write(blockOffset, b, 0, read);
+							blockOffset+=read;
 						}
+					} catch (Throwable e) {
+						//shouldn't happen, but we'll invalidate this write and continue
+						blockStore.blocksInUse.clear(info.block);
+						info.block = EMPTY_ADDRESS;
+						//just continue to free
+						LogManager.logError(LogConstants.CTX_DQP, e, "Error transferring block to storage " + oid); //$NON-NLS-1$
 					}
-					return is.free(true);
+					return is.free(stealDataBlock);
 				}
-				return bm.free(true);
+				return bm.free(stealDataBlock);
 			}
-			bm.free(false);
+			if (info.inode != EMPTY_ADDRESS) {
+				BlockManager bm = getBlockManager(info.gid, oid, info.inode);
+				info.inode = EMPTY_ADDRESS;
+				bm.free(false);
+			}
 			if (info.block != EMPTY_ADDRESS) {
 				BlockStore blockStore = sizeBasedStores[info.sizeIndex];
 				blockStore.blocksInUse.clear(info.block);
@@ -650,39 +690,34 @@
 		return result;
 	}
 	
-	/**
-	 * Stop the world eviction.  Hopefully this should rarely happen.
-	 * @return the stole dataBlock
-	 */
-	int evictFromMemoryBuffer() {
+	int evictFromMemoryBuffer(boolean steal) {
 		memoryEvictionLock.writeLock().lock();
 		int next = -1;
-		boolean writeLocked = true;
 		try {
 			for (int i = 0; i < 10 && next == EMPTY_ADDRESS; i++) {
 				AutoCleanupUtil.doCleanup();
+				//scan the eviction queue looking for a victim
 				Iterator<Map.Entry<PhysicalInfo, Long>> iter = memoryBufferEntries.getEvictionQueue().entrySet().iterator();
-				while ((next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS && iter.hasNext()) {
+				while (((!steal && lowBlocks()) || (steal && (next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS)) && iter.hasNext()) {
 					Map.Entry<PhysicalInfo, Long> entry = iter.next();
 					PhysicalInfo info = entry.getKey();
 					synchronized (info) {
 						if (info.inode == EMPTY_ADDRESS) {
 							continue;
 						}
-						memoryEvictionLock.writeLock().unlock();
-						writeLocked = false;
-						next = free(entry.getValue(), info, true);
+						next = free(entry.getValue(), info, true, steal);
+						if (!steal) {
+							next = 0;
+						}
 					}
 					break;
 				}
 			} 
-			if (next == -1) {
+			if (steal && next == -1) {
 				throw new AssertionError("Could not free space for pending write"); //$NON-NLS-1$
 			}
 		} finally {
-			if (writeLocked) {
-				memoryEvictionLock.writeLock().unlock();
-			}
+			memoryEvictionLock.writeLock().unlock();
 		}
 		return next;
 	}
@@ -711,4 +746,12 @@
 		this.maxStorageObjectSize = maxStorageBlockSize;
 	}
 	
+	public long getStorageReads() {
+		return storageReads.get();
+	}
+	
+	public long getStorageWrites() {
+		return storageWrites.get();
+	}
+	
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -291,30 +291,11 @@
     
     private AtomicInteger activeBatchKB = new AtomicInteger();
     
-    //combined recency/frequency lamda value between 0 and 1 lower -> LFU, higher -> LRU
-    //TODO: adaptively adjust this value.  more hits should move closer to lru
-    private final double crfLamda = .001;
+    private AtomicLong readAttempts = new AtomicLong();
     //implements a LRFU cache using the a customized crf function.  we store the value with
     //the cache entry to make a better decision about reuse of the batch
     //TODO: consider the size estimate in the weighting function
-    private PartiallyOrderedCache<Long, CacheEntry> memoryEntries = new PartiallyOrderedCache<Long, CacheEntry>(16, .75f, CONCURRENCY_LEVEL) {
-		
-		@Override
-		protected void recordAccess(Long key, CacheEntry value, boolean initial) {
-			long lastAccess = value.getLastAccess();
-			value.setLastAccess(readAttempts.get());
-			if (initial && lastAccess == 0) {
-				return;
-			}
-			double orderingValue = value.getOrderingValue();
-			orderingValue = 
-				//Frequency component
-				orderingValue*Math.pow(1-crfLamda, value.getLastAccess() - lastAccess)
-				//recency component
-				+ Math.pow(value.getLastAccess(), crfLamda);
-			value.setOrderingValue(orderingValue);
-		}
-	};
+    private OrderedCache<Long, CacheEntry> memoryEntries = new OrderedCache<Long, CacheEntry>(16, .75f, CONCURRENCY_LEVEL, readAttempts);
     
     //limited size reference caches based upon the memory settings
     private WeakReferenceHashedValueCache<CacheEntry> weakReferenceCache; 
@@ -341,7 +322,6 @@
     private AtomicLong batchAdded = new AtomicLong();
     private AtomicLong readCount = new AtomicLong();
 	private AtomicLong writeCount = new AtomicLong();
-	private AtomicLong readAttempts = new AtomicLong();
 	private AtomicLong referenceHit = new AtomicLong();
 	
 	public long getBatchesAdded() {

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -25,22 +25,29 @@
 import java.util.BitSet;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.teiid.core.util.Assertion;
+
 /**
  * A segmented {@link BitSet} that supports greater concurrency
  * and faster finding of clear bits.
  */
 public class ConcurrentBitSet {
 	
+	private static final int ADDRESS_BITS_PER_TOP_VALUE = 18;
+	private static final int MAX_TOP_VALUE = 1 << ADDRESS_BITS_PER_TOP_VALUE;
+	
 	private static class Segment {
 		int offset;
 		int maxBits;
 		int startSearch;
 		int bitsSet;
-		final private BitSet bitSet;
+		int[] topVals;
+		final BitSet bitSet;
 		
 		public Segment(int bitCount) {
 			bitSet = new BitSet();
 			maxBits = bitCount;
+			this.topVals = new int[Math.max(1, maxBits >> ADDRESS_BITS_PER_TOP_VALUE)];
 		}
 	}
 
@@ -49,16 +56,14 @@
 	private AtomicInteger counter = new AtomicInteger();
 	private AtomicInteger bitsSet = new AtomicInteger();
 	private Segment[] segments;
+	private boolean compact;
 	
 	public ConcurrentBitSet(int maxBits, int concurrencyLevel) {
-		if (maxBits < concurrencyLevel) {
-			concurrencyLevel = 1;
-			while (maxBits > 2*concurrencyLevel) {
-				concurrencyLevel <<=1;
-			}
+		Assertion.assertTrue(maxBits > 0);
+		while ((bitsPerSegment = maxBits/concurrencyLevel) < concurrencyLevel) {
+			concurrencyLevel >>= 1;
 		}
 		segments = new Segment[concurrencyLevel];
-		bitsPerSegment = maxBits/concurrencyLevel;
 		int modBits = maxBits%concurrencyLevel;
 		if (modBits > 0) {
 			bitsPerSegment++;
@@ -76,13 +81,17 @@
 	public void clear(int bitIndex) {
 		checkIndex(bitIndex);
 		Segment s = segments[bitIndex/bitsPerSegment];
-		bitIndex = bitIndex%bitsPerSegment;
+		int segmentBitIndex = bitIndex%bitsPerSegment;
 		synchronized (s) {
-			if (!s.bitSet.get(bitIndex)) {
+			if (!s.bitSet.get(segmentBitIndex)) {
 				throw new AssertionError(bitIndex + " not set"); //$NON-NLS-1$
 			}
-			s.bitSet.clear(bitIndex);
+			if (compact) {
+				s.startSearch = Math.min(s.startSearch, segmentBitIndex);
+			}
+			s.bitSet.clear(segmentBitIndex);
 			s.bitsSet--;
+			s.topVals[segmentBitIndex>>ADDRESS_BITS_PER_TOP_VALUE]--;
 		}
 		bitsSet.decrementAndGet();
 	}
@@ -100,14 +109,35 @@
 				if (s.bitsSet == s.maxBits) {
 					continue;
 				}
-				nextBit = s.bitSet.nextClearBit(s.startSearch);
-				if (nextBit >= s.maxBits - 1) {
-					s.startSearch = 0;
-					nextBit = s.bitSet.nextClearBit(s.startSearch);
-					if (nextBit >= s.maxBits) {
-						throw new AssertionError("could not find clear bit"); //$NON-NLS-1$
+				int indexSearchStart = s.startSearch >> ADDRESS_BITS_PER_TOP_VALUE;
+				for (int j = indexSearchStart; j < s.topVals.length; j++) {
+					if (s.topVals[j] == MAX_TOP_VALUE) {
+						continue;
 					}
+					if (s.topVals[j] == 0) {
+						if (j == start) {
+							nextBit = s.startSearch;
+							break;
+						}
+						nextBit = j * MAX_TOP_VALUE;
+						break;
+					}
+					int index = j * MAX_TOP_VALUE;
+					if (j == indexSearchStart) {
+						index = s.startSearch;
+					}
+					nextBit = s.bitSet.nextClearBit(index);
+					if (s.startSearch > 0 && nextBit >= s.maxBits - 1) {
+						s.startSearch = 0;
+						//fallback full scan
+						nextBit = s.bitSet.nextClearBit(s.startSearch);
+					}
+					break;
 				}
+				if (nextBit >= s.maxBits) {
+					throw new AssertionError("could not find clear bit"); //$NON-NLS-1$
+				}
+				s.topVals[nextBit>>ADDRESS_BITS_PER_TOP_VALUE]++;
 				s.bitsSet++;
 				s.bitSet.set(nextBit);
 				s.startSearch = nextBit + 1;
@@ -142,4 +172,12 @@
 		return bitsPerSegment;
 	}
 	
+	/**
+	 * Set to try to always allocate against the first available block in a segment.
+	 * @param compact
+	 */
+	public void setCompact(boolean compact) {
+		this.compact = compact;
+	}
+	
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -29,6 +29,7 @@
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.teiid.common.buffer.FileStore;
@@ -50,6 +51,7 @@
 	
 	private long maxBufferSpace = DEFAULT_MAX_BUFFERSPACE;
 	private AtomicLong usedBufferSpace = new AtomicLong();
+	private AtomicInteger fileCounter = new AtomicInteger();
 	
 	private class FileInfo {
     	private File file;
@@ -175,6 +177,9 @@
     private int maxOpenFiles = DEFAULT_MAX_OPEN_FILES;
     private String directory;
     private File dirFile;
+    //use subdirectories to hold the files since we may create a relatively unbounded amount of lob files and 
+    //fs performance will typically degrade if a single directory is too large
+    private File[] subDirectories = new File[256];
 
     // State
     private Map<File, RandomAccessFile> fileCache = Collections.synchronizedMap(new LinkedHashMap<File, RandomAccessFile>() {
@@ -201,13 +206,20 @@
         }
 
         dirFile = new File(this.directory);
-        if(dirFile.exists()) {
-            if(! dirFile.isDirectory()) {
-            	throw new TeiidComponentException(QueryPlugin.Util.getString("FileStoreageManager.not_a_directory", dirFile.getAbsoluteFile())); //$NON-NLS-1$
-
+    	makeDir(dirFile);
+        for (int i = 0; i < subDirectories.length; i++) {
+        	subDirectories[i] = new File(this.directory, "b" +i); //$NON-NLS-1$
+        	makeDir(subDirectories[i]);
+        }
+    }
+    
+    private static void makeDir(File file) throws TeiidComponentException {
+    	if(file.exists()) {
+            if(! file.isDirectory()) {
+            	throw new TeiidComponentException(QueryPlugin.Util.getString("FileStoreageManager.not_a_directory", file.getAbsoluteFile())); //$NON-NLS-1$
             }
-        } else if(! dirFile.mkdirs()) {
-        	throw new TeiidComponentException(QueryPlugin.Util.getString("FileStoreageManager.error_creating", dirFile.getAbsoluteFile())); //$NON-NLS-1$
+        } else if(! file.mkdirs()) {
+        	throw new TeiidComponentException(QueryPlugin.Util.getString("FileStoreageManager.error_creating", file.getAbsoluteFile())); //$NON-NLS-1$
         }
     }
     
@@ -220,7 +232,8 @@
 	}
     
     File createFile(String name) throws IOException {
-    	File storageFile = File.createTempFile(FILE_PREFIX + name + "_", null, this.dirFile); //$NON-NLS-1$
+    	//spray the files into separate different directories in a round robin fashion.
+    	File storageFile = File.createTempFile(FILE_PREFIX + name + "_", null, this.subDirectories[fileCounter.getAndIncrement()&(this.subDirectories.length-1)]); //$NON-NLS-1$
         if (LogManager.isMessageToBeRecorded(org.teiid.logging.LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
             LogManager.logDetail(org.teiid.logging.LogConstants.CTX_BUFFER_MGR, "Created temporary storage area file " + storageFile.getAbsoluteFile()); //$NON-NLS-1$
         }

Deleted: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -1,353 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.teiid.common.buffer.Cache;
-import org.teiid.common.buffer.CacheEntry;
-import org.teiid.common.buffer.FileStore;
-import org.teiid.common.buffer.Serializer;
-import org.teiid.common.buffer.StorageManager;
-import org.teiid.core.TeiidComponentException;
-import org.teiid.logging.LogConstants;
-import org.teiid.logging.LogManager;
-import org.teiid.logging.MessageLevel;
-import org.teiid.query.QueryPlugin;
-
-/**
- * A minimally blocking Cache using {@link FileStore}s.
- * 
- * Storage files with significant unused space are compacted after reaching a size threshold.
- * Compacting the empty space may be costly as it is currently implemented by blocking all
- * read/write operations against the group.
- * 
- * Since empty is concentrated at the beginning of the store a better approach could
- * be to users smaller file segments and move batches off of the beginning.
- * 
- * There is unfortunately a significant memory footprint per group.
- */
-public class FileStoreCache implements Cache {
-	
-	private static class CacheGroup {
-		private static final int RECLAIM_TAIL_SIZE = IO_BUFFER_SIZE << 5;
-		private static final int MAX_FREE_SPACE = 1 << 11;
-		FileStore store;
-		long tail;
-		long unusedSpace = 0;
-		ReadWriteLock lock = new ReentrantReadWriteLock();
-		Map<Long, long[]> physicalMapping = Collections.synchronizedMap(new HashMap<Long, long[]>());
-		List<Long> freed = Collections.synchronizedList(new LinkedList<Long>()); 
-		
-		CacheGroup(FileStore store) {
-			this.store = store;
-		}
-		
-		void freeBatch(Long batch) throws IOException {
-			long[] info = physicalMapping.remove(batch);
-			if (info != null) { 
-				if (info[0] + info[1] == tail) {
-					tail -= info[1];
-					if (store.getLength() - tail > RECLAIM_TAIL_SIZE) {
-						store.setLength(tail);						
-					}
-				} else {
-					unusedSpace += info[1]; 
-				}
-			}
-		}
-		
-		private long getOffset(Long gid, long compactionThreshold) throws IOException {
-			long currentLength = store.getLength();
-			if (currentLength <= compactionThreshold || unusedSpace * 4 <= currentLength * 3) {
-				return tail;
-			}
-			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-				LogManager.logDetail(LogConstants.CTX_DQP, "Running full compaction on", gid); //$NON-NLS-1$
-			}
-			byte[] buffer = new byte[IO_BUFFER_SIZE];
-			TreeSet<long[]> bySize = new TreeSet<long[]>(new Comparator<long[]>() {
-				@Override
-				public int compare(long[] o1, long[] o2) {
-					int signum = Long.signum(o1[1] - o2[1]);
-					if (signum == 0) {
-						//take the upper address first
-						return Long.signum(o2[0] - o1[0]);
-					}
-					return signum;
-				}
-			});
-			TreeSet<long[]> byAddress = new TreeSet<long[]>(new Comparator<long[]>() {
-				
-				@Override
-				public int compare(long[] o1, long[] o2) {
-					return Long.signum(o1[0] - o2[0]);
-				}
-			});
-			synchronized (physicalMapping) {
-				for (long[] value : physicalMapping.values()) {
-					if (value == null) {
-						continue;
-					}
-					bySize.add(value);
-					byAddress.add(value);
-				}
-			}
-			long lastEndAddress = 0;
-			long usedSpace = tail - unusedSpace;
-			while (!byAddress.isEmpty()) {
-				long[] info = byAddress.pollFirst();
-				bySize.remove(info);
-
-				long currentOffset = info[0];
-				long space = currentOffset - lastEndAddress;
-				boolean movedLast = false;
-				while (space > 0 && !bySize.isEmpty()) {
-					long[] last = byAddress.last();
-					if (last[1] > space) {
-						break;
-					}
-					movedLast = true;
-					byAddress.pollLast();
-					bySize.remove(last);
-					move(last, lastEndAddress, buffer);
-					space -= last[1];
-					lastEndAddress += last[1];
-				}
-				if (movedLast && !byAddress.isEmpty()) {
-					long[] last = byAddress.last();
-					long currentLastEndAddress = last[0] + last[1]; 
-					if (currentLastEndAddress < currentLength>>1) {
-						lastEndAddress = currentLastEndAddress;
-						break;
-					}
-				}
-				while (space > 0 && !bySize.isEmpty()) {
-					long[] smallest = bySize.first();
-					if (smallest[1] > space) {
-						break;
-					}
-					bySize.pollFirst();
-					byAddress.remove(smallest);
-					move(smallest, lastEndAddress, buffer);
-					space -= smallest[1];
-					lastEndAddress += smallest[1];
-				}
-				
-				if (space > MAX_FREE_SPACE) {
-					move(info, lastEndAddress, buffer);
-				}
-				lastEndAddress = info[0] + info[1];
-			}
-			store.setLength(lastEndAddress);
-			tail = lastEndAddress;
-			unusedSpace = lastEndAddress - usedSpace;
-			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
-				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Compacted store", gid, "pre-size", currentLength, "post-size", lastEndAddress); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
-			}
-			return tail;
-		}
-		
-		private void move(long[] toMove, long newOffset, byte[] buffer) throws IOException {
-			long oldOffset = toMove[0];
-			toMove[0] = newOffset;
-			int size = (int)toMove[1];
-			while (size > 0) {
-				int toWrite = Math.min(IO_BUFFER_SIZE, size);
-				store.readFully(oldOffset, buffer, 0, toWrite);
-				store.write(newOffset, buffer, 0, toWrite);
-				size -= toWrite;
-				oldOffset += toWrite;
-				newOffset += toWrite;
-			}
-		}
-	}
-
-	private static final int COMPACTION_THRESHOLD = 1 << 24; //start checking at 16 megs
-	private static final int IO_BUFFER_SIZE = 1<<13;
-	int compactionThreshold = COMPACTION_THRESHOLD;
-	private ConcurrentHashMap<Long, CacheGroup> cacheGroups = new ConcurrentHashMap<Long, CacheGroup>();
-	private StorageManager storageManager;
-	
-	@Override
-	public void add(CacheEntry entry, Serializer s) throws Exception {
-		final CacheGroup group = cacheGroups.get(s.getId());
-		if (group == null) {
-			return;
-		}
-
-		group.lock.writeLock().lock();
-		try {
-			synchronized (group.freed) {
-				while (!group.freed.isEmpty()) {
-					group.freeBatch(group.freed.remove(0));
-				}
-			}
-			final ByteBuffer buffer = ByteBuffer.allocate(IO_BUFFER_SIZE);
-			final long offset = group.getOffset(s.getId(), compactionThreshold);
-			ExtensibleBufferedOutputStream fsos = new ExtensibleBufferedOutputStream() {
-				@Override
-				protected ByteBuffer newBuffer() {
-					buffer.rewind();
-					return buffer;
-				}
-				
-				@Override
-				protected int flushDirect(int i) throws IOException {
-					group.store.write(offset + bytesWritten, buffer.array(), 0, i);
-					return i;
-				}
-			};
-	        ObjectOutputStream oos = new ObjectOutputStream(fsos);
-	        oos.writeInt(entry.getSizeEstimate());
-	        oos.writeLong(entry.getLastAccess());
-	        oos.writeDouble(entry.getOrderingValue());
-	        s.serialize(entry.getObject(), oos);
-	        oos.close();
-	        long size = fsos.getBytesWritten();
-	        long[] info = new long[] {offset, size};
-	        group.physicalMapping.put(entry.getId(), info);
-	        group.tail = Math.max(group.tail, offset + size);
-			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
-				LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, s.getId(), entry.getId(), "batch written starting at:", offset); //$NON-NLS-1$
-			}
-		} finally {
-			group.lock.writeLock().unlock();
-		}
-	}
-
-	@Override
-	public void createCacheGroup(Long gid) {
-		cacheGroups.put(gid, new CacheGroup(storageManager.createFileStore(String.valueOf(gid))));
-	}
-
-	@Override
-	public CacheEntry get(Long id, Serializer<?> serializer)
-			throws TeiidComponentException {
-		CacheGroup group = cacheGroups.get(serializer.getId());
-		if (group == null) {
-			return null;
-		}
-		try {
-			group.lock.readLock().lock();
-			long[] info = group.physicalMapping.get(id);
-			if (info == null) {
-				return null;
-			}
-			ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(group.store.createInputStream(info[0]), IO_BUFFER_SIZE));
-			CacheEntry ce = new CacheEntry(id);
-			ce.setSizeEstimate(ois.readInt());
-			ce.setLastAccess(ois.readLong());
-			ce.setOrderingValue(ois.readDouble());
-			ce.setObject(serializer.deserialize(ois));
-			return ce;
-        } catch(IOException e) {
-        	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", id)); //$NON-NLS-1$
-        } catch (ClassNotFoundException e) {
-        	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", id)); //$NON-NLS-1$
-        } finally {
-        	group.lock.readLock().unlock();
-        }
-	}
-
-	@Override
-	public void remove(Long gid, Long id) {
-		CacheGroup group = cacheGroups.get(gid);
-		if (group == null) {
-			return;
-		}
-		if (group.lock.writeLock().tryLock()) {
-			try {
-				try {
-					group.freeBatch(id);
-				} catch (IOException e) {
-					LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error removing batch"); //$NON-NLS-1$
-				}
-			} finally {
-				group.lock.writeLock().unlock();
-			}
-		} else {
-			group.freed.add(id);
-		}
-	}
-	
-	@Override
-	public void addToCacheGroup(Long gid, Long oid) {
-		CacheGroup group = cacheGroups.get(gid);
-		if (group == null) {
-			return;
-		}
-		group.physicalMapping.put(oid, null);
-	}
-
-	@Override
-	public Collection<Long> removeCacheGroup(Long gid) {
-		CacheGroup group = cacheGroups.remove(gid);
-		if (group == null) {
-			return Collections.emptyList();
-		}
-		group.store.remove();
-		synchronized (group.physicalMapping) {
-			return new ArrayList<Long>(group.physicalMapping.keySet());
-		}
-	}
-
-	@Override
-	public FileStore createFileStore(String name) {
-		return storageManager.createFileStore(name);
-	}
-
-	@Override
-	public void initialize() throws TeiidComponentException {
-		this.storageManager.initialize();
-	}
-	
-	public void setStorageManager(StorageManager storageManager) {
-		this.storageManager = storageManager;
-	}
-	
-	public StorageManager getStorageManager() {
-		return storageManager;
-	}
-	
-	public void setCompactionThreshold(int compactionThreshold) {
-		this.compactionThreshold = compactionThreshold;
-	}
-
-}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -26,12 +26,35 @@
 import java.util.NavigableMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
 
-public abstract class OrderedCache<K, V> {
+import org.teiid.common.buffer.BaseCacheEntry;
+
+/**
+ * A Concurrent LRFU cache.  Has assumptions that match buffermanager usage.
+ * Null values are not allowed.
+ * @param <K>
+ * @param <V>
+ */
+public class OrderedCache<K, V extends BaseCacheEntry> {
 	
-	protected Map<K, V> map = new ConcurrentHashMap<K, V>(); 
-	protected NavigableMap<V, K> expirationQueue = new ConcurrentSkipListMap<V, K>();
-	protected Map<K, V> limbo = new ConcurrentHashMap<K, V>();
+	protected Map<K, V> map; 
+	//TODO: until Java 7 ConcurrentSkipListMap has a scaling bug in that
+	//the level limits the effective map size to ~ 2^16
+	//above which it performs comparably under load to a synchronized LinkedHashMap
+	//just with more CPU overhead vs. wait time.
+	protected NavigableMap<V, K> evictionQueue = new ConcurrentSkipListMap<V, K>();
+	protected Map<K, V> limbo;
+	protected AtomicLong clock;
+    //combined recency/frequency lamda value between 0 and 1 lower -> LFU, higher -> LRU
+    //TODO: adaptively adjust this value.  more hits should move closer to lru
+	protected float crfLamda = .0002f;
+	
+	public OrderedCache(int initialCapacity, float loadFactor, int concurrencyLevel, AtomicLong clock) {
+		map = new ConcurrentHashMap<K, V>(initialCapacity, loadFactor, concurrencyLevel);
+		limbo = new ConcurrentHashMap<K, V>(initialCapacity, loadFactor, concurrencyLevel);
+		this.clock = clock;
+	}
 		
 	public V get(K key) {
 		V result = map.get(key);
@@ -40,9 +63,9 @@
 		}
 		if (result != null) {
 			synchronized (result) {
-				expirationQueue.remove(result);
-				recordAccess(key, result, false);
-				expirationQueue.put(result, key);
+				evictionQueue.remove(result);
+				recordAccess(result, false);
+				evictionQueue.put(result, key);
 			}
 		}
 		return result;
@@ -52,7 +75,7 @@
 		V result = map.remove(key);
 		if (result != null) {
 			synchronized (result) {
-				expirationQueue.remove(result);
+				evictionQueue.remove(result);
 			}
 		}
 		return result;
@@ -62,18 +85,18 @@
 		V result = map.put(key, value);
 		if (result != null) {
 			synchronized (result) {
-				expirationQueue.remove(result);
+				evictionQueue.remove(result);
 			}
 		}
 		synchronized (value) {
-			recordAccess(key, value, result == null);
-			expirationQueue.put(value, key);
+			recordAccess(value, result == null);
+			evictionQueue.put(value, key);
 		}
 		return result;
 	}
 	
 	public V evict() {
-		Map.Entry<V, K> entry = expirationQueue.pollFirstEntry();
+		Map.Entry<V, K> entry = evictionQueue.pollFirstEntry();
 		if (entry == null) {
 			return null;
 		}
@@ -89,6 +112,42 @@
 		return map.size();
 	}
 	
-	protected abstract void recordAccess(K key, V value, boolean initial);
+	public Map<V, K> getEvictionQueue() {
+		return evictionQueue;
+	}
 	
+	public Map.Entry<V, K> firstEntry() {
+		return evictionQueue.firstEntry();
+	}
+	
+	protected void recordAccess(BaseCacheEntry value, boolean initial) {
+		float lastAccess = value.getLastAccess();
+		value.setLastAccess(clock.get());
+		if (initial && lastAccess == 0) {
+			return; //we just want to timestamp this as created and not give it an ordering value
+		}
+		float orderingValue = value.getOrderingValue();
+		orderingValue = computeNextOrderingValue(value.getLastAccess(), lastAccess,
+				orderingValue);
+		value.setOrderingValue(orderingValue);
+	}
+
+	float computeNextOrderingValue(float currentTime,
+			float lastAccess, float orderingValue) {
+		orderingValue = 
+			(float) (//Frequency component
+			orderingValue*Math.pow(1-crfLamda, currentTime - lastAccess)
+			//recency component
+			+ Math.pow(currentTime, crfLamda));
+		return orderingValue;
+	}
+	
+	public float getCrfLamda() {
+		return crfLamda;
+	}
+	
+	public void setCrfLamda(float crfLamda) {
+		this.crfLamda = crfLamda;
+	}
+	
 }

Deleted: trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -1,149 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-
-public abstract class PartiallyOrderedCache<K, V> {
-	
-	private int maxOrderedSize = 1 << 19;
-	
-	protected Map<K, V> map; 
-	//TODO: until Java 7 ConcurrentSkipListMap has a scaling bug in that
-	//the level limits the effective map size to ~ 2^16
-	//where it performs comparably under load to a synchronized LinkedHashMap
-	//just with more CPU overhead vs. wait time.
-	//TODO: have the concurrent version be pluggable
-	protected NavigableMap<V, K> evictionQueue = new TreeMap<V, K>();
-	//when we get to extreme number of entries we overflow into lru
-	protected Map<V, K> evictionQueueHead = new LinkedHashMap<V, K>();
-	//holds entries that are being evicted, but that might not yet be in a lower caching level
-	protected Map<K, V> limbo;
-	
-	public PartiallyOrderedCache(int initialCapacity, float loadFactor, int concurrencyLevel) {
-		map = new ConcurrentHashMap<K, V>(initialCapacity, loadFactor, concurrencyLevel);
-		limbo = new ConcurrentHashMap<K, V>(initialCapacity, loadFactor, concurrencyLevel);
-	}
-	
-	public void setMaxOrderedSize(int maxOrderedSize) {
-		this.maxOrderedSize = maxOrderedSize;
-	}
-		
-	public V get(K key) {
-		V result = map.get(key);
-		if (result == null) {
-			result = limbo.get(key);
-		}
-		if (result != null) {
-			maintainQueues(key, result, null);
-		}
-		return result;
-	}
-	
-	public V remove(K key) {
-		V result = map.remove(key);
-		if (result != null) {
-			synchronized (this) {
-				if (evictionQueue.remove(result) != null) {
-					orderedRemoved();
-				} else {
-					evictionQueueHead.remove(result);
-				}
-			}
-		}
-		return result;
-	}
-
-	private void orderedRemoved() {
-		if (evictionQueue.size() < (maxOrderedSize>>1) && evictionQueueHead.size() > 0) {
-			Iterator<Map.Entry<V,K>> i = evictionQueueHead.entrySet().iterator();
-			if (i.hasNext()) {
-				Map.Entry<V, K> entry = i.next();
-				if (map.containsKey(entry.getValue())) {
-					i.remove();
-					evictionQueue.put(entry.getKey(), entry.getValue());
-				}
-			}
-		}
-	}
-	
-	public V put(K key, V value) {
-		V result = map.put(key, value);
-		maintainQueues(key, value, result);
-		return result;
-	}
-
-	private void maintainQueues(K key, V value, V old) {
-		synchronized (this) {
-			if (old != null && evictionQueue.remove(old) == null) {
-				evictionQueueHead.remove(old);
-			}
-			recordAccess(key, value, old == null);
-			evictionQueue.put(value, key);
-			if (evictionQueue.size() > maxOrderedSize) {
-				Map.Entry<V, K> last = evictionQueue.pollLastEntry();
-				if (last != null) {
-					if (map.containsKey(last.getValue()) && !evictionQueue.containsKey(last.getKey())) {
-						evictionQueueHead.put(last.getKey(), last.getValue());
-					}
-				}
-			}
-		}
-	}
-	
-	public V evict() {
-		Map.Entry<V, K> entry = evictionQueue.pollFirstEntry();
-		if (entry == null) {
-			return null;
-		}
-		synchronized (this) {
-			orderedRemoved();
-		}
-		limbo.put(entry.getValue(), entry.getKey());
-		return map.remove(entry.getValue());
-	}
-	
-	public Map<V, K> getEvictionQueue() {
-		return evictionQueue;
-	}
-	
-	public Map.Entry<V, K> firstEntry() {
-		return evictionQueue.firstEntry();
-	}
-	
-	public void finishedEviction(K key) {
-		limbo.remove(key);
-	}
-	
-	public int size() {
-		return map.size();
-	}
-	
-	protected abstract void recordAccess(K key, V value, boolean initial);
-	
-}

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -22,8 +22,8 @@
 
 package org.teiid.common.buffer;
 
+import org.teiid.common.buffer.impl.BufferFrontedFileStoreCache;
 import org.teiid.common.buffer.impl.BufferManagerImpl;
-import org.teiid.common.buffer.impl.FileStoreCache;
 import org.teiid.common.buffer.impl.MemoryStorageManager;
 import org.teiid.common.buffer.impl.SplittableStorageManager;
 import org.teiid.core.TeiidComponentException;
@@ -85,7 +85,11 @@
 			MemoryStorageManager storageManager = new MemoryStorageManager();
 			SplittableStorageManager ssm = new SplittableStorageManager(storageManager);
 			ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
-			FileStoreCache fsc = new FileStoreCache();
+			BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
+			//use conservative allocations
+			fsc.setDirect(false); //allow the space to be GCed easily
+			fsc.setMaxStorageObjectSize(1<<20);
+			fsc.setMemoryBufferSpace(1<<21);
 			fsc.setStorageManager(ssm);
 			fsc.initialize();
 		    bufferManager.setCache(fsc);

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -29,8 +29,8 @@
 
 import org.junit.Test;
 import org.teiid.common.buffer.STree.InsertMode;
+import org.teiid.common.buffer.impl.BufferFrontedFileStoreCache;
 import org.teiid.common.buffer.impl.BufferManagerImpl;
-import org.teiid.common.buffer.impl.FileStoreCache;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.query.sql.symbol.ElementSymbol;
 
@@ -102,15 +102,14 @@
 				
 	}
 	
-	/**
-	 * Forces the logic through several compaction cycles by using large strings
-	 * @throws TeiidComponentException
-	 */
-	@Test public void testCompaction() throws TeiidComponentException {
+	@Test public void testStorageWrites() throws TeiidComponentException {
 		BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
 		bm.setProcessorBatchSize(32);
 		bm.setMaxReserveKB(0);//force all to disk
-		((FileStoreCache)bm.getCache()).setCompactionThreshold(0);
+		BufferFrontedFileStoreCache fsc =(BufferFrontedFileStoreCache)bm.getCache();
+		fsc.setMaxStorageObjectSize(1 << 19);
+		fsc.setMemoryBufferSpace(1 << 19);
+		fsc.initialize();
 		bm.initialize();
 		
 		ElementSymbol e1 = new ElementSymbol("x");

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -62,4 +62,16 @@
 		}
 	}
 	
+	@Test public void testCompactBitSet() {
+		ConcurrentBitSet bst = new ConcurrentBitSet(100000, 1);
+		bst.setCompact(true);
+		for (int i = 0; i < 100000; i++) {
+			assertEquals(i, bst.getAndSetNextClearBit());
+		}
+		bst.clear(50);
+		bst.clear(500);
+		bst.clear(5000);
+		assertEquals(50, bst.getAndSetNextClearBit());
+	}
+	
 }

Deleted: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -1,61 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.junit.Test;
-
-public class TestPartiallyOrderedCache {
-
-	@Test public void testQueueMaintenance() {
-		PartiallyOrderedCache<Integer, Integer> cache = new PartiallyOrderedCache<Integer, Integer>(16, .75f, 16) {
-			
-			@Override
-			protected void recordAccess(Integer key, Integer value, boolean initial) {
-				
-			}
-		};
-		
-		cache.setMaxOrderedSize(5);
-		
-		for (int i = 0; i < 10; i++) {
-			cache.put(i, i);
-		}
-		
-		cache.get(8);
-		cache.get(1);
-		
-		List<Integer> evictions = new ArrayList<Integer>();
-		for (int i = 0; i < 10; i++) {
-			evictions.add(i);
-		}
-		//we expect natural order because the lru is converted into the sorted on natural key
-		assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), evictions);
-	}
-	
-}

Modified: trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -31,9 +31,9 @@
 import org.jboss.managed.api.annotation.ManagementProperties;
 import org.jboss.managed.api.annotation.ManagementProperty;
 import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.impl.BufferFrontedFileStoreCache;
 import org.teiid.common.buffer.impl.BufferManagerImpl;
 import org.teiid.common.buffer.impl.FileStorageManager;
-import org.teiid.common.buffer.impl.FileStoreCache;
 import org.teiid.common.buffer.impl.MemoryStorageManager;
 import org.teiid.common.buffer.impl.SplittableStorageManager;
 import org.teiid.core.TeiidComponentException;
@@ -68,6 +68,8 @@
     private int maxReserveKb = BufferManager.DEFAULT_RESERVE_BUFFER_KB;
     private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE>>20;
     private boolean inlineLobs = true;
+    private int maxMemoryBufferSpace = -1;
+    private int maxStorageObjectSize = BufferFrontedFileStoreCache.DEFAuLT_MAX_OBJECT_SIZE;
 	private FileStorageManager fsm;
 	
     /**
@@ -106,7 +108,14 @@
                 fsm.setMaxBufferSpace(maxBufferSpace*MB);
                 SplittableStorageManager ssm = new SplittableStorageManager(fsm);
                 ssm.setMaxFileSize(maxFileSize);
-                FileStoreCache fsc = new FileStoreCache();
+                BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
+                fsc.setMaxStorageObjectSize(maxStorageObjectSize);
+                if (maxMemoryBufferSpace <= 0) {
+                	//use approximately 20% of what's set aside for the reserved
+                	fsc.setMemoryBufferSpace(this.bufferMgr.getMaxReserveKB() * 200);
+                } else {
+                	fsc.setMemoryBufferSpace(maxMemoryBufferSpace);
+                }
                 fsc.setStorageManager(ssm);
                 fsc.initialize();
                 this.bufferMgr.setCache(fsc);
@@ -245,4 +254,22 @@
 	public long getReadAttempts() {
 		return bufferMgr.getReadAttempts();
 	}
+    
+    public int getMaxMemoryBufferSpace() {
+		return maxMemoryBufferSpace;
+	}
+    
+    public int getMaxStorageObjectSize() {
+		return maxStorageObjectSize;
+	}
+
+    @ManagementProperty(description="Max direct memory buffer space used by the buffer manager in MB.  -1 determines the setting automatically from the maxReserveKB (default -1).  This value cannot be smaller than maxStorageObjectSize.")
+    public void setMaxMemoryBufferSpace(int maxMemoryBufferSpace) {
+		this.maxMemoryBufferSpace = maxMemoryBufferSpace;
+	}
+
+    @ManagementProperty(description="The maximum size of a buffer managed object (typically a table page or a results batch) in bytes (default 8388608).")
+    public void setMaxStorageObjectSize(int maxStorageObjectSize) {
+		this.maxStorageObjectSize = maxStorageObjectSize;
+	}
 }

Modified: trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java	2011-10-12 23:23:02 UTC (rev 3549)
+++ trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java	2011-10-14 01:46:31 UTC (rev 3550)
@@ -29,9 +29,9 @@
 
 import org.junit.Test;
 import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.impl.BufferFrontedFileStoreCache;
 import org.teiid.common.buffer.impl.BufferManagerImpl;
 import org.teiid.common.buffer.impl.FileStorageManager;
-import org.teiid.common.buffer.impl.FileStoreCache;
 import org.teiid.common.buffer.impl.SplittableStorageManager;
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.core.util.UnitTestUtil;
@@ -55,7 +55,7 @@
         assertTrue(svc.isUseDisk());
         
         BufferManagerImpl mgr = svc.getBufferManager();
-        SplittableStorageManager ssm = (SplittableStorageManager)((FileStoreCache)mgr.getCache()).getStorageManager();
+        SplittableStorageManager ssm = (SplittableStorageManager)((BufferFrontedFileStoreCache)mgr.getCache()).getStorageManager();
         assertTrue(((FileStorageManager)ssm.getStorageManager()).getDirectory().endsWith(svc.getBufferDirectory().getName()));
     }
 



More information about the teiid-commits mailing list