[teiid-commits] teiid SVN: r3576 - in trunk: engine/src/main/java/org/teiid/common/buffer/impl and 3 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Sat Oct 22 07:40:30 EDT 2011


Author: shawkins
Date: 2011-10-22 07:40:30 -0400 (Sat, 22 Oct 2011)
New Revision: 3576

Added:
   trunk/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java
Modified:
   trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
   trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
   trunk/engine/src/main/java/org/teiid/common/buffer/Serializer.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
   trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
Log:
TEIID-1750 further refining serialization and adding defrag

Added: trunk/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -0,0 +1,69 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public abstract class ExtensibleBufferedInputStream extends InputStream {
+	ByteBuffer buf;
+
+	@Override
+	public int read() throws IOException {
+		if (!ensureBytes()) {
+			return -1;
+		}
+		return buf.get() & 0xff;
+	}
+
+	private boolean ensureBytes() throws IOException {
+		if (buf == null || buf.remaining() == 0) {
+			buf = nextBuffer();
+			if (buf == null) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	protected abstract ByteBuffer nextBuffer() throws IOException;
+
+	@Override
+	public int read(byte[] b, int off, int len) throws IOException {
+		if (!ensureBytes()) {
+			return -1;
+		}
+		len = Math.min(len, buf.remaining());
+		buf.get(b, off, len);
+		return len;
+	}
+	
+	@Override
+	public void reset() throws IOException {
+		if (buf != null) {
+			buf.rewind();
+		}
+	}
+	
+}
\ No newline at end of file


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -25,6 +25,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -189,36 +190,31 @@
 	protected abstract void removeDirect();
 	
 	public InputStream createInputStream(final long start, final long length) {
-		return new InputStream() {
+		return new ExtensibleBufferedInputStream() {
 			private long offset = start;
 			private long streamLength = length;
+			private ByteBuffer bb = ByteBuffer.allocate(1<<13);
 			
 			@Override
-			public int read() throws IOException {
-				byte[] buffer = new byte[1];
-				int read = read(buffer, 0, 1);
-				if (read == -1) {
-					return -1;
-				}
-				return buffer[0];
-			}
-			
-			@Override
-			public int read(byte[] b, int off, int len) throws IOException {
+			protected ByteBuffer nextBuffer() throws IOException {
+				int len = bb.capacity();
 				if (this.streamLength != -1 && len > this.streamLength) {
 					len = (int)this.streamLength;
 				}
 				if (this.streamLength == -1 || this.streamLength > 0) {
-					int bytes = FileStore.this.read(offset, b, off, len);
-					if (bytes != -1) {
-						this.offset += bytes;
-						if (this.streamLength != -1) {
-							this.streamLength -= bytes;
-						}
+					int bytes = FileStore.this.read(offset, bb.array(), 0, len);
+					if (bytes == -1) {
+						return null;
 					}
-					return bytes;
+					bb.rewind();
+					bb.limit(bytes);
+					this.offset += bytes;
+					if (this.streamLength != -1) {
+						this.streamLength -= bytes;
+					}
+					return bb;
 				}
-				return -1;
+				return null;
 			}
 		};
 	}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java	2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -22,7 +22,6 @@
  
 package org.teiid.common.buffer;
 
-import java.io.BufferedInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -50,8 +49,7 @@
 		if (fsos != null && !fsos.bytesWritten()) {
 			return new ByteArrayInputStream(fsos.getBuffer(), 0, fsos.getCount());
 		}
-		//TODO: adjust the buffer size, and/or develop a shared buffer strategy
-		return new BufferedInputStream(lobBuffer.createInputStream(0));
+		return lobBuffer.createInputStream(0);
 	}
 
 	@Override

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/Serializer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/Serializer.java	2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/Serializer.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -23,16 +23,16 @@
 package org.teiid.common.buffer;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 
 /**
  * Responsible for serializing {@link CacheEntry}s
  * @param <T>
  */
 public interface Serializer<T> {
-	void serialize(T obj, ObjectOutputStream oos) throws IOException;
-	T deserialize(ObjectInputStream ois) throws IOException, ClassNotFoundException;
+	void serialize(T obj, ObjectOutput oos) throws IOException;
+	T deserialize(ObjectInput ois) throws IOException, ClassNotFoundException;
 	boolean useSoftCache();
 	Long getId();
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java	2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -22,18 +22,17 @@
 
 package org.teiid.common.buffer.impl;
 
-import java.io.InputStream;
 import java.nio.ByteBuffer;
 
+import org.teiid.common.buffer.ExtensibleBufferedInputStream;
+
 /**
  * TODO: support freeing of datablocks as we go
  */
-final class BlockInputStream extends InputStream {
+final class BlockInputStream extends ExtensibleBufferedInputStream {
 	private final BlockManager manager;
 	private final int maxBlock;
 	int blockIndex;
-	ByteBuffer buf;
-	boolean done;
 
 	BlockInputStream(BlockManager manager, int blockCount) {
 		this.manager = manager;
@@ -41,33 +40,11 @@
 	}
 
 	@Override
-	public int read() {
-		ensureBytes();
-		if (done) {
-			return -1;
+	protected ByteBuffer nextBuffer() {
+		if (maxBlock == blockIndex) {
+			return null;
 		}
-		return buf.get() & 0xff;
+		return manager.getBlock(blockIndex++);
 	}
-
-	private void ensureBytes() {
-		if (buf == null || buf.remaining() == 0) {
-			if (maxBlock == blockIndex) {
-				done = true;
-				return;
-			}
-			buf = manager.getBlock(blockIndex++);
-		}
-	}
-
-	@Override
-	public int read(byte[] b, int off, int len) {
-		ensureBytes();
-		if (done) {
-			return -1;
-		}
-		len = Math.min(len, buf.remaining());
-		buf.get(b, off, len);
-		return len;
-	}
 	
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java	2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -22,8 +22,16 @@
 
 package org.teiid.common.buffer.impl;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.teiid.common.buffer.FileStore;
 import org.teiid.common.buffer.StorageManager;
+import org.teiid.core.TeiidRuntimeException;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.logging.MessageLevel;
 
 /**
  * Represents a FileStore that holds blocks of a fixed size.
@@ -32,6 +40,7 @@
 	final long blockSize;
 	final ConcurrentBitSet blocksInUse;
 	final FileStore[] stores;
+	final ReentrantReadWriteLock[] locks;
 	
 	public BlockStore(StorageManager storageManager, int blockSize, int blockCountLog, int concurrencyLevel) {
 		this.blockSize = blockSize;
@@ -39,9 +48,50 @@
 		this.blocksInUse = new ConcurrentBitSet(blockCount, concurrencyLevel);
 		this.blocksInUse.setCompact(true);
 		this.stores = new FileStore[concurrencyLevel];
+		this.locks = new ReentrantReadWriteLock[concurrencyLevel];
 		for (int i = 0; i < stores.length; i++) {
-			this.stores[i] = storageManager.createFileStore(String.valueOf(blockSize) + '_' + i); 
+			this.stores[i] = storageManager.createFileStore(String.valueOf(blockSize) + '_' + i);
+			this.locks[i] = new ReentrantReadWriteLock();
 		}
+		
 	}
+	
+	int getAndSetNextClearBit(PhysicalInfo info) {
+		int result = blocksInUse.getAndSetNextClearBit();
+		if (result == -1) {
+			throw new TeiidRuntimeException("Out of blocks of size " + blockSize); //$NON-NLS-1$
+		}
+		if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+			LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Allocating storage data block", result, "of size", blockSize, "to", info); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ 
+		}
+		return result;
+	}
+	
+	int writeToStorageBlock(PhysicalInfo info,
+			InputStream is) throws IOException {
+		int block = getAndSetNextClearBit(info);
+		int segment = block/blocksInUse.getBitsPerSegment();
+		boolean success = false;
+		//we're using the read lock here so that defrag can lock the write out
+		locks[segment].readLock().lock();
+		try {
+			FileStore fs = stores[segment];
+			long blockOffset = (block%blocksInUse.getBitsPerSegment())*blockSize;
+			byte[] b = new byte[BufferFrontedFileStoreCache.BLOCK_SIZE];
+			int read = 0;
+			while ((read = is.read(b, 0, b.length)) != -1) {
+				fs.write(blockOffset, b, 0, read);
+				blockOffset+=read;
+			}
+			success = true;
+		} finally {
+			locks[segment].readLock().unlock();
+			if (!success) {
+				blocksInUse.clear(block);
+				block = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
+			}
+		}
+		return block;
+	}
 
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -22,10 +22,11 @@
 
 package org.teiid.common.buffer.impl;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -89,17 +90,14 @@
  * The root directory "physicalMapping" is held in memory for performance.  It will grow in
  * proportion to the number of tables/tuplebuffers in use.
  * 
- * TODO: compact tail storage blocks.  there may be dangling blocks causing us to consume disk space.
- * we should at least reclaim tail space if the end block is removed.  for now we are just relying
- * on the compact option of {@link ConcurrentBitSet} to keep the blocks at the start of the
- * files.
- * 
  * The locking is as fine grained as possible to prevent contention.  See {@link PhysicalInfo} for
  * flags that are used when it is used as a lock.  It is important to not access the
  * group maps when a {@link PhysicalInfo} lock is held.
  */
 public class BufferFrontedFileStoreCache implements Cache<PhysicalInfo>, StorageManager {
 	
+	private static final int DEFAULT_MIN_DEFRAG = 1 << 23;
+	private static final byte[] HEADER_SKIP_BUFFER = new byte[16];
 	private static final int EVICTION_SCANS = 5;
 
 	public static final int DEFAuLT_MAX_OBJECT_SIZE = 1 << 23;
@@ -355,9 +353,85 @@
 	//root directory
 	private ConcurrentHashMap<Long, Map<Long, PhysicalInfo>> physicalMapping = new ConcurrentHashMap<Long, Map<Long, PhysicalInfo>>(16, .75f, BufferManagerImpl.CONCURRENCY_LEVEL);
 	private BlockStore[] sizeBasedStores;
-	
+
+	private ExecutorService asynchPool = ExecutorUtils.newFixedThreadPool(2, "FileStore Worker"); //$NON-NLS-1$
+	private AtomicBoolean defragRunning = new AtomicBoolean();
+	//defrag to release freespace held by storage files
+	private final Runnable defragTask = new Runnable() {
+		
+		@Override
+		public void run() {
+			try {
+				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+					LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Running defrag"); //$NON-NLS-1$ 
+				}
+				for (int i = 0; i < sizeBasedStores.length; i++) {
+					BlockStore blockStore = sizeBasedStores[i];
+					for (int segment = 0; segment < blockStore.stores.length; segment++) {
+						if (!shouldDefrag(blockStore, segment)) {
+							continue;
+						}
+						try {
+							do {
+								int blockToMove = blockStore.blocksInUse.compactHighestBitSet(segment);
+								if (!shouldDefrag(blockStore, segment)) {
+									//truncate the file
+									blockStore.locks[segment].writeLock().lock();
+									try {
+										int endBlock = blockStore.blocksInUse.getHighestBitSet(segment);
+										long length = endBlock * blockStore.blockSize; 
+										blockStore.stores[segment].setLength(length);
+									} finally {
+										blockStore.locks[segment].writeLock().unlock();
+									}
+									break;
+								}
+								//move the block if possible
+								InputStream is = blockStore.stores[segment].createInputStream(blockToMove * blockStore.blockSize);
+								DataInputStream dis = new DataInputStream(is);
+								Long gid = dis.readLong();
+								Long oid = dis.readLong();
+								dis.reset(); //move back to the beginning
+								Map<Long, PhysicalInfo> map = physicalMapping.get(gid);
+								if (map == null) {
+									continue;
+								}
+								PhysicalInfo info = map.get(oid);
+								if (info == null) {
+									continue;
+								}
+								synchronized (info) {
+									await(info, true, false);
+									if (info.block == EMPTY_ADDRESS) {
+										continue;
+									}
+									assert info.block == blockToMove;
+								}
+								int newBlock = blockStore.writeToStorageBlock(info, dis);
+								synchronized (info) {
+									await(info, true, true);
+									if (info.block == EMPTY_ADDRESS) {
+										//already removed;
+										if (newBlock != EMPTY_ADDRESS) {
+											blockStore.blocksInUse.clear(newBlock);
+										}
+										continue;
+									}
+									info.block = newBlock;
+									blockStore.blocksInUse.clear(blockToMove);
+								}
+							} while (shouldDefrag(blockStore, segment));
+						} catch (IOException e) {
+							LogManager.logWarning(LogConstants.CTX_BUFFER_MGR, e, "Error performing defrag"); //$NON-NLS-1$
+						}
+					}
+				}
+			} finally {
+				defragRunning.set(false);
+			}
+		}
+	};
 	private AtomicBoolean cleanerRunning = new AtomicBoolean();
-	private ExecutorService asynchPool = ExecutorUtils.newFixedThreadPool(1, "FileStore Worker"); //$NON-NLS-1$
 	private final Runnable cleaningTask = new Runnable() {
 		
 		@Override
@@ -379,6 +453,8 @@
 	private AtomicLong storageWrites = new AtomicLong();
 	private AtomicLong storageReads = new AtomicLong();
 	
+	private long minDefrag = DEFAULT_MIN_DEFRAG;
+	
 	@Override
 	public void initialize() throws TeiidComponentException {
 		storageManager.initialize();
@@ -482,10 +558,12 @@
 			hasPermit = true;
 			blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
 			BlockOutputStream bos = new BlockOutputStream(blockManager);
-            ObjectOutputStream oos = new ObjectOutputStream(bos);
-            oos.writeInt(entry.getSizeEstimate());
-            s.serialize(entry.getObject(), oos);
-            oos.close();
+			ObjectOutput dos = new DataObjectOutputStream(bos);
+			dos.writeLong(s.getId());
+			dos.writeLong(entry.getId());
+			dos.writeInt(entry.getSizeEstimate());
+            s.serialize(entry.getObject(), dos);
+            dos.close();
         	//synchronized to ensure proper cleanup from a concurrent removal 
             synchronized (map) {
             	if (physicalMapping.containsKey(s.getId()) && map.containsKey(entry.getId())) {
@@ -585,6 +663,7 @@
 					BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
 					is = new BlockInputStream(manager, info.memoryBlockCount);
 				} else if (info.block != EMPTY_ADDRESS) {
+					info.pinned = true;
 					memoryBufferEntries.recordAccess(info);
 					storageReads.incrementAndGet();
 					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
@@ -598,8 +677,10 @@
 					return null;
 				}
 			}
-			ObjectInputStream ois = new ObjectInputStream(is);
-			CacheEntry ce = new CacheEntry(new CacheKey(oid, 1, 1), ois.readInt(), serializer.deserialize(ois), ref, true);
+			ObjectInput dis = new DataObjectInputStream(is);
+			dis.readFully(HEADER_SKIP_BUFFER);
+			int sizeEstimate = dis.readInt();
+			CacheEntry ce = new CacheEntry(new CacheKey(oid, 1, 1), sizeEstimate, serializer.deserialize(dis), ref, true);
 			return ce;
         } catch(IOException e) {
         	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid)); //$NON-NLS-1$
@@ -718,26 +799,10 @@
 				storageWrites.getAndIncrement();
 				BlockInputStream is = new BlockInputStream(bm, memoryBlockCount); 
 				BlockStore blockStore = sizeBasedStores[sizeIndex];
-				block = getAndSetNextClearBit(blockStore);
-				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
-					LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Allocating storage data block", info.block, "of size", blockStore.blockSize, "to", info); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ 
-				}
-				FileStore fs = blockStore.stores[block/blockStore.blocksInUse.getBitsPerSegment()];
-				long blockOffset = (block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
-				byte[] b = new byte[BLOCK_SIZE];
-				int read = 0;
-				try {
-					while ((read = is.read(b, 0, b.length)) != -1) {
-						fs.write(blockOffset, b, 0, read);
-						blockOffset+=read;
-					}
-				} catch (Throwable e) {
-					//shouldn't happen, but we'll invalidate this write and continue
-					demote = false;
-					//just continue to free
-					LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error transferring block to storage " + oid); //$NON-NLS-1$
-				}
+				block = blockStore.writeToStorageBlock(info, is);
 			}
+		} catch (IOException e) {
+			LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error transferring block to storage " + oid); //$NON-NLS-1$
 		} finally {
 			//ensure post conditions
 			synchronized (info) {
@@ -764,6 +829,12 @@
 						if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
 							LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Freed storage data block", info.block, "of size", blockStore.blockSize); //$NON-NLS-1$ //$NON-NLS-2$
 						}
+						int segment = info.block/blockStore.blocksInUse.getBitsPerSegment();
+						if (!defragRunning.get() 
+								&& shouldDefrag(blockStore, segment) 
+								&& defragRunning.compareAndSet(false, true)) {
+								this.asynchPool.execute(defragTask);
+						}
 						info.block = EMPTY_ADDRESS;
 					}
 				}
@@ -775,6 +846,14 @@
 		return result;
 	}
 
+	boolean shouldDefrag(BlockStore blockStore, int segment) {
+		int highestBitSet = blockStore.blocksInUse.getHighestBitSet(segment);
+		int bitsSet = blockStore.blocksInUse.getBitsSet(segment);
+		highestBitSet = Math.max(bitsSet, Math.max(1, highestBitSet));
+		int freeBlocks = highestBitSet-bitsSet;
+		return freeBlocks > (highestBitSet>>2) && freeBlocks*blockStore.blockSize > minDefrag;
+	}
+
 	private void await(PhysicalInfo info, boolean pinned, boolean evicting) {
 		while ((pinned && info.pinned) || (evicting && info.evicting)) {
 			try {
@@ -785,14 +864,6 @@
 		}
 	}
 	
-	static int getAndSetNextClearBit(BlockStore bs) {
-		int result = bs.blocksInUse.getAndSetNextClearBit();
-		if (result == -1) {
-			throw new TeiidRuntimeException("Out of blocks of size " + bs.blockSize); //$NON-NLS-1$
-		}
-		return result;
-	}
-	
 	/**
 	 * Eviction routine.  When space is exhausted data blocks are acquired from
 	 * memory entries.
@@ -886,6 +957,10 @@
 	public long getMemoryBufferSpace() {
 		return memoryBufferSpace;
 	}
+	
+	public void setMinDefrag(long minDefrag) {
+		this.minDefrag = minDefrag;
+	}
 
 }
 

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -23,8 +23,8 @@
 package org.teiid.common.buffer.impl;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.lang.ref.Reference;
 import java.lang.ref.ReferenceQueue;
 import java.lang.ref.SoftReference;
@@ -81,6 +81,8 @@
  * 
  * TODO: add detection of pinned batches to prevent unnecessary purging of non-persistent batches
  *       - this is not necessary for already persistent batches, since we hold a weak reference
+ *       
+ * TODO: add a pre-fetch for tuplebuffers or some built-in correlation logic with the queue.      
  */
 public class BufferManagerImpl implements BufferManager, StorageManager {
 
@@ -214,7 +216,7 @@
 		}
 
 		@Override
-		public List<? extends List<?>> deserialize(ObjectInputStream ois)
+		public List<? extends List<?>> deserialize(ObjectInput ois)
 				throws IOException, ClassNotFoundException {
 			List<? extends List<?>> batch = BatchSerializer.readBatch(ois, types);
 			if (lobManager != null) {
@@ -231,7 +233,7 @@
 		
 		@Override
 		public void serialize(List<? extends List<?>> obj,
-				ObjectOutputStream oos) throws IOException {
+				ObjectOutput oos) throws IOException {
 			int expectedModCount = 0;
 			ResizingArrayList<?> list = null;
 			if (obj instanceof ResizingArrayList<?>) {
@@ -626,7 +628,7 @@
 				} else {
 					waitCount >>= 1;
 				}
-		    	int result = noWaitReserve(additional - committed);
+		    	int result = noWaitReserve(additional - committed, false);
 		    	committed += result;
 	    	}	
 	    	return committed;
@@ -645,16 +647,19 @@
     	if (mode == BufferReserveMode.FORCE) {
     		this.reserveBatchKB.addAndGet(-count);
     	} else {
-    		result = noWaitReserve(count);
+    		result = noWaitReserve(count, true);
     	}
     	reservedByThread.set(reservedByThread.get() + result);
 		persistBatchReferences();
     	return result;
     }
 
-	private int noWaitReserve(int count) {
+	private int noWaitReserve(int count, boolean allOrNothing) {
 		for (int i = 0; i < 2; i++) {
 			int reserveBatch = this.reserveBatchKB.get();
+			if (allOrNothing && count > reserveBatch) {
+				return 0;
+			}
 			count = Math.min(count, Math.max(0, reserveBatch));
 			if (count == 0) {
 				return 0;
@@ -828,9 +833,12 @@
 	void addMemoryEntry(CacheEntry ce, boolean initial) {
 		persistBatchReferences();
 		synchronized (ce) {
-			memoryEntries.put(ce.getId(), ce);
+			boolean added = memoryEntries.put(ce.getId(), ce) == null;
 			if (initial) {
 				evictionQueue.add(ce);
+			} else if (added) {
+				evictionQueue.recordAccess(ce);
+				evictionQueue.add(ce);
 			} else {
 				evictionQueue.touch(ce);
 			}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java	2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -33,6 +33,7 @@
  */
 public class ConcurrentBitSet {
 	
+	private static final int CONCURRENT_MODIFICATION = -2;
 	private static final int ADDRESS_BITS_PER_TOP_VALUE = 18;
 	private static final int MAX_TOP_VALUE = 1 << ADDRESS_BITS_PER_TOP_VALUE;
 	
@@ -40,6 +41,7 @@
 		int offset;
 		int maxBits;
 		int startSearch;
+		int highestBitSet = -1;
 		int bitsSet;
 		int[] topVals;
 		final BitSet bitSet;
@@ -58,6 +60,10 @@
 	private Segment[] segments;
 	private boolean compact;
 	
+	/**
+	 * @param maxBits
+	 * @param concurrencyLevel - should be a power of 2
+	 */
 	public ConcurrentBitSet(int maxBits, int concurrencyLevel) {
 		Assertion.assertTrue(maxBits > 0);
 		while ((bitsPerSegment = maxBits/concurrencyLevel) < concurrencyLevel) {
@@ -108,10 +114,34 @@
 		return counter.getAndIncrement();
 	}
 	
-	public int getAndSetNextClearBit(int start) {
+	/**
+	 * return an estimate of the number of bits set
+	 * @param segment
+	 * @return
+	 */
+	public int getBitsSet(int segment) {
+		Segment s = segments[segment&(segments.length-1)];
+		return s.bitsSet;
+	}
+	
+	/**
+	 * return an estimate of the highest bit (relative index) that has been set
+	 * @param segment
+	 * @return
+	 */
+	public int getHighestBitSet(int segment) {
+		Segment s = segments[segment&(segments.length-1)];
+		return s.highestBitSet;	
+	}
+	
+	/**
+	 * @param segment
+	 * @return the next clear bit index as an absolute index - not relative to a segment
+	 */
+	public int getAndSetNextClearBit(int segment) {
 		int nextBit = -1;
 		for (int i = 0; i < segments.length; i++) {
-			Segment s = segments[(start+i)&(segments.length-1)];
+			Segment s = segments[(segment+i)&(segments.length-1)];
 			synchronized (s) {
 				if (s.bitsSet == s.maxBits) {
 					continue;
@@ -122,7 +152,7 @@
 						continue;
 					}
 					if (s.topVals[j] == 0) {
-						if (j == start) {
+						if (j == segment) {
 							nextBit = s.startSearch;
 							break;
 						}
@@ -148,6 +178,7 @@
 				s.bitsSet++;
 				s.bitSet.set(nextBit);
 				s.startSearch = nextBit + 1;
+				s.highestBitSet = Math.max(s.highestBitSet, nextBit);
 				if (s.startSearch == s.maxBits) {
 					s.startSearch = 0;
 				}
@@ -187,4 +218,84 @@
 		this.compact = compact;
 	}
 	
+	
+	public int compactHighestBitSet(int segment) {
+		Segment s = segments[segment&(segments.length-1)];
+		//first do an unlocked compact
+		for (int i = 0; i < 3; i++) {
+			int result = tryCompactHighestBitSet(s);
+			if (result != CONCURRENT_MODIFICATION) {
+				return result;
+			}
+		}
+		synchronized (s) {
+			return tryCompactHighestBitSet(s);
+		}
+	}
+
+	private int tryCompactHighestBitSet(Segment s) {
+		int highestBitSet = 0;
+		synchronized (s) {
+			highestBitSet = s.highestBitSet;
+			if (highestBitSet <= 0) {
+				return 0;
+			}
+			if (s.bitSet.get(highestBitSet)) {
+				return highestBitSet;
+			}
+		}
+		int indexSearchStart = highestBitSet >> ADDRESS_BITS_PER_TOP_VALUE;
+		for (int j = indexSearchStart; j >= 0; j--) {
+			if (s.topVals[j] == 0) {
+				if (j==0) {
+					synchronized (s) {
+						if (s.highestBitSet != highestBitSet) {
+							return CONCURRENT_MODIFICATION;
+						}
+						s.highestBitSet = -1;
+					}
+				}
+				continue;
+			}
+			if (s.topVals[j] == MAX_TOP_VALUE) {
+				synchronized (s) {
+					if (s.highestBitSet != highestBitSet) {
+						return CONCURRENT_MODIFICATION;
+					}
+					s.highestBitSet = ((j + 1) * MAX_TOP_VALUE) -1;
+				}
+				break;
+			}
+			int index = j * MAX_TOP_VALUE;
+			int end = index + s.maxBits;
+			if (j == indexSearchStart) {
+				end = highestBitSet;
+			}
+			BitSet bs = s.bitSet;
+			int offset = 0;
+			if (j == indexSearchStart) {
+				bs = s.bitSet.get(index, end); //ensures that we look only at a subset of the words
+				offset = index;
+			}
+			index = index - offset;
+			end = end - offset - 1;
+			while (index < end) {
+				int next = bs.nextSetBit(index);
+				if (next == -1) {
+					index--;
+					break;
+				}
+				index = next + 1;
+			}
+			synchronized (s) {
+				if (s.highestBitSet != highestBitSet) {
+					return CONCURRENT_MODIFICATION;
+				}
+				s.highestBitSet = index + offset;
+				return s.highestBitSet;
+			}
+		}			
+		return -1;
+	}
+	
 }

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -0,0 +1,105 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+import java.io.StreamCorruptedException;
+
+public class DataObjectInputStream extends DataInputStream implements ObjectInput {
+	
+	ObjectInput ois;
+	ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+	
+	public DataObjectInputStream(InputStream in) {
+		super(in);
+	}
+	
+	@Override
+	public Object readObject() throws ClassNotFoundException, IOException {
+		if (ois == null) {
+			ois = new ObjectInputStream(this) {
+				
+				@Override
+				protected void readStreamHeader() throws IOException,
+						StreamCorruptedException {
+					int version = readByte() & 0xFF;
+			        if (version != STREAM_VERSION) {
+			            throw new StreamCorruptedException("Unsupported version: " + version); //$NON-NLS-1$
+			        }
+				}
+				
+			    @Override
+			    protected ObjectStreamClass readClassDescriptor()
+			            throws IOException, ClassNotFoundException {
+			        int type = read();
+			        if (type < 0) {
+			            throw new EOFException();
+			        }
+			        switch (type) {
+			        case DataObjectOutputStream.TYPE_FAT_DESCRIPTOR:
+			            return super.readClassDescriptor();
+			        case DataObjectOutputStream.TYPE_THIN_DESCRIPTOR:
+			            String className = readUTF();
+			            Class<?> clazz = loadClass(className);
+			            return ObjectStreamClass.lookup(clazz);
+			        default:
+			        	className = DataObjectOutputStream.typeMapping.get((byte)type);
+			        	if (className == null) {
+			        		throw new StreamCorruptedException("Unknown class type " + type); //$NON-NLS-1$
+			        	}
+			            clazz = loadClass(className);
+			            return ObjectStreamClass.lookup(clazz);
+			        }
+			    }
+
+			    @Override
+			    protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+			        String className = desc.getName();
+			        try {
+			            return loadClass(className);
+			        } catch (ClassNotFoundException ex) {
+			            return super.resolveClass(desc);
+			        }
+			    }
+
+			    protected Class<?> loadClass(String className) throws ClassNotFoundException {
+			        Class<?> clazz;
+			        if (classLoader != null) {
+			            clazz = classLoader.loadClass(className);
+			        } else {
+			            clazz = Class.forName(className);
+			        }
+			        return clazz;
+			    }
+			};
+		}
+		return ois.readObject();
+	}
+
+}


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.io.OutputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Extends the logic of Netty's CompactObjectOutputStream to use byte identifiers
+ * for some classes and to write/flush objects directly into the output.
+ * 
+ * We can do this since buffer serialized data is ephemeral and good only for
+ * a single process.
+ */
+public class DataObjectOutputStream extends DataOutputStream implements ObjectOutput {
+	
+	private static final int MAX_BYTE_IDS = 254;
+	static AtomicInteger counter = new AtomicInteger(2);
+	static final ConcurrentHashMap<String, Byte> knownClasses = new ConcurrentHashMap<String, Byte>();
+	static final ConcurrentHashMap<Byte, String> typeMapping = new ConcurrentHashMap<Byte, String>();
+	
+    static final int TYPE_FAT_DESCRIPTOR = 0;
+    static final int TYPE_THIN_DESCRIPTOR = 1;
+    
+	ObjectOutputStream oos;
+	
+	public DataObjectOutputStream(OutputStream out) {
+		super(out);
+	}
+
+	@Override
+	public void writeObject(Object obj) throws IOException {
+		if (oos == null) {
+			oos = new ObjectOutputStream(this) {
+				@Override
+				protected void writeStreamHeader() throws IOException {
+					writeByte(STREAM_VERSION);
+				}
+				
+				@Override
+			    protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException {
+			        Class<?> clazz = desc.forClass();
+			        if (clazz.isPrimitive() || clazz.isArray()) {
+			            write(TYPE_FAT_DESCRIPTOR);
+			            super.writeClassDescriptor(desc);
+			        } else {
+			        	String name = desc.getName();
+			        	Byte b = knownClasses.get(name);
+			        	if (b == null && counter.get() < MAX_BYTE_IDS) {
+		        			synchronized (DataObjectOutputStream.class) {
+								b = knownClasses.get(name);
+								if (b == null && counter.get() < 254) {
+									b = (byte)counter.getAndIncrement();
+									knownClasses.put(name, b);
+									typeMapping.put(b, name);
+								}
+			        		}
+			        	}
+			        	if (b != null) {
+			        		write(b);
+			        	} else {
+				            write(TYPE_THIN_DESCRIPTOR);
+				            writeUTF(name);
+			        	}
+			        }
+			    }
+			};
+		}
+		oos.writeObject(obj);
+		oos.flush();
+	}
+	
+}


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java	2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -37,17 +37,18 @@
     
     public void write(int b) throws IOException {
     	ensureBuffer();
-		if (buf.remaining() == 0) {
-		    flush();
-		}
 		buf.put((byte)b);
     }
 
-	private void ensureBuffer() {
-		if (buf == null) {
-    		buf = newBuffer();
-    		startPosition = buf.position();
-    	}
+	private void ensureBuffer() throws IOException {
+		if (buf != null) {
+			if (buf.remaining() != 0) {
+				return;
+			}
+			flush();
+		}
+		buf = newBuffer();
+		startPosition = buf.position();
 	}
 
     public void write(byte b[], int off, int len) throws IOException {
@@ -60,7 +61,6 @@
 			if (buf.remaining() > 0) {
 				break;
 			}
-			flush();
     	}
     }
 

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java	2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -34,7 +34,6 @@
 /**
  * A Concurrent LRFU eviction queue.  Has assumptions that match buffermanager usage.
  * Null values are not allowed.
- * @param <K>
  * @param <V>
  */
 public class LrfuEvictionQueue<V extends BaseCacheEntry> {
@@ -49,11 +48,13 @@
     //TODO: adaptively adjust this value.  more hits should move closer to lru
 	protected double crfLamda;
 	protected double inverseCrfLamda = 1 - crfLamda;
-	protected int maxInterval;
+	protected int maxInterval; //don't consider the old ordering value after the maxInterval
+	protected int minInterval; //cap the frequency gain under this interval (we can make some values too hot otherwise)
+	private float minVal;
 	
 	public LrfuEvictionQueue(AtomicLong clock) {
 		this.clock = clock;
-		setCrfLamda(.0002);
+		setCrfLamda(.00005); //smaller values tend to work better since we're using interval bounds
 	}
 
 	public boolean remove(V value) {
@@ -110,7 +111,7 @@
 		long delta = currentTime - longLastAccess;
 		orderingValue = 
 			(float) (//Frequency component
-			(delta>maxInterval?0:orderingValue*Math.pow(inverseCrfLamda, delta))
+			(delta<maxInterval?(delta<minInterval?minVal:Math.pow(inverseCrfLamda, delta)):0)*orderingValue
 			//recency component
 			+ Math.pow(currentTime, crfLamda));
 		return orderingValue;
@@ -125,9 +126,14 @@
 		this.inverseCrfLamda = 1 - crfLamda;
 		int i = 0;
 		for (; i < 30; i++) {
-			if ((float)Math.pow(inverseCrfLamda, 1<<i) == 0) {
+			float val = (float)Math.pow(inverseCrfLamda, 1<<i);
+			if (val == 0) {
 				break;
 			}
+			if (val > .8) {
+				minInterval = 1<<i;
+				this.minVal = val;
+			}
 		}
 		this.maxInterval = 1<<(i-1);
 	}

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java	2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -298,7 +298,7 @@
             TupleBuffer merged = createTupleBuffer();
 
             int desiredSpace = activeTupleBuffers.size() * schemaSize;
-            int reserved = Math.min(desiredSpace, this.bufferManager.getMaxProcessingKB());
+            int reserved = Math.min(desiredSpace, Math.max(2*schemaSize, this.bufferManager.getMaxProcessingKB()));
             bufferManager.reserveBuffers(reserved, BufferReserveMode.FORCE);
             if (desiredSpace > reserved) {
             	reserved += bufferManager.reserveAdditionalBuffers(desiredSpace - reserved);

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java	2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -25,8 +25,8 @@
 import static org.junit.Assert.*;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.lang.ref.WeakReference;
 
 import org.junit.Test;
@@ -38,7 +38,7 @@
 	
 	private final class SimpleSerializer implements Serializer<Integer> {
 		@Override
-		public Integer deserialize(ObjectInputStream ois)
+		public Integer deserialize(ObjectInput ois)
 				throws IOException, ClassNotFoundException {
 			Integer result = ois.readInt();
 			for (int i = 0; i < result; i++) {
@@ -53,7 +53,7 @@
 		}
 
 		@Override
-		public void serialize(Integer obj, ObjectOutputStream oos)
+		public void serialize(Integer obj, ObjectOutput oos)
 				throws IOException {
 			oos.writeInt(obj);
 			for (int i = 0; i < obj; i++) {

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java	2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -81,4 +81,34 @@
 		assertEquals(50, bst.getAndSetNextClearBit());
 	}
 	
+	@Test public void testCompactHighest() {
+		ConcurrentBitSet bst = new ConcurrentBitSet(1 << 19, 1);
+		bst.setCompact(true);
+		for (int i = 0; i < bst.getTotalBits(); i++) {
+			bst.getAndSetNextClearBit();
+		}
+		assertEquals(bst.getTotalBits()-1, bst.getHighestBitSet(0));
+		assertEquals(bst.getTotalBits()-1, bst.getHighestBitSet(1));
+		
+		for (int i = bst.getTotalBits()-20; i < bst.getTotalBits(); i++) {
+			bst.clear(i);
+		}
+
+		assertEquals(bst.getTotalBits()-21, bst.compactHighestBitSet(0));
+		
+		for (int i = bst.getTotalBits()-20; i < bst.getTotalBits(); i++) {
+			bst.getAndSetNextClearBit();
+		}
+		
+		assertEquals(-1, bst.getAndSetNextClearBit());
+		
+		for (int i = 20; i < bst.getTotalBits(); i++) {
+			bst.clear(i);
+		}
+		
+		assertEquals(bst.getTotalBits()-1, bst.getHighestBitSet(0));
+		assertEquals(19, bst.compactHighestBitSet(0));
+		
+	}
+	
 }

Modified: trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java	2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java	2011-10-22 11:40:30 UTC (rev 3576)
@@ -113,19 +113,20 @@
 			while (true) {
 				try {
 			    	nextFuture = rs.submitNext();
-			    	if (!nextFuture.isDone()) {
-				    	nextFuture.addCompletionListener(new ResultsFuture.CompletionListener<Boolean>() {
-				    		@Override
-				    		public void onCompletion(ResultsFuture<Boolean> future) {
-				    			if (processRow(future)) {
-				    				if (rowsSent != rows2Send) {
-				    					//this can be recursive, but ideally won't be called many times 
-				    					ResultsWorkItem.this.run();
-				    				}
-				    			}
-				    		}
-						});
-				    	return;
+			    	synchronized (nextFuture) {
+				    	if (!nextFuture.isDone()) {
+					    	nextFuture.addCompletionListener(new ResultsFuture.CompletionListener<Boolean>() {
+					    		@Override
+					    		public void onCompletion(ResultsFuture<Boolean> future) {
+					    			if (processRow(future)) {
+					    				if (rowsSent != rows2Send) {
+					    					ResultsWorkItem.this.run();
+					    				}
+					    			}
+					    		}
+							});
+					    	return;
+				    	}
 			    	}
 			    	if (!processRow(nextFuture)) {
 			    		break;



More information about the teiid-commits mailing list