[teiid-commits] teiid SVN: r3508 - in trunk: documentation/admin-guide/src/main/docbook/en-US/content and 6 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Sep 29 22:00:29 EDT 2011


Author: shawkins
Date: 2011-09-29 22:00:28 -0400 (Thu, 29 Sep 2011)
New Revision: 3508

Removed:
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStoreCache.java
Modified:
   trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java
   trunk/documentation/admin-guide/src/main/docbook/en-US/content/installation.xml
   trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
   trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java
   trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
   trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
Log:
TEIID-1750 converting storage back over to the old file algorithm (1 filestore per table/tuplebuffer) with an updated compaction algorithm

Modified: trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java
===================================================================
--- trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -50,6 +50,45 @@
  */
 public class UnitTestUtil {
 	
+	public static final class LogFormatter extends Formatter {
+		@Override
+		public String format(LogRecord record) {
+			final StringBuilder result = new StringBuilder();
+			result.append(new Timestamp(record.getMillis()));
+			result.append(" "); //$NON-NLS-1$
+			result.append(record.getLoggerName());
+			result.append(" "); //$NON-NLS-1$
+			result.append(record.getLevel());
+			result.append(" "); //$NON-NLS-1$
+			result.append(Thread.currentThread().getName());
+			result.append(" "); //$NON-NLS-1$
+			result.append(record.getMessage());
+			result.append('\n');
+			if (record.getThrown() != null) {
+				record.getThrown().printStackTrace(new PrintWriter(new Writer() {
+
+					@Override
+					public void close() throws IOException {
+						
+					}
+
+					@Override
+					public void flush() throws IOException {
+						
+					}
+
+					@Override
+					public void write(char[] cbuf, int off, int len)
+							throws IOException {
+						result.append(new String(cbuf, off, len));
+					}
+				}));
+				result.append('\n');
+			}
+			return result.toString();
+		}
+	}
+
 	public static final String PATH_SEPARATOR = "/"; //$NON-NLS-1$
 
 	private static final String DEFAULT_TESTDATA_PATH = "src/test/resources"; //$NON-NLS-1$
@@ -419,45 +458,7 @@
     	} else {
     		logger.setUseParentHandlers(false);
     		ConsoleHandler ch = new ConsoleHandler();
-    		ch.setFormatter(new Formatter() {
-				
-				@Override
-				public String format(LogRecord record) {
-					final StringBuilder result = new StringBuilder();
-					result.append(new Timestamp(record.getMillis()));
-					result.append(" "); //$NON-NLS-1$
-					result.append(record.getLoggerName());
-					result.append(" "); //$NON-NLS-1$
-					result.append(record.getLevel());
-					result.append(" "); //$NON-NLS-1$
-					result.append(record.getThreadID());
-					result.append(" "); //$NON-NLS-1$
-					result.append(record.getMessage());
-					result.append('\n');
-					if (record.getThrown() != null) {
-						record.getThrown().printStackTrace(new PrintWriter(new Writer() {
-
-							@Override
-							public void close() throws IOException {
-								
-							}
-
-							@Override
-							public void flush() throws IOException {
-								
-							}
-
-							@Override
-							public void write(char[] cbuf, int off, int len)
-									throws IOException {
-								result.append(new String(cbuf, off, len));
-							}
-						}));
-						result.append('\n');
-					}
-					return result.toString();
-				}
-			});
+    		ch.setFormatter(new LogFormatter());
     		ch.setLevel(Level.FINEST);
     		logger.addHandler(ch);
     	}

Modified: trunk/documentation/admin-guide/src/main/docbook/en-US/content/installation.xml
===================================================================
--- trunk/documentation/admin-guide/src/main/docbook/en-US/content/installation.xml	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/documentation/admin-guide/src/main/docbook/en-US/content/installation.xml	2011-09-30 02:00:28 UTC (rev 3508)
@@ -98,7 +98,8 @@
 	/deployers
 	   /teiid.deployer
 	/lib
-	/teiid-examples]]></programlisting>     
+	/teiid-examples
+	/tmp/teiid]]></programlisting>     
 	</example>
     <section>
         <title>/deploy/teiid/teiid-jboss-beans.xml</title>
@@ -163,6 +164,14 @@
         <title>teiid-docs</title>
         <para>This directory contains the PDF documents related Teiid and Teiid development. </para>
     </section>
+    
+    <section>
+        <title>tmp/teiid</title>
+        <para>This directory contains temporary files created by Teiid.  These are mostly created by the buffer manager.  
+        These files are not needed across a VM restart.  Heavy usage of large data sets will create lots of temporary files.
+        In heavy usage scenarios, consider pointing the buffer directory at a partition that is routinely defragmented.
+        </para>
+    </section>
          
    </section>   
    

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -31,9 +31,11 @@
  */
 public interface Cache extends StorageManager {
 	void createCacheGroup(Long gid); //called prior to adding an entry
+	//TODO: this should use a callback on the buffermangaer to remove memory entries
+	//without materializing all group keys
 	Collection<Long> removeCacheGroup(Long gid);
 	void addToCacheGroup(Long gid, Long oid); 
 	CacheEntry get(Long id, Serializer<?> serializer) throws TeiidComponentException;
-	void add(CacheEntry entry, Serializer<?> s);
+	void add(CacheEntry entry, Serializer<?> s) throws Exception;
 	void remove(Long gid, Long id);
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -25,7 +25,6 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -224,28 +223,6 @@
 		};
 	}
 	
-    public ByteBuffer getBuffer(long start, int length, boolean allocate) throws IOException {
-    	byte[] b = new byte[length];
-    	if (!allocate) {
-    		readFully(start, b, 0, length);
-    	}
-    	return ByteBuffer.wrap(b);
-    }
-    
-    public void updateFromBuffer(ByteBuffer bb, long start) throws IOException {
-    	byte[] b = null;
-    	int offset = 0;
-    	bb.rewind();
-    	if (bb.hasArray()) {
-    		b = bb.array();
-    		offset = bb.arrayOffset();
-    	} else {
-    		b = new byte[bb.limit()];
-    		bb.get(b);
-    	}
-    	write(start, b, offset, bb.limit());
-    }
-	
 	public InputStream createInputStream(final long start) {
 		return createInputStream(start, -1);
 	}

Deleted: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -1,191 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
- 
- package org.teiid.common.buffer.impl;
-
-import java.util.BitSet;
-
-import org.teiid.common.buffer.impl.FileStoreCache.BlockInfo;
-
-/**
- * Extends a {@link BitSet} by adding a cumulative total and a 
- * first level index to speed queries against large bitsets.
- */
-public class BlockBitSetTree {
-	
-	private static final int LOG_BITS_PER_BLOCK = FileStoreCache.LOG_BLOCK_SIZE + 3;
-	private static final int MAX_TOP_VALUE = 1 << LOG_BITS_PER_BLOCK;
-	private int maxIndex;
-	private int bitsSet;
-	private int[] topVals;
-	private BlockManager blockManager;
-	private int blockCount = 0;
-	
-	public BlockBitSetTree(int maxIndex, BlockManager blockManager) {
-		this.maxIndex = maxIndex;
-		this.blockManager = blockManager;
-		this.topVals = new int[(maxIndex >> (FileStoreCache.LOG_BLOCK_SIZE + 3)) + 1];
-	}
-	
-	public int getMaxIndex() {
-		return maxIndex;
-	}
-	
-	/**
-	 * Set the given bit at the index.
-	 * @param bitIndex
-	 * @param value
-	 */
-	public synchronized void set(int bitIndex, boolean value) {
-		getOrSet(bitIndex, value, true);
-	}
-	
-	public synchronized boolean get(int bitIndex) {
-		return getOrSet(bitIndex, false, false);
-	}
-	
-	private boolean getOrSet(int bitIndex, boolean value, boolean update) {
-		if (bitIndex > maxIndex) {
-			throw new ArrayIndexOutOfBoundsException(bitIndex);
-		}
-		int blockIndex = bitIndex>>LOG_BITS_PER_BLOCK;
-		BlockInfo bb = null;
-		if (blockIndex >= blockCount) {
-			if (!update) {
-				return false;
-			}
-			for (; blockCount < blockIndex+1; blockCount++) {
-				bb = blockManager.allocateBlock(blockCount);
-				bb.buf.position(0);
-				int longsPerBlock = FileStoreCache.BLOCK_SIZE >> 6;
-				for (int j = 0; j < longsPerBlock; j++) {
-					bb.buf.putLong(0);
-				}
-			}
-		} else {
-			bb = blockManager.getBlock(blockIndex);
-		}
-		int relativeIndex = bitIndex&(MAX_TOP_VALUE-1);
-		int longByteIndex = (relativeIndex>>6)<<3;
-		long word = bb.buf.getLong(longByteIndex);
-		long mask = 1L << bitIndex;
-		boolean currentValue = ((word & mask) != 0);
-		if (!update) {
-			return currentValue;
-		}
-		if (currentValue == value) {
-			return currentValue;
-		}
-		if (value) {
-			word |= mask;
-		} else {
-			word &= ~mask;
-		}
-		bb.buf.putLong(longByteIndex, word);
-		blockManager.updateBlock(bb);
-		int topIndex = bitIndex >> LOG_BITS_PER_BLOCK;
-		int increment = value?1:-1;
-		bitsSet+=increment;
-		topVals[topIndex]+=increment;
-		return currentValue;
-	}
-	
-	public synchronized int getBitsSet() {
-		return bitsSet;
-	}
-	
-	public synchronized int nextClearBit(int fromIndex) {
-		int start = fromIndex >> LOG_BITS_PER_BLOCK;
-		for (int i = start; i < topVals.length; i++) {
-			if (topVals[i] == MAX_TOP_VALUE) {
-				continue;
-			}
-			if (topVals[i] == 0) {
-				if (i == start) {
-					return fromIndex;
-				}
-				return i * MAX_TOP_VALUE;
-			}
-			int relativeIndex = 0;
-			if (i == start) {
-				relativeIndex = fromIndex&(MAX_TOP_VALUE-1);
-			}
-			BlockInfo bb = blockManager.getBlock(i);
-			
-			int longByteIndex = (relativeIndex>>6)<<3;
-			
-			long word = ~bb.buf.getLong(longByteIndex) & (-1l << relativeIndex);
-
-			while (true) {
-			    if (word != 0) {
-			    	return longByteIndex*8 + (i * MAX_TOP_VALUE) + Long.numberOfTrailingZeros(word);
-			    }
-			    longByteIndex+=8;
-			    if (longByteIndex > FileStoreCache.BLOCK_MASK) {
-			    	break;
-			    }
-			    word = ~bb.buf.getLong(longByteIndex);
-			}
-		}
-		return -1;
-	}
-	
-	public synchronized int nextSetBit(int fromIndex) {
-		if (bitsSet == 0) {
-			return -1;
-		}
-		int start = fromIndex >> LOG_BITS_PER_BLOCK;
-		for (int i = start; i < topVals.length; i++) {
-			if (topVals[i] == 0) {
-				continue;
-			}
-			if (topVals[i] == MAX_TOP_VALUE) {
-				if (i == start) {
-					return fromIndex;
-				}
-				return i * MAX_TOP_VALUE;
-			}
-			int relativeIndex = 0;
-			if (i == start) {
-				relativeIndex = fromIndex&(MAX_TOP_VALUE-1);
-			}
-			BlockInfo bb = blockManager.getBlock(i);
-			
-			int longByteIndex = (relativeIndex>>6)<<3;
-			
-			long word = bb.buf.getLong(longByteIndex) & (-1l << relativeIndex);
-
-			while (true) {
-			    if (word != 0) {
-			    	return longByteIndex*8 + (i * MAX_TOP_VALUE) + Long.numberOfTrailingZeros(word);
-			    }
-			    longByteIndex+=8;
-			    if (longByteIndex > FileStoreCache.BLOCK_MASK) {
-			    	break;
-			    }
-			    word = bb.buf.getLong(longByteIndex);
-			}
-		}
-		return -1;
-	}
-	
-}

Deleted: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -1,238 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.teiid.common.buffer.impl.FileStoreCache.BlockInfo;
-
-/**
- * Represents the logical structure of a cache group / directory
- * 
- * Implemented by a closed hash table using a single step linear probe with delayed removal.
- * Uses power of 2 hashing.
- * 
- * Provides an extremely simple hash structure that rivals {@link HashMap} performance and
- * is directly mapped to {@link ByteBuffer}s to avoid serialization overhead.
- * 
- * Does not expect keys or values to be negative.
- */
-public class BlockClosedLongIntHashTable {
-	
-	private enum Mode {
-		GET,
-		UPDATE,
-		REMOVE
-	}
-	
-	private static final int BYTES_PER_ROW = 12; //8+4
-	private static final int BLOCK_SIZE = FileStoreCache.BLOCK_DATA_BYTES/BYTES_PER_ROW;
-	private static final float LOAD_FACTOR = .7f;
-	private static final int MIN_SIZE = 1 << (31 - Integer.numberOfLeadingZeros(BLOCK_SIZE)); //should fit in a single block
-	
-	static final int EMPTY = -1;
-	private static final int REMOVED = -2;
-
-	protected int size;
-	protected int capacityMask = EMPTY;
-	protected BlockManager blockManager;
-	
-	public BlockClosedLongIntHashTable(BlockManager blockManager) {
-		this.blockManager = blockManager;
-	}
-	
-	private void init(int capacity) {
-		int currentBlockCount = blockCount(capacityMask + 1);
-		int desiredBlockCount = blockCount(capacity);
-		if (capacity > capacityMask) {
-			for (int i = currentBlockCount; i < desiredBlockCount; i++) {
-				BlockInfo bb = blockManager.allocateBlock(i);
-				empty(bb.buf, BLOCK_SIZE);
-				blockManager.updateBlock(bb);
-			}
-		}
-		capacityMask = capacity - 1;
-	}
-
-	private void empty(ByteBuffer bb, int toIndex) {
-		bb.position(0);
-		for (int j = 0; j < toIndex; j++) {
-			bb.putLong(j * BYTES_PER_ROW, EMPTY);
-		}
-	}
-
-	private static int blockCount(int size) {
-		int currentBlockCount = (size) / BLOCK_SIZE;
-		if (size%BLOCK_SIZE > 0) {
-			currentBlockCount++;
-		}
-		return currentBlockCount;
-	}
-	
-	public synchronized int put(long key, int value) {
-		int result = getOrUpdate(key, value, Mode.UPDATE);
-		if ((result == EMPTY || result == REMOVED) && ++size > LOAD_FACTOR*capacityMask) {
-			int newCapacity = (capacityMask+1)<<1;
-			int oldLength = capacityMask + 1;
-			init(newCapacity);
-			rehash(oldLength, newCapacity);
-		}
-		return result;
-	}
-
-	public synchronized int get(long key) {
-		return getOrUpdate(key, EMPTY, Mode.GET);
-	}
-	
-	public synchronized int remove(long key) {
-		int result = getOrUpdate(key, EMPTY, Mode.REMOVE);
-		if (result != EMPTY && --size*LOAD_FACTOR < capacityMask>>3 && (capacityMask+1)>>1 >= MIN_SIZE) {
-			//reduce the size of the table by half
-			int oldLength = capacityMask + 1;
-			capacityMask >>= 1;
-			rehash(oldLength, oldLength>>1);
-			int oldBlocks = blockCount(oldLength);
-			int newBlocks = blockCount(capacityMask +1);
-			for (int i = oldBlocks-1; i >= newBlocks; i--) {
-				blockManager.freeBlock(i);
-			}
-		}
-		return result;
-	}
-	
-	private void rehash(int oldLength, int newLength) {
-		BlockInfo lastBlockInfo = null;
-		ByteBuffer lastBlock = null;
-		for (int i = 0; i < oldLength; i++) {
-			int relativeIndex = i%BLOCK_SIZE;
-			if (lastBlock == null || relativeIndex == 0) {
-				int lastIndex = i/BLOCK_SIZE;
-				lastBlockInfo = blockManager.getBlock(lastIndex);
-				lastBlock = lastBlockInfo.buf;
-				if (i < newLength) {
-					byte[] buf = new byte[FileStoreCache.BLOCK_DATA_BYTES];
-					lastBlock.position(0);
-					lastBlock.get(buf);
-					ByteBuffer copyBlock = ByteBuffer.wrap(buf);
-					empty(lastBlock, Math.min(BLOCK_SIZE, oldLength - lastIndex*BLOCK_SIZE));
-					blockManager.updateBlock(lastBlockInfo);
-					lastBlock = copyBlock;
-				}
-			}
-			lastBlock.position(relativeIndex*BYTES_PER_ROW);
-			long oldKey = lastBlock.getLong();
-			int oldValue = lastBlock.getInt();
-			if (oldKey != REMOVED && oldKey != EMPTY) {
-				getOrUpdate(oldKey, oldValue, Mode.UPDATE);
-			}
-		}
-	}
-
-	@SuppressWarnings("null")
-	protected int getOrUpdate(long key, int value, Mode mode) {
-		if (capacityMask == EMPTY) {
-			if (mode == Mode.GET || mode == Mode.REMOVE) {
-				return EMPTY;
-			}
-			init(MIN_SIZE);
-		}
-		int i = hashIndex(key);
-		BlockInfo lastBlockInfo = null;
-		long old = EMPTY;
-		int position = 0;
-		while (true) {
-			int relativeIndex = i%BLOCK_SIZE;
-			if (lastBlockInfo == null || relativeIndex == 0) {
-				int index = i/BLOCK_SIZE;
-				lastBlockInfo = blockManager.getBlock(index);
-			}
-			position = relativeIndex*BYTES_PER_ROW;
-			old = lastBlockInfo.buf.getLong(position);
-			if (old == EMPTY || old == key || (mode == Mode.UPDATE && old == REMOVED)) {
-				break;
-			}
-	        i = (i + 1) & capacityMask;
-		}
-		int result = EMPTY;
-		if (old != EMPTY && old != REMOVED) {
-			result = lastBlockInfo.buf.getInt(position + 8);
-		}
-		switch (mode) {
-		case GET:
-			return result;
-		case UPDATE:
-			lastBlockInfo.buf.putLong(position, key);
-			lastBlockInfo.buf.putInt(position + 8, value);
-			blockManager.updateBlock(lastBlockInfo);
-			return result;
-		case REMOVE:
-			if (old == EMPTY || old == REMOVED) {
-				return EMPTY;
-			}
-			lastBlockInfo.buf.putLong(position, REMOVED);
-			blockManager.updateBlock(lastBlockInfo);
-			return result;
-		default:
-			throw new AssertionError();
-		}
-	}
-	
-	private int hashIndex(long key) {
-		//start with the usual long hash
-		int primaryHash = (int)(key ^ (key >>> 32));
-		//allow the lower bits to spread the entries
-		primaryHash += primaryHash <<= 2;
-		primaryHash += primaryHash <<= 3;
-		return primaryHash & capacityMask;
-	}
-	
-	public synchronized int size() {
-		return size;
-	}
-
-	public synchronized Map<Long, Integer> remove() {
-		Map<Long, Integer> result = new HashMap<Long, Integer>();
-		BlockInfo lastBlockInfo = null;
-		int blockIndex = 0;
-		for (int i = capacityMask; i >= 0; i--) {
-			int relativeIndex = i%BLOCK_SIZE;
-			if (lastBlockInfo == null || relativeIndex == BLOCK_SIZE - 1) {
-				if (lastBlockInfo != null) {
-					blockManager.freeBlock(blockIndex);
-				}
-				blockIndex = i/BLOCK_SIZE;
-				lastBlockInfo = blockManager.getBlock(blockIndex);
-			}
-			lastBlockInfo.buf.position(relativeIndex*BYTES_PER_ROW);
-			long key = lastBlockInfo.buf.getLong();
-			if (key != EMPTY && key != REMOVED) {
-				result.put(key, lastBlockInfo.buf.getInt());
-			}
-		}
-		blockManager.free();
-		return result;
-	}
-
-}

Deleted: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -1,51 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import org.teiid.common.buffer.impl.FileStoreCache.BlockInfo;
-
-/**
- * Represents an INode
- * 
- * Returned BlockInfo may be shared.  If shared there and no guarantees about position and mark.
- * in particular system/index blocks can be used by multiple threads relative methods should be
- * avoided, but may be used for exclusive write operations.
- * Otherwise the position will be 0.
- * 
- * Due to buffermanager locking, non-index data blocks can be assumed to be thread-safe. 
- */
-public interface BlockManager {
-	
-	int getInode();
-	
-	BlockInfo allocateBlock(int index);
-	
-	BlockInfo getBlock(int index);
-	
-	void updateBlock(BlockInfo block);
-	
-	void freeBlock(int index);
-	
-	void free();
-
-}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -199,6 +199,7 @@
 					cache.remove(this.id, batch);
 				}
 				ce.setSerializer(this.ref);
+				ce.setPersistent(true);
 				if (retain) {
 					addMemoryEntry(ce);
 				}
@@ -556,7 +557,11 @@
 			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
 				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, ce.getId(), "writing batch to storage, total writes: ", count); //$NON-NLS-1$
 			}
-			cache.add(ce, s);
+			try {
+				cache.add(ce, s);
+			} catch (Throwable e) {
+				LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read batch "+ ce.getId() +" later will result in an exception"); //$NON-NLS-1$ //$NON-NLS-2$
+			}
 			ce.setPersistent(true);
 		}
 		if (s.useSoftCache()) {
@@ -668,6 +673,7 @@
 		cleanSoftReferences();
 		Collection<Long> vals = cache.removeCacheGroup(id);
 		for (Long val : vals) {
+			//TODO: we will unnecessarily call remove on the cache, but that should be low cost
 			fastGet(val, prefersMemory, false);
 		}
 	}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -29,6 +29,7 @@
 public abstract class ExtensibleBufferedOutputStream extends OutputStream {
 	
     protected ByteBuffer buf;
+    protected int bytesWritten;
     
     public ExtensibleBufferedOutputStream() {
 	}
@@ -63,18 +64,22 @@
 
 	public void flush() throws IOException {
 		if (buf != null && buf.position() > 0) {
-			flushDirect();
+			bytesWritten += flushDirect();
 		}
 		buf = null;
 	}
 
 	protected abstract ByteBuffer newBuffer();
 	
-	protected abstract void flushDirect() throws IOException;
+	protected abstract int flushDirect() throws IOException;
     
     @Override
     public void close() throws IOException {
 		flush();
     }
     
+    public int getBytesWritten() {
+		return bytesWritten;
+	}
+    
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -22,17 +22,23 @@
 
 package org.teiid.common.buffer.impl;
 
+import java.io.BufferedInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedHashMap;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.teiid.common.buffer.Cache;
 import org.teiid.common.buffer.CacheEntry;
@@ -40,688 +46,292 @@
 import org.teiid.common.buffer.Serializer;
 import org.teiid.common.buffer.StorageManager;
 import org.teiid.core.TeiidComponentException;
-import org.teiid.core.TeiidRuntimeException;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
 import org.teiid.logging.MessageLevel;
 import org.teiid.query.QueryPlugin;
 
 /**
- * Implements storage against a {@link FileStore} abstraction using a filesystem paradigm.
- * The filesystem uses a 31bit address space on top of 2^14 byte blocks.
+ * A minimally blocking Cache using {@link FileStore}s.
  * 
- * Therefore there is 2^31*2^14 = 2^45 or 32 terabytes max of addressable space.
+ * Storage files with significant unused space are compacted after reaching a size threshold.
+ * Compacting the empty space may be costly as it is currently implemented by blocking all
+ * read/write operations against the group.
  * 
- * Some amount of the space is taken up by system information (inodes and use flags). 
- * This is held in a separate file.
+ * Since empty is concentrated at the beginning of the store a better approach could
+ * be to users smaller file segments and move batches off of the beginning.
  * 
- * The 64 byte inode format is:
- * 14 32 bit direct block pointers
- * 1  32 bit block indirect pointer
- * 1  32 bit block doubly indirect pointer
- * 
- * The data block format is:
- * data bytes | long gid | long oid | three byte int block num
- * 
- * The gid/oid are stored with the block so that defrag/compaction can be performed
- * with minimal blocking/lookups.
- * 
- * This means that the maximum number of blocks available to a "file" is
- * 14 + (2^14-18)/4 + ((2^14-18)/4)^2 ~= 2^24
- * 
- * Thus the max serialized object size is:     2^24*(2^14-18)  ~= 256GB.
- * 
- * The root directory "physicalMapping" is held in memory for performance,
- * but could itself be switched to using a block map.  It will grow in
- * proportion to the number of tables/tuplebuffers in use.
- * 
- * TODO: defragment
- * TODO: lobs could also be supported in this structure.
+ * There is unfortunately a significant memory footprint per group.
  */
-public class FileStoreCache implements Cache, StorageManager {
+public class FileStoreCache implements Cache {
 	
-	//TODO allow the block size to be configurable
-	static final int LOG_BLOCK_SIZE = 14;
-	static final int ADDRESS_BITS = 31;
-	static final int SYSTEM_MASK = 1<<ADDRESS_BITS;
-	static final int BLOCK_SIZE = 1 << LOG_BLOCK_SIZE;
-	static final int BLOCK_MASK = BLOCK_SIZE - 1;
-	static final int BLOCK_OVERHEAD = 8+8+3;
-	static final int BLOCK_DATA_BYTES = BLOCK_SIZE - BLOCK_OVERHEAD;
-	static final int BYTES_PER_BLOCK_ADDRESS = 4;
-	static final int ADDRESSES_PER_BLOCK = BLOCK_DATA_BYTES/BYTES_PER_BLOCK_ADDRESS;
-	static final int INODE_BYTES = 16*BYTES_PER_BLOCK_ADDRESS;
-	static final int DIRECT_POINTERS = 14;
-	static final int MAX_INDIRECT = DIRECT_POINTERS + ADDRESSES_PER_BLOCK;
-	static final int MAX_DOUBLE_INDIRECT = MAX_INDIRECT + ADDRESSES_PER_BLOCK * ADDRESSES_PER_BLOCK;
-	static final int EMPTY_ADDRESS = -1;
-
-	private final class BlockOutputStream extends
-			ExtensibleBufferedOutputStream {
-		private final BlockManager blockManager;
-		int blockNum = -1;
-		BlockInfo bi;
-
-		private BlockOutputStream(BlockManager blockManager) {
-			this.blockManager = blockManager;
-		}
-
-		@Override
-		protected ByteBuffer newBuffer() {
-			bi = blockManager.allocateBlock(++blockNum);
-			return bi.buf;
-		}
-
-		@Override
-		protected void flushDirect() throws IOException {
-			blockManager.updateBlock(bi);
-			bi = null;
-		}
-	}
-
-	private final class BitSetBlockManager implements BlockManager {
-		final int offset;
+	private static class CacheGroup {
+		private static final int MAX_FREE_SPACE = 1 << 11;
+		FileStore store;
+		long tail;
+		long unusedSpace = 0;
+		ReadWriteLock lock = new ReentrantReadWriteLock();
+		Map<Long, long[]> physicalMapping = Collections.synchronizedMap(new HashMap<Long, long[]>());
+		List<Long> freed = Collections.synchronizedList(new LinkedList<Long>()); 
 		
-		BitSetBlockManager(int offset) {
-			this.offset = offset;
+		CacheGroup(FileStore store) {
+			this.store = store;
 		}
 		
-		@Override
-		public void updateBlock(BlockInfo info) {
-			updatePhysicalBlock(info);
-		}
-
-		@Override
-		public BlockInfo getBlock(int index) {
-			return getPhysicalBlock(index + offset, true, false);
-		}
-		
-		@Override
-		public BlockInfo allocateBlock(int index) {
-			return getPhysicalBlock(index + offset, true, true);
-		}
-		
-		@Override
-		public int getInode() {
-			throw new AssertionError();
-		}
-
-		@Override
-		public void freeBlock(int index) {
-			throw new AssertionError();
-		}
-
-		@Override
-		public void free() {
-			throw new AssertionError();
-		}
-
-	}
-	
-	private enum Mode {
-		GET,
-		UPDATE,
-		ALLOCATE
-	}
-	
-	static final class BlockInfo {
-		final boolean system;
-		final int inodeOffset;
-		final ByteBuffer buf;
-		final int physicalAddress;
-		boolean dirty;
-		
-		BlockInfo(boolean system, ByteBuffer ib, int index, int inodeOffset) {
-			this.system = system;
-			this.buf = ib;
-			this.physicalAddress = index;
-			this.inodeOffset = inodeOffset;
-		}
-	}
-	
-	private final class InodeBlockManager implements BlockManager {
-		private final int inode;
-		private final long gid;
-		private final long oid;
-		private int lastBlock = -1;
-		boolean trackLast;
-
-		InodeBlockManager(long gid, long oid, int inode) {
-			if (inode == EMPTY_ADDRESS) {
-				synchronized (inodesInuse) {
-					inode = inodesInuse.nextClearBit(0);
-					if (inode == -1) {
-						throw new TeiidRuntimeException("no inodes available"); //$NON-NLS-1$
+		void freeBatch(Long batch) throws IOException {
+			long[] info = physicalMapping.remove(batch);
+			if (info != null) { 
+				if (info[0] + info[1] == tail) {
+					tail -= info[1];
+					if (store.getLength() - tail > IO_BUFFER_SIZE << 5) {
+						store.setLength(tail);						
 					}
-					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
-						LogManager.logTrace(LogConstants.CTX_DQP, "Allocating inode", inode, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
-					}
-					inodesInuse.set(inode, true);
+				} else {
+					unusedSpace += info[1]; 
 				}
-				int inodeBlock = inode/inodesPerBlock;
-				int inodePosition = INODE_BYTES*(inode%inodesPerBlock);
-				BlockInfo bb = getInodeSubBlock(inodeBlock, inodePosition);
-				bb.buf.putInt(EMPTY_ADDRESS);
-				updatePhysicalBlock(bb);
 			}
-			this.inode = inode;
-			this.gid = gid;
-			this.oid = oid;
 		}
 		
-		@Override
-		public int getInode() {
-			return inode;
-		}
-
-		@Override
-		public void updateBlock(BlockInfo info) {
-			updatePhysicalBlock(info);
-		}
-
-		@Override
-		public BlockInfo getBlock(int index) {
-			int dataBlock = getOrUpdateDataBlockIndex(index, EMPTY_ADDRESS, Mode.GET);
-			BlockInfo bb = getPhysicalBlock(dataBlock, false, false);
-			bb.buf.position(0);
-			bb.buf.limit(BLOCK_DATA_BYTES);
-			return bb;
-		}
-				
-		private int getOrUpdateDataBlockIndex(int index, int value, Mode mode) {
-			if (index >= MAX_DOUBLE_INDIRECT) {
-				throw new TeiidRuntimeException("Max block number exceeded"); //$NON-NLS-1$
+		private long getOffset(Long gid, long compactionThreshold) throws IOException {
+			long currentLength = store.getLength();
+			if (currentLength <= compactionThreshold || unusedSpace * 4 <= currentLength * 3) {
+				return tail;
 			}
-			int dataBlock = 0;
-			int position = 0;
-			int inodePosition = INODE_BYTES*(inode%inodesPerBlock);
-			BlockInfo info = getInodeSubBlock(inode/inodesPerBlock, inodePosition);
-			if (index >= MAX_INDIRECT) {
-				position = BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1);
-				BlockInfo next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT, MAX_DOUBLE_INDIRECT+1, value, mode);
-				if (next != info) {
-					info = next;
-					//should have traversed to the secondary
-					int indirectAddressBlock = (index - MAX_INDIRECT) / ADDRESSES_PER_BLOCK;
-					position = indirectAddressBlock * BYTES_PER_BLOCK_ADDRESS;
-					if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS < BLOCK_DATA_BYTES) {
-						info.buf.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+				LogManager.logDetail(LogConstants.CTX_DQP, "Running full compaction on", gid); //$NON-NLS-1$
+			}
+			byte[] buffer = new byte[IO_BUFFER_SIZE];
+			TreeSet<long[]> bySize = new TreeSet<long[]>(new Comparator<long[]>() {
+				@Override
+				public int compare(long[] o1, long[] o2) {
+					int signum = Long.signum(o1[1] - o2[1]);
+					if (signum == 0) {
+						//take the upper address first
+						return Long.signum(o2[0] - o1[0]);
 					}
-					next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT + indirectAddressBlock * ADDRESSES_PER_BLOCK,  MAX_DOUBLE_INDIRECT + 2 + indirectAddressBlock, value, mode);
-					if (next != info) {
-						info = next;
-						position = ((index - MAX_INDIRECT)%ADDRESSES_PER_BLOCK) * BYTES_PER_BLOCK_ADDRESS;
-						if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS < BLOCK_DATA_BYTES) {
-							info.buf.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
-						}
+					return signum;
+				}
+			});
+			TreeSet<long[]> byAddress = new TreeSet<long[]>(new Comparator<long[]>() {
+				
+				@Override
+				public int compare(long[] o1, long[] o2) {
+					return Long.signum(o1[0] - o2[0]);
+				}
+			});
+			synchronized (physicalMapping) {
+				for (long[] value : physicalMapping.values()) {
+					if (value == null) {
+						continue;
 					}
+					bySize.add(value);
+					byAddress.add(value);
 				}
-			} else if (index >= DIRECT_POINTERS) {
-				//indirect
-				position = BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS;
-				BlockInfo next = updateIndirectBlockInfo(info, index, position, DIRECT_POINTERS, MAX_DOUBLE_INDIRECT, value, mode);
-				if (next != info) {
-					info = next;
-					position = (index - DIRECT_POINTERS) * BYTES_PER_BLOCK_ADDRESS;
-					if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS < BLOCK_DATA_BYTES) {
-						info.buf.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+			}
+			long lastEndAddress = 0;
+			long usedSpace = tail - unusedSpace;
+			while (!byAddress.isEmpty()) {
+				long[] info = byAddress.pollFirst();
+				bySize.remove(info);
+
+				long currentOffset = info[0];
+				long space = currentOffset - lastEndAddress;
+				boolean movedLast = false;
+				while (space > 0 && !bySize.isEmpty()) {
+					long[] last = byAddress.last();
+					if (last[1] > space) {
+						break;
 					}
+					movedLast = true;
+					byAddress.pollLast();
+					bySize.remove(last);
+					move(last, lastEndAddress, buffer);
+					space -= last[1];
+					lastEndAddress += last[1];
 				}
-			} else {
-				position = BYTES_PER_BLOCK_ADDRESS*index;
-				if (mode == Mode.ALLOCATE) {
-					info.buf.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+				if (movedLast && !byAddress.isEmpty()) {
+					long[] last = byAddress.last();
+					long currentLastEndAddress = last[0] + last[1]; 
+					if (currentLastEndAddress < currentLength>>1) {
+						lastEndAddress = currentLastEndAddress;
+						break;
+					}
 				}
-			}
-			if (mode == Mode.ALLOCATE) {
-				dataBlock = nextBlock();
-				info.buf.putInt(position, dataBlock);
-				updatePhysicalBlock(info);
-			} else {
-				dataBlock = info.buf.getInt(position);
-				if (mode == Mode.UPDATE) {
-					info.buf.putInt(position, value);
-					updatePhysicalBlock(info);
+				while (space > 0 && !bySize.isEmpty()) {
+					long[] smallest = bySize.first();
+					if (smallest[1] > space) {
+						break;
+					}
+					bySize.pollFirst();
+					byAddress.remove(smallest);
+					move(smallest, lastEndAddress, buffer);
+					space -= smallest[1];
+					lastEndAddress += smallest[1];
 				}
-			}
-			return dataBlock;
-		}
-		
-		private BlockInfo updateIndirectBlockInfo(BlockInfo info, int index, int position, int cutOff, int blockId, int value, Mode mode) {
-			int sib_index = info.buf.getInt(position);
-			boolean newBlock = false;
-			if (index == cutOff) {
-				if (mode == Mode.ALLOCATE) {
-					sib_index = nextBlock();
-					info.buf.putInt(position, sib_index);
-					updatePhysicalBlock(info);
-					newBlock = true;
-				} else if (mode == Mode.UPDATE && value == EMPTY_ADDRESS) {
-					freeDataBlock(sib_index);
-					return info;
+				
+				if (space > MAX_FREE_SPACE) {
+					move(info, lastEndAddress, buffer);
 				}
+				lastEndAddress = info[0] + info[1];
 			}
-			info = getPhysicalBlock(sib_index, false, false);
-			if (newBlock) {
-				putBlockId(blockId, info.buf);
+			store.setLength(lastEndAddress);
+			tail = lastEndAddress;
+			unusedSpace = lastEndAddress - usedSpace;
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Compacted store", gid, "pre-size", currentLength, "post-size", lastEndAddress); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
 			}
-			return info;
+			return tail;
 		}
-
-		private int nextBlock() {
-			int next = -1;
-			synchronized (blocksInuse) {
-				next = blocksInuse.nextClearBit(lastBlock + 1);
-				if (next == -1) {
-					throw new TeiidRuntimeException("no freespace available"); //$NON-NLS-1$
-				}
-				blocksInuse.set(next, true);
-			}
-			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
-				LogManager.logTrace(LogConstants.CTX_DQP, "Allocating block", next, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
-			}
-			if (trackLast) {
-				lastBlock = next;
-			}
-			return next;
-		}
-
-		@Override
-		public void freeBlock(int index) {
-			int dataBlock = getOrUpdateDataBlockIndex(index, EMPTY_ADDRESS, Mode.UPDATE);
-			freeDataBlock(dataBlock);
-		}
-
-		private void freeDataBlock(int dataBlock) {
-			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
-				LogManager.logTrace(LogConstants.CTX_DQP, "freeing data block", dataBlock, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
-			}
-			blockCache.remove(dataBlock);
-			blocksInuse.set(dataBlock, false);
-		}
 		
-		BlockInfo getInodeSubBlock(int inodeBlock, int inodePosition) {
-			BlockInfo bi = getPhysicalBlock(inodeBlock + blocksInUseBlocks + inodesInUseBlocks, true, false);
-			ByteBuffer bb = bi.buf.duplicate();
-			bb.position(inodePosition);
-			bb.limit(inodePosition + INODE_BYTES);
-			bb = bb.slice();
-			return new BlockInfo(true, bb, inodeBlock + blocksInUseBlocks + inodesInUseBlocks, inodePosition);
-		}
-
-		@Override
-		public void free() {
-			int inodeBlock = inode/inodesPerBlock;
-			int inodePosition = INODE_BYTES*(inode%inodesPerBlock);
-			BlockInfo bi = getInodeSubBlock(inodeBlock, inodePosition);
-			ByteBuffer ib = bi.buf;
-			int indirectIndexBlock = ib.getInt(ib.position() + BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS);
-			int doublyIndirectIndexBlock = ib.getInt(ib.position() + BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1));
-			boolean freedAll = freeBlock(ib, DIRECT_POINTERS, true);
-			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
-				LogManager.logTrace(LogConstants.CTX_DQP, "freeing inode", inode, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+		private void move(long[] toMove, long newOffset, byte[] buffer) throws IOException {
+			long oldOffset = toMove[0];
+			toMove[0] = newOffset;
+			int size = (int)toMove[1];
+			while (size > 0) {
+				int toWrite = Math.min(IO_BUFFER_SIZE, size);
+				store.readFully(oldOffset, buffer, 0, toWrite);
+				store.write(newOffset, buffer, 0, toWrite);
+				size -= toWrite;
+				oldOffset += toWrite;
+				newOffset += toWrite;
 			}
-			inodesInuse.set(inode, false);
-			if (!freedAll || indirectIndexBlock == EMPTY_ADDRESS) {
-				return;
-			}
-			freedAll = freeIndirectBlock(indirectIndexBlock);
-			if (!freedAll || doublyIndirectIndexBlock == EMPTY_ADDRESS) {
-				return;
-			}
-			BlockInfo bb = getPhysicalBlock(doublyIndirectIndexBlock, false, false);
-			freeBlock(bb.buf, ADDRESSES_PER_BLOCK, false);
-			freeDataBlock(doublyIndirectIndexBlock);
 		}
-
-		private boolean freeIndirectBlock(int indirectIndexBlock) {
-			BlockInfo bb = getPhysicalBlock(indirectIndexBlock, false, false);
-			bb.buf.position(0);
-			boolean freedAll = freeBlock(bb.buf, ADDRESSES_PER_BLOCK, true);
-			freeDataBlock(indirectIndexBlock);
-			return freedAll;
-		}
-
-		private boolean freeBlock(ByteBuffer ib, int numPointers, boolean primary) {
-			for (int i = 0; i < numPointers; i++) {
-				int dataBlock = ib.getInt();
-				if (dataBlock == EMPTY_ADDRESS) {
-					return false;
-				}
-				if (primary) {
-					freeDataBlock(dataBlock);
-				} else {
-					freeIndirectBlock(dataBlock);
-				}
-			}
-			return true;
-		}
-
-		@Override
-		public BlockInfo allocateBlock(int blockNum) {
-			int dataBlock = getOrUpdateDataBlockIndex(blockNum, EMPTY_ADDRESS, Mode.ALLOCATE);
-			BlockInfo bb = getPhysicalBlock(dataBlock, false, true);
-			putBlockId(blockNum, bb.buf);
-			bb.buf.position(0);
-			bb.buf.limit(BLOCK_DATA_BYTES);
-			return bb;
-		}
-
-		private void putBlockId(int blockNum, ByteBuffer bb) {
-			bb.position(BLOCK_DATA_BYTES);
-			bb.putLong(gid);
-			bb.putLong(oid);
-			bb.put((byte)(blockNum >> 16));
-			bb.putShort((short)blockNum);
-		}
-
 	}
 
+	private static final int COMPACTION_THRESHOLD = 1 << 24; //start checking at 16 megs
+	private static final int IO_BUFFER_SIZE = 1<<13;
+	int compactionThreshold = COMPACTION_THRESHOLD;
+	private ConcurrentHashMap<Long, CacheGroup> cacheGroups = new ConcurrentHashMap<Long, CacheGroup>();
 	private StorageManager storageManager;
-	private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE;
-	private int inodes;
-	private int blocks;
-	private int inodesPerBlock;
-	private int inodesInUseBlocks;
-	private int blocksInUseBlocks;
-	private FileStore store;
-	private FileStore systemStore;
-	private BlockBitSetTree blocksInuse;
-	private BlockBitSetTree inodesInuse;
-
-	//root directory
-	ConcurrentHashMap<Long, BlockClosedLongIntHashTable> physicalMapping = new ConcurrentHashMap<Long, BlockClosedLongIntHashTable>();
 	
-	//block caching
-	int blockCacheSize = 128; //2 MB
-	ReentrantLock blockLock = new ReentrantLock();
-	LinkedHashMap<Integer, BlockInfo> blockCache = new LinkedHashMap<Integer, BlockInfo>() {
-		private static final long serialVersionUID = -4240291744435552008L;
-		BlockInfo eldestEntry = null;
+	@Override
+	public void add(CacheEntry entry, Serializer s) throws Exception {
+		final CacheGroup group = cacheGroups.get(s.getId());
+		if (group == null) {
+			return;
+		}
 
-		protected boolean removeEldestEntry(Map.Entry<Integer,BlockInfo> eldest) {
-			if (size() > blockCacheSize) {
-				BlockInfo bi = eldest.getValue();
-				if (bi.dirty) {
-					eldestEntry = bi;
+		group.lock.writeLock().lock();
+		try {
+			synchronized (group.freed) {
+				while (!group.freed.isEmpty()) {
+					group.freeBatch(group.freed.remove(0));
 				}
-				return true;
 			}
-			return false;
-		}
-		
-		public BlockInfo put(Integer key, BlockInfo value) {
-			blockLock.lock();
-			value.dirty = true;
-			try {
-				return super.put(key, value);
-			} finally {
-				BlockInfo toUpdate = eldestEntry;
-				eldestEntry = null;
-				blockLock.unlock();
-				if (toUpdate != null) {
-					updatePhysicalBlockDirect(toUpdate);
+			final ByteBuffer buffer = ByteBuffer.allocate(IO_BUFFER_SIZE);
+			final long offset = group.getOffset(s.getId(), compactionThreshold);
+			ExtensibleBufferedOutputStream fsos = new ExtensibleBufferedOutputStream() {
+				@Override
+				protected ByteBuffer newBuffer() {
+					buffer.rewind();
+					return buffer;
 				}
-			}
-		}
-		
-		public BlockInfo get(Object key) {
-			blockLock.lock();
-			try {
-				return super.get(key);
-			} finally {
-				blockLock.unlock();
-			}
-		}
-		
-		public BlockInfo remove(Object key) {
-			blockLock.lock();
-			try {
-				return super.remove(key);
-			} finally {
-				blockLock.unlock();
-			}
-		}
-	};
-	
-	BlockInfo getPhysicalBlock(int block, boolean system, boolean allocate) {
-		if (block < 0) {
-			throw new AssertionError("invalid block address " + block); //$NON-NLS-1$
-		}
-		try {
-			int key = block;
-			if (system) {
-				key |= SYSTEM_MASK;
-			}
-			BlockInfo result = blockCache.get(key);
-			assert result == null || !allocate; 
-			if (result == null) {
-				ByteBuffer bb = null;
-				if (system) {
-					bb = systemStore.getBuffer(block<<LOG_BLOCK_SIZE, BLOCK_SIZE, allocate);
-				} else {
-					bb = store.getBuffer(block<<LOG_BLOCK_SIZE, BLOCK_SIZE, allocate);
+				
+				@Override
+				protected int flushDirect() throws IOException {
+					group.store.write(offset + bytesWritten, buffer.array(), 0, buf.position());
+					return buf.position();
 				}
-				result = new BlockInfo(system, bb, block, -1);
-				blockLock.lock();
-				try {
-					BlockInfo existing = blockCache.get(key);
-					if (existing != null) {
-						return existing;
-					}
-					blockCache.put(key, result);
-				} finally {
-					blockLock.unlock();
-				}
-				return result;
+			};
+	        ObjectOutputStream oos = new ObjectOutputStream(fsos);
+	        oos.writeInt(entry.getSizeEstimate());
+	        s.serialize(entry.getObject(), oos);
+	        oos.close();
+	        long size = fsos.getBytesWritten();
+	        long[] info = new long[] {offset, size};
+	        group.physicalMapping.put(entry.getId(), info);
+	        group.tail = Math.max(group.tail, offset + size);
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+				LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, s.getId(), entry.getId(), "batch written starting at:", offset); //$NON-NLS-1$
 			}
-			return result;
-		} catch (IOException e) {
-			throw new TeiidRuntimeException(e);
+		} finally {
+			group.lock.writeLock().unlock();
 		}
 	}
-	
-	void updatePhysicalBlock(BlockInfo bi) {
-		int key = bi.physicalAddress;
-		if (bi.system) {
-			key |= SYSTEM_MASK;
-		}
-		if (bi.inodeOffset >= 0) {
-			blockLock.lock();
-			try {
-				BlockInfo actual = blockCache.get(key);
-				if (actual == null) {
-					//we're not in the cache, so just update storage
-					updatePhysicalBlockDirect(bi);
-				} else {
-					//TODO: check to see we're sharing the same buffer
-					for (int i = 0; i < INODE_BYTES; i++) {
-						actual.buf.put(bi.inodeOffset + i, bi.buf.get(i));
-					}
-				}
-			} finally {
-				blockLock.unlock();
-			}
-			return;
-		}
-		blockCache.put(key, bi);
-	}
 
-	private void updatePhysicalBlockDirect(BlockInfo bi) {
-		try {
-			bi.buf.rewind();
-			if (!bi.system) {
-				store.updateFromBuffer(bi.buf, bi.physicalAddress<<LOG_BLOCK_SIZE);
-			} else {
-				systemStore.updateFromBuffer(bi.buf, bi.physicalAddress<<LOG_BLOCK_SIZE);				
-			}
-		} catch (IOException e) {
-			throw new TeiidRuntimeException(e);
-		}
-	}
-	
-	InodeBlockManager getBlockManager(long gid, long oid, int inode) {
-		return new InodeBlockManager(gid, oid, inode);
-	}
-
-	@SuppressWarnings("unchecked")
 	@Override
-	public void add(CacheEntry entry, Serializer s) {
-		boolean success = false;
-		InodeBlockManager blockManager = null;
-		try {
-			BlockClosedLongIntHashTable map = physicalMapping.get(s.getId());
-			if (map == null) {
-				return;
-			}
-			blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
-			blockManager.trackLast = true;
-			ExtensibleBufferedOutputStream fsos = new BlockOutputStream(blockManager);
-            ObjectOutputStream oos = new ObjectOutputStream(fsos);
-            oos.writeInt(entry.getSizeEstimate());
-            s.serialize(entry.getObject(), oos);
-            oos.close();
-            map.put(entry.getId(), blockManager.getInode());
-            success = true;
-		} catch (Throwable e) {
-			LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read batch "+ entry.getId() +" later will result in an exception"); //$NON-NLS-1$ //$NON-NLS-2$
-		} finally {
-			if (!success && blockManager != null) {
-				blockManager.free();
-			}
-		}
+	public void createCacheGroup(Long gid) {
+		cacheGroups.put(gid, new CacheGroup(storageManager.createFileStore(String.valueOf(gid))));
 	}
 
 	@Override
-	public CacheEntry get(Long oid, Serializer<?> serializer) throws TeiidComponentException {
+	public CacheEntry get(Long id, Serializer<?> serializer)
+			throws TeiidComponentException {
+		CacheGroup group = cacheGroups.get(serializer.getId());
+		if (group == null) {
+			return null;
+		}
 		try {
-			BlockClosedLongIntHashTable map = physicalMapping.get(serializer.getId());
-			if (map == null) {
+			group.lock.readLock().lock();
+			long[] info = group.physicalMapping.get(id);
+			if (info == null) {
 				return null;
 			}
-			final int inode = map.get(oid);
-			if (inode == EMPTY_ADDRESS) {
-				return null;
-			}
-			final BlockManager manager = getBlockManager(serializer.getId(), oid, inode);
-			ObjectInputStream ois = new ObjectInputStream(new InputStream() {
-				
-				int blockIndex;
-				BlockInfo buf;
-				
-				@Override
-				public int read() throws IOException {
-					ensureBytes();
-					return buf.buf.get() & 0xff;
-				}
-
-				private void ensureBytes() {
-					if (buf == null || buf.buf.remaining() == 0) {
-						if (buf != null) {
-							buf = null;
-						}
-						buf = manager.getBlock(blockIndex++);
-					}
-				}
-				
-				@Override
-				public int read(byte[] b, int off, int len)
-						throws IOException {
-					ensureBytes();
-					len = Math.min(len, buf.buf.remaining());
-					buf.buf.get(b, off, len);
-					return len;
-				}
-			});
-			CacheEntry ce = new CacheEntry(oid);
+			ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(group.store.createInputStream(info[0]), IO_BUFFER_SIZE));
+			CacheEntry ce = new CacheEntry(id);
 			ce.setSizeEstimate(ois.readInt());
 			ce.setObject(serializer.deserialize(ois));
-			ce.setPersistent(true);
 			return ce;
         } catch(IOException e) {
-        	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid)); //$NON-NLS-1$
+        	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", id)); //$NON-NLS-1$
         } catch (ClassNotFoundException e) {
-        	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid)); //$NON-NLS-1$
+        	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", id)); //$NON-NLS-1$
+        } finally {
+        	group.lock.readLock().unlock();
         }
 	}
-	
-	@Override
-	public FileStore createFileStore(String name) {
-		return storageManager.createFileStore(name);
-	}
-	
-	@Override
-	public void initialize() throws TeiidComponentException {
-		storageManager.initialize();
-		int logSpace = Math.min(45, 63 - Long.numberOfLeadingZeros(this.maxBufferSpace));
-		
-		blocks = 1 << Math.min(Math.max(12, logSpace -LOG_BLOCK_SIZE +1), ADDRESS_BITS); //blocks per segment
 
-		inodes = blocks>>1;
-		inodesPerBlock = BLOCK_SIZE/INODE_BYTES;
-		
-		inodesInUseBlocks = computeInuseBlocks(inodes);
-		blocksInUseBlocks = computeInuseBlocks(blocks);
-		
-		inodesInuse = new BlockBitSetTree(inodes - 1, new BitSetBlockManager(0));
-		blocksInuse = new BlockBitSetTree(blocks - 1, new BitSetBlockManager(inodesInUseBlocks));
-		
-		store = storageManager.createFileStore("data_store"); //$NON-NLS-1$
-		systemStore = storageManager.createFileStore("system_store"); //$NON-NLS-1$
-	}
-	
-	static int computeInuseBlocks(int number) {
-		int blockCount = (number>>LOG_BLOCK_SIZE) + ((number&BLOCK_MASK)>0?1:0);
-		return (blockCount>>3) + ((blockCount&7)>0?1:0);	
-	}
-		
 	@Override
-	public void addToCacheGroup(Long gid, Long oid) {
-		BlockClosedLongIntHashTable map = physicalMapping.get(gid);
-		if (map == null) {
+	public void remove(Long gid, Long id) {
+		CacheGroup group = cacheGroups.get(gid);
+		if (group == null) {
 			return;
 		}
-		map.put(oid, EMPTY_ADDRESS);
+		if (group.lock.writeLock().tryLock()) {
+			try {
+				try {
+					group.freeBatch(id);
+				} catch (IOException e) {
+					LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error removing batch"); //$NON-NLS-1$
+				}
+			} finally {
+				group.lock.writeLock().unlock();
+			}
+		} else {
+			group.freed.add(id);
+		}
 	}
 	
 	@Override
-	public void createCacheGroup(Long gid) {
-		BlockClosedLongIntHashTable map = new BlockClosedLongIntHashTable(getBlockManager(gid, -1, EMPTY_ADDRESS));
-		physicalMapping.put(gid, map);
-	}
-	
-	@Override
-	public void remove(Long gid, Long id) {
-		BlockClosedLongIntHashTable map = physicalMapping.get(gid);
-		if (map == null) {
+	public void addToCacheGroup(Long gid, Long oid) {
+		CacheGroup group = cacheGroups.get(gid);
+		if (group == null) {
 			return;
 		}
-		int inode = map.remove(id);
-		free(gid, id, inode);
+		group.physicalMapping.put(oid, null);
 	}
 
 	@Override
 	public Collection<Long> removeCacheGroup(Long gid) {
-		BlockClosedLongIntHashTable map = physicalMapping.remove(gid);
-		if (map == null) {
-			return Collections.emptySet();
+		CacheGroup group = cacheGroups.remove(gid);
+		if (group == null) {
+			return Collections.emptyList();
 		}
-		Map<Long, Integer> values = map.remove();
-		for (Map.Entry<Long, Integer> entry : values.entrySet()) {
-			if (entry.getValue() != null) {
-				free(gid, entry.getKey(), entry.getValue());
-			}
+		group.store.remove();
+		synchronized (group.physicalMapping) {
+			return new ArrayList<Long>(group.physicalMapping.keySet());
 		}
-		return values.keySet();
 	}
-	
-	void free(Long gid, Long oid, int inode) {
-		if (inode == EMPTY_ADDRESS) {
-			return;
-		}
-		BlockManager bm = getBlockManager(gid, oid, inode);
-		bm.free();
+
+	@Override
+	public FileStore createFileStore(String name) {
+		return storageManager.createFileStore(name);
 	}
+
+	@Override
+	public void initialize() throws TeiidComponentException {
+		this.storageManager.initialize();
+	}
 	
 	public void setStorageManager(StorageManager storageManager) {
 		this.storageManager = storageManager;
@@ -731,16 +341,8 @@
 		return storageManager;
 	}
 	
-	public void setMaxBufferSpace(long maxBufferSpace) {
-		this.maxBufferSpace = maxBufferSpace;
+	public void setCompactionThreshold(int compactionThreshold) {
+		this.compactionThreshold = compactionThreshold;
 	}
-	
-	public int getInodesInUse() {
-		return this.inodesInuse.getBitsSet();
-	}
-	
-	public int getDataBlocksInUse() {
-		return this.blocksInuse.getBitsSet();
-	}
-	
+
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -22,7 +22,6 @@
 
 package org.teiid.common.buffer.impl;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -87,20 +86,6 @@
 		public synchronized long getLength() {
 			return buffer.limit();
 		}
-		
-		@Override
-		public synchronized ByteBuffer getBuffer(long start, int length, boolean allocate) {
-			int position = (int)start;
-			buffer.limit(position + length);
-			buffer.position(position);
-			return buffer.slice();
-		}
-		
-		@Override
-		public void updateFromBuffer(ByteBuffer bb, long start)
-				throws IOException {
-			//do nothing we are sharing the bytes
-		}
 
 	}
 

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -23,7 +23,6 @@
 package org.teiid.common.buffer.impl;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -79,7 +78,7 @@
 	    	FileStore store = null;
 	    	if (!write) {
 	    		synchronized (this) {
-		    		if (fileOffset + length > len) {
+		    		if (fileOffset > len) {
 		    			throw new IOException("Invalid file position " + fileOffset + " length " + length); //$NON-NLS-1$ //$NON-NLS-2$
 		    		}
 		    		store = storageFiles.get((int)(fileOffset/maxFileSize));
@@ -118,28 +117,6 @@
 			len = length;
 		}
 		
-		@Override
-		public ByteBuffer getBuffer(long start, int length, boolean allocate) throws IOException {
-			FileStore store = null;
-			synchronized (this) {
-				ensureLength(start + length);
-				store = storageFiles.get((int)(start/maxFileSize));
-			}
-	    	long fileBegin = start%maxFileSize;
-			return store.getBuffer(fileBegin, length, allocate);
-		}
-		
-		@Override
-		public void updateFromBuffer(ByteBuffer bb, long start)
-				throws IOException {
-			FileStore store = null;
-			synchronized (this) {
-				store = storageFiles.get((int)(start/maxFileSize));
-			}
-	    	long fileBegin = start%maxFileSize;
-			store.updateFromBuffer(bb, fileBegin);
-		}
-
 	    @Override
 	    public synchronized void setLength(long length) throws IOException {
 			if (length > len) {

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -86,7 +86,6 @@
 			SplittableStorageManager ssm = new SplittableStorageManager(storageManager);
 			ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
 			FileStoreCache fsc = new FileStoreCache();
-			fsc.setMaxBufferSpace(Runtime.getRuntime().maxMemory()/4);
 			fsc.setStorageManager(ssm);
 			fsc.initialize();
 		    bufferManager.setCache(fsc);

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -30,6 +30,7 @@
 import org.junit.Test;
 import org.teiid.common.buffer.STree.InsertMode;
 import org.teiid.common.buffer.impl.BufferManagerImpl;
+import org.teiid.common.buffer.impl.FileStoreCache;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.query.sql.symbol.ElementSymbol;
 
@@ -109,6 +110,7 @@
 		BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
 		bm.setProcessorBatchSize(32);
 		bm.setMaxReserveKB(0);//force all to disk
+		((FileStoreCache)bm.getCache()).setCompactionThreshold(0);
 		bm.initialize();
 		
 		ElementSymbol e1 = new ElementSymbol("x");

Deleted: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -1,66 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
- 
- package org.teiid.common.buffer.impl;
-
-import static org.junit.Assert.*;
-
-import java.util.BitSet;
-import java.util.Random;
-
-import org.junit.Test;
-
-public class TestBlockBitSetTree {
-	
-	@Test public void testBitsSet() {
-		BlockBitSetTree bst = new BlockBitSetTree((1 << 20) -1, new TestBlockClosedLongIntHashTable.DummyBlockManager());
-		bst.set(1, true);
-		bst.set(100, true);
-		bst.set(10000, true);
-		bst.set(1000000, true);
-		assertEquals(4, bst.getBitsSet());
-		bst.set(1, false);
-		assertEquals(3, bst.getBitsSet());
-		assertFalse(bst.get(1));
-	}
-	
-	@Test public void testNextClearSet() {
-		BlockBitSetTree bst = new BlockBitSetTree((1 << 20) -1, new TestBlockClosedLongIntHashTable.DummyBlockManager());
-		BitSet bst1 = new BitSet();
-		Random r = new Random(1);
-		for (int i = 0; i < 1000; i++) {
-			int rand = r.nextInt() & bst.getMaxIndex();
-			bst.set(rand, true);
-			bst1.set(rand, true);
-			assertTrue(bst.get(rand));
-			assertEquals(bst1.nextSetBit(rand), bst.nextSetBit(rand));
-			assertEquals(String.valueOf(i), bst1.nextSetBit(rand), bst.nextSetBit(rand));
-		}
-		
-		for (int i = 0; i < 10000; i++) {
-			int rand = r.nextInt() & bst.getMaxIndex();
-			assertEquals(bst1.nextClearBit(rand), bst.nextClearBit(rand));
-			assertEquals(bst1.nextSetBit(rand), bst.nextSetBit(rand));
-		}
-	}
-
-}

Deleted: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -1,92 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import static org.junit.Assert.*;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
-
-import org.junit.Test;
-import org.teiid.common.buffer.impl.FileStoreCache.BlockInfo;
-
-public class TestBlockClosedLongIntHashTable {
-	
-	public static final class DummyBlockManager implements BlockManager {
-		List<BlockInfo> blocks = new ArrayList<BlockInfo>();
-
-		@Override
-		public int getInode() {
-			return 0;
-		}
-		
-		@Override
-		public void updateBlock(BlockInfo block) {
-			
-		}
-
-		@Override
-		public void free() {
-			blocks.clear();
-		}
-		
-		@Override
-		public BlockInfo getBlock(int index) {
-			BlockInfo block = blocks.get(index);
-			block.buf.rewind();
-			return block;
-		}
-
-		@Override
-		public void freeBlock(int index) {
-			blocks.remove(index);
-		}
-
-		@Override
-		public BlockInfo allocateBlock(int index) {
-			assertEquals(index, blocks.size());
-			ByteBuffer result = ByteBuffer.wrap(new byte[FileStoreCache.BLOCK_SIZE]);
-			blocks.add(new BlockInfo(false, result, index, -1));
-			return blocks.get(blocks.size() - 1);
-		}
-	}
-
-	@Test public void testAgainstHashMap() {
-		BlockClosedLongIntHashTable table = new BlockClosedLongIntHashTable(new DummyBlockManager());
-		HashMap<Long, Integer> table1 = new HashMap<Long, Integer>(16);
-		for (long i = 1; i < 200000; i++) {
-			table.put(i, (int)i);
-			table1.put(i, (int)i);
-		}
-		Random r = new Random(0);
-		for (int i = 1; i < 2000000; i++) {
-			long toRemove = r.nextInt(i); 
-			boolean removed = table.remove(toRemove) != BlockClosedLongIntHashTable.EMPTY;
-			assertEquals(table1.remove(toRemove) != null, removed);
-		}
-	}
-
-}

Deleted: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStoreCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStoreCache.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStoreCache.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -1,119 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.junit.Test;
-import org.teiid.common.buffer.CacheEntry;
-import org.teiid.common.buffer.Serializer;
-import org.teiid.core.util.UnitTestUtil;
-
-public class TestFileStoreCache {
-	
-	private final class SimpleSerializer implements Serializer<Integer> {
-		@Override
-		public Integer deserialize(ObjectInputStream ois)
-				throws IOException, ClassNotFoundException {
-			Integer result = ois.readInt();
-			for (int i = 0; i < result; i++) {
-				assertEquals(i, ois.readInt());
-			}
-			return result;
-		}
-
-		@Override
-		public Long getId() {
-			return 1l;
-		}
-
-		@Override
-		public void serialize(Integer obj, ObjectOutputStream oos)
-				throws IOException {
-			oos.writeInt(obj);
-			for (int i = 0; i < obj; i++) {
-				oos.writeInt(i);
-			}
-		}
-
-		@Override
-		public boolean useSoftCache() {
-			return false;
-		}
-	}
-
-	@Test public void testAddGetMultiBlock() throws Exception {
-		FileStoreCache fsc = new FileStoreCache();
-		fsc.setMaxBufferSpace(1 << 28);
-		SplittableStorageManager ssm = new SplittableStorageManager(new MemoryStorageManager());
-		ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
-		fsc.setStorageManager(ssm);
-		fsc.initialize();
-		
-		UnitTestUtil.enableTraceLogging("org.teiid");  //$NON-NLS-1$
-		
-		CacheEntry ce = new CacheEntry(2l);
-		Serializer<Integer> s = new SimpleSerializer();
-		fsc.createCacheGroup(s.getId());
-		Integer cacheObject = Integer.valueOf(2);
-		ce.setObject(cacheObject);
-		fsc.add(ce, s);
-		
-		ce = fsc.get(2l, s);
-		assertEquals(cacheObject, ce.getObject());
-		
-		//test something that exceeds the direct inode data blocks
-		ce = new CacheEntry(3l);
-		cacheObject = Integer.valueOf(80000);
-		ce.setObject(cacheObject);
-		fsc.add(ce, s);
-		
-		ce = fsc.get(3l, s);
-		assertEquals(cacheObject, ce.getObject());
-		
-		fsc.removeCacheGroup(1l);
-		
-		assertEquals(0, fsc.getDataBlocksInUse());
-		assertEquals(0, fsc.getInodesInUse());
-		
-		//test something that exceeds the indirect inode data blocks
-		ce = new CacheEntry(3l);
-		fsc.createCacheGroup(s.getId());
-		cacheObject = Integer.valueOf(5000000);
-		ce.setObject(cacheObject);
-		fsc.add(ce, s);
-		
-		ce = fsc.get(3l, s);
-		assertEquals(cacheObject, ce.getObject());
-		
-		fsc.removeCacheGroup(1l);
-		
-		assertEquals(0, fsc.getDataBlocksInUse());
-		assertEquals(0, fsc.getInodesInUse());
-	}
-	
-}

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -378,17 +378,17 @@
         Future<ResultsMessage> message = core.executeRequest(reqMsg.getExecutionId(), reqMsg);
         ResultsMessage rm = message.get(500000, TimeUnit.MILLISECONDS);
         assertNull(rm.getException());
-        assertEquals(5, rm.getResults().length);
+        assertEquals(5, rm.getResultsList().size());
         
         message = core.processCursorRequest(reqMsg.getExecutionId(), 6, 5);
         rm = message.get(500000, TimeUnit.MILLISECONDS);
         assertNull(rm.getException());
-        assertEquals(5, rm.getResults().length);
+        assertEquals(5, rm.getResultsList().size());
         
         message = core.processCursorRequest(reqMsg.getExecutionId(), 11, 5);
         rm = message.get(500000, TimeUnit.MILLISECONDS);
         assertNull(rm.getException());
-        assertEquals(5, rm.getResults().length);
+        assertEquals(5, rm.getResultsList().size());
     }
     
     @Test public void testSourceConcurrency() throws Exception {

Modified: trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java	2011-09-29 16:46:38 UTC (rev 3507)
+++ trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java	2011-09-30 02:00:28 UTC (rev 3508)
@@ -108,7 +108,6 @@
                 ssm.setMaxFileSize(maxFileSize);
                 FileStoreCache fsc = new FileStoreCache();
                 fsc.setStorageManager(ssm);
-                fsc.setMaxBufferSpace(maxBufferSpace*MB);
                 fsc.initialize();
                 this.bufferMgr.setCache(fsc);
             } else {



More information about the teiid-commits mailing list