[teiid-commits] teiid SVN: r3549 - in trunk/engine/src: main/java/org/teiid/common/buffer/impl and 1 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Oct 12 19:23:03 EDT 2011


Author: shawkins
Date: 2011-10-12 19:23:02 -0400 (Wed, 12 Oct 2011)
New Revision: 3549

Added:
   trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java
Modified:
   trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
Log:
TEIID-1750 reintroducing block storage with a fixed file allocation scheme

Added: trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java	2011-10-12 23:23:02 UTC (rev 3549)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer;
+
+public class BaseCacheEntry implements Comparable<BaseCacheEntry> {
+
+	private Long id;
+	protected long lastAccess;
+	protected double orderingValue;
+	
+	public BaseCacheEntry(Long id) {
+		this.id = id;
+	}
+	
+	public Long getId() {
+		return id;
+	}
+
+	@Override
+	public int hashCode() {
+		return getId().hashCode();
+	}
+	
+	@Override
+	public String toString() {
+		return getId().toString();
+	}
+
+	public long getLastAccess() {
+		return lastAccess;
+	}
+	
+	public void setLastAccess(long lastAccess) {
+		this.lastAccess = lastAccess;
+	}
+	
+	public double getOrderingValue() {
+		return orderingValue;
+	}
+	
+	public void setOrderingValue(double orderingValue) {
+		this.orderingValue = orderingValue;
+	}
+	
+	@Override
+	public int compareTo(BaseCacheEntry o) {
+		int result = (int) Math.signum(orderingValue - o.orderingValue);
+		if (result == 0) {
+			result = Long.signum(lastAccess - o.lastAccess);
+			if (result == 0) {
+				return Long.signum(id - o.id);
+			}
+		}
+		return result;
+	}
+
+}
\ No newline at end of file


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java	2011-10-12 20:56:57 UTC (rev 3548)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java	2011-10-12 23:23:02 UTC (rev 3549)
@@ -24,28 +24,16 @@
 
 import java.lang.ref.WeakReference;
 
-public class CacheEntry implements Comparable<CacheEntry>{
+public class CacheEntry extends BaseCacheEntry {
 	private boolean persistent;
 	private Object object;
 	private int sizeEstimate;
-	private long lastAccess;
-	private double orderingValue;
 	private WeakReference<? extends Serializer<?>> serializer;
-	private Long id;
 	
 	public CacheEntry(Long id) {
-		this.id = id;
+		super(id);
 	}
 	
-	public Long getId() {
-		return id;
-	}
-
-	@Override
-	public int hashCode() {
-		return getId().hashCode();
-	}
-	
 	public int getSizeEstimate() {
 		return sizeEstimate;
 	}
@@ -53,35 +41,7 @@
 	public void setSizeEstimate(int sizeEstimate) {
 		this.sizeEstimate = sizeEstimate;
 	}
-	
-	public long getLastAccess() {
-		return lastAccess;
-	}
-	
-	public void setLastAccess(long lastAccess) {
-		this.lastAccess = lastAccess;
-	}
-	
-	public double getOrderingValue() {
-		return orderingValue;
-	}
-	
-	public void setOrderingValue(double orderingValue) {
-		this.orderingValue = orderingValue;
-	}
-	
-	@Override
-	public int compareTo(CacheEntry o) {
-		int result = (int) Math.signum(orderingValue - o.orderingValue);
-		if (result == 0) {
-			result = Long.signum(lastAccess - o.lastAccess);
-			if (result == 0) {
-				return Long.signum(id - o.id);
-			}
-		}
-		return result;
-	}
-	
+		
 	public boolean equals(Object obj) {
 		if (obj == this) {
 			return true;
@@ -92,10 +52,6 @@
 		return getId().equals(((CacheEntry)obj).getId());
 	}
 
-	@Override
-	public String toString() {
-		return getId().toString();
-	}
 	
 	public Object nullOut() {
 		Object result = getObject();
@@ -124,8 +80,12 @@
 		this.serializer = serializer;
 	}
 
-	public WeakReference<? extends Serializer<?>> getSerializer() {
-		return serializer;
+	public Serializer<?> getSerializer() {
+		WeakReference<? extends Serializer<?>> ref = this.serializer;
+		if (ref == null) {
+			return null;
+		}
+		return ref.get();
 	}
 
 }
\ No newline at end of file

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java	2011-10-12 23:23:02 UTC (rev 3549)
@@ -0,0 +1,137 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Provides buffer slices or blocks off of a central
+ * set of buffers.
+ */
+public class BlockByteBuffer {
+	
+	private final static class ThreadLocalByteBuffer extends ThreadLocal<ByteBuffer> {
+		private final ByteBuffer byteBuffer;
+		
+		public ThreadLocalByteBuffer(ByteBuffer byteBuffer) {
+			this.byteBuffer = byteBuffer;
+		}
+		
+		protected ByteBuffer initialValue() {
+			return byteBuffer.duplicate();
+		}
+	}
+
+	private static class BlockInfo {
+		final ByteBuffer bb;
+		final int block;
+		public BlockInfo(ByteBuffer bb, int block) {
+			this.bb = bb;
+			this.block = block;
+		}
+	}
+	
+	private int blockAddressBits;
+	private int segmentAddressBits;
+	private int segmentSize;
+	private int blockSize;
+	private int blockCount;
+	private ThreadLocal<ByteBuffer>[] buffers;
+	private BlockInfo[] bufferCache;
+	
+	/**
+	 * Creates a new {@link BlockByteBuffer} where each buffer segment will be
+	 * 1 << segmentAddressBits (max of 30), and a total size of (1 << blockAddressBits)*blockCount.
+	 * @param segmentAddressBits
+	 * @param blockCount
+	 * @param blockAddressBits
+	 * @param direct
+	 */
+	@SuppressWarnings("unchecked")
+	public BlockByteBuffer(int segmentAddressBits, int blockCount, int blockAddressBits, boolean direct) {
+		this.segmentAddressBits = segmentAddressBits;
+		this.blockAddressBits = blockAddressBits;
+		this.blockSize = 1 << blockAddressBits;
+		this.segmentSize = 1 << this.segmentAddressBits;
+		this.blockCount = blockCount;
+		long size = ((long)blockCount)<<blockAddressBits;
+		int fullSegments = (int)size>>segmentAddressBits;
+		int lastSegmentSize = (int) (size&(segmentSize-1));
+		int segments = fullSegments;
+		if (lastSegmentSize > 0) {
+			segments++;
+		}
+		buffers = new ThreadLocal[segments];
+		for (int i = 0; i < fullSegments; i++) {
+			buffers[i] = new ThreadLocalByteBuffer(allocate(lastSegmentSize, direct));
+		}
+		if (lastSegmentSize > 0) {
+			buffers[fullSegments] = new ThreadLocalByteBuffer(allocate(lastSegmentSize, direct));
+		}
+		int logSize = 32 - Integer.numberOfLeadingZeros(blockCount);
+		bufferCache = new BlockInfo[Math.min(logSize, 20)];
+	}
+
+	public static ByteBuffer allocate(int size, boolean direct) {
+		if (direct) {
+			ByteBuffer newBuffer = ByteBuffer.allocateDirect(size);
+			int longsPerSegment = size>>3;
+			//manually initialize until java 7 when it's mandated (although this may already have been performed)
+			for (int j = 0; j < longsPerSegment; j++) {
+				newBuffer.putLong(0);
+			}
+			return newBuffer;
+		}
+		return ByteBuffer.allocate(size);
+	}
+	
+	/**
+	 * Return a buffer containing the given start byte.
+	 * It is assumed that the caller will handle blocks in
+	 * a thread safe manner.
+	 * @param startIndex
+	 * @return
+	 */
+	public ByteBuffer getByteBuffer(int block) {
+		if (block < 0 || block >= blockCount) {
+			throw new IndexOutOfBoundsException("Invalid block " + block); //$NON-NLS-1$
+		}
+		int cacheIndex = block&(bufferCache.length -1);
+		BlockInfo info = bufferCache[cacheIndex];
+		if (info != null && info.block == block) {
+			info.bb.rewind();
+			return info.bb;
+		}
+		int segment = block>>(segmentAddressBits-blockAddressBits);
+		ByteBuffer bb = buffers[segment].get();
+		bb.limit(bb.capacity());
+		int position = (block<<blockAddressBits)&(segmentSize-1);
+		bb.position(position);
+		bb.limit(position + blockSize);
+		bb = bb.slice();
+		info = new BlockInfo(bb, block);
+		bufferCache[cacheIndex] = info;
+		return bb;
+	}
+	
+}


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java	2011-10-12 23:23:02 UTC (rev 3549)
@@ -0,0 +1,81 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+final class BlockInputStream extends InputStream {
+	private final BlockManager manager;
+	private final int maxBlock;
+	int blockIndex;
+	ByteBuffer buf;
+	boolean free;
+	boolean done;
+
+	BlockInputStream(BlockManager manager, int blockCount, boolean free) {
+		this.manager = manager;
+		this.free = free;
+		this.maxBlock = blockCount;
+	}
+
+	@Override
+	public int read() {
+		ensureBytes();
+		if (done) {
+			return -1;
+		}
+		return buf.get() & 0xff;
+	}
+
+	private void ensureBytes() {
+		if (buf == null || buf.remaining() == 0) {
+			if (maxBlock == blockIndex) {
+				done = true;
+				if (blockIndex > 1 && free) {
+					manager.freeBlock(blockIndex - 1, false);
+				}
+				return;
+			}
+			buf = manager.getBlock(blockIndex++);
+			if (blockIndex > 2 && free) {
+				manager.freeBlock(blockIndex - 2, false);
+			}
+		}
+	}
+
+	@Override
+	public int read(byte[] b, int off, int len) {
+		ensureBytes();
+		if (done) {
+			return -1;
+		}
+		len = Math.min(len, buf.remaining());
+		buf.get(b, off, len);
+		return len;
+	}
+	
+	public int free(boolean steal) {
+		return manager.free(steal);
+	}
+}
\ No newline at end of file


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java	2011-10-12 23:23:02 UTC (rev 3549)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * Represents an INode
+ */
+public interface BlockManager {
+	
+	int getInode();
+	
+	ByteBuffer allocateBlock(int index);
+	
+	/**
+	 * Get the block for a given index.  Returns null if the block does not exist.
+	 * @param index
+	 * @return
+	 */
+	ByteBuffer getBlock(int index);
+	
+	int freeBlock(int index, boolean steal);
+	
+	int free(boolean steal);
+
+}


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java	2011-10-12 23:23:02 UTC (rev 3549)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+final class BlockOutputStream extends
+		ExtensibleBufferedOutputStream {
+	private final BlockManager blockManager;
+	int blockNum = -1;
+
+	BlockOutputStream(BlockManager blockManager) {
+		this.blockManager = blockManager;
+	}
+
+	@Override
+	protected ByteBuffer newBuffer() {
+		return blockManager.allocateBlock(++blockNum);
+	}
+	
+	@Override
+	protected int flushDirect(int i) throws IOException {
+		return i;
+	}
+}
\ No newline at end of file


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java	2011-10-12 23:23:02 UTC (rev 3549)
@@ -0,0 +1,43 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import org.teiid.common.buffer.FileStore;
+import org.teiid.common.buffer.StorageManager;
+
+class BlockStore {
+	final long blockSize;
+	final ConcurrentBitSet blocksInUse;
+	final FileStore[] stores;
+	
+	public BlockStore(StorageManager storageManager, int blockSize, int blockCountLog) {
+		this.blockSize = blockSize;
+		int blockCount = 1 << blockCountLog;
+		this.blocksInUse = new ConcurrentBitSet(blockCount, BufferManagerImpl.CONCURRENCY_LEVEL/2);
+		this.stores = new FileStore[BufferManagerImpl.CONCURRENCY_LEVEL/2];
+		for (int i = 0; i < stores.length; i++) {
+			this.stores[i] = storageManager.createFileStore(String.valueOf(blockSize) + '_' + i); 
+		}
+	}
+
+}
\ No newline at end of file


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-12 23:23:02 UTC (rev 3549)
@@ -0,0 +1,714 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.teiid.common.buffer.AutoCleanupUtil;
+import org.teiid.common.buffer.BaseCacheEntry;
+import org.teiid.common.buffer.Cache;
+import org.teiid.common.buffer.CacheEntry;
+import org.teiid.common.buffer.FileStore;
+import org.teiid.common.buffer.Serializer;
+import org.teiid.common.buffer.StorageManager;
+import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.util.ObjectConverterUtil;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.logging.MessageLevel;
+import org.teiid.query.QueryPlugin;
+
+/**
+ * Implements storage against a {@link FileStore} abstraction using a fronting
+ * memory buffer with a filesystem paradigm.  All objects must go through the 
+ * memory (typically off-heap) buffer so that they can be put into their appropriately 
+ * sized storage bucket.
+ * 
+ * The memory uses a 31bit address space on top of 2^13 byte blocks.
+ * 
+ * Therefore there is 2^31*2^13 = 2^44 or 16 terabytes max of addressable space.
+ * This is well beyond any current needs.
+ * 
+ * The 64 byte inode format is:
+ * 14 32 bit direct block pointers
+ * 1  32 bit block indirect pointer
+ * 1  32 bit block doubly indirect pointer (should be rarely used)
+ * 
+ * This means that the maximum number of blocks available to an object is
+ * 14 + (2^13)/4 + ((2^13)/4)^2 ~= 2^22
+ * 
+ * Thus the max serialized object size is:     2^22*(2^13)  ~= 32GB.
+ * 
+ * Typically the max object size will be much smaller, such as 8MB.
+ * 
+ * Inodes are held separately from the data/index blocks, and introduce an overhead
+ * that is ~ 1/128th the size of memory buffer.
+ * 
+ * The filesystem stores are broken up into block specific sizes starting with 8KB.
+ * 
+ * The root directory "physicalMapping" is held in memory for performance.  It will grow in
+ * proportion to the number of tables/tuplebuffers in use.
+ * 
+ * TODO: compact tail storage blocks.  there may be dangling blocks causing us to consume disk space.
+ */
+public class BufferFrontedFileStoreCache implements Cache, StorageManager {
+	
+	static final int ADDRESS_BITS = 31;
+	static final int SYSTEM_MASK = 1<<ADDRESS_BITS;
+	static final int BYTES_PER_BLOCK_ADDRESS = 4;
+	static final int INODE_BYTES = 16*BYTES_PER_BLOCK_ADDRESS;
+	static final int LOG_INODE_SIZE = 6;
+	static final int DIRECT_POINTERS = 14;
+	static final int EMPTY_ADDRESS = -1;
+	
+	//TODO allow the block size to be configurable
+	static final int LOG_BLOCK_SIZE = 13;
+	static final int BLOCK_SIZE = 1 << LOG_BLOCK_SIZE;
+	static final int BLOCK_MASK = BLOCK_SIZE - 1;
+	static final int ADDRESSES_PER_BLOCK = BLOCK_SIZE/BYTES_PER_BLOCK_ADDRESS;
+	static final int MAX_INDIRECT = DIRECT_POINTERS + ADDRESSES_PER_BLOCK;
+	static final int MAX_DOUBLE_INDIRECT = MAX_INDIRECT + ADDRESSES_PER_BLOCK * ADDRESSES_PER_BLOCK;
+	
+	private enum Mode {
+		GET,
+		UPDATE,
+		ALLOCATE
+	}
+	
+	private final class InodeBlockManager implements BlockManager {
+		private int inode;
+		private ByteBuffer inodeBuffer;
+		private final long gid;
+		private final long oid;
+
+		InodeBlockManager(long gid, long oid, int inode) {
+			this.inode = inode;
+			this.gid = gid;
+			this.oid = oid;
+		}
+		
+		@Override
+		public int getInode() {
+			return inode;
+		}
+
+		@Override
+		public ByteBuffer getBlock(int index) {
+			int dataBlock = getOrUpdateDataBlockIndex(index, EMPTY_ADDRESS, Mode.GET);
+			return blockByteBuffer.getByteBuffer(dataBlock);
+		}
+				
+		private int getOrUpdateDataBlockIndex(int index, int value, Mode mode) {
+			if (index >= MAX_DOUBLE_INDIRECT || (mode == Mode.ALLOCATE && index >= maxMemoryBlocks)) {
+				throw new TeiidRuntimeException("Max block number exceeded"); //$NON-NLS-1$
+			}
+			int dataBlock = 0;
+			int position = 0;
+			ByteBuffer info = getInodeBlock();
+			if (index >= MAX_INDIRECT) {
+				position = BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1);
+				ByteBuffer next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT, value, mode);
+				if (next != info) {
+					info = next;
+					//should have traversed to the secondary
+					int indirectAddressBlock = (index - MAX_INDIRECT) / ADDRESSES_PER_BLOCK;
+					position = indirectAddressBlock * BYTES_PER_BLOCK_ADDRESS;
+					if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS < BLOCK_SIZE) {
+						info.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+					}
+					next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT + indirectAddressBlock * ADDRESSES_PER_BLOCK,  value, mode);
+					if (next != info) {
+						info = next;
+						position = ((index - MAX_INDIRECT)%ADDRESSES_PER_BLOCK) * BYTES_PER_BLOCK_ADDRESS;
+					}
+				}
+			} else if (index >= DIRECT_POINTERS) {
+				//indirect
+				position = BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS;
+				ByteBuffer next = updateIndirectBlockInfo(info, index, position, DIRECT_POINTERS, value, mode);
+				if (next != info) {
+					info = next;
+					position = (index - DIRECT_POINTERS) * BYTES_PER_BLOCK_ADDRESS;
+				}
+			} else {
+				position = BYTES_PER_BLOCK_ADDRESS*index;
+			}
+			if (mode == Mode.ALLOCATE) {
+				dataBlock = nextBlock(true);
+				info.putInt(position, dataBlock);
+				if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS < BLOCK_SIZE) {
+					//maintain the invariant that the next pointer is empty
+					info.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+				}
+			} else {
+				dataBlock = info.getInt(position);
+				if (mode == Mode.UPDATE) {
+					info.putInt(position, value);
+				}
+			}
+			return dataBlock;
+		}
+		
+		private ByteBuffer updateIndirectBlockInfo(ByteBuffer buf, int index, int position, int cutOff, int value, Mode mode) {
+			int sib_index = buf.getInt(position);
+			if (index == cutOff) {
+				if (mode == Mode.ALLOCATE) {
+					sib_index = nextBlock(false);
+					buf.putInt(position, sib_index);
+				} else if (mode == Mode.UPDATE && value == EMPTY_ADDRESS) {
+					freeDataBlock(sib_index);
+					return buf;
+				}
+			}
+			return blockByteBuffer.getByteBuffer(sib_index);
+		}
+
+		/**
+		 * Get the next dataBlock.  When the memory buffer is full we have some
+		 * book keeping to do.
+		 * @return
+		 */
+		private int nextBlock(boolean data) {
+			int next = EMPTY_ADDRESS;
+			memoryEvictionLock.readLock().lock();
+			boolean readLocked = true;
+			try {
+				if ((next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS) {
+					memoryEvictionLock.readLock().unlock();
+					readLocked = false;
+					next = evictFromMemoryBuffer();
+				}
+			} finally {
+				if (readLocked) {
+					memoryEvictionLock.readLock().unlock();
+				}
+			}
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+				LogManager.logTrace(LogConstants.CTX_DQP, "Allocating", data?"data":"index", "block", next, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$
+			}
+			return next;
+		}
+
+		@Override
+		public int freeBlock(int index, boolean steal) {
+			int dataBlock = getOrUpdateDataBlockIndex(index, EMPTY_ADDRESS, Mode.UPDATE);
+			if (!steal) {
+				freeDataBlock(dataBlock);
+			}
+			return dataBlock;
+		}
+
+		private void freeDataBlock(int dataBlock) {
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+				LogManager.logTrace(LogConstants.CTX_DQP, "freeing data block", dataBlock, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+			}
+			blocksInuse.clear(dataBlock);
+		}
+		
+		private ByteBuffer getInodeBlock() {
+			if (inodeBuffer == null) {
+				if (inode == EMPTY_ADDRESS) {
+					this.inode = inodesInuse.getAndSetNextClearBit();
+					if (this.inode == -1) {
+						throw new AssertionError("Out of inodes"); //$NON-NLS-1$
+					}
+					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+						LogManager.logDetail(LogConstants.CTX_DQP, "Allocating inode", this.inode, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+					}
+					ByteBuffer bb = getInodeBlock();
+					bb.putInt(EMPTY_ADDRESS);
+				}
+				inodeBuffer = inodeByteBuffer.getByteBuffer(inode);
+			}
+			return inodeBuffer;
+		}
+
+		@Override
+		public int free(boolean steal) {
+			if (this.inode == EMPTY_ADDRESS) {
+				return EMPTY_ADDRESS;
+			}
+			ByteBuffer bb = getInodeBlock();
+			int dataBlockToSteal = bb.getInt(0);
+			int indirectIndexBlock = bb.getInt(BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS);
+			int doublyIndirectIndexBlock = bb.getInt(BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1));
+			boolean freedAll = freeBlock(steal?BYTES_PER_BLOCK_ADDRESS:0, bb, DIRECT_POINTERS, true);
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+				LogManager.logDetail(LogConstants.CTX_DQP, "freeing inode", inode, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+			}
+			inodesInuse.clear(inode);
+			if (!freedAll || indirectIndexBlock == EMPTY_ADDRESS) {
+				return steal?dataBlockToSteal:EMPTY_ADDRESS;
+			}
+			freedAll = freeIndirectBlock(indirectIndexBlock);
+			if (!freedAll || doublyIndirectIndexBlock == EMPTY_ADDRESS) {
+				return steal?dataBlockToSteal:EMPTY_ADDRESS;
+			}
+			bb = blockByteBuffer.getByteBuffer(doublyIndirectIndexBlock);
+			freeBlock(0, bb, ADDRESSES_PER_BLOCK, false);
+			freeDataBlock(doublyIndirectIndexBlock);
+			return steal?dataBlockToSteal:EMPTY_ADDRESS;
+		}
+
+		private boolean freeIndirectBlock(int indirectIndexBlock) {
+			ByteBuffer bb = blockByteBuffer.getByteBuffer(indirectIndexBlock);
+			boolean freedAll = freeBlock(0, bb, ADDRESSES_PER_BLOCK, true);
+			freeDataBlock(indirectIndexBlock);
+			return freedAll;
+		}
+
+		private boolean freeBlock(int startPosition, ByteBuffer ib, int numPointers, boolean primary) {
+			ib.position(startPosition);
+			for (int i = 0; i < numPointers; i++) {
+				int dataBlock = ib.getInt();
+				if (dataBlock == EMPTY_ADDRESS) {
+					return false;
+				}
+				if (primary) {
+					freeDataBlock(dataBlock);
+				} else {
+					freeIndirectBlock(dataBlock);
+				}
+			}
+			return true;
+		}
+
+		@Override
+		public ByteBuffer allocateBlock(int blockNum) {
+			int dataBlock = getOrUpdateDataBlockIndex(blockNum, EMPTY_ADDRESS, Mode.ALLOCATE);
+			return blockByteBuffer.getByteBuffer(dataBlock);
+		}
+	}
+
+	private static class PhysicalInfo extends BaseCacheEntry {
+		int inode = EMPTY_ADDRESS;
+		int block = EMPTY_ADDRESS;
+		int sizeIndex = 0;
+		final int memoryBlockCount;
+		final Long gid;
+		
+		public PhysicalInfo(Long gid, Long id, int inode, int size) {
+			super(id);
+			this.inode = inode;
+			this.gid = gid;
+			this.memoryBlockCount = (size>>LOG_BLOCK_SIZE) + ((size&BLOCK_MASK)>0?1:0);
+			int blocks = memoryBlockCount;
+			while (blocks >= 1) {
+				this.sizeIndex++;
+				blocks>>=2;
+			}
+		}
+	}
+	
+	double crfLamda = .0001;
+	
+	StorageManager storageManager;
+	int maxStorageObjectSize = 1 << 23; //8MB
+	private long memoryBufferSpace = 1 << 27;
+	private boolean direct;
+	
+	int maxMemoryBlocks;
+	private AtomicLong readAttempts = new AtomicLong();
+	PartiallyOrderedCache<Long, PhysicalInfo> memoryBufferEntries = new PartiallyOrderedCache<Long, PhysicalInfo>(16, .75f, BufferManagerImpl.CONCURRENCY_LEVEL) {
+		
+		@Override
+		protected void recordAccess(Long key, PhysicalInfo value, boolean initial) {
+			long lastAccess = value.getLastAccess();
+			value.setLastAccess(readAttempts.get());
+			if (initial && lastAccess == 0) {
+				return;
+			}
+			double orderingValue = value.getOrderingValue();
+			orderingValue = computeNextOrderingValue(value.getLastAccess(), lastAccess, orderingValue);
+			value.setOrderingValue(orderingValue);
+		}
+
+	};
+	private Semaphore memoryWritePermits; //prevents deadlock waiting for free blocks
+	ReentrantReadWriteLock memoryEvictionLock = new ReentrantReadWriteLock();
+	
+	private int blocks;
+	private ConcurrentBitSet blocksInuse;
+	private BlockByteBuffer blockByteBuffer;
+
+	private ConcurrentBitSet inodesInuse;
+	private BlockByteBuffer inodeByteBuffer;
+	
+	//root directory
+	private ConcurrentHashMap<Long, Map<Long, PhysicalInfo>> physicalMapping = new ConcurrentHashMap<Long, Map<Long, PhysicalInfo>>(16, .75f, BufferManagerImpl.CONCURRENCY_LEVEL);
+	private BlockStore[] sizeBasedStores;
+	
+	@Override
+	public void initialize() throws TeiidComponentException {
+		storageManager.initialize();
+		blocks = (int) Math.min(Integer.MAX_VALUE, (memoryBufferSpace>>LOG_BLOCK_SIZE));
+		inodesInuse = new ConcurrentBitSet(blocks+1, BufferManagerImpl.CONCURRENCY_LEVEL);
+		blocksInuse = new ConcurrentBitSet(blocks, BufferManagerImpl.CONCURRENCY_LEVEL);
+		this.blockByteBuffer = new BlockByteBuffer(30, blocks, LOG_BLOCK_SIZE, direct);
+		//ensure that we'll run out of blocks first
+		this.inodeByteBuffer = new BlockByteBuffer(30, blocks+1, LOG_INODE_SIZE, direct);
+		memoryBufferSpace = Math.max(memoryBufferSpace, maxStorageObjectSize);
+		memoryWritePermits = new Semaphore(Math.max(1, (int)Math.min(memoryBufferSpace/maxStorageObjectSize, Integer.MAX_VALUE)));
+		maxMemoryBlocks = Math.min(MAX_DOUBLE_INDIRECT, maxStorageObjectSize>>LOG_BLOCK_SIZE);
+		//account for index pointer block overhead
+		if (maxMemoryBlocks > DIRECT_POINTERS) {
+			maxMemoryBlocks--;
+		}
+		if (maxMemoryBlocks > MAX_INDIRECT) {
+			int indirect = maxMemoryBlocks-MAX_INDIRECT;
+			maxMemoryBlocks -= (indirect/ADDRESSES_PER_BLOCK + (indirect%ADDRESSES_PER_BLOCK>0?1:0) + 1);
+		}
+		List<BlockStore> stores = new ArrayList<BlockStore>();
+		int size = BLOCK_SIZE;
+		do {
+			stores.add(new BlockStore(this.storageManager, size, 30));
+			size <<=2;
+		} while (size>>2 < maxStorageObjectSize);
+		this.sizeBasedStores = stores.toArray(new BlockStore[stores.size()]);
+	}
+	
+	double computeNextOrderingValue(long currentTime,
+			long lastAccess, double orderingValue) {
+		orderingValue = 
+			//Frequency component
+			orderingValue*Math.pow(1-crfLamda, currentTime - lastAccess)
+			//recency component
+			+ Math.pow(currentTime, crfLamda);
+		return orderingValue;
+	}
+	
+	InodeBlockManager getBlockManager(long gid, long oid, int inode) {
+		return new InodeBlockManager(gid, oid, inode);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void add(CacheEntry entry, Serializer s) {
+		if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+			LogManager.logDetail(LogConstants.CTX_DQP, "adding object", s.getId(), entry.getId()); //$NON-NLS-1$
+		}
+		InodeBlockManager blockManager = null;
+		boolean hasPermit = false;
+		PhysicalInfo info = null;
+		boolean newEntry = true;
+		boolean success = false;
+		try {
+			Map<Long, PhysicalInfo> map = physicalMapping.get(s.getId());
+			if (map == null) {
+				return; //already removed
+			}
+			info = map.get(entry.getId());
+			if (info == null) {
+				if (!map.containsKey(entry.getId())) {
+					return; //already removed
+				}
+			} else {
+				newEntry = false;
+				synchronized (info) {
+					if (info.inode != EMPTY_ADDRESS || !shouldPlaceInMemoryBuffer(readAttempts.get(), info)) {
+						success = true;
+						return; 
+					}
+				}
+			}
+			memoryWritePermits.acquire();
+			hasPermit = true;
+			blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
+			ExtensibleBufferedOutputStream fsos = new BlockOutputStream(blockManager);
+            ObjectOutputStream oos = new ObjectOutputStream(fsos);
+            oos.writeInt(entry.getSizeEstimate());
+            s.serialize(entry.getObject(), oos);
+            oos.close();
+            synchronized (map) {
+            	//synchronize to ensure proper cleanup from a concurrent removal 
+            	if (physicalMapping.containsKey(s.getId()) && map.containsKey(entry.getId())) {
+            		if (newEntry) {
+	           			info = new PhysicalInfo(s.getId(), entry.getId(), blockManager.getInode(), fsos.getBytesWritten());
+		                map.put(entry.getId(), info);
+	            		memoryBufferEntries.put(entry.getId(), info);
+            		}
+            		success = true;
+            	}
+			}
+		} catch (Throwable e) {
+			LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read batch "+ entry.getId() +" later will result in an exception"); //$NON-NLS-1$ //$NON-NLS-2$
+		} finally {
+			if (hasPermit) {
+				memoryWritePermits.release();
+			}
+			if (!success && blockManager != null) {
+				blockManager.free(false);
+			}
+		}
+	}
+
+	@Override
+	public CacheEntry get(Long oid, Serializer<?> serializer) throws TeiidComponentException {
+		long currentTime = readAttempts.incrementAndGet();
+		try {
+			Map<Long, PhysicalInfo> map = physicalMapping.get(serializer.getId());
+			if (map == null) {
+				return null;
+			}
+			final PhysicalInfo info = map.get(oid);
+			if (info == null) {
+				return null;
+			}
+			CacheEntry ce = new CacheEntry(oid);
+			InputStream is = null;
+			synchronized (info) {
+				if (info.inode != EMPTY_ADDRESS) {
+					memoryBufferEntries.get(oid); //touch this entry
+					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+						LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at inode", info.inode, serializer.getId(), oid); //$NON-NLS-1$
+					}
+					BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
+					is = new BlockInputStream(manager, info.memoryBlockCount, false);
+				} else if (info.block != EMPTY_ADDRESS) {
+					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+						LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at block", info.block, info.sizeIndex, serializer.getId(), oid); //$NON-NLS-1$
+					}
+					BlockStore blockStore = sizeBasedStores[info.sizeIndex];
+					FileStore fs = blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
+					long blockOffset = (info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
+					is = fs.createInputStream(blockOffset);
+					if (shouldPlaceInMemoryBuffer(currentTime, info) && this.memoryWritePermits.tryAcquire()) {
+						BlockManager manager = null;
+						try {
+							manager = getBlockManager(info.gid, info.getId(), EMPTY_ADDRESS);
+							ExtensibleBufferedOutputStream os = new BlockOutputStream(manager);
+				            ObjectConverterUtil.write(os, is, -1);
+							memoryBufferEntries.put(info.getId(), info);
+							is = new BlockInputStream(manager, info.memoryBlockCount, false);
+						} finally {
+							this.memoryWritePermits.release();
+						}
+					} else {
+						this.toString();
+					}
+				} else {
+					return null;
+				}
+			}
+			ObjectInputStream ois = new ObjectInputStream(is);
+			ce.setSizeEstimate(ois.readInt());
+			ce.setLastAccess(1);
+			ce.setOrderingValue(1);
+			ce.setObject(serializer.deserialize(ois));
+			ce.setPersistent(true);
+			return ce;
+        } catch(IOException e) {
+        	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid)); //$NON-NLS-1$
+        } catch (ClassNotFoundException e) {
+        	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid)); //$NON-NLS-1$
+        }
+	}
+
+	private boolean shouldPlaceInMemoryBuffer(long currentTime, PhysicalInfo info) {
+		Map.Entry<PhysicalInfo, Long> lowest = memoryBufferEntries.firstEntry();
+		return lowest == null 
+				|| (blocksInuse.getTotalBits() - blocksInuse.getBitsSet()) > (info.memoryBlockCount<<3)
+				|| lowest.getKey().getOrderingValue() < computeNextOrderingValue(currentTime, info.getLastAccess(), info.getOrderingValue());
+	}
+	
+	@Override
+	public FileStore createFileStore(String name) {
+		return storageManager.createFileStore(name);
+	}
+	
+	public void setDirect(boolean direct) {
+		this.direct = direct;
+	}
+	
+	@Override
+	public void addToCacheGroup(Long gid, Long oid) {
+		Map<Long, PhysicalInfo> map = physicalMapping.get(gid);
+		if (map == null) {
+			return;
+		}
+		map.put(oid, null);
+	}
+	
+	@Override
+	public void createCacheGroup(Long gid) {
+		physicalMapping.put(gid, Collections.synchronizedMap(new HashMap<Long, PhysicalInfo>()));
+	}
+	
+	@Override
+	public void remove(Long gid, Long id) {
+		Map<Long, PhysicalInfo> map = physicalMapping.get(gid);
+		if (map == null) {
+			return;
+		}
+		PhysicalInfo info = map.remove(id);
+		free(id, info, false);
+	}
+
+	@Override
+	public Collection<Long> removeCacheGroup(Long gid) {
+		Map<Long, PhysicalInfo> map = physicalMapping.remove(gid);
+		if (map == null) {
+			return Collections.emptySet();
+		}
+		synchronized (map) {
+			for (Map.Entry<Long, PhysicalInfo> entry : map.entrySet()) {
+				free(entry.getKey(), entry.getValue(), false);
+			}
+			return map.keySet();
+		}
+	}
+	
+	int free(Long oid, PhysicalInfo info, boolean demote) {
+		memoryBufferEntries.remove(oid);
+		if (info == null) {
+			return EMPTY_ADDRESS;
+		}
+		synchronized (info) {
+			memoryBufferEntries.remove(oid);
+			if (info.inode == EMPTY_ADDRESS) {
+				return EMPTY_ADDRESS;
+			}
+			BlockManager bm = getBlockManager(info.gid, oid, info.inode);
+			info.inode = EMPTY_ADDRESS;
+			if (demote) {
+				if (info.block == EMPTY_ADDRESS) {
+					BlockInputStream is = new BlockInputStream(bm, info.memoryBlockCount, true);
+					BlockStore blockStore = sizeBasedStores[info.sizeIndex];
+					FileStore fs = blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
+					info.block = getAndSetNextClearBit(blockStore);
+					long blockOffset = (info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
+					byte[] b = new byte[BLOCK_SIZE];
+					int read = 0;
+					boolean errored = false;
+					while ((read = is.read(b, 0, b.length)) != -1) {
+						if (!errored) {
+							try {
+								fs.write(blockOffset, b, 0, read);
+								blockOffset+=read;
+							} catch (Throwable e) {
+								//just continue to free
+								errored = true;
+								LogManager.logError(LogConstants.CTX_DQP, e, "Error transferring block to storage " + oid); //$NON-NLS-1$
+							}
+						}
+					}
+					return is.free(true);
+				}
+				return bm.free(true);
+			}
+			bm.free(false);
+			if (info.block != EMPTY_ADDRESS) {
+				BlockStore blockStore = sizeBasedStores[info.sizeIndex];
+				blockStore.blocksInUse.clear(info.block);
+				info.block = EMPTY_ADDRESS;
+			}
+		}
+		return EMPTY_ADDRESS;
+	}
+	
+	static int getAndSetNextClearBit(BlockStore bs) {
+		int result = bs.blocksInUse.getAndSetNextClearBit();
+		if (result == -1) {
+			throw new TeiidRuntimeException("Out of blocks of size " + bs.blockSize); //$NON-NLS-1$
+		}
+		return result;
+	}
+	
+	/**
+	 * Stop the world eviction.  Hopefully this should rarely happen.
+	 * @return the stole dataBlock
+	 */
+	int evictFromMemoryBuffer() {
+		memoryEvictionLock.writeLock().lock();
+		int next = -1;
+		boolean writeLocked = true;
+		try {
+			for (int i = 0; i < 10 && next == EMPTY_ADDRESS; i++) {
+				AutoCleanupUtil.doCleanup();
+				Iterator<Map.Entry<PhysicalInfo, Long>> iter = memoryBufferEntries.getEvictionQueue().entrySet().iterator();
+				while ((next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS && iter.hasNext()) {
+					Map.Entry<PhysicalInfo, Long> entry = iter.next();
+					PhysicalInfo info = entry.getKey();
+					synchronized (info) {
+						if (info.inode == EMPTY_ADDRESS) {
+							continue;
+						}
+						memoryEvictionLock.writeLock().unlock();
+						writeLocked = false;
+						next = free(entry.getValue(), info, true);
+					}
+					break;
+				}
+			} 
+			if (next == -1) {
+				throw new AssertionError("Could not free space for pending write"); //$NON-NLS-1$
+			}
+		} finally {
+			if (writeLocked) {
+				memoryEvictionLock.writeLock().unlock();
+			}
+		}
+		return next;
+	}
+	
+	public void setStorageManager(StorageManager storageManager) {
+		this.storageManager = storageManager;
+	}
+	
+	public StorageManager getStorageManager() {
+		return storageManager;
+	}
+	
+	public void setMemoryBufferSpace(long maxBufferSpace) {
+		this.memoryBufferSpace = Math.min(maxBufferSpace, 1l<<(ADDRESS_BITS+LOG_BLOCK_SIZE));
+	}
+	
+	public int getInodesInUse() {
+		return this.inodesInuse.getBitsSet();
+	}
+	
+	public int getDataBlocksInUse() {
+		return this.blocksInuse.getBitsSet();
+	}
+	
+	public void setMaxStorageObjectSize(int maxStorageBlockSize) {
+		this.maxStorageObjectSize = maxStorageBlockSize;
+	}
+	
+}


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-12 20:56:57 UTC (rev 3548)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-12 23:23:02 UTC (rev 3549)
@@ -187,7 +187,7 @@
 			try {
 				//it's expected that the containing structure has updated the lob manager
 				BatchSerializer.writeBatch(oos, types, obj);
-			} catch (IndexOutOfBoundsException e) {
+			} catch (RuntimeException e) {
 				//there is a chance of a concurrent persist while modifying 
 				//in which case we want to swallow this exception
 				if (list == null || list.getModCount() == expectedModCount) {
@@ -219,7 +219,7 @@
 					return (List<List<?>>)(!retain?ce.nullOut():ce.getObject());
 				}
 				long count = readCount.incrementAndGet();
-				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
 					LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, id, id, "reading batch", batch, "from storage, total reads:", count); //$NON-NLS-1$ //$NON-NLS-2$
 				}
 				ce = cache.get(batch, this);
@@ -268,6 +268,7 @@
 		}
 	}
 
+	static final int CONCURRENCY_LEVEL = 32; //TODO: make this configurable
 	private static final int TARGET_BYTES_PER_ROW = 1 << 11; //2k bytes per row
 	private static ReferenceQueue<CacheEntry> SOFT_QUEUE = new ReferenceQueue<CacheEntry>();
 	
@@ -292,11 +293,11 @@
     
     //combined recency/frequency lamda value between 0 and 1 lower -> LFU, higher -> LRU
     //TODO: adaptively adjust this value.  more hits should move closer to lru
-    private final double crfLamda = .0002;
+    private final double crfLamda = .001;
     //implements a LRFU cache using the a customized crf function.  we store the value with
     //the cache entry to make a better decision about reuse of the batch
     //TODO: consider the size estimate in the weighting function
-    private OrderedCache<Long, CacheEntry> memoryEntries = new OrderedCache<Long, CacheEntry>() {
+    private PartiallyOrderedCache<Long, CacheEntry> memoryEntries = new PartiallyOrderedCache<Long, CacheEntry>(16, .75f, CONCURRENCY_LEVEL) {
 		
 		@Override
 		protected void recordAccess(Long key, CacheEntry value, boolean initial) {
@@ -599,7 +600,7 @@
 	}
 
 	void evict(CacheEntry ce) throws Exception {
-		Serializer<?> s = ce.getSerializer().get();
+		Serializer<?> s = ce.getSerializer();
 		if (s == null) {
 			return;
 		}
@@ -615,8 +616,8 @@
 			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
 				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, ce.getId(), "writing batch to storage, total writes: ", count); //$NON-NLS-1$
 			}
-			cache.add(ce, s);
 		}
+		cache.add(ce, s);
 		if (s.useSoftCache()) {
 			createSoftReference(ce);
 		} else if (useWeakReferences) {
@@ -689,7 +690,7 @@
 		if (inMemory) {
 			activeBatchKB.addAndGet(-ce.getSizeEstimate());
 		}
-		Serializer<?> s = ce.getSerializer().get();
+		Serializer<?> s = ce.getSerializer();
 		if (s != null) {
 			cache.remove(s.getId(), ce.getId());
 		}

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java	2011-10-12 23:23:02 UTC (rev 3549)
@@ -0,0 +1,145 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+ 
+package org.teiid.common.buffer.impl;
+
+import java.util.BitSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A segmented {@link BitSet} that supports greater concurrency
+ * and faster finding of clear bits.
+ */
+public class ConcurrentBitSet {
+	
+	private static class Segment {
+		int offset;
+		int maxBits;
+		int startSearch;
+		int bitsSet;
+		final private BitSet bitSet;
+		
+		public Segment(int bitCount) {
+			bitSet = new BitSet();
+			maxBits = bitCount;
+		}
+	}
+
+	private int bitsPerSegment;
+	private int totalBits;
+	private AtomicInteger counter = new AtomicInteger();
+	private AtomicInteger bitsSet = new AtomicInteger();
+	private Segment[] segments;
+	
+	public ConcurrentBitSet(int maxBits, int concurrencyLevel) {
+		if (maxBits < concurrencyLevel) {
+			concurrencyLevel = 1;
+			while (maxBits > 2*concurrencyLevel) {
+				concurrencyLevel <<=1;
+			}
+		}
+		segments = new Segment[concurrencyLevel];
+		bitsPerSegment = maxBits/concurrencyLevel;
+		int modBits = maxBits%concurrencyLevel;
+		if (modBits > 0) {
+			bitsPerSegment++;
+		}
+		for (int i = 0; i < concurrencyLevel; i++) {
+			segments[i] = new Segment(bitsPerSegment);
+			segments[i].offset = i*bitsPerSegment;
+			if (i == concurrencyLevel - 1) {
+				segments[i].maxBits -= (bitsPerSegment * concurrencyLevel)-maxBits;
+			}
+		}
+		this.totalBits = maxBits;
+	}
+	
+	public void clear(int bitIndex) {
+		checkIndex(bitIndex);
+		Segment s = segments[bitIndex/bitsPerSegment];
+		bitIndex = bitIndex%bitsPerSegment;
+		synchronized (s) {
+			if (!s.bitSet.get(bitIndex)) {
+				throw new AssertionError(bitIndex + " not set"); //$NON-NLS-1$
+			}
+			s.bitSet.clear(bitIndex);
+			s.bitsSet--;
+		}
+		bitsSet.decrementAndGet();
+	}
+	
+	/**
+	 * Makes a best effort to atomically find the next clear bit and set it
+	 * @return the next bit index or -1 if no clear bits are founds
+	 */
+	public int getAndSetNextClearBit() {
+		int start = counter.getAndIncrement();
+		int nextBit = -1;
+		for (int i = 0; i < segments.length; i++) {
+			Segment s = segments[(start+i)&(segments.length-1)];
+			synchronized (s) {
+				if (s.bitsSet == s.maxBits) {
+					continue;
+				}
+				nextBit = s.bitSet.nextClearBit(s.startSearch);
+				if (nextBit >= s.maxBits - 1) {
+					s.startSearch = 0;
+					nextBit = s.bitSet.nextClearBit(s.startSearch);
+					if (nextBit >= s.maxBits) {
+						throw new AssertionError("could not find clear bit"); //$NON-NLS-1$
+					}
+				}
+				s.bitsSet++;
+				s.bitSet.set(nextBit);
+				s.startSearch = nextBit + 1;
+				if (s.startSearch == s.maxBits) {
+					s.startSearch = 0;
+				}
+				nextBit += s.offset;
+				break;
+			}
+		}
+		if (nextBit != -1) {
+			bitsSet.getAndIncrement();
+		}
+		return nextBit;
+	}
+	
+	private void checkIndex(int bitIndex) {
+		if (bitIndex >= totalBits) {
+			throw new ArrayIndexOutOfBoundsException(bitIndex);
+		}
+	}
+	
+	public int getTotalBits() {
+		return totalBits;
+	}
+	
+	public int getBitsSet() {
+		return bitsSet.get();
+	}
+	
+	public int getBitsPerSegment() {
+		return bitsPerSegment;
+	}
+	
+}


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java	2011-10-12 23:23:02 UTC (rev 3549)
@@ -0,0 +1,149 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class PartiallyOrderedCache<K, V> {
+	
+	private int maxOrderedSize = 1 << 19;
+	
+	protected Map<K, V> map; 
+	//TODO: until Java 7 ConcurrentSkipListMap has a scaling bug in that
+	//the level limits the effective map size to ~ 2^16
+	//where it performs comparably under load to a synchronized LinkedHashMap
+	//just with more CPU overhead vs. wait time.
+	//TODO: have the concurrent version be pluggable
+	protected NavigableMap<V, K> evictionQueue = new TreeMap<V, K>();
+	//when we get to extreme number of entries we overflow into lru
+	protected Map<V, K> evictionQueueHead = new LinkedHashMap<V, K>();
+	//holds entries that are being evicted, but that might not yet be in a lower caching level
+	protected Map<K, V> limbo;
+	
+	public PartiallyOrderedCache(int initialCapacity, float loadFactor, int concurrencyLevel) {
+		map = new ConcurrentHashMap<K, V>(initialCapacity, loadFactor, concurrencyLevel);
+		limbo = new ConcurrentHashMap<K, V>(initialCapacity, loadFactor, concurrencyLevel);
+	}
+	
+	public void setMaxOrderedSize(int maxOrderedSize) {
+		this.maxOrderedSize = maxOrderedSize;
+	}
+		
+	public V get(K key) {
+		V result = map.get(key);
+		if (result == null) {
+			result = limbo.get(key);
+		}
+		if (result != null) {
+			maintainQueues(key, result, null);
+		}
+		return result;
+	}
+	
+	public V remove(K key) {
+		V result = map.remove(key);
+		if (result != null) {
+			synchronized (this) {
+				if (evictionQueue.remove(result) != null) {
+					orderedRemoved();
+				} else {
+					evictionQueueHead.remove(result);
+				}
+			}
+		}
+		return result;
+	}
+
+	private void orderedRemoved() {
+		if (evictionQueue.size() < (maxOrderedSize>>1) && evictionQueueHead.size() > 0) {
+			Iterator<Map.Entry<V,K>> i = evictionQueueHead.entrySet().iterator();
+			if (i.hasNext()) {
+				Map.Entry<V, K> entry = i.next();
+				if (map.containsKey(entry.getValue())) {
+					i.remove();
+					evictionQueue.put(entry.getKey(), entry.getValue());
+				}
+			}
+		}
+	}
+	
+	public V put(K key, V value) {
+		V result = map.put(key, value);
+		maintainQueues(key, value, result);
+		return result;
+	}
+
+	private void maintainQueues(K key, V value, V old) {
+		synchronized (this) {
+			if (old != null && evictionQueue.remove(old) == null) {
+				evictionQueueHead.remove(old);
+			}
+			recordAccess(key, value, old == null);
+			evictionQueue.put(value, key);
+			if (evictionQueue.size() > maxOrderedSize) {
+				Map.Entry<V, K> last = evictionQueue.pollLastEntry();
+				if (last != null) {
+					if (map.containsKey(last.getValue()) && !evictionQueue.containsKey(last.getKey())) {
+						evictionQueueHead.put(last.getKey(), last.getValue());
+					}
+				}
+			}
+		}
+	}
+	
+	public V evict() {
+		Map.Entry<V, K> entry = evictionQueue.pollFirstEntry();
+		if (entry == null) {
+			return null;
+		}
+		synchronized (this) {
+			orderedRemoved();
+		}
+		limbo.put(entry.getValue(), entry.getKey());
+		return map.remove(entry.getValue());
+	}
+	
+	public Map<V, K> getEvictionQueue() {
+		return evictionQueue;
+	}
+	
+	public Map.Entry<V, K> firstEntry() {
+		return evictionQueue.firstEntry();
+	}
+	
+	public void finishedEviction(K key) {
+		limbo.remove(key);
+	}
+	
+	public int size() {
+		return map.size();
+	}
+	
+	protected abstract void recordAccess(K key, V value, boolean initial);
+	
+}


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java	                        (rev 0)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java	2011-10-12 23:23:02 UTC (rev 3549)
@@ -0,0 +1,171 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.ref.WeakReference;
+
+import org.junit.Test;
+import org.teiid.common.buffer.CacheEntry;
+import org.teiid.common.buffer.Serializer;
+import org.teiid.core.TeiidComponentException;
+
+public class TestBufferFrontedFileStoreCache {
+	
+	private final class SimpleSerializer implements Serializer<Integer> {
+		@Override
+		public Integer deserialize(ObjectInputStream ois)
+				throws IOException, ClassNotFoundException {
+			Integer result = ois.readInt();
+			for (int i = 0; i < result; i++) {
+				assertEquals(i, ois.readInt());
+			}
+			return result;
+		}
+
+		@Override
+		public Long getId() {
+			return 1l;
+		}
+
+		@Override
+		public void serialize(Integer obj, ObjectOutputStream oos)
+				throws IOException {
+			oos.writeInt(obj);
+			for (int i = 0; i < obj; i++) {
+				oos.writeInt(i);
+			}
+		}
+
+		@Override
+		public boolean useSoftCache() {
+			return false;
+		}
+	}
+
+	@Test public void testAddGetMultiBlock() throws Exception {
+		BufferFrontedFileStoreCache cache = createLayeredCache(1 << 26, 1 << 26);
+		
+		CacheEntry ce = new CacheEntry(2l);
+		Serializer<Integer> s = new SimpleSerializer();
+		cache.createCacheGroup(s.getId());
+		Integer cacheObject = Integer.valueOf(2);
+		ce.setObject(cacheObject);
+		cache.addToCacheGroup(s.getId(), ce.getId());
+		cache.add(ce, s);
+		
+		ce = cache.get(2l, s);
+		assertEquals(cacheObject, ce.getObject());
+		
+		//test something that exceeds the direct inode data blocks
+		ce = new CacheEntry(3l);
+		cacheObject = Integer.valueOf(80000);
+		ce.setObject(cacheObject);
+		cache.addToCacheGroup(s.getId(), ce.getId());
+		cache.add(ce, s);
+		
+		ce = cache.get(3l, s);
+		assertEquals(cacheObject, ce.getObject());
+		
+		cache.removeCacheGroup(1l);
+		
+		assertEquals(0, cache.getDataBlocksInUse());
+		assertEquals(0, cache.getInodesInUse());
+		
+		//test something that exceeds the indirect data blocks
+		ce = new CacheEntry(3l);
+		cache.createCacheGroup(s.getId());
+		cacheObject = Integer.valueOf(5000000);
+		ce.setObject(cacheObject);
+		cache.addToCacheGroup(s.getId(), ce.getId());
+		cache.add(ce, s);
+		
+		ce = cache.get(3l, s);
+		assertEquals(cacheObject, ce.getObject());
+
+		cache.removeCacheGroup(1l);
+		
+		assertEquals(0, cache.getDataBlocksInUse());
+		assertEquals(0, cache.getInodesInUse());
+
+		//test something that exceeds the allowable object size
+		ce = new CacheEntry(3l);
+		cache.createCacheGroup(s.getId());
+		cacheObject = Integer.valueOf(500000000);
+		ce.setObject(cacheObject);
+		cache.addToCacheGroup(s.getId(), ce.getId());
+		cache.add(ce, s);
+		
+		ce = cache.get(3l, s);
+		assertNull(ce);
+
+		cache.removeCacheGroup(1l);
+		
+		assertEquals(0, cache.getDataBlocksInUse());
+		assertEquals(0, cache.getInodesInUse());
+	}
+	
+	@Test public void testEviction() throws Exception {
+		BufferFrontedFileStoreCache cache = createLayeredCache(1<<15, 1<<15);
+		
+		CacheEntry ce = new CacheEntry(2l);
+		Serializer<Integer> s = new SimpleSerializer();
+		WeakReference<? extends Serializer<?>> ref = new WeakReference<Serializer<?>>(s);
+		ce.setSerializer(ref);
+		cache.createCacheGroup(s.getId());
+		Integer cacheObject = Integer.valueOf(5000);
+		ce.setObject(cacheObject);
+		cache.addToCacheGroup(s.getId(), ce.getId());
+		cache.add(ce, s);
+		
+		ce = new CacheEntry(3l);
+		ce.setSerializer(ref);
+		cacheObject = Integer.valueOf(5000);
+		ce.setObject(cacheObject);
+		cache.addToCacheGroup(s.getId(), ce.getId());
+		cache.add(ce, s);
+
+		assertEquals(3, cache.getDataBlocksInUse());
+		assertEquals(1, cache.getInodesInUse());
+
+		ce = cache.get(2l, s);
+		assertEquals(Integer.valueOf(5000), ce.getObject());
+	}
+
+	private BufferFrontedFileStoreCache createLayeredCache(int bufferSpace, int objectSize) throws TeiidComponentException {
+		BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
+		fsc.setMemoryBufferSpace(bufferSpace);
+		fsc.setMaxStorageObjectSize(objectSize);
+		fsc.setDirect(false);
+		SplittableStorageManager ssm = new SplittableStorageManager(new MemoryStorageManager());
+		ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
+		fsc.setStorageManager(ssm);
+		fsc.initialize();
+		return fsc;
+	}
+	
+}


Property changes on: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java	                        (rev 0)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java	2011-10-12 23:23:02 UTC (rev 3549)
@@ -0,0 +1,65 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+ 
+ package org.teiid.common.buffer.impl;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class TestConcurrentBitSet {
+	
+	@Test public void testBitsSet() {
+		ConcurrentBitSet bst = new ConcurrentBitSet(50001, 4);
+		assertEquals(0, bst.getAndSetNextClearBit());
+		assertEquals(12501, bst.getAndSetNextClearBit());
+		assertEquals(25002, bst.getAndSetNextClearBit());
+		assertEquals(37503, bst.getAndSetNextClearBit());
+		assertEquals(1, bst.getAndSetNextClearBit());
+		assertEquals(5, bst.getBitsSet());
+		bst.clear(1);
+		assertEquals(4, bst.getBitsSet());
+		bst.clear(12501);
+		try {
+			bst.clear(30000);
+			fail();
+		} catch (AssertionError e) {
+			
+		}
+		assertEquals(3, bst.getBitsSet());
+		
+		for (int i = 0; i < bst.getTotalBits()-3;i++) {
+			assertTrue(bst.getAndSetNextClearBit() != -1);
+		}
+		
+		bst.clear(5);
+		bst.clear(12505);
+		bst.clear(25505);
+		bst.clear(37505);
+		
+		for (int i = 0; i < 4; i++) {
+			int bit = bst.getAndSetNextClearBit();
+			assertTrue(bit < bst.getTotalBits() && bit > 0);
+		}
+	}
+	
+}


Property changes on: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java	                        (rev 0)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java	2011-10-12 23:23:02 UTC (rev 3549)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+public class TestPartiallyOrderedCache {
+
+	@Test public void testQueueMaintenance() {
+		PartiallyOrderedCache<Integer, Integer> cache = new PartiallyOrderedCache<Integer, Integer>(16, .75f, 16) {
+			
+			@Override
+			protected void recordAccess(Integer key, Integer value, boolean initial) {
+				
+			}
+		};
+		
+		cache.setMaxOrderedSize(5);
+		
+		for (int i = 0; i < 10; i++) {
+			cache.put(i, i);
+		}
+		
+		cache.get(8);
+		cache.get(1);
+		
+		List<Integer> evictions = new ArrayList<Integer>();
+		for (int i = 0; i < 10; i++) {
+			evictions.add(i);
+		}
+		//we expect natural order because the lru is converted into the sorted on natural key
+		assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), evictions);
+	}
+	
+}


Property changes on: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain



More information about the teiid-commits mailing list