[teiid-commits] teiid SVN: r3504 - in trunk: engine/src/main/java/org/teiid/common/buffer and 5 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Sep 27 23:15:09 EDT 2011


Author: shawkins
Date: 2011-09-27 23:15:08 -0400 (Tue, 27 Sep 2011)
New Revision: 3504

Added:
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java
Removed:
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BitSetTree.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBitSetTree.java
Modified:
   trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java
   trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
   trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStoreCache.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSizeUtility.java
   trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
   trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
Log:
TEIID-1750 converting storage over to a more flexible inode approach

Modified: trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java
===================================================================
--- trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java	2011-09-23 04:11:11 UTC (rev 3503)
+++ trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -28,7 +28,16 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.PrintWriter;
 import java.io.Serializable;
+import java.io.Writer;
+import java.sql.Timestamp;
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Formatter;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
 
 import junit.framework.Assert;
 import junit.framework.AssertionFailedError;
@@ -388,6 +397,7 @@
 	    return filePath;
 	}
 	
+	@SuppressWarnings("unchecked")
 	public static final <T extends Serializable> T helpSerialize(T object) throws IOException, ClassNotFoundException {
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
         ObjectOutputStream oos = new ObjectOutputStream(baos);
@@ -398,5 +408,59 @@
         
         return (T)ois.readObject();
 	}
+	
+	public static void enableTraceLogging(String loggerName) {
+		Logger logger = Logger.getLogger(loggerName);
+    	logger.setLevel(Level.FINEST);
+		if (logger.getHandlers().length > 0) {
+	    	for (Handler h : logger.getHandlers()) {
+				h.setLevel(Level.FINEST);
+			}
+    	} else {
+    		logger.setUseParentHandlers(false);
+    		ConsoleHandler ch = new ConsoleHandler();
+    		ch.setFormatter(new Formatter() {
+				
+				@Override
+				public String format(LogRecord record) {
+					final StringBuilder result = new StringBuilder();
+					result.append(new Timestamp(record.getMillis()));
+					result.append(" "); //$NON-NLS-1$
+					result.append(record.getLoggerName());
+					result.append(" "); //$NON-NLS-1$
+					result.append(record.getLevel());
+					result.append(" "); //$NON-NLS-1$
+					result.append(record.getThreadID());
+					result.append(" "); //$NON-NLS-1$
+					result.append(record.getMessage());
+					result.append('\n');
+					if (record.getThrown() != null) {
+						record.getThrown().printStackTrace(new PrintWriter(new Writer() {
 
+							@Override
+							public void close() throws IOException {
+								
+							}
+
+							@Override
+							public void flush() throws IOException {
+								
+							}
+
+							@Override
+							public void write(char[] cbuf, int off, int len)
+									throws IOException {
+								result.append(new String(cbuf, off, len));
+							}
+						}));
+						result.append('\n');
+					}
+					return result.toString();
+				}
+			});
+    		ch.setLevel(Level.FINEST);
+    		logger.addHandler(ch);
+    	}
+	}
+
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java	2011-09-23 04:11:11 UTC (rev 3503)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -30,9 +30,9 @@
  * Represents the storage strategy for the {@link BufferManager}
  */
 public interface Cache extends StorageManager {
-	void createCacheGroup(Long gid);
+	void createCacheGroup(Long gid); //called prior to adding an entry
 	Collection<Long> removeCacheGroup(Long gid);
-	void addToCacheGroup(Long gid, Long oid); //called prior to adding an entry
+	void addToCacheGroup(Long gid, Long oid); 
 	CacheEntry get(Long id, Serializer<?> serializer) throws TeiidComponentException;
 	void add(CacheEntry entry, Serializer<?> s);
 	void remove(Long gid, Long id);

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2011-09-23 04:11:11 UTC (rev 3503)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -25,6 +25,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -223,6 +224,28 @@
 		};
 	}
 	
+    public ByteBuffer getBuffer(long start, int length, boolean allocate) throws IOException {
+    	byte[] b = new byte[length];
+    	if (!allocate) {
+    		readFully(start, b, 0, length);
+    	}
+    	return ByteBuffer.wrap(b);
+    }
+    
+    public void updateFromBuffer(ByteBuffer bb, long start) throws IOException {
+    	byte[] b = null;
+    	int offset = 0;
+    	bb.rewind();
+    	if (bb.hasArray()) {
+    		b = bb.array();
+    		offset = bb.arrayOffset();
+    	} else {
+    		b = new byte[bb.limit()];
+    		bb.get(b);
+    	}
+    	write(start, b, offset, bb.limit());
+    }
+	
 	public InputStream createInputStream(final long start) {
 		return createInputStream(start, -1);
 	}
@@ -242,7 +265,7 @@
 		};
 	}
 	
-	public  FileStoreOutputStream createOutputStream(int maxMemorySize) {
+	public FileStoreOutputStream createOutputStream(int maxMemorySize) {
 		return new FileStoreOutputStream(maxMemorySize);
 	}
 	

Deleted: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BitSetTree.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BitSetTree.java	2011-09-23 04:11:11 UTC (rev 3503)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BitSetTree.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -1,106 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
- 
- package org.teiid.common.buffer.impl;
-
-import java.util.BitSet;
-
-/**
- * Extends a {@link BitSet} by adding a cumulative total and a 
- * first level index to speed queries against large bitsets.
- */
-public class BitSetTree {
-	
-	public static final int MAX_INDEX = (1 << 24) - 1;
-	
-	private int bitsSet;
-	private int totalBits;
-	private short[] topVals = new short[1 << 9];
-	private BitSet bitSet = new BitSet();
-	
-	public boolean get(int index) {
-		if (index > MAX_INDEX) {
-			throw new ArrayIndexOutOfBoundsException(index);
-		}
-		return bitSet.get(index);
-	}
-	
-	/**
-	 * Set the given bit at the index.  It's expected that the 
-	 * bit currently has the opposite value.
-	 * @param bitIndex
-	 * @param value
-	 */
-	public void set(int bitIndex, boolean value) {
-		if (bitIndex > MAX_INDEX) {
-			throw new ArrayIndexOutOfBoundsException(bitIndex);
-		}
-		bitSet.set(bitIndex, value);
-		if (bitIndex >= totalBits) {
-			totalBits = bitIndex + 1;
-		}
-		int topIndex = bitIndex >>> 15;
-		int increment = value?1:-1;
-		bitsSet+=increment;
-		topVals[topIndex]+=increment;
-	}
-	
-	public int getTotalBits() {
-		return totalBits;
-	}
-	
-	public int getBitsSet() {
-		return bitsSet;
-	}
-	
-	public int nextClearBit(int fromIndex) {
-		int start = fromIndex >> 15;
-		for (int i = start; i < topVals.length; i++) {
-			if (topVals[i] < Short.MAX_VALUE) {
-				int searchFrom = fromIndex;
-				if (i > start) {
-					searchFrom = i << 15;
-				}
-				return bitSet.nextClearBit(searchFrom);
-			}
-		}
-		return -1;
-	}
-	
-	public int nextSetBit(int fromIndex) {
-		if (bitsSet == 0) {
-			return -1;
-		}
-		int start = fromIndex >> 15;
-		for (int i = fromIndex >> 15; i < topVals.length; i++) {
-			if (topVals[i] > 0) {
-				int searchFrom = fromIndex;
-				if (i > start) {
-					searchFrom = i << 15;
-				}
-				return bitSet.nextSetBit(searchFrom); 
-			}
-		}
-		return -1;
-	}
-	
-}

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -0,0 +1,191 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+ 
+ package org.teiid.common.buffer.impl;
+
+import java.util.BitSet;
+
+import org.teiid.common.buffer.impl.FileStoreCache.BlockInfo;
+
+/**
+ * Extends a {@link BitSet} by adding a cumulative total and a 
+ * first level index to speed queries against large bitsets.
+ */
+public class BlockBitSetTree {
+	
+	private static final int LOG_BITS_PER_BLOCK = FileStoreCache.LOG_BLOCK_SIZE + 3;
+	private static final int MAX_TOP_VALUE = 1 << LOG_BITS_PER_BLOCK;
+	private int maxIndex;
+	private int bitsSet;
+	private int[] topVals;
+	private BlockManager blockManager;
+	private int blockCount = 0;
+	
+	public BlockBitSetTree(int maxIndex, BlockManager blockManager) {
+		this.maxIndex = maxIndex;
+		this.blockManager = blockManager;
+		this.topVals = new int[(maxIndex >> (FileStoreCache.LOG_BLOCK_SIZE + 3)) + 1];
+	}
+	
+	public int getMaxIndex() {
+		return maxIndex;
+	}
+	
+	/**
+	 * Set the given bit at the index.
+	 * @param bitIndex
+	 * @param value
+	 */
+	public synchronized void set(int bitIndex, boolean value) {
+		getOrSet(bitIndex, value, true);
+	}
+	
+	public synchronized boolean get(int bitIndex) {
+		return getOrSet(bitIndex, false, false);
+	}
+	
+	private boolean getOrSet(int bitIndex, boolean value, boolean update) {
+		if (bitIndex > maxIndex) {
+			throw new ArrayIndexOutOfBoundsException(bitIndex);
+		}
+		int blockIndex = bitIndex>>LOG_BITS_PER_BLOCK;
+		BlockInfo bb = null;
+		if (blockIndex >= blockCount) {
+			if (!update) {
+				return false;
+			}
+			for (; blockCount < blockIndex+1; blockCount++) {
+				bb = blockManager.allocateBlock(blockCount);
+				bb.buf.position(0);
+				int longsPerBlock = FileStoreCache.BLOCK_SIZE >> 6;
+				for (int j = 0; j < longsPerBlock; j++) {
+					bb.buf.putLong(0);
+				}
+			}
+		} else {
+			bb = blockManager.getBlock(blockIndex);
+		}
+		int relativeIndex = bitIndex&(MAX_TOP_VALUE-1);
+		int longByteIndex = (relativeIndex>>6)<<3;
+		long word = bb.buf.getLong(longByteIndex);
+		long mask = 1L << bitIndex;
+		boolean currentValue = ((word & mask) != 0);
+		if (!update) {
+			return currentValue;
+		}
+		if (currentValue == value) {
+			return currentValue;
+		}
+		if (value) {
+			word |= mask;
+		} else {
+			word &= ~mask;
+		}
+		bb.buf.putLong(longByteIndex, word);
+		blockManager.updateBlock(bb);
+		int topIndex = bitIndex >> LOG_BITS_PER_BLOCK;
+		int increment = value?1:-1;
+		bitsSet+=increment;
+		topVals[topIndex]+=increment;
+		return currentValue;
+	}
+	
+	public synchronized int getBitsSet() {
+		return bitsSet;
+	}
+	
+	public synchronized int nextClearBit(int fromIndex) {
+		int start = fromIndex >> LOG_BITS_PER_BLOCK;
+		for (int i = start; i < topVals.length; i++) {
+			if (topVals[i] == MAX_TOP_VALUE) {
+				continue;
+			}
+			if (topVals[i] == 0) {
+				if (i == start) {
+					return fromIndex;
+				}
+				return i * MAX_TOP_VALUE;
+			}
+			int relativeIndex = 0;
+			if (i == start) {
+				relativeIndex = fromIndex&(MAX_TOP_VALUE-1);
+			}
+			BlockInfo bb = blockManager.getBlock(i);
+			
+			int longByteIndex = (relativeIndex>>6)<<3;
+			
+			long word = ~bb.buf.getLong(longByteIndex) & (-1l << relativeIndex);
+
+			while (true) {
+			    if (word != 0) {
+			    	return longByteIndex*8 + (i * MAX_TOP_VALUE) + Long.numberOfTrailingZeros(word);
+			    }
+			    longByteIndex+=8;
+			    if (longByteIndex > FileStoreCache.BLOCK_MASK) {
+			    	break;
+			    }
+			    word = ~bb.buf.getLong(longByteIndex);
+			}
+		}
+		return -1;
+	}
+	
+	public synchronized int nextSetBit(int fromIndex) {
+		if (bitsSet == 0) {
+			return -1;
+		}
+		int start = fromIndex >> LOG_BITS_PER_BLOCK;
+		for (int i = start; i < topVals.length; i++) {
+			if (topVals[i] == 0) {
+				continue;
+			}
+			if (topVals[i] == MAX_TOP_VALUE) {
+				if (i == start) {
+					return fromIndex;
+				}
+				return i * MAX_TOP_VALUE;
+			}
+			int relativeIndex = 0;
+			if (i == start) {
+				relativeIndex = fromIndex&(MAX_TOP_VALUE-1);
+			}
+			BlockInfo bb = blockManager.getBlock(i);
+			
+			int longByteIndex = (relativeIndex>>6)<<3;
+			
+			long word = bb.buf.getLong(longByteIndex) & (-1l << relativeIndex);
+
+			while (true) {
+			    if (word != 0) {
+			    	return longByteIndex*8 + (i * MAX_TOP_VALUE) + Long.numberOfTrailingZeros(word);
+			    }
+			    longByteIndex+=8;
+			    if (longByteIndex > FileStoreCache.BLOCK_MASK) {
+			    	break;
+			    }
+			    word = bb.buf.getLong(longByteIndex);
+			}
+		}
+		return -1;
+	}
+	
+}


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -0,0 +1,238 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.teiid.common.buffer.impl.FileStoreCache.BlockInfo;
+
+/**
+ * Represents the logical structure of a cache group / directory
+ * 
+ * Implemented by a closed hash table using a single step linear probe with delayed removal.
+ * Uses power of 2 hashing.
+ * 
+ * Provides an extremely simple hash structure that rivals {@link HashMap} performance and
+ * is directly mapped to {@link ByteBuffer}s to avoid serialization overhead.
+ * 
+ * Does not expect keys or values to be negative.
+ */
+public class BlockClosedLongIntHashTable {
+	
+	private enum Mode {
+		GET,
+		UPDATE,
+		REMOVE
+	}
+	
+	private static final int BYTES_PER_ROW = 12; //8+4
+	private static final int BLOCK_SIZE = FileStoreCache.BLOCK_DATA_BYTES/BYTES_PER_ROW;
+	private static final float LOAD_FACTOR = .7f;
+	private static final int MIN_SIZE = 1 << (31 - Integer.numberOfLeadingZeros(BLOCK_SIZE)); //should fit in a single block
+	
+	static final int EMPTY = -1;
+	private static final int REMOVED = -2;
+
+	protected int size;
+	protected int capacityMask = EMPTY;
+	protected BlockManager blockManager;
+	
+	public BlockClosedLongIntHashTable(BlockManager blockManager) {
+		this.blockManager = blockManager;
+	}
+	
+	private void init(int capacity) {
+		int currentBlockCount = blockCount(capacityMask + 1);
+		int desiredBlockCount = blockCount(capacity);
+		if (capacity > capacityMask) {
+			for (int i = currentBlockCount; i < desiredBlockCount; i++) {
+				BlockInfo bb = blockManager.allocateBlock(i);
+				empty(bb.buf, BLOCK_SIZE);
+				blockManager.updateBlock(bb);
+			}
+		}
+		capacityMask = capacity - 1;
+	}
+
+	private void empty(ByteBuffer bb, int toIndex) {
+		bb.position(0);
+		for (int j = 0; j < toIndex; j++) {
+			bb.putLong(j * BYTES_PER_ROW, EMPTY);
+		}
+	}
+
+	private static int blockCount(int size) {
+		int currentBlockCount = (size) / BLOCK_SIZE;
+		if (size%BLOCK_SIZE > 0) {
+			currentBlockCount++;
+		}
+		return currentBlockCount;
+	}
+	
+	public synchronized int put(long key, int value) {
+		int result = getOrUpdate(key, value, Mode.UPDATE);
+		if ((result == EMPTY || result == REMOVED) && ++size > LOAD_FACTOR*capacityMask) {
+			int newCapacity = (capacityMask+1)<<1;
+			int oldLength = capacityMask + 1;
+			init(newCapacity);
+			rehash(oldLength, newCapacity);
+		}
+		return result;
+	}
+
+	public synchronized int get(long key) {
+		return getOrUpdate(key, EMPTY, Mode.GET);
+	}
+	
+	public synchronized int remove(long key) {
+		int result = getOrUpdate(key, EMPTY, Mode.REMOVE);
+		if (result != EMPTY && --size*LOAD_FACTOR < capacityMask>>3 && (capacityMask+1)>>1 >= MIN_SIZE) {
+			//reduce the size of the table by half
+			int oldLength = capacityMask + 1;
+			capacityMask >>= 1;
+			rehash(oldLength, oldLength>>1);
+			int oldBlocks = blockCount(oldLength);
+			int newBlocks = blockCount(capacityMask +1);
+			for (int i = oldBlocks-1; i >= newBlocks; i--) {
+				blockManager.freeBlock(i);
+			}
+		}
+		return result;
+	}
+	
+	private void rehash(int oldLength, int newLength) {
+		BlockInfo lastBlockInfo = null;
+		ByteBuffer lastBlock = null;
+		for (int i = 0; i < oldLength; i++) {
+			int relativeIndex = i%BLOCK_SIZE;
+			if (lastBlock == null || relativeIndex == 0) {
+				int lastIndex = i/BLOCK_SIZE;
+				lastBlockInfo = blockManager.getBlock(lastIndex);
+				lastBlock = lastBlockInfo.buf;
+				if (i < newLength) {
+					byte[] buf = new byte[FileStoreCache.BLOCK_DATA_BYTES];
+					lastBlock.position(0);
+					lastBlock.get(buf);
+					ByteBuffer copyBlock = ByteBuffer.wrap(buf);
+					empty(lastBlock, Math.min(BLOCK_SIZE, oldLength - lastIndex*BLOCK_SIZE));
+					blockManager.updateBlock(lastBlockInfo);
+					lastBlock = copyBlock;
+				}
+			}
+			lastBlock.position(relativeIndex*BYTES_PER_ROW);
+			long oldKey = lastBlock.getLong();
+			int oldValue = lastBlock.getInt();
+			if (oldKey != REMOVED && oldKey != EMPTY) {
+				getOrUpdate(oldKey, oldValue, Mode.UPDATE);
+			}
+		}
+	}
+
+	@SuppressWarnings("null")
+	protected int getOrUpdate(long key, int value, Mode mode) {
+		if (capacityMask == EMPTY) {
+			if (mode == Mode.GET || mode == Mode.REMOVE) {
+				return EMPTY;
+			}
+			init(MIN_SIZE);
+		}
+		int i = hashIndex(key);
+		BlockInfo lastBlockInfo = null;
+		long old = EMPTY;
+		int position = 0;
+		while (true) {
+			int relativeIndex = i%BLOCK_SIZE;
+			if (lastBlockInfo == null || relativeIndex == 0) {
+				int index = i/BLOCK_SIZE;
+				lastBlockInfo = blockManager.getBlock(index);
+			}
+			position = relativeIndex*BYTES_PER_ROW;
+			old = lastBlockInfo.buf.getLong(position);
+			if (old == EMPTY || old == key || (mode == Mode.UPDATE && old == REMOVED)) {
+				break;
+			}
+	        i = (i + 1) & capacityMask;
+		}
+		int result = EMPTY;
+		if (old != EMPTY && old != REMOVED) {
+			result = lastBlockInfo.buf.getInt(position + 8);
+		}
+		switch (mode) {
+		case GET:
+			return result;
+		case UPDATE:
+			lastBlockInfo.buf.putLong(position, key);
+			lastBlockInfo.buf.putInt(position + 8, value);
+			blockManager.updateBlock(lastBlockInfo);
+			return result;
+		case REMOVE:
+			if (old == EMPTY || old == REMOVED) {
+				return EMPTY;
+			}
+			lastBlockInfo.buf.putLong(position, REMOVED);
+			blockManager.updateBlock(lastBlockInfo);
+			return result;
+		default:
+			throw new AssertionError();
+		}
+	}
+	
+	private int hashIndex(long key) {
+		//start with the usual long hash
+		int primaryHash = (int)(key ^ (key >>> 32));
+		//allow the lower bits to spread the entries
+		primaryHash += primaryHash <<= 2;
+		primaryHash += primaryHash <<= 3;
+		return primaryHash & capacityMask;
+	}
+	
+	public synchronized int size() {
+		return size;
+	}
+
+	public synchronized Map<Long, Integer> remove() {
+		Map<Long, Integer> result = new HashMap<Long, Integer>();
+		BlockInfo lastBlockInfo = null;
+		int blockIndex = 0;
+		for (int i = capacityMask; i >= 0; i--) {
+			int relativeIndex = i%BLOCK_SIZE;
+			if (lastBlockInfo == null || relativeIndex == BLOCK_SIZE - 1) {
+				if (lastBlockInfo != null) {
+					blockManager.freeBlock(blockIndex);
+				}
+				blockIndex = i/BLOCK_SIZE;
+				lastBlockInfo = blockManager.getBlock(blockIndex);
+			}
+			lastBlockInfo.buf.position(relativeIndex*BYTES_PER_ROW);
+			long key = lastBlockInfo.buf.getLong();
+			if (key != EMPTY && key != REMOVED) {
+				result.put(key, lastBlockInfo.buf.getInt());
+			}
+		}
+		blockManager.free();
+		return result;
+	}
+
+}


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -0,0 +1,51 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import org.teiid.common.buffer.impl.FileStoreCache.BlockInfo;
+
+/**
+ * Represents an INode
+ * 
+ * Returned BlockInfo may be shared.  If shared there and no guarantees about position and mark.
+ * in particular system/index blocks can be used by multiple threads relative methods should be
+ * avoided, but may be used for exclusive write operations.
+ * Otherwise the position will be 0.
+ * 
+ * Due to buffermanager locking, non-index data blocks can be assumed to be thread-safe. 
+ */
+public interface BlockManager {
+	
+	int getInode();
+	
+	BlockInfo allocateBlock(int index);
+	
+	BlockInfo getBlock(int index);
+	
+	void updateBlock(BlockInfo block);
+	
+	void freeBlock(int index);
+	
+	void free();
+
+}


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-23 04:11:11 UTC (rev 3503)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -31,6 +31,7 @@
 import java.lang.ref.WeakReference;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -250,6 +251,7 @@
     private boolean useWeakReferences = true;
     private boolean inlineLobs = true;
     private int targetBytesPerRow = TARGET_BYTES_PER_ROW;
+    private int maxSoftReferences;
 
     private ReentrantLock lock = new ReentrantLock(true);
     private Condition batchesFreed = lock.newCondition();
@@ -260,10 +262,23 @@
     private LinkedHashMap<Long, CacheEntry> memoryEntries = new LinkedHashMap<Long, CacheEntry>(16, .75f, false);
     private LinkedHashMap<Long, CacheEntry> tenuredMemoryEntries = new LinkedHashMap<Long, CacheEntry>(16, .75f, true);
     
-    private WeakReferenceHashedValueCache<CacheEntry> weakReferenceCache; //limited size based upon the memory settings
-    private ConcurrentHashMap<Long, BatchSoftReference> softCache = new ConcurrentHashMap<Long, BatchSoftReference>(); //"unlimitted" size maintained by reference queue
+    //limited size reference caches based upon the memory settings
+    private WeakReferenceHashedValueCache<CacheEntry> weakReferenceCache; 
+    private Map<Long, BatchSoftReference> softCache = Collections.synchronizedMap(new LinkedHashMap<Long, BatchSoftReference>(16, .75f, false) {
+		private static final long serialVersionUID = 1L;
+
+		protected boolean removeEldestEntry(Map.Entry<Long,BatchSoftReference> eldest) {
+    		if (size() > maxSoftReferences) {
+    			BatchSoftReference bsr = eldest.getValue();
+    			maxReserveKB.addAndGet(bsr.sizeEstimate);
+    			bsr.clear();
+    			return true;
+    		}
+    		return false;
+    	};
+    });
     
-    private Cache cache;
+    Cache cache;
     
 	private Map<String, TupleReference> tupleBufferMap = new ConcurrentHashMap<String, TupleReference>();
 	private ReferenceQueue<TupleBuffer> tupleBufferQueue = new ReferenceQueue<TupleBuffer>();
@@ -430,11 +445,12 @@
 		if (this.maxProcessingKBOrig < 0) {
 			this.maxProcessingKB = Math.max(Math.min(8 * processorBatchSize, Integer.MAX_VALUE), (int)(.1 * maxMemory)/maxActivePlans);
 		} 
+		int memoryBatches = (this.maxProcessingKB * maxActivePlans + this.getMaxReserveKB()) / (processorBatchSize * targetBytesPerRow / 1024);
+		int logSize = 39 - Integer.numberOfLeadingZeros(memoryBatches);
 		if (useWeakReferences) {
-			int memoryBatches = (this.maxProcessingKB * maxActivePlans + this.getMaxReserveKB()) / (processorBatchSize * targetBytesPerRow / 1024);
-			int logSize = 39 - Integer.numberOfLeadingZeros(memoryBatches);
 			weakReferenceCache = new WeakReferenceHashedValueCache<CacheEntry>(Math.min(20, logSize));
 		}
+		this.maxSoftReferences = 1 << Math.max(28, logSize+2);
 	}
 	
     @Override
@@ -551,7 +567,7 @@
 	}
 
 	private void createSoftReference(CacheEntry ce) {
-		BatchSoftReference ref = new BatchSoftReference(ce, SOFT_QUEUE, ce.getSizeEstimate());
+		BatchSoftReference ref = new BatchSoftReference(ce, SOFT_QUEUE, ce.getSizeEstimate()/2);
 		softCache.put(ce.getId(), ref);
 		maxReserveKB.addAndGet(- ce.getSizeEstimate()/2);
 	}
@@ -584,12 +600,11 @@
 			return ce;
 		}
 		if (prefersMemory) {
-			BatchSoftReference bsr = softCache.get(batch);
+			BatchSoftReference bsr = softCache.remove(batch);
 			if (bsr != null) {
 				ce = bsr.get();
 				if (ce != null) {
-					softCache.remove(batch);
-					maxReserveKB.addAndGet(bsr.sizeEstimate/2);
+					maxReserveKB.addAndGet(bsr.sizeEstimate);
 				}
 			}
 		} else if (useWeakReferences) {
@@ -611,6 +626,9 @@
 	}
 	
 	void remove(Long gid, Long batch, boolean prefersMemory) {
+		if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+			LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Removing batch from BufferManager", batch); //$NON-NLS-1$
+		}
 		CacheEntry ce = fastGet(batch, prefersMemory, false);
 		if (ce == null) {
 			cache.remove(gid, batch);
@@ -661,8 +679,7 @@
 				break;
 			}
 			softCache.remove(ref.key);
-			maxReserveKB.addAndGet(ref.sizeEstimate/2);
-			ref.sizeEstimate = 0;
+			maxReserveKB.addAndGet(ref.sizeEstimate);
 			ref.clear();
 		}
 	}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java	2011-09-23 04:11:11 UTC (rev 3503)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -24,32 +24,37 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 
 public abstract class ExtensibleBufferedOutputStream extends OutputStream {
 	
-	protected int bytesWritten;
-    protected byte buf[];
-    protected int count;
+    protected ByteBuffer buf;
     
-    public ExtensibleBufferedOutputStream(byte[] buf) {
-    	this.buf = buf;
+    public ExtensibleBufferedOutputStream() {
 	}
     
     public void write(int b) throws IOException {
-		if (count >= buf.length) {
+    	ensureBuffer();
+		if (buf.remaining() == 0) {
 		    flush();
 		}
-		buf[count++] = (byte)b;
+		buf.put((byte)b);
     }
 
+	private void ensureBuffer() {
+		if (buf == null) {
+    		buf = newBuffer();
+    	}
+	}
+
     public void write(byte b[], int off, int len) throws IOException {
     	while (true) {
-    		int toCopy = Math.min(buf.length - count, len);
-			System.arraycopy(b, off, buf, count, toCopy);
-			count += toCopy;
+        	ensureBuffer();
+    		int toCopy = Math.min(buf.remaining(), len);
+    		buf.put(b, off, toCopy);
 			len -= toCopy;
 			off += toCopy;
-			if (count < buf.length) {
+			if (buf.remaining() > 0) {
 				break;
 			}
 			flush();
@@ -57,12 +62,13 @@
     }
 
 	public void flush() throws IOException {
-		if (count > 0) {
+		if (buf != null && buf.position() > 0) {
 			flushDirect();
 		}
-		bytesWritten += count;
-		count = 0;
+		buf = null;
 	}
+
+	protected abstract ByteBuffer newBuffer();
 	
 	protected abstract void flushDirect() throws IOException;
     
@@ -71,8 +77,4 @@
 		flush();
     }
     
-    public int getBytesWritten() {
-		return bytesWritten;
-	}
-
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java	2011-09-23 04:11:11 UTC (rev 3503)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -151,6 +151,9 @@
 	    
 	    @Override
 	    public synchronized void setLength(long length) throws IOException {
+	    	if (fileInfo == null) {
+				fileInfo = new FileInfo(createFile(name));
+	        }
 	    	try {
 	    		fileInfo.open().setLength(length);
 	    	} finally {
@@ -165,7 +168,7 @@
 				fileInfo.delete();
 			}
 		}
-		
+	    
 	}
 
     // Initialization

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java	2011-09-23 04:11:11 UTC (rev 3503)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -27,15 +27,12 @@
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.teiid.common.buffer.Cache;
 import org.teiid.common.buffer.CacheEntry;
@@ -50,298 +47,604 @@
 import org.teiid.query.QueryPlugin;
 
 /**
- * Implements storage and caching against a {@link FileStore} abstraction.
- * caching uses a block paradigm, where the first block has the format
- *   long gid | long oid | short tail bytes | short sector table length | [3 byte int sector ...] 
- *   
- * TODO: back non-fragmented block information onto the block address
+ * Implements storage against a {@link FileStore} abstraction using a filesystem paradigm.
+ * The filesystem uses a 31bit address space on top of 2^14 byte blocks.
+ * 
+ * Therefore there is 2^31*2^14 = 2^45 or 32 terabytes max of addressable space.
+ * 
+ * Some amount of the space is taken up by system information (inodes and use flags). 
+ * This is held in a separate file.
+ * 
+ * The 64 byte inode format is:
+ * 14 32 bit direct block pointers
+ * 1  32 bit block indirect pointer
+ * 1  32 bit block doubly indirect pointer
+ * 
+ * The data block format is:
+ * data bytes | long gid | long oid | three byte int block num
+ * 
+ * The gid/oid are stored with the block so that defrag/compaction can be performed
+ * with minimal blocking/lookups.
+ * 
+ * This means that the maximum number of blocks available to a "file" is
+ * 14 + (2^14-18)/4 + ((2^14-18)/4)^2 ~= 2^24
+ * 
+ * Thus the max serialized object size is:     2^24*(2^14-18)  ~= 256GB.
+ * 
+ * The root directory "physicalMapping" is held in memory for performance,
+ * but could itself be switched to using a block map.  It will grow in
+ * proportion to the number of tables/tuplebuffers in use.
+ * 
+ * TODO: defragment
+ * TODO: lobs could also be supported in this structure.
  */
 public class FileStoreCache implements Cache, StorageManager {
 	
-	private static final int IO_BUFFER_SIZE = 1 << 14;
-	private static final int DATA_START = 1 << 13;
-	private static final int MAX_BLOCKS = 2724; //implies that the max size is ~ 42.5 mb
+	//TODO allow the block size to be configurable
+	static final int LOG_BLOCK_SIZE = 14;
+	static final int ADDRESS_BITS = 31;
+	static final int SYSTEM_MASK = 1<<ADDRESS_BITS;
+	static final int BLOCK_SIZE = 1 << LOG_BLOCK_SIZE;
+	static final int BLOCK_MASK = BLOCK_SIZE - 1;
+	static final int BLOCK_OVERHEAD = 8+8+3;
+	static final int BLOCK_DATA_BYTES = BLOCK_SIZE - BLOCK_OVERHEAD;
+	static final int BYTES_PER_BLOCK_ADDRESS = 4;
+	static final int ADDRESSES_PER_BLOCK = BLOCK_DATA_BYTES/BYTES_PER_BLOCK_ADDRESS;
+	static final int INODE_BYTES = 16*BYTES_PER_BLOCK_ADDRESS;
+	static final int DIRECT_POINTERS = 14;
+	static final int MAX_INDIRECT = DIRECT_POINTERS + ADDRESSES_PER_BLOCK;
+	static final int MAX_DOUBLE_INDIRECT = MAX_INDIRECT + ADDRESSES_PER_BLOCK * ADDRESSES_PER_BLOCK;
+	static final int EMPTY_ADDRESS = -1;
+
+	private final class BlockOutputStream extends
+			ExtensibleBufferedOutputStream {
+		private final BlockManager blockManager;
+		int blockNum = -1;
+		BlockInfo bi;
+
+		private BlockOutputStream(BlockManager blockManager) {
+			this.blockManager = blockManager;
+		}
+
+		@Override
+		protected ByteBuffer newBuffer() {
+			bi = blockManager.allocateBlock(++blockNum);
+			return bi.buf;
+		}
+
+		@Override
+		protected void flushDirect() throws IOException {
+			blockManager.updateBlock(bi);
+			bi = null;
+		}
+	}
+
+	private final class BitSetBlockManager implements BlockManager {
+		final int offset;
+		
+		BitSetBlockManager(int offset) {
+			this.offset = offset;
+		}
+		
+		@Override
+		public void updateBlock(BlockInfo info) {
+			updatePhysicalBlock(info);
+		}
+
+		@Override
+		public BlockInfo getBlock(int index) {
+			return getPhysicalBlock(index + offset, true, false);
+		}
+		
+		@Override
+		public BlockInfo allocateBlock(int index) {
+			return getPhysicalBlock(index + offset, true, true);
+		}
+		
+		@Override
+		public int getInode() {
+			throw new AssertionError();
+		}
+
+		@Override
+		public void freeBlock(int index) {
+			throw new AssertionError();
+		}
+
+		@Override
+		public void free() {
+			throw new AssertionError();
+		}
+
+	}
 	
-	static class TryFreeParameter {
-		Long gid;
-		Long oid;
-		Integer block;
+	private enum Mode {
+		GET,
+		UPDATE,
+		ALLOCATE
+	}
+	
+	static final class BlockInfo {
+		final boolean system;
+		final int inodeOffset;
+		final ByteBuffer buf;
+		final int physicalAddress;
+		boolean dirty;
+		
+		BlockInfo(boolean system, ByteBuffer ib, int index, int inodeOffset) {
+			this.system = system;
+			this.buf = ib;
+			this.physicalAddress = index;
+			this.inodeOffset = inodeOffset;
+		}
+	}
+	
+	private final class InodeBlockManager implements BlockManager {
+		private final int inode;
+		private final long gid;
+		private final long oid;
+		private int lastBlock = -1;
+		boolean trackLast;
 
-		public TryFreeParameter(Long gid, Long oid, Integer block) {
+		InodeBlockManager(long gid, long oid, int inode) {
+			if (inode == EMPTY_ADDRESS) {
+				synchronized (inodesInuse) {
+					inode = inodesInuse.nextClearBit(0);
+					if (inode == -1) {
+						throw new TeiidRuntimeException("no inodes available"); //$NON-NLS-1$
+					}
+					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+						LogManager.logTrace(LogConstants.CTX_DQP, "Allocating inode", inode, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+					}
+					inodesInuse.set(inode, true);
+				}
+				int inodeBlock = inode/inodesPerBlock;
+				int inodePosition = INODE_BYTES*(inode%inodesPerBlock);
+				BlockInfo bb = getInodeSubBlock(inodeBlock, inodePosition);
+				bb.buf.putInt(EMPTY_ADDRESS);
+				updatePhysicalBlock(bb);
+			}
+			this.inode = inode;
 			this.gid = gid;
 			this.oid = oid;
-			this.block = block;
 		}
-	}
-	
-	private class Segment {
-		int id;
-		ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-		List<TryFreeParameter> freed = Collections.synchronizedList(new ArrayList<TryFreeParameter>()); 
-		FileStore store;
-		BitSetTree inuse = new BitSetTree();
-		BitSetTree fragmentedFlags = new BitSetTree();
 		
-		public Segment(int id) {
-			this.id = id;
+		@Override
+		public int getInode() {
+			return inode;
 		}
-		
-		void tryFree(Long gid, Long oid, Integer block) {
-			if (lock.writeLock().tryLock()) {
-				try {
-					free(gid, oid, block);
-				} finally {
-					lock.writeLock().unlock();
+
+		@Override
+		public void updateBlock(BlockInfo info) {
+			updatePhysicalBlock(info);
+		}
+
+		@Override
+		public BlockInfo getBlock(int index) {
+			int dataBlock = getOrUpdateDataBlockIndex(index, EMPTY_ADDRESS, Mode.GET);
+			BlockInfo bb = getPhysicalBlock(dataBlock, false, false);
+			bb.buf.position(0);
+			bb.buf.limit(BLOCK_DATA_BYTES);
+			return bb;
+		}
+				
+		private int getOrUpdateDataBlockIndex(int index, int value, Mode mode) {
+			if (index >= MAX_DOUBLE_INDIRECT) {
+				throw new TeiidRuntimeException("Max block number exceeded"); //$NON-NLS-1$
+			}
+			int dataBlock = 0;
+			int position = 0;
+			int inodePosition = INODE_BYTES*(inode%inodesPerBlock);
+			BlockInfo info = getInodeSubBlock(inode/inodesPerBlock, inodePosition);
+			if (index >= MAX_INDIRECT) {
+				position = BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1);
+				BlockInfo next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT, MAX_DOUBLE_INDIRECT+1, value, mode);
+				if (next != info) {
+					info = next;
+					//should have traversed to the secondary
+					int indirectAddressBlock = (index - MAX_INDIRECT) / ADDRESSES_PER_BLOCK;
+					position = indirectAddressBlock * BYTES_PER_BLOCK_ADDRESS;
+					if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS < BLOCK_DATA_BYTES) {
+						info.buf.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+					}
+					next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT + indirectAddressBlock * ADDRESSES_PER_BLOCK,  MAX_DOUBLE_INDIRECT + 2 + indirectAddressBlock, value, mode);
+					if (next != info) {
+						info = next;
+						position = ((index - MAX_INDIRECT)%ADDRESSES_PER_BLOCK) * BYTES_PER_BLOCK_ADDRESS;
+						if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS < BLOCK_DATA_BYTES) {
+							info.buf.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+						}
+					}
 				}
+			} else if (index >= DIRECT_POINTERS) {
+				//indirect
+				position = BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS;
+				BlockInfo next = updateIndirectBlockInfo(info, index, position, DIRECT_POINTERS, MAX_DOUBLE_INDIRECT, value, mode);
+				if (next != info) {
+					info = next;
+					position = (index - DIRECT_POINTERS) * BYTES_PER_BLOCK_ADDRESS;
+					if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS < BLOCK_DATA_BYTES) {
+						info.buf.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+					}
+				}
 			} else {
-				freed.add(new TryFreeParameter(gid, oid, block));
+				position = BYTES_PER_BLOCK_ADDRESS*index;
+				if (mode == Mode.ALLOCATE) {
+					info.buf.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+				}
 			}
+			if (mode == Mode.ALLOCATE) {
+				dataBlock = nextBlock();
+				info.buf.putInt(position, dataBlock);
+				updatePhysicalBlock(info);
+			} else {
+				dataBlock = info.buf.getInt(position);
+				if (mode == Mode.UPDATE) {
+					info.buf.putInt(position, value);
+					updatePhysicalBlock(info);
+				}
+			}
+			return dataBlock;
 		}
 		
-		private void free(Long gid, Long oid, Integer block) {
-			if (block == null) {
-				Map<Long, Integer> map = physicalMapping.get(gid);
-				if (map == null) {
-					return;
+		private BlockInfo updateIndirectBlockInfo(BlockInfo info, int index, int position, int cutOff, int blockId, int value, Mode mode) {
+			int sib_index = info.buf.getInt(position);
+			boolean newBlock = false;
+			if (index == cutOff) {
+				if (mode == Mode.ALLOCATE) {
+					sib_index = nextBlock();
+					info.buf.putInt(position, sib_index);
+					updatePhysicalBlock(info);
+					newBlock = true;
+				} else if (mode == Mode.UPDATE && value == EMPTY_ADDRESS) {
+					freeDataBlock(sib_index);
+					return info;
 				}
-				block = map.remove(oid);
-				if (block == null) {
-					return;
+			}
+			info = getPhysicalBlock(sib_index, false, false);
+			if (newBlock) {
+				putBlockId(blockId, info.buf);
+			}
+			return info;
+		}
+
+		private int nextBlock() {
+			int next = -1;
+			synchronized (blocksInuse) {
+				next = blocksInuse.nextClearBit(lastBlock + 1);
+				if (next == -1) {
+					throw new TeiidRuntimeException("no freespace available"); //$NON-NLS-1$
 				}
+				blocksInuse.set(next, true);
 			}
-			byte[] buf = new byte[DATA_START];
-			try {
-				inuse.set(block, false);
-				boolean fragmented = false;
-				if (fragmentedFlags.get(block)) {
-					fragmented = true;
-					store.read(block << 14, buf, 0, buf.length);
-					fragmentedFlags.set(block, false);
-				} else {
-					//TODO: if not fragmented come up with a better approach than a disk read
-					store.read(block << 14, buf, 0, 20);
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+				LogManager.logTrace(LogConstants.CTX_DQP, "Allocating block", next, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+			}
+			if (trackLast) {
+				lastBlock = next;
+			}
+			return next;
+		}
+
+		@Override
+		public void freeBlock(int index) {
+			int dataBlock = getOrUpdateDataBlockIndex(index, EMPTY_ADDRESS, Mode.UPDATE);
+			freeDataBlock(dataBlock);
+		}
+
+		private void freeDataBlock(int dataBlock) {
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+				LogManager.logTrace(LogConstants.CTX_DQP, "freeing data block", dataBlock, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+			}
+			blockCache.remove(dataBlock);
+			blocksInuse.set(dataBlock, false);
+		}
+		
+		BlockInfo getInodeSubBlock(int inodeBlock, int inodePosition) {
+			BlockInfo bi = getPhysicalBlock(inodeBlock + blocksInUseBlocks + inodesInUseBlocks, true, false);
+			ByteBuffer bb = bi.buf.duplicate();
+			bb.position(inodePosition);
+			bb.limit(inodePosition + INODE_BYTES);
+			bb = bb.slice();
+			return new BlockInfo(true, bb, inodeBlock + blocksInUseBlocks + inodesInUseBlocks, inodePosition);
+		}
+
+		@Override
+		public void free() {
+			int inodeBlock = inode/inodesPerBlock;
+			int inodePosition = INODE_BYTES*(inode%inodesPerBlock);
+			BlockInfo bi = getInodeSubBlock(inodeBlock, inodePosition);
+			ByteBuffer ib = bi.buf;
+			int indirectIndexBlock = ib.getInt(ib.position() + BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS);
+			int doublyIndirectIndexBlock = ib.getInt(ib.position() + BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1));
+			boolean freedAll = freeBlock(ib, DIRECT_POINTERS, true);
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+				LogManager.logTrace(LogConstants.CTX_DQP, "freeing inode", inode, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+			}
+			inodesInuse.set(inode, false);
+			if (!freedAll || indirectIndexBlock == EMPTY_ADDRESS) {
+				return;
+			}
+			freedAll = freeIndirectBlock(indirectIndexBlock);
+			if (!freedAll || doublyIndirectIndexBlock == EMPTY_ADDRESS) {
+				return;
+			}
+			BlockInfo bb = getPhysicalBlock(doublyIndirectIndexBlock, false, false);
+			freeBlock(bb.buf, ADDRESSES_PER_BLOCK, false);
+			freeDataBlock(doublyIndirectIndexBlock);
+		}
+
+		private boolean freeIndirectBlock(int indirectIndexBlock) {
+			BlockInfo bb = getPhysicalBlock(indirectIndexBlock, false, false);
+			bb.buf.position(0);
+			boolean freedAll = freeBlock(bb.buf, ADDRESSES_PER_BLOCK, true);
+			freeDataBlock(indirectIndexBlock);
+			return freedAll;
+		}
+
+		private boolean freeBlock(ByteBuffer ib, int numPointers, boolean primary) {
+			for (int i = 0; i < numPointers; i++) {
+				int dataBlock = ib.getInt();
+				if (dataBlock == EMPTY_ADDRESS) {
+					return false;
 				}
-				ByteBuffer bb = ByteBuffer.wrap(buf);
-				bb.position(18);
-				short blocks = bb.getShort();
-				if (!fragmented) {
-					for (int i = block + 1; i < blocks; i++) {
-						inuse.set(i, false);
-					}
+				if (primary) {
+					freeDataBlock(dataBlock);
 				} else {
-					for (short i = 0; i < blocks; i++) {
-						int toFree = getNextBlock(bb);
-						inuse.set(toFree, false);
-					}
+					freeIndirectBlock(dataBlock);
 				}
-			} catch (IOException e) {
-				throw new TeiidRuntimeException(e, "Could not read intial block to process freeing " + oid); //$NON-NLS-1$
 			}
+			return true;
 		}
 
-		@SuppressWarnings("unchecked")
-		void add(final CacheEntry entry, final Serializer s) throws TeiidComponentException {
-			lock.writeLock().lock();
+		@Override
+		public BlockInfo allocateBlock(int blockNum) {
+			int dataBlock = getOrUpdateDataBlockIndex(blockNum, EMPTY_ADDRESS, Mode.ALLOCATE);
+			BlockInfo bb = getPhysicalBlock(dataBlock, false, true);
+			putBlockId(blockNum, bb.buf);
+			bb.buf.position(0);
+			bb.buf.limit(BLOCK_DATA_BYTES);
+			return bb;
+		}
+
+		private void putBlockId(int blockNum, ByteBuffer bb) {
+			bb.position(BLOCK_DATA_BYTES);
+			bb.putLong(gid);
+			bb.putLong(oid);
+			bb.put((byte)(blockNum >> 16));
+			bb.putShort((short)blockNum);
+		}
+
+	}
+
+	private StorageManager storageManager;
+	private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE;
+	private int inodes;
+	private int blocks;
+	private int inodesPerBlock;
+	private int inodesInUseBlocks;
+	private int blocksInUseBlocks;
+	private FileStore store;
+	private FileStore systemStore;
+	private BlockBitSetTree blocksInuse;
+	private BlockBitSetTree inodesInuse;
+
+	//root directory
+	ConcurrentHashMap<Long, BlockClosedLongIntHashTable> physicalMapping = new ConcurrentHashMap<Long, BlockClosedLongIntHashTable>();
+	
+	//block caching
+	int blockCacheSize = 128; //2 MB
+	ReentrantLock blockLock = new ReentrantLock();
+	LinkedHashMap<Integer, BlockInfo> blockCache = new LinkedHashMap<Integer, BlockInfo>() {
+		private static final long serialVersionUID = -4240291744435552008L;
+		BlockInfo eldestEntry = null;
+
+		protected boolean removeEldestEntry(Map.Entry<Integer,BlockInfo> eldest) {
+			if (size() > blockCacheSize) {
+				BlockInfo bi = eldest.getValue();
+				if (bi.dirty) {
+					eldestEntry = bi;
+				}
+				return true;
+			}
+			return false;
+		}
+		
+		public BlockInfo put(Integer key, BlockInfo value) {
+			blockLock.lock();
+			value.dirty = true;
 			try {
-				List<TryFreeParameter> toFree = Collections.emptyList();
-				synchronized (freed) {
-					if (!freed.isEmpty()) {
-						toFree = new ArrayList<TryFreeParameter>(freed);
-						freed.clear();
-					}
+				return super.put(key, value);
+			} finally {
+				BlockInfo toUpdate = eldestEntry;
+				eldestEntry = null;
+				blockLock.unlock();
+				if (toUpdate != null) {
+					updatePhysicalBlockDirect(toUpdate);
 				}
-				for (TryFreeParameter param : toFree) {
-					free(param.gid, param.oid, param.block);
-				}
-				ExtensibleBufferedOutputStream fsos = new ExtensibleBufferedOutputStream(new byte[IO_BUFFER_SIZE]) {
-					
-					byte[] firstBytes = null;
-					List<Integer> blocks = new ArrayList<Integer>(8);
-					boolean isFragmented;
-					int lastCount;
-					int start;
-					
-					@Override
-					protected void flushDirect() throws IOException {
-						lastCount = count;
-						if (firstBytes == null) {
-							start = nextBlock(-1);
-							firstBytes = buf;
-							//need a new buffer only if there could be bytes remaining
-							if (count == buf.length) {
-								buf = new byte[IO_BUFFER_SIZE];
-							}
-						} else {
-							int last = -1;
-							if (!blocks.isEmpty()) {
-								last = blocks.get(blocks.size()-1);
-							} else {
-								last = start;
-							}
-							int next = nextBlock(last);
-							blocks.add(next);
-							if (next != last + 1) {
-								isFragmented = true;
-							}
-							if (blocks.size() > MAX_BLOCKS) {
-								//TODO handle this case
-								throw new TeiidRuntimeException("Exceeded max persistent object size" + entry.getId()); //$NON-NLS-1$
-							}
-							store.write(next << 14, buf, 0, count);
-						}
-					}
-					
-					@Override
-					public void close() throws IOException {
-						super.close();
-						ByteBuffer bb = ByteBuffer.wrap(firstBytes);
-						bb.putLong(s.getId());
-						bb.putLong(entry.getId());
-						bb.putShort((short)(lastCount - (blocks.isEmpty()?DATA_START:0)));
-						bb.putShort((short)blocks.size());
-						for (Integer i : blocks) {
-							bb.put((byte)(i.intValue()>>16));
-							bb.putShort((short)i.intValue());
-						}
-						store.write(start << 14, firstBytes, 0, firstBytes.length);
-						if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
-							LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, id, entry.getId(), "batch written starting at:", start); //$NON-NLS-1$
-						}
-						Map<Long, Integer> map = physicalMapping.get(s.getId());
-						if (map == null) {
-							return;
-						}
-						map.put(entry.getId(), start);
-						inuse.set(start, true);
-						for (Integer i : blocks) {
-							inuse.set(i, true);
-						}
-						if (isFragmented) {
-							fragmentedFlags.set(start, true);
-						}
-					}
-
-				};
-				fsos.count = DATA_START; 
-	            ObjectOutputStream oos = new ObjectOutputStream(fsos);
-	            oos.writeInt(entry.getSizeEstimate());
-	            s.serialize(entry.getObject(), oos);
-	            oos.close();
-			} catch (IOException e) {
-				throw new TeiidComponentException(e);
-			} finally {
-				lock.writeLock().unlock();
 			}
-			
-			if (entry.getId().longValue() == 451) {
-				get(entry.getId(), s);
-			}
 		}
 		
-		int nextBlock(int fromIndex) {
-			int next = inuse.nextClearBit(fromIndex + 1);
-			if (next == -1) {
-				throw new TeiidRuntimeException("no freespace available on segment" + id); //$NON-NLS-1$
+		public BlockInfo get(Object key) {
+			blockLock.lock();
+			try {
+				return super.get(key);
+			} finally {
+				blockLock.unlock();
 			}
-			return next;
 		}
 		
-		@SuppressWarnings("unchecked")
-		CacheEntry get(Long oid, Serializer serializer) throws TeiidComponentException {
-			lock.readLock().lock();
+		public BlockInfo remove(Object key) {
+			blockLock.lock();
 			try {
-				Map<Long, Integer> map = physicalMapping.get(serializer.getId());
-				if (map == null) {
-					return null;
+				return super.remove(key);
+			} finally {
+				blockLock.unlock();
+			}
+		}
+	};
+	
+	BlockInfo getPhysicalBlock(int block, boolean system, boolean allocate) {
+		if (block < 0) {
+			throw new AssertionError("invalid block address " + block); //$NON-NLS-1$
+		}
+		try {
+			int key = block;
+			if (system) {
+				key |= SYSTEM_MASK;
+			}
+			BlockInfo result = blockCache.get(key);
+			assert result == null || !allocate; 
+			if (result == null) {
+				ByteBuffer bb = null;
+				if (system) {
+					bb = systemStore.getBuffer(block<<LOG_BLOCK_SIZE, BLOCK_SIZE, allocate);
+				} else {
+					bb = store.getBuffer(block<<LOG_BLOCK_SIZE, BLOCK_SIZE, allocate);
 				}
-				final Integer startBlock = map.get(oid);
-				if (startBlock == null) {
-					return null;
+				result = new BlockInfo(system, bb, block, -1);
+				blockLock.lock();
+				try {
+					BlockInfo existing = blockCache.get(key);
+					if (existing != null) {
+						return existing;
+					}
+					blockCache.put(key, result);
+				} finally {
+					blockLock.unlock();
 				}
-				ObjectInputStream ois = new ObjectInputStream(new InputStream() {
-					
-					int count;
-					int pos;
-					byte[] buf = new byte[IO_BUFFER_SIZE];
-					ByteBuffer firstBytes;
-					int currentBlock;
-					short tailBytes;
-					short totalBlocks;
-					short blockNum;
-					
-					@Override
-					public int read() throws IOException {
-						if (pos == count) {
-							if (firstBytes == null) {
-								store.readFully(startBlock << 14, buf, 0, buf.length);
-								firstBytes = ByteBuffer.wrap(buf);
-								firstBytes.position(16);
-								tailBytes = firstBytes.getShort();
-								totalBlocks = firstBytes.getShort();
-								count = buf.length;
-								pos = DATA_START;
-							} else {
-								buf = new byte[IO_BUFFER_SIZE];
-								//TODO: defrag on read
-								if (count == buf.length) {
-									currentBlock = getNextBlock(firstBytes);
-									blockNum++;
-									pos = 0;
-								}
-								int length = blockNum == totalBlocks?tailBytes:buf.length;
-								store.readFully(currentBlock << 14, buf, 0, length);
-								count = length;
-							}
-						} 
-						return buf[pos++] & 0xff;
+				return result;
+			}
+			return result;
+		} catch (IOException e) {
+			throw new TeiidRuntimeException(e);
+		}
+	}
+	
+	void updatePhysicalBlock(BlockInfo bi) {
+		int key = bi.physicalAddress;
+		if (bi.system) {
+			key |= SYSTEM_MASK;
+		}
+		if (bi.inodeOffset >= 0) {
+			blockLock.lock();
+			try {
+				BlockInfo actual = blockCache.get(key);
+				if (actual == null) {
+					//we're not in the cache, so just update storage
+					updatePhysicalBlockDirect(bi);
+				} else {
+					//TODO: check to see we're sharing the same buffer
+					for (int i = 0; i < INODE_BYTES; i++) {
+						actual.buf.put(bi.inodeOffset + i, bi.buf.get(i));
 					}
-				});
-				CacheEntry ce = new CacheEntry(oid);
-				ce.setSizeEstimate(ois.readInt());
-				ce.setObject(serializer.deserialize(ois));
-				ce.setPersistent(true);
-				return ce;
-	        } catch(IOException e) {
-	        	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", id)); //$NON-NLS-1$
-	        } catch (ClassNotFoundException e) {
-	        	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", id)); //$NON-NLS-1$
-	        } finally {
-	        	lock.readLock().unlock();
-	        }
+				}
+			} finally {
+				blockLock.unlock();
+			}
+			return;
 		}
+		blockCache.put(key, bi);
 	}
+
+	private void updatePhysicalBlockDirect(BlockInfo bi) {
+		try {
+			bi.buf.rewind();
+			if (!bi.system) {
+				store.updateFromBuffer(bi.buf, bi.physicalAddress<<LOG_BLOCK_SIZE);
+			} else {
+				systemStore.updateFromBuffer(bi.buf, bi.physicalAddress<<LOG_BLOCK_SIZE);				
+			}
+		} catch (IOException e) {
+			throw new TeiidRuntimeException(e);
+		}
+	}
 	
-	static int getNextBlock(ByteBuffer bb) {
-		int block = (bb.get() & 0xff)<< 16;
-		block += (bb.getShort() & 0xffff);
-		return block;
+	InodeBlockManager getBlockManager(long gid, long oid, int inode) {
+		return new InodeBlockManager(gid, oid, inode);
 	}
-	
-	private StorageManager storageManager;
-	private Segment[] segments;
-	private ConcurrentHashMap<Long, Map<Long, Integer>> physicalMapping = new ConcurrentHashMap<Long, Map<Long,Integer>>();
-	private int segmentCount = 32;
-	
+
+	@SuppressWarnings("unchecked")
 	@Override
-	public void add(CacheEntry entry, Serializer<?> s) {
-		Segment seg = getSegment(entry.getId());
+	public void add(CacheEntry entry, Serializer s) {
+		boolean success = false;
+		InodeBlockManager blockManager = null;
 		try {
-			seg.add(entry, s);
+			BlockClosedLongIntHashTable map = physicalMapping.get(s.getId());
+			if (map == null) {
+				return;
+			}
+			blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
+			blockManager.trackLast = true;
+			ExtensibleBufferedOutputStream fsos = new BlockOutputStream(blockManager);
+            ObjectOutputStream oos = new ObjectOutputStream(fsos);
+            oos.writeInt(entry.getSizeEstimate());
+            s.serialize(entry.getObject(), oos);
+            oos.close();
+            map.put(entry.getId(), blockManager.getInode());
+            success = true;
 		} catch (Throwable e) {
 			LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read batch "+ entry.getId() +" later will result in an exception"); //$NON-NLS-1$ //$NON-NLS-2$
+		} finally {
+			if (!success && blockManager != null) {
+				blockManager.free();
+			}
 		}
 	}
-	
+
 	@Override
-	public CacheEntry get(Long id, Serializer<?> serializer) throws TeiidComponentException {
-		Segment seg = getSegment(id);
-		return seg.get(id, serializer);
+	public CacheEntry get(Long oid, Serializer<?> serializer) throws TeiidComponentException {
+		try {
+			BlockClosedLongIntHashTable map = physicalMapping.get(serializer.getId());
+			if (map == null) {
+				return null;
+			}
+			final int inode = map.get(oid);
+			if (inode == EMPTY_ADDRESS) {
+				return null;
+			}
+			final BlockManager manager = getBlockManager(serializer.getId(), oid, inode);
+			ObjectInputStream ois = new ObjectInputStream(new InputStream() {
+				
+				int blockIndex;
+				BlockInfo buf;
+				
+				@Override
+				public int read() throws IOException {
+					ensureBytes();
+					return buf.buf.get() & 0xff;
+				}
+
+				private void ensureBytes() {
+					if (buf == null || buf.buf.remaining() == 0) {
+						if (buf != null) {
+							buf = null;
+						}
+						buf = manager.getBlock(blockIndex++);
+					}
+				}
+				
+				@Override
+				public int read(byte[] b, int off, int len)
+						throws IOException {
+					ensureBytes();
+					len = Math.min(len, buf.buf.remaining());
+					buf.buf.get(b, off, len);
+					return len;
+				}
+			});
+			CacheEntry ce = new CacheEntry(oid);
+			ce.setSizeEstimate(ois.readInt());
+			ce.setObject(serializer.deserialize(ois));
+			ce.setPersistent(true);
+			return ce;
+        } catch(IOException e) {
+        	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid)); //$NON-NLS-1$
+        } catch (ClassNotFoundException e) {
+        	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid)); //$NON-NLS-1$
+        }
 	}
 	
-	private Segment getSegment(Long id) {
-		return segments[id.hashCode() & (segments.length - 1)];
-	}
-	
 	@Override
 	public FileStore createFileStore(String name) {
 		return storageManager.createFileStore(name);
@@ -350,62 +653,76 @@
 	@Override
 	public void initialize() throws TeiidComponentException {
 		storageManager.initialize();
-		int numSegments = 31 - Integer.numberOfLeadingZeros(this.segmentCount);
-		segments = new Segment[1 << numSegments];
-		for (int i = 0; i < segments.length; i++) {
-			segments[i] = new Segment(i);
-			segments[i].store = storageManager.createFileStore("segment_" + i); //$NON-NLS-1$
-		}
+		int logSpace = Math.min(45, 63 - Long.numberOfLeadingZeros(this.maxBufferSpace));
+		
+		blocks = 1 << Math.min(Math.max(12, logSpace -LOG_BLOCK_SIZE +1), ADDRESS_BITS); //blocks per segment
+
+		inodes = blocks>>1;
+		inodesPerBlock = BLOCK_SIZE/INODE_BYTES;
+		
+		inodesInUseBlocks = computeInuseBlocks(inodes);
+		blocksInUseBlocks = computeInuseBlocks(blocks);
+		
+		inodesInuse = new BlockBitSetTree(inodes - 1, new BitSetBlockManager(0));
+		blocksInuse = new BlockBitSetTree(blocks - 1, new BitSetBlockManager(inodesInUseBlocks));
+		
+		store = storageManager.createFileStore("data_store"); //$NON-NLS-1$
+		systemStore = storageManager.createFileStore("system_store"); //$NON-NLS-1$
 	}
+	
+	static int computeInuseBlocks(int number) {
+		int blockCount = (number>>LOG_BLOCK_SIZE) + ((number&BLOCK_MASK)>0?1:0);
+		return (blockCount>>3) + ((blockCount&7)>0?1:0);	
+	}
 		
 	@Override
 	public void addToCacheGroup(Long gid, Long oid) {
-		Map<Long, Integer> map = physicalMapping.get(gid);
+		BlockClosedLongIntHashTable map = physicalMapping.get(gid);
 		if (map == null) {
 			return;
 		}
-		map.put(oid, null);
+		map.put(oid, EMPTY_ADDRESS);
 	}
 	
 	@Override
 	public void createCacheGroup(Long gid) {
-		physicalMapping.put(gid, Collections.synchronizedMap(new HashMap<Long, Integer>()));
+		BlockClosedLongIntHashTable map = new BlockClosedLongIntHashTable(getBlockManager(gid, -1, EMPTY_ADDRESS));
+		physicalMapping.put(gid, map);
 	}
 	
 	@Override
 	public void remove(Long gid, Long id) {
-		Map<Long, Integer> map = physicalMapping.get(gid);
+		BlockClosedLongIntHashTable map = physicalMapping.get(gid);
 		if (map == null) {
 			return;
 		}
-		Integer block = map.remove(id);
-		if (block != null) {
-			Segment s = getSegment(id);
-			s.tryFree(gid, id, block);
-		}
+		int inode = map.remove(id);
+		free(gid, id, inode);
 	}
-	
+
 	@Override
 	public Collection<Long> removeCacheGroup(Long gid) {
-		Map<Long, Integer> values = physicalMapping.remove(gid);
-		if (values == null) {
+		BlockClosedLongIntHashTable map = physicalMapping.remove(gid);
+		if (map == null) {
 			return Collections.emptySet();
 		}
-		synchronized (values) {
-			for (Map.Entry<Long, Integer> entry : values.entrySet()) {
-				if (entry.getValue() != null) {
-					Segment s = getSegment(entry.getKey());
-					s.tryFree(gid, entry.getKey(), entry.getValue());
-				}
+		Map<Long, Integer> values = map.remove();
+		for (Map.Entry<Long, Integer> entry : values.entrySet()) {
+			if (entry.getValue() != null) {
+				free(gid, entry.getKey(), entry.getValue());
 			}
-			return new HashSet<Long>(values.keySet());
 		}
+		return values.keySet();
 	}
 	
-	public void setSegmentCount(int segmentCount) {
-		this.segmentCount = segmentCount;
+	void free(Long gid, Long oid, int inode) {
+		if (inode == EMPTY_ADDRESS) {
+			return;
+		}
+		BlockManager bm = getBlockManager(gid, oid, inode);
+		bm.free();
 	}
-
+	
 	public void setStorageManager(StorageManager storageManager) {
 		this.storageManager = storageManager;
 	}
@@ -414,4 +731,16 @@
 		return storageManager;
 	}
 	
+	public void setMaxBufferSpace(long maxBufferSpace) {
+		this.maxBufferSpace = maxBufferSpace;
+	}
+	
+	public int getInodesInUse() {
+		return this.inodesInuse.getBitsSet();
+	}
+	
+	public int getDataBlocksInUse() {
+		return this.blocksInuse.getBitsSet();
+	}
+	
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java	2011-09-23 04:11:11 UTC (rev 3503)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -22,6 +22,7 @@
 
 package org.teiid.common.buffer.impl;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -86,6 +87,20 @@
 		public synchronized long getLength() {
 			return buffer.limit();
 		}
+		
+		@Override
+		public synchronized ByteBuffer getBuffer(long start, int length, boolean allocate) {
+			int position = (int)start;
+			buffer.limit(position + length);
+			buffer.position(position);
+			return buffer.slice();
+		}
+		
+		@Override
+		public void updateFromBuffer(ByteBuffer bb, long start)
+				throws IOException {
+			//do nothing we are sharing the bytes
+		}
 
 	}
 
@@ -134,7 +149,7 @@
 	@Override
 	public CacheEntry get(Long id, Serializer<?> serializer)
 			throws TeiidComponentException {
-		Map<Long, CacheEntry> group = groups.get(id);
+		Map<Long, CacheEntry> group = groups.get(serializer.getId());
 		if (group != null) {
 			return group.get(id);
 		}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java	2011-09-23 04:11:11 UTC (rev 3503)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -23,6 +23,7 @@
 package org.teiid.common.buffer.impl;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -30,6 +31,13 @@
 import org.teiid.common.buffer.StorageManager;
 import org.teiid.core.TeiidComponentException;
 
+/**
+ * A storage manager that combines smaller files into a larger
+ * logical file.
+ * 
+ * The buffer methods assume that buffers cannot go beyond single
+ * file boundaries.
+ */
 public class SplittableStorageManager implements StorageManager {
 	
 	public static final long DEFAULT_MAX_FILESIZE = 2 * 1024l;
@@ -82,7 +90,7 @@
 		    	ensureLength(fileOffset + length);
 	    		store = storageFiles.get((int)(fileOffset/maxFileSize));
 			}
-	    	long fileBegin = (int)(fileOffset%maxFileSize);
+	    	long fileBegin = fileOffset%maxFileSize;
 	    	length = Math.min(length, (int)Math.min(Integer.MAX_VALUE, maxFileSize - fileBegin));
 			store.write(fileBegin, b, offSet, length);
 			return length;
@@ -109,6 +117,28 @@
 			}
 			len = length;
 		}
+		
+		@Override
+		public ByteBuffer getBuffer(long start, int length, boolean allocate) throws IOException {
+			FileStore store = null;
+			synchronized (this) {
+				ensureLength(start + length);
+				store = storageFiles.get((int)(start/maxFileSize));
+			}
+	    	long fileBegin = start%maxFileSize;
+			return store.getBuffer(fileBegin, length, allocate);
+		}
+		
+		@Override
+		public void updateFromBuffer(ByteBuffer bb, long start)
+				throws IOException {
+			FileStore store = null;
+			synchronized (this) {
+				store = storageFiles.get((int)(start/maxFileSize));
+			}
+	    	long fileBegin = start%maxFileSize;
+			store.updateFromBuffer(bb, fileBegin);
+		}
 
 	    @Override
 	    public synchronized void setLength(long length) throws IOException {

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java	2011-09-23 04:11:11 UTC (rev 3503)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -86,7 +86,7 @@
 			SplittableStorageManager ssm = new SplittableStorageManager(storageManager);
 			ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
 			FileStoreCache fsc = new FileStoreCache();
-			fsc.setSegmentCount(1);
+			fsc.setMaxBufferSpace(Runtime.getRuntime().maxMemory()/4);
 			fsc.setStorageManager(ssm);
 			fsc.initialize();
 		    bufferManager.setCache(fsc);

Deleted: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBitSetTree.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBitSetTree.java	2011-09-23 04:11:11 UTC (rev 3503)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBitSetTree.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -1,60 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
- 
- package org.teiid.common.buffer.impl;
-
-import static org.junit.Assert.*;
-
-import java.util.BitSet;
-import java.util.Random;
-
-import org.junit.Test;
-
-public class TestBitSetTree {
-	
-	@Test public void testBitsSet() {
-		BitSetTree bst = new BitSetTree();
-		bst.set(1, true);
-		bst.set(100, true);
-		bst.set(10000, true);
-		bst.set(1000000, true);
-		assertEquals(4, bst.getBitsSet());
-	}
-	
-	@Test public void testNextClearSet() {
-		BitSetTree bst = new BitSetTree();
-		BitSet bst1 = new BitSet();
-		Random r = new Random(1);
-		for (int i = 0; i < 1000; i++) {
-			int rand = r.nextInt() & BitSetTree.MAX_INDEX;
-			bst.set(rand, true);
-			bst1.set(rand, true);
-		}
-		
-		for (int i = 0; i < 10000; i++) {
-			int rand = r.nextInt() & BitSetTree.MAX_INDEX;
-			assertEquals(bst1.nextClearBit(rand), bst.nextClearBit(rand));
-			assertEquals(bst1.nextSetBit(rand), bst.nextSetBit(rand));
-		}
-	}
-
-}

Copied: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java (from rev 3502, trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBitSetTree.java)
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java	                        (rev 0)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+ 
+ package org.teiid.common.buffer.impl;
+
+import static org.junit.Assert.*;
+
+import java.util.BitSet;
+import java.util.Random;
+
+import org.junit.Test;
+
+public class TestBlockBitSetTree {
+	
+	@Test public void testBitsSet() {
+		BlockBitSetTree bst = new BlockBitSetTree((1 << 20) -1, new TestBlockClosedLongIntHashTable.DummyBlockManager());
+		bst.set(1, true);
+		bst.set(100, true);
+		bst.set(10000, true);
+		bst.set(1000000, true);
+		assertEquals(4, bst.getBitsSet());
+		bst.set(1, false);
+		assertEquals(3, bst.getBitsSet());
+		assertFalse(bst.get(1));
+	}
+	
+	@Test public void testNextClearSet() {
+		BlockBitSetTree bst = new BlockBitSetTree((1 << 20) -1, new TestBlockClosedLongIntHashTable.DummyBlockManager());
+		BitSet bst1 = new BitSet();
+		Random r = new Random(1);
+		for (int i = 0; i < 1000; i++) {
+			int rand = r.nextInt() & bst.getMaxIndex();
+			bst.set(rand, true);
+			bst1.set(rand, true);
+			assertTrue(bst.get(rand));
+			assertEquals(bst1.nextSetBit(rand), bst.nextSetBit(rand));
+			assertEquals(String.valueOf(i), bst1.nextSetBit(rand), bst.nextSetBit(rand));
+		}
+		
+		for (int i = 0; i < 10000; i++) {
+			int rand = r.nextInt() & bst.getMaxIndex();
+			assertEquals(bst1.nextClearBit(rand), bst.nextClearBit(rand));
+			assertEquals(bst1.nextSetBit(rand), bst.nextSetBit(rand));
+		}
+	}
+
+}


Property changes on: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java	                        (rev 0)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -0,0 +1,92 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Test;
+import org.teiid.common.buffer.impl.FileStoreCache.BlockInfo;
+
+public class TestBlockClosedLongIntHashTable {
+	
+	public static final class DummyBlockManager implements BlockManager {
+		List<BlockInfo> blocks = new ArrayList<BlockInfo>();
+
+		@Override
+		public int getInode() {
+			return 0;
+		}
+		
+		@Override
+		public void updateBlock(BlockInfo block) {
+			
+		}
+
+		@Override
+		public void free() {
+			blocks.clear();
+		}
+		
+		@Override
+		public BlockInfo getBlock(int index) {
+			BlockInfo block = blocks.get(index);
+			block.buf.rewind();
+			return block;
+		}
+
+		@Override
+		public void freeBlock(int index) {
+			blocks.remove(index);
+		}
+
+		@Override
+		public BlockInfo allocateBlock(int index) {
+			assertEquals(index, blocks.size());
+			ByteBuffer result = ByteBuffer.wrap(new byte[FileStoreCache.BLOCK_SIZE]);
+			blocks.add(new BlockInfo(false, result, index, -1));
+			return blocks.get(blocks.size() - 1);
+		}
+	}
+
+	@Test public void testAgainstHashMap() {
+		BlockClosedLongIntHashTable table = new BlockClosedLongIntHashTable(new DummyBlockManager());
+		HashMap<Long, Integer> table1 = new HashMap<Long, Integer>(16);
+		for (long i = 1; i < 200000; i++) {
+			table.put(i, (int)i);
+			table1.put(i, (int)i);
+		}
+		Random r = new Random(0);
+		for (int i = 1; i < 2000000; i++) {
+			long toRemove = r.nextInt(i); 
+			boolean removed = table.remove(toRemove) != BlockClosedLongIntHashTable.EMPTY;
+			assertEquals(table1.remove(toRemove) != null, removed);
+		}
+	}
+
+}


Property changes on: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStoreCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStoreCache.java	2011-09-23 04:11:11 UTC (rev 3503)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStoreCache.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -27,51 +27,93 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.util.Arrays;
-import java.util.List;
 
 import org.junit.Test;
 import org.teiid.common.buffer.CacheEntry;
 import org.teiid.common.buffer.Serializer;
+import org.teiid.core.util.UnitTestUtil;
 
 public class TestFileStoreCache {
 	
+	private final class SimpleSerializer implements Serializer<Integer> {
+		@Override
+		public Integer deserialize(ObjectInputStream ois)
+				throws IOException, ClassNotFoundException {
+			Integer result = ois.readInt();
+			for (int i = 0; i < result; i++) {
+				assertEquals(i, ois.readInt());
+			}
+			return result;
+		}
+
+		@Override
+		public Long getId() {
+			return 1l;
+		}
+
+		@Override
+		public void serialize(Integer obj, ObjectOutputStream oos)
+				throws IOException {
+			oos.writeInt(obj);
+			for (int i = 0; i < obj; i++) {
+				oos.writeInt(i);
+			}
+		}
+
+		@Override
+		public boolean useSoftCache() {
+			return false;
+		}
+	}
+
 	@Test public void testAddGetMultiBlock() throws Exception {
 		FileStoreCache fsc = new FileStoreCache();
-		fsc.setStorageManager(new MemoryStorageManager());
+		fsc.setMaxBufferSpace(1 << 28);
+		SplittableStorageManager ssm = new SplittableStorageManager(new MemoryStorageManager());
+		ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
+		fsc.setStorageManager(ssm);
 		fsc.initialize();
 		
+		UnitTestUtil.enableTraceLogging("org.teiid");  //$NON-NLS-1$
+		
 		CacheEntry ce = new CacheEntry(2l);
-		Serializer<Object> s = new Serializer<Object>() {
-			@Override
-			public Object deserialize(ObjectInputStream ois)
-					throws IOException, ClassNotFoundException {
-				return ois.readObject();
-			}
-			
-			@Override
-			public Long getId() {
-				return 1l;
-			}
-			
-			@Override
-			public void serialize(Object obj, ObjectOutputStream oos)
-					throws IOException {
-				oos.writeObject(obj);
-			}
-			
-			@Override
-			public boolean useSoftCache() {
-				return false;
-			}
-		};
+		Serializer<Integer> s = new SimpleSerializer();
 		fsc.createCacheGroup(s.getId());
-		List<Object> cacheObject = Arrays.asList(new Object[10000]);
+		Integer cacheObject = Integer.valueOf(2);
 		ce.setObject(cacheObject);
 		fsc.add(ce, s);
 		
 		ce = fsc.get(2l, s);
 		assertEquals(cacheObject, ce.getObject());
+		
+		//test something that exceeds the direct inode data blocks
+		ce = new CacheEntry(3l);
+		cacheObject = Integer.valueOf(80000);
+		ce.setObject(cacheObject);
+		fsc.add(ce, s);
+		
+		ce = fsc.get(3l, s);
+		assertEquals(cacheObject, ce.getObject());
+		
+		fsc.removeCacheGroup(1l);
+		
+		assertEquals(0, fsc.getDataBlocksInUse());
+		assertEquals(0, fsc.getInodesInUse());
+		
+		//test something that exceeds the indirect inode data blocks
+		ce = new CacheEntry(3l);
+		fsc.createCacheGroup(s.getId());
+		cacheObject = Integer.valueOf(5000000);
+		ce.setObject(cacheObject);
+		fsc.add(ce, s);
+		
+		ce = fsc.get(3l, s);
+		assertEquals(cacheObject, ce.getObject());
+		
+		fsc.removeCacheGroup(1l);
+		
+		assertEquals(0, fsc.getDataBlocksInUse());
+		assertEquals(0, fsc.getInodesInUse());
 	}
 	
 }

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSizeUtility.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSizeUtility.java	2011-09-23 04:11:11 UTC (rev 3503)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSizeUtility.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -98,18 +98,6 @@
         helpTestGetSize("abcdefghij", 64); //$NON-NLS-1$
     }
 
-    public void XtestGetSizeLongString() {
-        // There is no clear way of figuring out the actual size of a string that is created
-        // from a StringBuffer because the buffer can sometimes be twice as big as the actual length of the string
-        // Since the data comin from the connector is not created this way, this test is an inaccurate setup 
-        int size = 10000;
-        StringBuffer str = new StringBuffer();
-        for(int i=0; i<size; i++) { 
-            str.append("a"); //$NON-NLS-1$
-        }
-        helpTestGetSize(str.toString(), size+3);
-    }
-
     @Test public void testGetSizeRow1() {
         List<Object> row = new ArrayList<Object>(1);
         row.add(new Integer(0));

Modified: trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java	2011-09-23 04:11:11 UTC (rev 3503)
+++ trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -66,7 +66,7 @@
     private long maxFileSize = SplittableStorageManager.DEFAULT_MAX_FILESIZE; // 2GB
     private int maxProcessingKb = BufferManager.DEFAULT_MAX_PROCESSING_KB;
     private int maxReserveKb = BufferManager.DEFAULT_RESERVE_BUFFER_KB;
-    private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE;
+    private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE>>20;
     private boolean inlineLobs = true;
 	private FileStorageManager fsm;
 	
@@ -108,6 +108,8 @@
                 ssm.setMaxFileSize(maxFileSize);
                 FileStoreCache fsc = new FileStoreCache();
                 fsc.setStorageManager(ssm);
+                fsc.setMaxBufferSpace(maxBufferSpace*MB);
+                fsc.initialize();
                 this.bufferMgr.setCache(fsc);
             } else {
             	this.bufferMgr.setCache(new MemoryStorageManager());

Modified: trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java	2011-09-23 04:11:11 UTC (rev 3503)
+++ trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java	2011-09-28 03:15:08 UTC (rev 3504)
@@ -30,9 +30,6 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.logging.Handler;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import org.jgroups.JChannelFactory;
 import org.junit.BeforeClass;
@@ -58,16 +55,7 @@
     
     @Test public void testReplication() throws Exception {
     	if (DEBUG) {
-	    	Logger logger = Logger.getLogger("org.teiid");
-	    	logger.setLevel(Level.FINEST);
-	    	for (Handler h : logger.getHandlers()) {
-				h.setLevel(Level.FINEST);
-			}
-	    	/*org.apache.log4j.Logger l = LogManager.getLogger("org.jgroups");
-	    	l.setLevel(org.apache.log4j.Level.TRACE);
-	    	ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
-	    	ca.setName("x");
-	    	l.addAppender(ca);*/
+	    	UnitTestUtil.enableTraceLogging("org.teiid");
     	}
     	
 		FakeServer server1 = createServer();



More information about the teiid-commits mailing list