[teiid-commits] teiid SVN: r4561 - in branches/7.7.x-TEIID-2429: engine/src/main/java/org/teiid/common/buffer and 9 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Apr 17 14:59:48 EDT 2013


Author: jolee
Date: 2013-04-17 14:59:47 -0400 (Wed, 17 Apr 2013)
New Revision: 4561

Added:
   branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestExtensibleBufferedInputStream.java
   branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferManagerImpl.java
Modified:
   branches/7.7.x-TEIID-2429/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/AutoCleanupUtil.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BlockedException.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/FileStore.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/DependentCriteriaProcessor.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SortingFilter.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
   branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/util/CommandContext.java
   branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
   branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
   branches/7.7.x-TEIID-2429/test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java
   branches/7.7.x-TEIID-2429/test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java
Log:
sort.patch applied (w/TestSystemVirtualModel.testColumns disabled)

Modified: branches/7.7.x-TEIID-2429/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java
===================================================================
--- branches/7.7.x-TEIID-2429/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -80,11 +80,11 @@
  */
 public class DataTypeManager {
 	
-	private static final boolean USE_VALUE_CACHE = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.useValueCache", false); //$NON-NLS-1$
+	public static final boolean USE_VALUE_CACHE = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.useValueCache", false); //$NON-NLS-1$
 	private static final boolean COMPARABLE_LOBS = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.comparableLobs", false); //$NON-NLS-1$
 	public static final boolean PAD_SPACE = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.padSpace", false); //$NON-NLS-1$
 	
-	private static boolean valueCacheEnabled;
+	private static boolean valueCacheEnabled = USE_VALUE_CACHE;
 	
 	private interface ValueCache<T> {
 		T getValue(T value);

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -57,10 +57,11 @@
 			BlockedException, TeiidProcessingException {
 		if (available() > 0) {
 			//if (forwardOnly) {
-				if (batch == null || !batch.containsRow(currentRow)) {
-					batch = getBatch(currentRow);
+				int row = getCurrentIndex();
+				if (batch == null || !batch.containsRow(row)) {
+					batch = getBatch(row);
 				}
-				return batch.getTuple(currentRow);
+				return batch.getTuple(row);
 			//} 
 			//TODO: determine if we should directly hold a soft reference here
 			//return getRow(currentRow);

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/AutoCleanupUtil.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/AutoCleanupUtil.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/AutoCleanupUtil.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -55,10 +55,20 @@
 	private static ReferenceQueue<Object> QUEUE = new ReferenceQueue<Object>();
 	private static final Set<PhantomReference<Object>> REFERENCES = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<PhantomReference<Object>, Boolean>()));
 
-	public static void setCleanupReference(Object o, Removable r) {
-		REFERENCES.add(new PhantomCleanupReference(o, r));
+	public static PhantomReference<Object> setCleanupReference(Object o, Removable r) {
+		PhantomCleanupReference ref = new PhantomCleanupReference(o, r);
+		REFERENCES.add(ref);
 		doCleanup();
+		return ref;
 	}
+	
+	public static void removeCleanupReference(PhantomReference<Object> ref) {
+		if (ref == null) {
+			return;
+		}
+		REFERENCES.remove(ref);
+		ref.clear();
+	}
 
 	public static void doCleanup() {
 		for (int i = 0; i < 10; i++) {

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BatchManager.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BatchManager.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -47,4 +47,6 @@
 	Reference<? extends BatchManager> getBatchManagerReference();
 	
 	String[] getTypes();
+	
+	int getRowSizeEstimate();
 }

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BlockedException.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BlockedException.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BlockedException.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -37,6 +37,8 @@
 public class BlockedException extends TeiidComponentException {
 
     public static final BlockedException INSTANCE = new BlockedException();
+    
+    public static final BlockedException BLOCKED_ON_MEMORY_EXCEPTION = new BlockedException();
 
     /**
      * No-arg costructor required by Externalizable semantics

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BufferManager.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BufferManager.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -134,13 +134,10 @@
 	 */
 	void setMaxActivePlans(int maxActivePlans);
 
-	/**
-	 * Wait for additional buffers to become available.
-	 * @param additional
-	 * @return
-	 */
-	int reserveAdditionalBuffers(int additional);
-	
 	Streamable<?> persistLob(final Streamable<?> lob,
 			final FileStore store, byte[] bytes) throws TeiidComponentException;
+	
+	int reserveBuffersBlocking(int count, long[] attempts, boolean force) throws BlockedException;
+	
+	void releaseOrphanedBuffers(long count);
 }

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -65,5 +65,12 @@
 			buf.rewind();
 		}
 	}
+		
+	public ByteBuffer getBuffer() throws IOException {
+		if (!ensureBytes()) {
+			return null;
+		}
+		return buf;
+	}
 	
 }
\ No newline at end of file

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -191,7 +191,7 @@
 	
 	protected abstract void removeDirect();
 	
-	public InputStream createInputStream(final long start, final long length) {
+	public ExtensibleBufferedInputStream createInputStream(final long start, final long length) {
 		return new ExtensibleBufferedInputStream() {
 			private long offset = start;
 			private long streamLength = length;

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -30,6 +30,7 @@
 import org.teiid.client.ResizingArrayList;
 import org.teiid.common.buffer.LobManager.ReferenceMode;
 import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.core.types.Streamable;
 import org.teiid.core.util.Assertion;
@@ -42,6 +43,62 @@
 
 public class TupleBuffer {
 	
+	public final class TupleBufferTupleSource extends
+	AbstractTupleSource {
+	private final boolean singleUse;
+	private boolean noBlocking;
+	private boolean reverse;
+	
+	private TupleBufferTupleSource(boolean singleUse) {
+		this.singleUse = singleUse;
+	}
+	
+	@Override
+	protected List<?> finalRow() throws TeiidComponentException, TeiidProcessingException {
+		if(isFinal || noBlocking || reverse) {
+	        return null;
+	    } 
+		throw BlockedException.blockWithTrace("Blocking on non-final TupleBuffer", tupleSourceID, "size", getRowCount()); //$NON-NLS-1$ //$NON-NLS-2$
+	}
+	
+	@Override
+	public int available() {
+		if (!reverse) {
+			return rowCount - getCurrentIndex() + 1;
+		}
+		return getCurrentIndex();
+	}
+	
+	@Override
+	protected TupleBatch getBatch(int row) throws TeiidComponentException {
+		return TupleBuffer.this.getBatch(row);
+	}
+	
+	@Override
+	public void closeSource() {
+		super.closeSource();
+		if (singleUse) {
+			remove();
+		}
+	}
+	
+	public void setNoBlocking(boolean noBlocking) {
+		this.noBlocking = noBlocking;
+	}
+	
+	public void setReverse(boolean reverse) {
+		this.reverse = reverse;
+	}
+	
+	@Override
+	public int getCurrentIndex() {
+		if (!reverse) {
+			return super.getCurrentIndex();
+		}
+		return getRowCount() - super.getCurrentIndex() + 1;
+	}
+	
+	}
 	/**
      * Gets the data type names for each of the input expressions, in order.
      * @param expressions List of Expressions
@@ -299,7 +356,7 @@
 		this.forwardOnly = forwardOnly;
 	}
     
-	public IndexedTupleSource createIndexedTupleSource() {
+	public TupleBufferTupleSource createIndexedTupleSource() {
 		return createIndexedTupleSource(false);
 	}
     
@@ -307,38 +364,11 @@
 	 * Create a new iterator for this buffer
 	 * @return
 	 */
-	public IndexedTupleSource createIndexedTupleSource(final boolean singleUse) {
+	public TupleBufferTupleSource createIndexedTupleSource(final boolean singleUse) {
 		if (singleUse) {
 			setForwardOnly(true);
 		}
-		return new AbstractTupleSource() {
-			
-			@Override
-			protected List<?> finalRow() throws BlockedException {
-				if(isFinal) {
-		            return null;
-		        } 
-		        throw BlockedException.blockWithTrace("Blocking on non-final TupleBuffer", tupleSourceID, "size", getRowCount()); //$NON-NLS-1$ //$NON-NLS-2$
-			}
-			
-			@Override
-			public int available() {
-				return rowCount - getCurrentIndex() + 1;
-			}
-			
-			@Override
-			protected TupleBatch getBatch(int row) throws TeiidComponentException {
-				return TupleBuffer.this.getBatch(row);
-			}
-			
-			@Override
-			public void closeSource() {
-				super.closeSource();
-				if (singleUse) {
-					remove();
-				}
-			}
-		};
+		return new TupleBufferTupleSource(singleUse);
 	}
 	
 	@Override
@@ -369,4 +399,8 @@
 		return this.lobManager.getLobCount();
 	}
 	
+	public int getRowSizeEstimate() {
+		return this.manager.getRowSizeEstimate();
+	}
+	
 }

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -53,6 +53,7 @@
 import org.teiid.common.buffer.Cache;
 import org.teiid.common.buffer.CacheEntry;
 import org.teiid.common.buffer.CacheKey;
+import org.teiid.common.buffer.ExtensibleBufferedInputStream;
 import org.teiid.common.buffer.FileStore;
 import org.teiid.common.buffer.Serializer;
 import org.teiid.common.buffer.StorageManager;
@@ -687,6 +688,7 @@
 		}
 		InputStream is = null;
 		Lock lock = null;
+		ExtensibleBufferedInputStream eis = null;
 		int memoryBlocks = 0;
 		try {
 			synchronized (info) {
@@ -711,7 +713,7 @@
 					int segment = info.block/blockStore.blocksInUse.getBitsPerSegment();
 					FileStore fs = blockStore.stores[segment];
 					long blockOffset = (info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
-					is = fs.createInputStream(blockOffset, info.memoryBlockCount<<LOG_BLOCK_SIZE);
+					eis = fs.createInputStream(blockOffset, info.memoryBlockCount<<LOG_BLOCK_SIZE);
 					lock = blockStore.locks[segment].writeLock();
 					memoryBlocks = info.memoryBlockCount;
 				} else {
@@ -719,7 +721,7 @@
 				}
 			}
 			if (lock != null) {
-				is = readIntoMemory(info, is, lock, memoryBlocks);
+				is = readIntoMemory(info, eis, lock, memoryBlocks);
 			}
 			ObjectInput dis = new ObjectInputStream(is);
 			dis.readFully(HEADER_SKIP_BUFFER);
@@ -743,7 +745,7 @@
 	/**
 	 * Transfer into memory to release memory/file locks
 	 */
-	private InputStream readIntoMemory(PhysicalInfo info, InputStream is,
+	private InputStream readIntoMemory(PhysicalInfo info, ExtensibleBufferedInputStream is,
 			Lock fileLock, int memoryBlocks) throws InterruptedException,
 			IOException {
 		checkForLowMemory();
@@ -763,11 +765,14 @@
 			locked = true;
 			ExtensibleBufferedOutputStream os = new BlockOutputStream(manager, -1);
 			//TODO: there is still an extra buffer being created here, we could FileChannels to do better
-			int b = -1;
-			while ((b = is.read()) != -1) {
-				os.write(b);
+			ByteBuffer bb = null;
+			while ((bb = is.getBuffer()) != null) {
+				byte[] array = bb.array();
+				os.write(array, bb.position() + bb.arrayOffset(), bb.remaining());
+				bb.position(bb.position()+bb.remaining());
 			}
 			fileLock.unlock();
+			os.close();
 			locked = false;
 		    synchronized (info) {
 		        info.inode = manager.getInode();

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -25,6 +25,7 @@
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.lang.ref.PhantomReference;
 import java.lang.ref.Reference;
 import java.lang.ref.ReferenceQueue;
 import java.lang.ref.SoftReference;
@@ -54,14 +55,15 @@
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidRuntimeException;
 import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.types.DataTypeManager.WeakReferenceHashedValueCache;
 import org.teiid.core.types.Streamable;
-import org.teiid.core.types.DataTypeManager.WeakReferenceHashedValueCache;
 import org.teiid.dqp.internal.process.DQPConfiguration;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
 import org.teiid.logging.MessageLevel;
 import org.teiid.query.processor.relational.ListNestedSortComparator;
 import org.teiid.query.sql.symbol.Expression;
+import org.teiid.query.util.CommandContext;
 
 
 /**
@@ -76,12 +78,14 @@
  */
 public class BufferManagerImpl implements BufferManager, StorageManager {
 
+	private static final int SYSTEM_OVERHEAD_MEGS = 300;
+
 	/**
 	 * Asynch cleaner attempts to age out old entries and to reduce the memory size when 
 	 * little is reserved.
 	 */
+	private static final int MAX_READ_AGE = 1<<18;
 	private static final class Cleaner extends TimerTask {
-		private static final int MAX_READ_AGE = 1<<28;
 		WeakReference<BufferManagerImpl> bufferRef;
 		
 		public Cleaner(BufferManagerImpl bufferManagerImpl) {
@@ -96,36 +100,45 @@
 					this.cancel();
 					return;
 				}
-				boolean agingOut = false;
-				if (impl.reserveBatchBytes.get() < impl.maxReserveBytes.get()*.9 || impl.activeBatchBytes.get() < impl.maxReserveBytes.get()*.7) {
-					CacheEntry entry = impl.evictionQueue.firstEntry(false);
-					if (entry == null) {
-						return;
+				impl.cleaning.set(true);
+				try {
+					long evicted = impl.doEvictions(0, false, impl.initialEvictionQueue);
+					if (evicted != 0) {
+						continue;
 					}
-					//we aren't holding too many memory entries, ensure that
-					//entries aren't old
-					long lastAccess = entry.getKey().getLastAccess();
-					long currentTime = impl.readAttempts.get();
-					if (currentTime - lastAccess < MAX_READ_AGE) {
-						return;
+					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+						LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Asynch eviction run", evicted, impl.reserveBatchBytes.get(), impl.maxReserveBytes, impl.activeBatchBytes.get()); //$NON-NLS-1$
 					}
-					agingOut = true;
+				} catch (Throwable t) {
+					LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, t, "Exception during cleaning run"); //$NON-NLS-1$
 				}
-				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
-					LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Asynch eviction run", impl.reserveBatchBytes.get(), impl.maxReserveBytes.get(), impl.activeBatchBytes.get()); //$NON-NLS-1$
-				}
-				impl.doEvictions(0, !agingOut);
-				if (!agingOut) {
+				synchronized (this) {
+					impl.cleaning.set(false);
 					try {
-						Thread.sleep(100); //we don't want to evict too fast, because the processing threads are more than capable of evicting
+						this.wait(100);
 					} catch (InterruptedException e) {
-						throw new TeiidRuntimeException(e);
+						break;
 					}
 				}
 			}
 		}
 	}
+	
+	private final class Remover implements Removable {
+		private Long id;
+		private AtomicBoolean prefersMemory;
+		
+		public Remover(Long id, AtomicBoolean prefersMemory) {
+			this.id = id;
+			this.prefersMemory = prefersMemory;
+		}
 
+		@Override
+		public void remove() {
+			removeCacheGroup(id, prefersMemory.get());
+		}
+	}
+
 	/**
 	 * This estimate is based upon adding the value to 2/3 maps and having CacheEntry/PhysicalInfo keys
 	 */
@@ -135,9 +148,12 @@
 		final Long id;
 		SizeUtility sizeUtility;
 		private WeakReference<BatchManagerImpl> ref = new WeakReference<BatchManagerImpl>(this);
+		private PhantomReference<Object> cleanup;
 		AtomicBoolean prefersMemory = new AtomicBoolean();
 		String[] types;
 		private LobManager lobManager;
+		private long totalSize;
+		private long rowsSampled;
 
 		private BatchManagerImpl(Long newID, Class<?>[] types) {
 			this.id = newID;
@@ -146,7 +162,6 @@
 			for (int i = 0; i < types.length; i++) {
 				this.types[i] = DataTypeManager.getDataTypeName(types[i]);
 			}
-			cache.createCacheGroup(newID);
 		}
 		
 		@Override
@@ -188,6 +203,10 @@
 		public Long createManagedBatch(List<? extends List<?>> batch,
 				Long previous, boolean removeOld)
 				throws TeiidComponentException {
+			if (cleanup == null) {
+				cache.createCacheGroup(id);
+				cleanup = AutoCleanupUtil.setCleanupReference(this, new Remover(id, prefersMemory));
+			}
 			int sizeEstimate = getSizeEstimate(batch);
 			Long oid = batchAdded.getAndIncrement();
 			CacheEntry old = null;
@@ -197,15 +216,17 @@
 				} else {
 					old = fastGet(previous, prefersMemory.get(), true);
 				}
+			} else {
+				totalSize += sizeEstimate;
+				rowsSampled += batch.size();
 			}
 			CacheKey key = new CacheKey(oid, (int)readAttempts.get(), old!=null?old.getKey().getOrderingValue():0);
 			CacheEntry ce = new CacheEntry(key, sizeEstimate, batch, this.ref, false);
+			cache.addToCacheGroup(id, ce.getId());
+			overheadBytes.addAndGet(BATCH_OVERHEAD);
 			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
 				LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", ce.getId(), "with size estimate", ce.getSizeEstimate()); //$NON-NLS-1$ //$NON-NLS-2$
 			}
-			maxReserveBytes.addAndGet(-BATCH_OVERHEAD);
-			reserveBatchBytes.addAndGet(-BATCH_OVERHEAD);
-			cache.addToCacheGroup(id, ce.getId());
 			addMemoryEntry(ce, true);
 			return oid;
 		}
@@ -215,9 +236,9 @@
 				throws IOException, ClassNotFoundException {
 			List<? extends List<?>> batch = BatchSerializer.readBatch(ois, types);
 			if (lobManager != null) {
-				for (List<?> list : batch) {
+				for (int i = batch.size() - 1; i >= 0; i--) {
 					try {
-						lobManager.updateReferences(list, ReferenceMode.ATTACH);
+						lobManager.updateReferences(batch.get(i), ReferenceMode.ATTACH);
 					} catch (TeiidComponentException e) {
 						throw new TeiidRuntimeException(e);
 					}
@@ -280,6 +301,7 @@
 				}
 				if (!retain) {
 					removeFromCache(this.id, batch);
+					persistBatchReferences(ce.getSizeEstimate());
 				} else {
 					addMemoryEntry(ce, false);
 				}
@@ -296,13 +318,25 @@
 
 		@Override
 		public void remove() {
-			removeCacheGroup(id, prefersMemory.get());
+			if (cleanup != null) {
+				removeCacheGroup(id, prefersMemory.get());
+				AutoCleanupUtil.removeCleanupReference(cleanup);
+				cleanup = null;
+			}
 		}
 
 		@Override
 		public String toString() {
 			return id.toString();
 		}
+		
+		@Override
+		public int getRowSizeEstimate() {
+			if (rowsSampled == 0) {
+				return 0;
+			}
+			return (int)(totalSize/rowsSampled);
+		}
 	}
 	
 	private static class BatchSoftReference extends SoftReference<CacheEntry> {
@@ -322,21 +356,23 @@
 	private static final int TARGET_BYTES_PER_ROW = 1 << 11; //2k bytes per row
 	private static ReferenceQueue<CacheEntry> SOFT_QUEUE = new ReferenceQueue<CacheEntry>();
 	
-	// Configuration 
-    private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
+	// Configuration
+	private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
     private int processorBatchSize = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE;
     //set to acceptable defaults for testing
     private int maxProcessingBytes = 1 << 21; 
     private Integer maxProcessingBytesOrig;
-    AtomicLong maxReserveBytes = new AtomicLong(1 << 28);
+    long maxReserveBytes = 1 << 28;;
     AtomicLong reserveBatchBytes = new AtomicLong();
+    AtomicLong overheadBytes = new AtomicLong();
     private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a hint to set the reserveBatchKB
     private boolean useWeakReferences = true;
     private boolean inlineLobs = true;
     private int targetBytesPerRow = TARGET_BYTES_PER_ROW;
     private int maxSoftReferences;
+    private int nominalProcessingMemoryMax = maxProcessingBytes;
 
-    private ReentrantLock lock = new ReentrantLock(true);
+    private ReentrantLock lock = new ReentrantLock();
     private Condition batchesFreed = lock.newCondition();
     
     AtomicLong activeBatchBytes = new AtomicLong();
@@ -344,14 +380,9 @@
     private AtomicLong readAttempts = new AtomicLong();
     //TODO: consider the size estimate in the weighting function
     LrfuEvictionQueue<CacheEntry> evictionQueue = new LrfuEvictionQueue<CacheEntry>(readAttempts);
+    LrfuEvictionQueue<CacheEntry> initialEvictionQueue = new LrfuEvictionQueue<CacheEntry>(readAttempts);
     ConcurrentHashMap<Long, CacheEntry> memoryEntries = new ConcurrentHashMap<Long, CacheEntry>(16, .75f, CONCURRENCY_LEVEL);
     
-    private ThreadLocal<Integer> reservedByThread = new ThreadLocal<Integer>() {
-    	protected Integer initialValue() {
-    		return 0;
-    	}
-    };
-    
     //limited size reference caches based upon the memory settings
     private WeakReferenceHashedValueCache<CacheEntry> weakReferenceCache; 
     private Map<Long, BatchSoftReference> softCache = Collections.synchronizedMap(new LinkedHashMap<Long, BatchSoftReference>(16, .75f, false) {
@@ -379,16 +410,19 @@
 	private AtomicLong writeCount = new AtomicLong();
 	private AtomicLong referenceHit = new AtomicLong();
 	
+	//TODO: this does not scale well with multiple embedded instances
 	private static final Timer timer = new Timer("BufferManager Cleaner", true); //$NON-NLS-1$
+	private Cleaner cleaner;
+	private AtomicBoolean cleaning = new AtomicBoolean();
 	
 	public BufferManagerImpl() {
-		timer.schedule(new Cleaner(this), 15000, 15000);
+		this.cleaner = new Cleaner(this);
+		timer.schedule(cleaner, 100);
 	}
 	
 	void clearSoftReference(BatchSoftReference bsr) {
 		synchronized (bsr) {
-			maxReserveBytes.addAndGet(bsr.sizeEstimate);
-			reserveBatchBytes.addAndGet(bsr.sizeEstimate);
+			overheadBytes.addAndGet(-bsr.sizeEstimate);
 			bsr.sizeEstimate = 0;
 		}
 		bsr.clear();
@@ -396,8 +430,7 @@
 	
 	void removeFromCache(Long gid, Long batch) {
 		if (cache.remove(gid, batch)) {
-			maxReserveBytes.addAndGet(BATCH_OVERHEAD);
-			reserveBatchBytes.addAndGet(BATCH_OVERHEAD);
+			overheadBytes.addAndGet(-BATCH_OVERHEAD);
 		}
 	}
 	
@@ -464,18 +497,14 @@
     	Class<?>[] types = getTypeClasses(elements);
     	BatchManagerImpl batchManager = createBatchManager(newID, types);
     	LobManager lobManager = null;
-    	FileStore lobStore = null;
 		if (lobIndexes != null) {
-			lobStore = createFileStore(newID + "_lobs"); //$NON-NLS-1$
+			FileStore lobStore = createFileStore(newID + "_lobs"); //$NON-NLS-1$
 			lobManager = new LobManager(lobIndexes, lobStore);
 			batchManager.setLobManager(lobManager);
 		}
     	TupleBuffer tupleBuffer = new TupleBuffer(batchManager, String.valueOf(newID), elements, lobManager, getProcessorBatchSize(elements));
-    	if (lobStore != null) {
-    		AutoCleanupUtil.setCleanupReference(batchManager, lobStore);
-    	}
         if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
-        	LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating TupleBuffer:", newID, elements, Arrays.toString(types), "of type", tupleSourceType); //$NON-NLS-1$ //$NON-NLS-2$
+        	LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating TupleBuffer:", newID, elements, Arrays.toString(types), "batch size", tupleBuffer.getBatchSize(), "of type", tupleSourceType); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
         }
     	tupleBuffer.setInlineLobs(inlineLobs);
         return tupleBuffer;
@@ -502,7 +531,7 @@
     	return new STree(keyManager, bm, new ListNestedSortComparator(compareIndexes), getProcessorBatchSize(elements.subList(0, keyLength)), getProcessorBatchSize(elements), keyLength, lobManager);
     }
 
-	private static Class<?>[] getTypeClasses(final List elements) {
+	private static Class<?>[] getTypeClasses(final List<? extends Expression> elements) {
 		Class<?>[] types = new Class[elements.size()];
         for (ListIterator<? extends Expression> i = elements.listIterator(); i.hasNext();) {
             Expression expr = i.next();
@@ -512,16 +541,7 @@
 	}
 
 	private BatchManagerImpl createBatchManager(final Long newID, Class<?>[] types) {
-		BatchManagerImpl bm = new BatchManagerImpl(newID, types);
-		final AtomicBoolean prefersMemory = bm.prefersMemory;
-    	AutoCleanupUtil.setCleanupReference(bm, new Removable() {
-			
-			@Override
-			public void remove() {
-				BufferManagerImpl.this.removeCacheGroup(newID, prefersMemory.get());
-			}
-		});
-		return bm;
+		return new BatchManagerImpl(newID, types);
 	}
 
     @Override
@@ -551,33 +571,33 @@
     public void setMaxReserveKB(int maxReserveBatchKB) {
 		if (maxReserveBatchKB > -1) {
 			int maxReserve = maxReserveBatchKB<<10;
-			this.maxReserveBytes.set(maxReserve);
+			this.maxReserveBytes = maxReserve;
 			this.reserveBatchBytes.set(maxReserve);
 		} else {
-			this.maxReserveBytes.set(-1);
+			this.maxReserveBytes = -1;
 		}
 	}
     
 	@Override
 	public void initialize() throws TeiidComponentException {
 		long maxMemory = Runtime.getRuntime().maxMemory();
-		maxMemory = Math.max(0, maxMemory - (300 << 20)); //assume 300 megs of overhead for the AS/system stuff
+		maxMemory = Math.max(0, maxMemory - (SYSTEM_OVERHEAD_MEGS << 20)); //assume an overhead for the AS/system stuff
 		if (getMaxReserveKB() < 0) {
-			this.maxReserveBytes.set(0);
+			this.maxReserveBytes = 0;
 			int one_gig = 1 << 30;
 			if (maxMemory > one_gig) {
-				//assume 75% of the memory over the first gig
-				this.maxReserveBytes.addAndGet((long)Math.max(0, (maxMemory - one_gig) * .75));
+				//assume 70% of the memory over the first gig
+				this.maxReserveBytes = (long)Math.max(0, (maxMemory - one_gig) * .7);
 			}
-			this.maxReserveBytes.addAndGet(Math.max(0, Math.min(one_gig, maxMemory) >> 1));
+			this.maxReserveBytes += Math.max(0, Math.min(one_gig, maxMemory) >> 1);
     	}
-		this.reserveBatchBytes.set(this.maxReserveBytes.get());
+		this.reserveBatchBytes.set(maxReserveBytes);
 		if (this.maxProcessingBytesOrig == null) {
 			//store the config value so that we can be reinitialized (this is not a clean approach)
 			this.maxProcessingBytesOrig = this.maxProcessingBytes;
 		}
 		if (this.maxProcessingBytesOrig < 0) {
-			this.maxProcessingBytes = (int)Math.min(Math.max(processorBatchSize * targetBytesPerRow * 8l, (.1 * maxMemory)/maxActivePlans),  Integer.MAX_VALUE);
+			this.maxProcessingBytes = (int)Math.min(Math.max(processorBatchSize * targetBytesPerRow * 16l, (.2 * maxMemory)/maxActivePlans),  Integer.MAX_VALUE);
 		} 
 		//make a guess at the max number of batches
 		long memoryBatches = maxMemory / (processorBatchSize * targetBytesPerRow);
@@ -587,16 +607,39 @@
 			weakReferenceCache = new WeakReferenceHashedValueCache<CacheEntry>(Math.min(30, logSize));
 		}
 		this.maxSoftReferences = 1 << Math.min(30, logSize);
+		this.nominalProcessingMemoryMax = (int)Math.max(Math.min(this.maxReserveBytes, 2*this.maxProcessingBytes), Math.min(Integer.MAX_VALUE, 2*this.maxReserveBytes/maxActivePlans));
 	}
 	
+	void setNominalProcessingMemoryMax(int nominalProcessingMemoryMax) {
+		this.nominalProcessingMemoryMax = nominalProcessingMemoryMax;
+	}
+
     @Override
+    public void releaseOrphanedBuffers(long count) {
+    	releaseBuffers(count, false);
+    }
+    	
+    @Override
     public void releaseBuffers(int count) {
+    	releaseBuffers(count, true);
+    }
+    
+    private void releaseBuffers(long count, boolean updateContext) {
     	if (count < 1) {
     		return;
     	}
-    	reservedByThread.set(reservedByThread.get() - count);
-    	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
-    		LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Releasing buffer space", count); //$NON-NLS-1$
+    	if (updateContext) {
+        	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+        		LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Releasing buffer space", count); //$NON-NLS-1$
+        	}
+	    	CommandContext context = CommandContext.getThreadLocalContext();
+	    	if (context != null) {
+	    		context.addAndGetReservedBuffers((int)-count);
+	    	}
+    	} else {
+        	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.INFO)) {
+        		LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Releasing orphaned buffer space", count); //$NON-NLS-1$
+        	}
     	}
     	lock.lock();
     	try {
@@ -607,111 +650,218 @@
     	}
     }
     
-    /**
-     * TODO: should consider other reservations by the current thread
-     */
     @Override
-    public int reserveAdditionalBuffers(int additional) {
+    public int reserveBuffers(int count, BufferReserveMode mode) {
     	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
-    		LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reserving buffer space", additional, "WAIT"); //$NON-NLS-1$ //$NON-NLS-2$
+    		LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reserving buffer space", count, mode); //$NON-NLS-1$
     	}
-    	lock.lock();
-    	try {
-			//don't wait for more than is available
-			int waitCount = Math.min(additional, this.getMaxReserveKB() - reservedByThread.get());
-			int committed = 0;
-	    	while (waitCount > 0 && waitCount > this.reserveBatchBytes.get() && committed < additional) {
-	    		long reserveBatchSample = this.reserveBatchBytes.get();
-	    		try {
-					batchesFreed.await(100, TimeUnit.MILLISECONDS);
-				} catch (InterruptedException e) {
-					throw new TeiidRuntimeException(e);
-				}
-				if (reserveBatchSample >= this.reserveBatchBytes.get()) {
-					waitCount >>= 3;
-				} else {
-					waitCount >>= 1;
-				}
-		    	int result = noWaitReserve(additional - committed, false);
-		    	committed += result;
-	    	}	
-	    	return committed;
-    	} finally {
-    		lock.unlock();
-    		persistBatchReferences();
+		CommandContext context = CommandContext.getThreadLocalContext();
+		int existing = 0;
+		if (context != null) {
+			existing = (int)Math.min(Integer.MAX_VALUE, context.addAndGetReservedBuffers(0));
+		}
+    	int result = count;
+    	if (mode == BufferReserveMode.FORCE) {
+    		reserve(count, context);
+    	} else {
+    		lock.lock();
+    		try {
+    			count = Math.min(count, nominalProcessingMemoryMax - existing);
+    			result = noWaitReserve(count, false, context);
+    		} finally {
+    			lock.unlock();
+    		}
     	}
+		persistBatchReferences(result);
+    	return result;
     }
+
+	private void reserve(int count, CommandContext context) {
+		this.reserveBatchBytes.addAndGet(-count);
+		if (context != null) {
+			context.addAndGetReservedBuffers(count);
+		}
+	}
     
     @Override
-    public int reserveBuffers(int count, BufferReserveMode mode) {
+    public int reserveBuffersBlocking(int count, long[] val, boolean force) throws BlockedException {
     	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
-    		LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reserving buffer space", count, mode); //$NON-NLS-1$
+    		LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reserving buffer space", count, force); //$NON-NLS-1$
     	}
-    	int result = count;
-    	if (mode == BufferReserveMode.FORCE) {
-    		this.reserveBatchBytes.addAndGet(-count);
-    	} else {
-    		result = noWaitReserve(count, true);
+    	assert count >= 0;
+    	if (count == 0) {
+    		return 0;
     	}
-    	reservedByThread.set(reservedByThread.get() + result);
-		persistBatchReferences();
+    	int result = 0;
+		int count_orig = count;
+		CommandContext context = CommandContext.getThreadLocalContext();
+		long reserved = 0;
+		if (context != null) {
+			reserved = context.addAndGetReservedBuffers(0);
+			//TODO: in theory we have to check the whole stack as we could be 
+			//issuing embedded queries back to ourselves
+		}
+		count = Math.min(count, (int)Math.min(Integer.MAX_VALUE, nominalProcessingMemoryMax - reserved));
+		if (count_orig != count && !force) {
+			return 0; //is not possible to reserve the desired amount
+		}
+		result = noWaitReserve(count, true, context);
+		if (result == 0) {
+			if (val[0]++ == 0) {
+				val[1] = System.currentTimeMillis();
+			}
+			if (val[1] > 1) {
+				long last = val[1];
+				val[1] = System.currentTimeMillis();
+				try {
+					lock.lock();
+					if (val[1] - last < 10) {
+						//if the time difference is too close, then wait to prevent tight spins
+						//but we can't wait too long as we don't want to thread starve the system
+						batchesFreed.await(20, TimeUnit.MILLISECONDS);
+					}
+					if ((val[0] << (force?16:18)) > count) {
+						//aging out 
+						//TOOD: ideally we should be using a priority queue and better scheduling
+						if (!force) {
+							return 0;
+						}
+						reserve(count_orig, context);
+						result = count_orig;
+					} else {
+						int min = 0;
+						if (force) {
+							min = 2*count/3;
+						} else {
+							min = 4*count/5;
+						}
+						//if a sample looks good proceed
+						if (reserveBatchBytes.get() > min){
+							reserve(count_orig, context);
+							result = count_orig;
+						}
+					}
+				} catch (InterruptedException e) {
+					throw new TeiidRuntimeException(e);
+				} finally {
+					lock.unlock();
+				}
+			}
+			if (result == 0) {
+				throw BlockedException.BLOCKED_ON_MEMORY_EXCEPTION;
+			}
+		}
+		if (force && result < count_orig) {
+			reserve(count_orig - result, context);
+			result = count_orig;
+		}
+		val[0] = 0;
+		persistBatchReferences(result);
     	return result;
     }
 
-	private int noWaitReserve(int count, boolean allOrNothing) {
-		for (int i = 0; i < 2; i++) {
+	private int noWaitReserve(int count, boolean allOrNothing, CommandContext context) {
+		boolean success = false;
+		for (int i = 0; !success && i < 2; i++) {
 			long reserveBatch = this.reserveBatchBytes.get();
-			if (allOrNothing && count > reserveBatch) {
-				return 0;
+			long overhead = this.overheadBytes.get();
+			long current = reserveBatch - overhead;
+			if (allOrNothing) {
+				if (count > current) {
+					return 0;
+				}
+			} else if (count > current) {
+				count = (int)Math.max(0, current);
 			}
-			count = (int)Math.min(count, Math.max(0, reserveBatch));
 			if (count == 0) {
 				return 0;
 			}
 			if (this.reserveBatchBytes.compareAndSet(reserveBatch, reserveBatch - count)) {
-				return count;
+				success = true;
 			}
 		}
 		//the value is changing rapidly, but we've already potentially adjusted the value twice, so just proceed
-		this.reserveBatchBytes.addAndGet(-count);
+		if (!success) {
+			this.reserveBatchBytes.addAndGet(-count);
+		}
+		if (context != null) {
+			context.addAndGetReservedBuffers(count);
+		}
 		return count;
 	}
     
-	void persistBatchReferences() {
-		long activeBatch = activeBatchBytes.get();
+	void persistBatchReferences(int max) {
+		if (max <= 0) {
+			return;
+		}
+		if (!cleaning.get()) {
+			synchronized (cleaner) {
+				cleaner.notify();
+			}
+		}
+		long activeBatch = activeBatchBytes.get() + overheadBytes.get();
 		long reserveBatch = reserveBatchBytes.get();
-		if (activeBatch <= reserveBatch) {
-    		long memoryCount = activeBatch + getMaxReserveKB() - reserveBatch;
-			if (DataTypeManager.isValueCacheEnabled()) {
-    			if (memoryCount < getMaxReserveKB() / 8) {
-					DataTypeManager.setValueCacheEnabled(false);
-				}
-			} else if (memoryCount > getMaxReserveKB() / 2) {
-				DataTypeManager.setValueCacheEnabled(true);
+		long memoryCount = activeBatch + maxReserveBytes - reserveBatch;
+		if (memoryCount <= maxReserveBytes) {
+			if (DataTypeManager.USE_VALUE_CACHE && DataTypeManager.isValueCacheEnabled() && memoryCount < maxReserveBytes / 8) {
+				DataTypeManager.setValueCacheEnabled(false);
 			}
 			return;
+		} else if (DataTypeManager.USE_VALUE_CACHE) {
+			DataTypeManager.setValueCacheEnabled(true);
 		}
-		long maxToFree = Math.min(maxProcessingBytes, (activeBatch - reserveBatch)<<1);
-		doEvictions(maxToFree, true);
+		//we delay work here as there should be excess vm space, we are using an overestimate, and we want the cleaner to do the work if possible
+		//TODO: track sizes held by each queue independently
+		long maxToFree = Math.min(max, memoryCount - maxReserveBytes);
+		LrfuEvictionQueue<CacheEntry> first = initialEvictionQueue;
+		LrfuEvictionQueue<CacheEntry> second = evictionQueue;
+		if (evictionQueue.getSize() > 2*initialEvictionQueue.getSize()) {
+			//attempt to evict from the non-initial queue first as these should essentially be cost "free" and hopefully the reference cache can mitigate
+			//the cost of rereading
+			first = evictionQueue;
+			second = initialEvictionQueue;
+		}
+		maxToFree -= doEvictions(maxToFree, true, first);
+		if (maxToFree > 0) {
+			maxToFree = Math.min(maxToFree, activeBatchBytes.get() + overheadBytes.get() - reserveBatchBytes.get());
+			if (maxToFree > 0) {
+				doEvictions(maxToFree, true, second);
+			}
+		}
 	}
-
-	void doEvictions(long maxToFree, boolean checkActiveBatch) {
-		int freed = 0;
-		while (freed <= maxToFree && (!checkActiveBatch || (maxToFree == 0 && activeBatchBytes.get() > reserveBatchBytes.get() * .7) || (maxToFree > 0 && activeBatchBytes.get() > reserveBatchBytes.get() * .8))) {
-			CacheEntry ce = evictionQueue.firstEntry(true);
+	
+	long doEvictions(long maxToFree, boolean checkActiveBatch, LrfuEvictionQueue<CacheEntry> queue) {
+		if (queue == evictionQueue) {
+			maxToFree = Math.min(maxToFree, this.maxProcessingBytes);
+		}
+		long freed = 0;
+		while (freed <= maxToFree && (
+				!checkActiveBatch //age out 
+				|| queue == evictionQueue && activeBatchBytes.get() + overheadBytes.get() + this.maxReserveBytes/2 > reserveBatchBytes.get() //nominal cleaning criterion 
+				|| queue != evictionQueue && activeBatchBytes.get() > 0)) { //assume that basically all initial batches will need to be written out at some point
+			CacheEntry ce = queue.firstEntry(checkActiveBatch);
 			if (ce == null) {
 				break;
 			}
 			synchronized (ce) {
 				if (!memoryEntries.containsKey(ce.getId())) {
+					checkActiveBatch = true;
 					continue; //not currently a valid eviction
 				}
 			}
+			if (!checkActiveBatch) {
+				long lastAccess = ce.getKey().getLastAccess();
+				long currentTime = readAttempts.get();
+				if (currentTime - lastAccess < MAX_READ_AGE) {
+					checkActiveBatch = true;
+					continue;
+				}
+			}
 			boolean evicted = true;
 			try {
 				evicted = evict(ce);
 			} catch (Throwable e) {
-				LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read batch "+ ce.getId() +" later will result in an exception"); //$NON-NLS-1$ //$NON-NLS-2$
+				LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting, attempts to read "+ ce.getId() +" later will result in an exception."); //$NON-NLS-1$ //$NON-NLS-2$
 			} finally {
 				synchronized (ce) {
 					if (evicted && memoryEntries.remove(ce.getId()) != null) {
@@ -719,14 +869,13 @@
 							freed += ce.getSizeEstimate();
 						}
 						activeBatchBytes.addAndGet(-ce.getSizeEstimate());
-						evictionQueue.remove(ce); //ensures that an intervening get will still be cleaned
-						if (!checkActiveBatch) {
-							break;
-						}
+						queue.remove(ce); //ensures that an intervening get will still be cleaned
+						checkActiveBatch = true;
 					}
 				}
 			}
 		}
+		return freed;
 	}
 
 	boolean evict(CacheEntry ce) throws Exception {
@@ -762,14 +911,13 @@
 		int sizeEstimate = ce.getSizeEstimate()/2;
 		BatchSoftReference ref = new BatchSoftReference(ce, SOFT_QUEUE, sizeEstimate);
 		softCache.put(ce.getId(), ref);
-		maxReserveBytes.addAndGet(- sizeEstimate);
-		reserveBatchBytes.addAndGet(- sizeEstimate);
+		overheadBytes.addAndGet(sizeEstimate);
 	}
 	
 	/**
 	 * Get a CacheEntry without hitting storage
 	 */
-	CacheEntry fastGet(Long batch, boolean prefersMemory, boolean retain) {
+	CacheEntry fastGet(Long batch, Boolean prefersMemory, boolean retain) {
 		CacheEntry ce = null;
 		if (retain) {
 			ce = memoryEntries.get(batch);
@@ -782,10 +930,17 @@
 					//there is a minute chance the batch was evicted
 					//this call ensures that we won't leak
 					if (memoryEntries.containsKey(batch)) {
-						evictionQueue.touch(ce);
+						if (ce.isPersistent()) {
+							evictionQueue.touch(ce);
+						} else {
+							initialEvictionQueue.touch(ce);
+						}
 					}
 				} else {
 					evictionQueue.remove(ce);
+					if (!ce.isPersistent()) {
+						initialEvictionQueue.remove(ce);
+					}
 				}
 			}
 			if (!retain) {
@@ -793,7 +948,7 @@
 			}
 			return ce;
 		}
-		if (prefersMemory) {
+		if (prefersMemory == null || prefersMemory) {
 			BatchSoftReference bsr = softCache.remove(batch);
 			if (bsr != null) {
 				ce = bsr.get();
@@ -801,7 +956,8 @@
 					clearSoftReference(bsr);
 				}
 			}
-		} else if (useWeakReferences) {
+		}
+		if (ce == null && (prefersMemory == null || !prefersMemory) && useWeakReferences) {
 			ce = weakReferenceCache.getByHash(batch);
 			if (ce == null || !ce.getId().equals(batch)) {
 				return null;
@@ -846,11 +1002,11 @@
 	}
 	
 	void addMemoryEntry(CacheEntry ce, boolean initial) {
-		persistBatchReferences();
+		persistBatchReferences(ce.getSizeEstimate());
 		synchronized (ce) {
 			boolean added = memoryEntries.put(ce.getId(), ce) == null;
 			if (initial) {
-				evictionQueue.add(ce);
+				initialEvictionQueue.add(ce);
 			} else if (added) {
 				evictionQueue.recordAccess(ce);
 				evictionQueue.add(ce);
@@ -861,15 +1017,16 @@
 		activeBatchBytes.getAndAdd(ce.getSizeEstimate());
 	}
 	
-	void removeCacheGroup(Long id, boolean prefersMemory) {
+	void removeCacheGroup(Long id, Boolean prefersMemory) {
 		cleanSoftReferences();
 		Collection<Long> vals = cache.removeCacheGroup(id);
 		long overhead = vals.size() * BATCH_OVERHEAD;
-		maxReserveBytes.addAndGet(overhead);
-		reserveBatchBytes.addAndGet(overhead);
-		for (Long val : vals) {
-			//TODO: we will unnecessarily call remove on the cache, but that should be low cost
-			fastGet(val, prefersMemory, false);
+		overheadBytes.addAndGet(-overhead);
+		if (!vals.isEmpty()) {
+			for (Long val : vals) {
+				//TODO: we will unnecessarily call remove on the cache, but that should be low cost
+				fastGet(val, prefersMemory, false);
+			}
 		}
 	}
 	
@@ -892,8 +1049,8 @@
 	private int[] getSizeEstimates(List<? extends Expression> elements) {
 		int total = 0;
 		boolean isValueCacheEnabled = DataTypeManager.isValueCacheEnabled();
-		for (Expression element : elements) {
-			Class<?> type = element.getType();
+		for (int i = elements.size() - 1; i >= 0; i--) {
+			Class<?> type = elements.get(i).getType();
 			total += SizeUtility.getSize(isValueCacheEnabled, type);
 		}
 		//assume 64-bit
@@ -932,6 +1089,11 @@
 	}
 
 	public void shutdown() {
+		this.cache = null;
+		this.memoryEntries.clear();
+		this.evictionQueue.getEvictionQueue().clear();
+		this.initialEvictionQueue.getEvictionQueue().clear();
+		this.cleaner.cancel();
 	}
 
 	@Override
@@ -970,14 +1132,14 @@
 	
 	public void setUseWeakReferences(boolean useWeakReferences) {
 		this.useWeakReferences = useWeakReferences;
-	}
+	}	
 	
 	public void setInlineLobs(boolean inlineLobs) {
 		this.inlineLobs = inlineLobs;
 	}
 
 	public int getMaxReserveKB() {
-		return (int)maxReserveBytes.get()>>10;
+		return (int)maxReserveBytes>>10;
 	}
 	
 	public void setCache(Cache cache) {
@@ -988,10 +1150,22 @@
 		return memoryEntries.size();
 	}
 	
+	public long getActiveBatchBytes() {
+		return activeBatchBytes.get();
+	}
+	
+	public long getReferenceHits() {
+		return referenceHit.get();
+	}
+	
 	@Override
 	public Streamable<?> persistLob(Streamable<?> lob, FileStore store,
 			byte[] bytes) throws TeiidComponentException {
 		return LobManager.persistLob(lob, store, bytes, inlineLobs, DataTypeManager.MAX_LOB_MEMORY_BYTES);
 	}
+
+	public void invalidCacheGroup(Long gid) {
+		removeCacheGroup(gid, null);
+	}
 	
 }

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -52,7 +52,7 @@
 	}
 
     public void write(byte b[], int off, int len) throws IOException {
-    	while (true) {
+    	while (len > 0) {
         	ensureBuffer();
     		int toCopy = Math.min(buf.remaining(), len);
     		buf.put(b, off, toCopy);

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -26,6 +26,7 @@
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.teiid.common.buffer.BaseCacheEntry;
@@ -48,6 +49,7 @@
 	protected AtomicLong clock;
 	protected long maxInterval;
 	protected long halfLife;
+	private AtomicInteger size = new AtomicInteger();
 	
 	public LrfuEvictionQueue(AtomicLong clock) {
 		this.clock = clock;
@@ -55,11 +57,19 @@
 	}
 
 	public boolean remove(V value) {
-		return evictionQueue.remove(value.getKey()) != null;
+		if (evictionQueue.remove(value.getKey()) != null) {
+			size.addAndGet(-1);
+			return true;
+		}
+		return false;
 	}
 	
 	public boolean add(V value) {
-		return evictionQueue.put(value.getKey(), value) == null;
+		if (evictionQueue.put(value.getKey(), value) == null) {
+			size.addAndGet(1);
+			return true;
+		}
+		return false;
 	}
 	
 	public void touch(V value) {
@@ -80,6 +90,9 @@
 		Map.Entry<CacheKey, V> entry = null;
 		if (poll) {
 			entry = evictionQueue.pollFirstEntry();
+			if (entry != null) {
+				size.addAndGet(-1);
+			}
 		} else {
 			entry = evictionQueue.firstEntry();
 		}
@@ -130,4 +143,8 @@
 		this.maxInterval = 62*this.halfLife;
 	}
 	
+	public int getSize() {
+		return size.get();
+	}
+	
 }

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -22,6 +22,11 @@
 
 package org.teiid.common.buffer.impl;
 
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.io.OutputStream;
+import java.io.Serializable;
 import java.lang.reflect.Array;
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -30,8 +35,14 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.teiid.core.types.BaseLob;
 import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.types.InputStreamFactory;
+import org.teiid.core.types.InputStreamFactory.StorageMode;
+import org.teiid.core.types.Streamable;
 
 
 /**
@@ -42,6 +53,26 @@
  * Actual object allocation efficiency can be quite poor.  
  */
 public final class SizeUtility {
+	private static final int UNKNOWN_SIZE_BYTES = 512;
+
+	private static final class DummyOutputStream extends OutputStream {
+		int bytes;
+
+		@Override
+		public void write(int arg0) throws IOException {
+			bytes++;
+		}
+		
+		@Override
+		public void write(byte[] b, int off, int len) throws IOException {
+			bytes+=len;
+		}
+		
+		public int getBytes() {
+			return bytes;
+		}
+	}
+
 	public static final int REFERENCE_SIZE = 8;
 	
 	private static Map<Class<?>, int[]> SIZE_ESTIMATES = new HashMap<Class<?>, int[]>(128);
@@ -73,6 +104,13 @@
 	private long bigDecimalEstimate;
 	private Class<?>[] types;
 	
+	private static class ClassStats {
+		AtomicInteger samples = new AtomicInteger();
+		volatile int averageSize = UNKNOWN_SIZE_BYTES;
+	}
+	
+	private static ConcurrentHashMap<String, ClassStats> objectEstimates = new ConcurrentHashMap<String, ClassStats>();
+	
 	public SizeUtility(Class<?>[] types) {
 		boolean isValueCacheEnabled = DataTypeManager.isValueCacheEnabled();
 		bigIntegerEstimate = getSize(isValueCacheEnabled, DataTypeManager.DefaultDataClasses.BIG_INTEGER);
@@ -109,7 +147,7 @@
 			Class<?> type) {
     	int[] vals = SIZE_ESTIMATES.get(type);
     	if (vals == null) {
-			return 512; //this is is misleading for lobs
+			return UNKNOWN_SIZE_BYTES; //this is is misleading for lobs
 			//most references are not actually removed from memory
     	}
     	return vals[isValueCacheEnabled?0:1];
@@ -124,7 +162,7 @@
             return 0;
         }
 
-        if(type == DataTypeManager.DefaultDataClasses.STRING) {
+        if(obj.getClass() == DataTypeManager.DefaultDataClasses.STRING) {
             int length = ((String)obj).length();
             if (length > 0) {
                 return alignMemory(40 + (2 * length));
@@ -137,9 +175,7 @@
             int bitLength = ((BigDecimal)obj).unscaledValue().bitLength();
             //TODO: this does not account for the possibility of a cached string
             long result = 88 + alignMemory(4 + (bitLength >> 3));
-            if (updateEstimate) {
-            	bigDecimalEstimate = (bigDecimalEstimate + result)/2;
-            }
+        	bigDecimalEstimate = (bigDecimalEstimate + result)/2;
             return result;
         } else if(type == DataTypeManager.DefaultDataClasses.BIG_INTEGER) {
         	if (!updateEstimate) {
@@ -147,15 +183,13 @@
         	}
             int bitLength = ((BigInteger)obj).bitLength();
             long result = 40 + alignMemory(4 + (bitLength >> 3));
-            if (updateEstimate) {
-            	bigIntegerEstimate = (bigIntegerEstimate + result)/2;
-            }
+        	bigIntegerEstimate = (bigIntegerEstimate + result)/2;
             return result;
         } else if(obj instanceof Iterable<?>) {
         	Iterable<?> i = (Iterable<?>)obj;
         	long total = 16;
         	for (Object object : i) {
-				total += getSize(object, DataTypeManager.determineDataTypeClass(object), true, false) + REFERENCE_SIZE;
+				total += getSize(object, DataTypeManager.determineDataTypeClass(object), updateEstimate, false) + REFERENCE_SIZE;
 			}
         	return total;
         } else if(obj.getClass().isArray()) {
@@ -164,7 +198,7 @@
 	            Object[] rows = (Object[]) obj;
 	            long total = 16 + alignMemory(rows.length * REFERENCE_SIZE); // Array overhead
 	            for(int i=0; i<rows.length; i++) {
-	                total += getSize(rows[i], DataTypeManager.determineDataTypeClass(rows[i]), true, false);
+	                total += getSize(rows[i], DataTypeManager.determineDataTypeClass(rows[i]), updateEstimate, false);
 	            }
 	            return total;
         	}
@@ -180,7 +214,67 @@
         		primitiveSize = 4;
         	}
         	return alignMemory(length * primitiveSize) + 16;
-        }        	
+        } else if (obj instanceof Streamable<?>) {
+			try {
+				Streamable<?> s = (Streamable)obj;
+				Object o = s.getReference();
+				if (o instanceof BaseLob) {
+					InputStreamFactory isf = ((BaseLob)o).getStreamFactory();
+					if (isf.getStorageMode() == StorageMode.MEMORY) {
+		    			long length = isf.getLength();
+		    			if (length >= 0) {
+		    				return 40 + alignMemory(length);
+		    			}
+					} else if (isf.getStorageMode() == StorageMode.PERSISTENT) {
+						long length = isf.getLength();
+	    				return 40 + alignMemory(Math.min(DataTypeManager.MAX_LOB_MEMORY_BYTES, length));
+					}
+	    		}
+			} catch (Exception e) {
+			}
+        } else if (type == DataTypeManager.DefaultDataClasses.OBJECT) {
+        	if (obj.getClass() != DataTypeManager.DefaultDataClasses.OBJECT && (SIZE_ESTIMATES.containsKey(obj.getClass()) || VARIABLE_SIZE_TYPES.contains(obj.getClass()))) {
+        		return getSize(obj, obj.getClass(), updateEstimate, accountForValueCache);
+        	}
+        	//assume we can get a plausable estimate from the serialized size
+        	if (obj instanceof Serializable) {
+            	ClassStats stats = objectEstimates.get(obj.getClass().getName()); //we're ignoring classloader differences here
+            	if (stats == null) {
+            		stats = new ClassStats();
+            		objectEstimates.put(obj.getClass().getName(), stats);
+            	}
+            	if (updateEstimate) {
+	            	int samples = stats.samples.getAndIncrement();
+	            	if (samples < 1000 || (samples&1023) == 1023) {
+	    	        	try {
+	    		        	DummyOutputStream os = new DummyOutputStream();
+	    		        	ObjectOutputStream oos = new ObjectOutputStream(os) {
+	    		        		@Override
+	    		        		protected void writeClassDescriptor(
+	    		        				ObjectStreamClass desc) throws IOException {
+	    		        		}
+	    		        		@Override
+	    		        		protected void writeStreamHeader()
+	    		        				throws IOException {
+	    		        		}
+	    		        	};
+	    		        	oos.writeObject(obj);
+	    		        	oos.close();
+	    		        	int result = (int)alignMemory(os.getBytes() * 3);
+	    		        	if (result > stats.averageSize) {
+	    		        		stats.averageSize = (stats.averageSize + result*2)/3;
+	    		        	} else {
+	    		        		stats.averageSize = (stats.averageSize + result)/2;
+	    		        	}
+	    		        	return result;
+	    	        	} catch (Exception e) {
+	    	        		
+	    	        	}            		
+	            	}
+            	}
+            	return stats.averageSize;
+        	}
+        }
 		return getSize(accountForValueCache, type);
     }
     

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -34,17 +34,17 @@
 import java.util.concurrent.Callable;
 
 import org.teiid.client.RequestMessage;
+import org.teiid.client.RequestMessage.ShowPlan;
 import org.teiid.client.ResultsMessage;
 import org.teiid.client.SourceWarning;
-import org.teiid.client.RequestMessage.ShowPlan;
 import org.teiid.client.lob.LobChunk;
 import org.teiid.client.metadata.ParameterInfo;
 import org.teiid.client.util.ResultsReceiver;
 import org.teiid.client.xa.XATransactionException;
 import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.common.buffer.TupleBatch;
 import org.teiid.common.buffer.TupleBuffer;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidException;
 import org.teiid.core.TeiidProcessingException;
@@ -59,14 +59,14 @@
 import org.teiid.dqp.message.AtomicRequestID;
 import org.teiid.dqp.message.RequestID;
 import org.teiid.dqp.service.TransactionContext;
+import org.teiid.dqp.service.TransactionContext.Scope;
 import org.teiid.dqp.service.TransactionService;
-import org.teiid.dqp.service.TransactionContext.Scope;
+import org.teiid.jdbc.EnhancedTimer.Task;
 import org.teiid.jdbc.SQLStates;
-import org.teiid.jdbc.EnhancedTimer.Task;
+import org.teiid.logging.CommandLogMessage.Event;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
 import org.teiid.logging.MessageLevel;
-import org.teiid.logging.CommandLogMessage.Event;
 import org.teiid.metadata.FunctionMethod.Determinism;
 import org.teiid.query.QueryPlugin;
 import org.teiid.query.analysis.AnalysisRecord;
@@ -78,6 +78,7 @@
 import org.teiid.query.sql.lang.SPParameter;
 import org.teiid.query.sql.lang.StoredProcedure;
 import org.teiid.query.sql.symbol.SingleElementSymbol;
+import org.teiid.query.util.CommandContext;
 
 public class RequestWorkItem extends AbstractWorkItem implements PrioritizedRunnable {
 	
@@ -372,7 +373,12 @@
 		if (!doneProducingBatches) {
 			this.processor.getContext().setTimeSliceEnd(System.currentTimeMillis() + this.processorTimeslice);
 			sendResultsIfNeeded(null);
-			this.resultsBuffer = collector.collectTuples();
+			try {
+				CommandContext.pushThreadLocalContext(this.processor.getContext());
+				this.resultsBuffer = collector.collectTuples();
+			} finally {
+				CommandContext.popThreadLocalContext();
+			}
 			if (!doneProducingBatches) {
 				doneProducingBatches();
 				//TODO: we could perform more tracking to know what source lobs are in use
@@ -435,6 +441,9 @@
 					for (DataTierTupleSource connectorRequest : getConnectorRequests()) {
 						connectorRequest.fullyCloseSource();
 				    }
+					
+					CommandContext cc = this.processor.getContext();
+					cc.close();
 				}
 	
 				this.resultsBuffer = null;

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -107,6 +107,9 @@
 	    		if (!this.context.isNonBlocking()) {
 	    			throw e;
 	    		}
+	    		if (e == BlockedException.BLOCKED_ON_MEMORY_EXCEPTION) {
+	    			continue; //TODO: pass the commandcontext into sortutility
+	    		}
 	    	}
     		try {
                 Thread.sleep(wait);
@@ -241,6 +244,9 @@
 	    		if (!this.context.isNonBlocking()) {
 	    			throw e;
 	    		}
+	    		if (e == BlockedException.BLOCKED_ON_MEMORY_EXCEPTION) {
+	    			continue; //TODO: pass the commandcontext into sortutility
+	    		}
 	    	} catch (TeiidComponentException e) {
 	    		closeProcessing();
 	    		throw e;

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/DependentCriteriaProcessor.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/DependentCriteriaProcessor.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/DependentCriteriaProcessor.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -91,7 +91,8 @@
 		                    sortDirection.add(Boolean.valueOf(OrderBy.ASC));
 		                    sortSymbols.add((SingleElementSymbol)dependentSetStates.get(i).valueExpression);
 		                }
-		                this.sortUtility = new SortUtility(originalVs.getTupleBuffer().createIndexedTupleSource(), sortSymbols, sortDirection, Mode.DUP_REMOVE, dependentNode.getBufferManager(), dependentNode.getConnectionID(), originalVs.getTupleBuffer().getSchema());
+		                this.sortUtility = new SortUtility(null, sortSymbols, sortDirection, Mode.DUP_REMOVE, dependentNode.getBufferManager(), dependentNode.getConnectionID(), originalVs.getTupleBuffer().getSchema());
+		                this.sortUtility.setWorkingBuffer(originalVs.getTupleBuffer());
 	            	}
 	            	dvs = new DependentValueSource(sortUtility.sort());
                 } else {

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -31,11 +31,11 @@
 
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
-import org.teiid.common.buffer.IndexedTupleSource;
+import org.teiid.common.buffer.BufferManager.BufferReserveMode;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.common.buffer.TupleBuffer;
+import org.teiid.common.buffer.TupleBuffer.TupleBufferTupleSource;
 import org.teiid.common.buffer.TupleSource;
-import org.teiid.common.buffer.BufferManager.BufferReserveMode;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.util.Assertion;
@@ -53,6 +53,7 @@
  * Implements several modes of a multi-pass sort.
  * 
  * TODO: could consider using an index for dup_removal and maintaining a separate output buffer
+ * TODO: release the tuple buffer in the last merge pass if sublists will fit in processing batch size
  */
 public class SortUtility {
 	
@@ -73,7 +74,7 @@
 	private class SortedSublist implements Comparable<SortedSublist> {
 		List<?> tuple;
 		int index;
-		IndexedTupleSource its;
+		TupleBufferTupleSource its;
 		int limit = Integer.MAX_VALUE;
 		
 		@Override
@@ -97,6 +98,7 @@
     private int schemaSize;
     private int batchSize;
 	private ListNestedSortComparator comparator;
+	private int targetRowCount;
 
     private TupleBuffer output;
     private boolean doneReading;
@@ -104,13 +106,15 @@
     private List<TupleBuffer> activeTupleBuffers = new ArrayList<TupleBuffer>();
     private int masterSortIndex;
     
-    private int collected;
+    private int processed;
 
     // Phase constants for readability
     private static final int INITIAL_SORT = 1;
     private static final int MERGE = 2;
     private static final int DONE = 3;
-	private Collection<List<?>> workingTuples;
+	private TupleBuffer workingBuffer;
+	private long[] attempts = new long[2];
+	private boolean nonBlocking;
     
     public SortUtility(TupleSource sourceID, List<OrderByItem> items, Mode mode, BufferManager bufferMgr,
                         String groupName, List<? extends Expression> schema) {
@@ -167,6 +171,7 @@
         this.schema = schema;
         this.schemaSize = bufferManager.getSchemaSize(this.schema);
         this.batchSize = bufferManager.getProcessorBatchSize(this.schema);
+        this.targetRowCount = Math.max(bufferManager.getMaxProcessingSize()/this.schemaSize, 2)*this.batchSize;
         this.comparator = new ListNestedSortComparator(cols, sortTypes);
         int distinctIndex = cols.length - 1;
         this.comparator.setDistinctIndex(distinctIndex);
@@ -180,38 +185,57 @@
 
     public TupleBuffer sort()
         throws TeiidComponentException, TeiidProcessingException {
-
-        if(this.phase == INITIAL_SORT) {
-            initialSort();
+    	boolean success = false;
+    	try {
+	        if(this.phase == INITIAL_SORT) {
+	            initialSort(false);
+	        }
+	        
+	        if(this.phase == MERGE) {
+	            mergePhase();
+	        }
+	        success = true;
+	        if (this.output != null) {
+	        	return this.output;
+	        }
+	        return this.activeTupleBuffers.get(0);
+    	} catch (BlockedException e) {
+    		success = true;
+    		throw e;
+    	} finally {
+        	if (!success) {
+        		remove();
+        	}
         }
-        
-        if(this.phase == MERGE) {
-            mergePhase();
-        }
-        if (this.output != null) {
-        	return this.output;
-        }
-        return this.activeTupleBuffers.get(0);
     }
     
     public List<TupleBuffer> onePassSort() throws TeiidComponentException, TeiidProcessingException {
+    	boolean success = false;
     	assert this.mode != Mode.DUP_REMOVE;
-    	
-    	if(this.phase == INITIAL_SORT) {
-            initialSort();
-        }
-    	
-    	for (TupleBuffer tb : activeTupleBuffers) {
-			tb.close();
-		}
-    	
-    	return activeTupleBuffers;
+    	try {
+	    	if(this.phase == INITIAL_SORT) {
+	            initialSort(true);
+	        }
+	    	
+	    	for (TupleBuffer tb : activeTupleBuffers) {
+				tb.close();
+			}
+	    	success = true;
+	    	return activeTupleBuffers;
+    	} catch (BlockedException e) {
+    		success = true;
+    		throw e;
+    	} finally {
+    		if (!success) {
+    			remove();
+    		}
+    	}
     }
-
+    
 	private TupleBuffer createTupleBuffer() throws TeiidComponentException {
 		TupleBuffer tb = bufferManager.createTupleBuffer(this.schema, this.groupName, TupleSourceType.PROCESSOR);
 		if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-			LogManager.logDetail(LogConstants.CTX_DQP, "Created intermediate sort buffer ", tb.getId()); //$NON-NLS-1$
+			LogManager.logDetail(LogConstants.CTX_DQP, "Created intermediate sort buffer", tb); //$NON-NLS-1$
 		}
 		tb.setForwardOnly(true);
 		return tb;
@@ -220,30 +244,16 @@
 	/**
 	 * creates sorted sublists stored in tuplebuffers
 	 */
-    protected void initialSort() throws TeiidComponentException, TeiidProcessingException {
-    	while(!doneReading) {
-    		if (workingTuples == null) {
-	            if (mode == Mode.SORT) {
-	            	workingTuples = new ArrayList<List<?>>();
-	            } else {
-	            	workingTuples = new TreeSet<List<?>>(comparator);
-	            }
-    		}
+    protected void initialSort(boolean onePass) throws TeiidComponentException, TeiidProcessingException {
+    	outer: while (!doneReading) {
     		
-            int totalReservedBuffers = 0;
-            try {
-	            int maxRows = this.batchSize;
-		        while(!doneReading) {
-		        	//attempt to reserve more working memory if there are additional rows available before blocking
-		        	if (workingTuples.size() >= maxRows) {
-	        			int reserved = bufferManager.reserveBuffers(schemaSize, 
-	        					(totalReservedBuffers + schemaSize <= bufferManager.getMaxProcessingSize())?BufferReserveMode.FORCE:BufferReserveMode.NO_WAIT);
-		        		totalReservedBuffers += reserved;
-	        			if (reserved != schemaSize) {
-		        			break;
-		        		} 
-		        		maxRows += this.batchSize;	
-		        	}
+    		if (this.source != null) {
+	    		//sub-phase 1 - build up a working buffer of tuples
+	    		if (this.workingBuffer == null) {
+	    			this.workingBuffer = createTupleBuffer();
+	    		}
+	    		
+	    		while (!doneReading) {
 		            try {
 		            	List<?> tuple = source.nextTuple();
 		            	
@@ -251,26 +261,96 @@
 		            		doneReading = true;
 		            		break;
 		            	}
-	                    if (workingTuples.add(tuple)) {
-	                    	this.collected++;
-	                    }
+		            	this.workingBuffer.addTuple(tuple);
 		            } catch(BlockedException e) {
-		            	if (workingTuples.size() >= this.batchSize) {
-		            		break;
+		            	/*there are three cases here
+		            	 * 1. a fully blocking sort (optionally dup removal)
+		            	 * 2. a streaming dup removal
+		            	 * 3. a one pass sort (for grace join like processing)
+		            	 */
+		            	if (!onePass && mode != Mode.DUP_REMOVE) {
+		            		throw e; //read fully before processing
 		            	}
-		            	if (mode != Mode.DUP_REMOVE  
-		            			|| (this.output != null && collected < this.output.getRowCount() * 2) 
-		            			|| (this.output == null && this.workingTuples.isEmpty() && this.activeTupleBuffers.isEmpty())) {
-	            			throw e; //block if no work can be performed
+		            	if (!onePass) {
+		            		//streaming dup remove - we have to balance latency vs. performance
+		            		//each merge phase is a full scan over the intermediate results
+		            		if (this.output != null && this.workingBuffer.getRowCount() < 2*this.processed) {
+		            			throw e;
+		            		}
+		            	} else {
+		            		//we're trying to create intermediate buffers that will comfortably be small memory sorts
+		            		if (this.workingBuffer.getRowCount() < this.targetRowCount) {
+		            			throw e;
+		            		}	
 		            	}
-		            	break;
+		            	break outer; //there's processing that we can do
 		            } 
+	    		}
+    		} else {
+    			doneReading = true;
+    		}
+        }
+    	
+		//sub-phase 2 - perform a memory sort on the workingbuffer/source
+    	int totalReservedBuffers = 0;
+        try {
+    		int maxRows = this.batchSize;
+    		Collection<List<?>> workingTuples = null;
+            boolean done = false;
+			/*
+			 * we can balance the work between the initial / multi-pass sort based upon the row count
+			 * and an updated estimate of the batch memory size 
+			 */
+			this.workingBuffer.close();
+			schemaSize = Math.max(1, this.workingBuffer.getRowSizeEstimate()*this.batchSize);
+			long memorySpaceNeeded = workingBuffer.getRowCount()*this.workingBuffer.getRowSizeEstimate();
+			if (onePass) {
+				//one pass just needs small sub-lists
+				memorySpaceNeeded = Math.min(memorySpaceNeeded, bufferManager.getMaxProcessingSize());
+			}
+			totalReservedBuffers = bufferManager.reserveBuffers(Math.min(bufferManager.getMaxProcessingSize(), (int)Math.min(memorySpaceNeeded, Integer.MAX_VALUE)), BufferReserveMode.FORCE);
+			if (totalReservedBuffers != memorySpaceNeeded) {
+				int processingSublists = Math.max(2, bufferManager.getMaxProcessingSize()/schemaSize);
+				int desiredSpace = (int)Math.min(Integer.MAX_VALUE, (workingBuffer.getRowCount()/processingSublists + (workingBuffer.getRowCount()%processingSublists))*(long)this.workingBuffer.getRowSizeEstimate());
+				if (desiredSpace > totalReservedBuffers) {
+					totalReservedBuffers += bufferManager.reserveBuffers(desiredSpace - totalReservedBuffers, BufferReserveMode.NO_WAIT);
+					//TODO: wait to force 2/3 pass processing
+				} else if (memorySpaceNeeded <= Integer.MAX_VALUE) {
+					totalReservedBuffers += bufferManager.reserveBuffers((int)memorySpaceNeeded - totalReservedBuffers, BufferReserveMode.NO_WAIT);
+				}
+				if (totalReservedBuffers > schemaSize) {
+					int additional = totalReservedBuffers%schemaSize;
+					totalReservedBuffers-=additional;
+					//release any excess
+		            bufferManager.releaseBuffers(additional);
+				}
+			}
+			TupleBufferTupleSource ts = workingBuffer.createIndexedTupleSource(source != null);
+			//ts.setReverse(workingBuffer.getRowCount() > this.batchSize);
+			processed+=this.workingBuffer.getRowCount();
+			maxRows = Math.max(1, (totalReservedBuffers/schemaSize))*batchSize;
+            if (mode == Mode.SORT) {
+            	workingTuples = new ArrayList<List<?>>();
+            } else {
+            	workingTuples = new TreeSet<List<?>>(comparator);
+            }
+            outer: while (!done) {
+                while(!done) {
+		        	if (workingTuples.size() >= maxRows) {
+	        			break;
+		        	}
+	            	List<?> tuple = ts.nextTuple();
+	            	
+	            	if (tuple == null) {
+	            		done = true;
+	            		if(workingTuples.isEmpty()) {
+				        	break outer;
+				        }
+	            		break;
+	            	}
+                    workingTuples.add(tuple);
 		        } 
 		
-		        if(workingTuples.isEmpty()) {
-		        	break;
-		        }
-			
 		        TupleBuffer sublist = createTupleBuffer();
 		        activeTupleBuffers.add(sublist);
 		        if (this.mode == Mode.SORT) {
@@ -280,40 +360,79 @@
 		        for (List<?> list : workingTuples) {
 					sublist.addTuple(list);
 				}
-		        workingTuples = null;
-	            
+		        workingTuples.clear();
 		        sublist.saveBatch();
-            } finally {
-        		bufferManager.releaseBuffers(totalReservedBuffers);
             }
+        } catch (BlockedException e) {
+        	Assertion.failed("should not block during memory sublist sorting"); //$NON-NLS-1$
+        } finally {
+    		bufferManager.releaseBuffers(totalReservedBuffers);
+    		if (this.workingBuffer != null) {
+    			if (this.source != null) {
+    				this.workingBuffer.remove();
+    			}
+        		this.workingBuffer = null;
+    		}
         }
     	
     	if (this.activeTupleBuffers.isEmpty()) {
             activeTupleBuffers.add(createTupleBuffer());
         }  
-    	this.collected = 0;
         this.phase = MERGE;
     }
 
+    public void setWorkingBuffer(TupleBuffer workingBuffer) {
+		this.workingBuffer = workingBuffer;
+	}
+    
     protected void mergePhase() throws TeiidComponentException, TeiidProcessingException {
-    	while(this.activeTupleBuffers.size() > 1) {    		
-    		ArrayList<SortedSublist> sublists = new ArrayList<SortedSublist>(activeTupleBuffers.size());
-            
-            TupleBuffer merged = createTupleBuffer();
+        long desiredSpace = activeTupleBuffers.size() * (long)schemaSize;
+        int toForce = (int)Math.min(desiredSpace, Math.max(2*schemaSize, this.bufferManager.getMaxProcessingSize()));
+        int reserved = 0;
+        
+        if (desiredSpace > toForce) {
+        	try {
+	        	int subLists = Math.max(2, this.bufferManager.getMaxProcessingSize()/schemaSize);
+	        	int twoPass = subLists * subLists;
+	        	if (twoPass < activeTupleBuffers.size()) {
+	        		//wait for 2-pass
+	    			int needed = (int)Math.ceil(Math.pow(activeTupleBuffers.size(), .5));
+	    	        reserved += bufferManager.reserveBuffersBlocking(needed * schemaSize - toForce, attempts, false);
+	        		if (reserved == 0 && twoPass*subLists < activeTupleBuffers.size()) {
+	        			//force 3-pass
+	        			needed = (int)Math.ceil(Math.pow(activeTupleBuffers.size(), 1/3d));
+	        	        reserved += bufferManager.reserveBuffersBlocking(needed * schemaSize - toForce, attempts, true);
+	        		}
+	        	} else if (desiredSpace < Integer.MAX_VALUE) {
+	        		//wait for 1-pass
+	        		reserved += bufferManager.reserveBuffersBlocking((int)desiredSpace - toForce, attempts, false);
+	        	}
+        	} catch (BlockedException be) {
+        		if (!nonBlocking) {
+        			throw be;
+        		}
+        	}
+        }
+        int total = reserved + toForce;
+        if (total > schemaSize) {
+            toForce -= total % schemaSize;
+        }
+        reserved += bufferManager.reserveBuffers(toForce, BufferReserveMode.FORCE);
+        
+        try {
+        	while(this.activeTupleBuffers.size() > 1) {    		
+	    		ArrayList<SortedSublist> sublists = new ArrayList<SortedSublist>(activeTupleBuffers.size());
+	            
+	            TupleBuffer merged = createTupleBuffer();
 
-            int desiredSpace = activeTupleBuffers.size() * schemaSize;
-            int reserved = Math.min(desiredSpace, Math.max(2*schemaSize, this.bufferManager.getMaxProcessingSize()));
-            bufferManager.reserveBuffers(reserved, BufferReserveMode.FORCE);
-            if (desiredSpace > reserved) {
-            	reserved += bufferManager.reserveAdditionalBuffers(desiredSpace - reserved);
-            }
-            int maxSortIndex = Math.max(2, reserved / schemaSize); //always allow progress
-            //release any partial excess
-            int release = reserved % schemaSize > 0 ? 1 : 0;
-            bufferManager.releaseBuffers(release);
-            reserved -= release;
-            try {
-	        	if (LogManager.isMessageToBeRecorded(org.teiid.logging.LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+	            desiredSpace = activeTupleBuffers.size() * (long)schemaSize;
+	            if (desiredSpace < reserved) {
+	            	bufferManager.releaseBuffers(reserved - (int)desiredSpace);
+	            	reserved = (int)desiredSpace;
+	            }
+	            int maxSortIndex = Math.max(2, reserved / schemaSize); //always allow progress
+	            
+            	if (LogManager.isMessageToBeRecorded(org.teiid.logging.LogConstants.CTX_DQP, MessageLevel.TRACE)) {
 	            	LogManager.logTrace(org.teiid.logging.LogConstants.CTX_DQP, "Merging", maxSortIndex, "sublists out of", activeTupleBuffers.size()); //$NON-NLS-1$ //$NON-NLS-2$
 	            }
 	        	// initialize the sublists with the min value
@@ -321,6 +440,7 @@
 	             	TupleBuffer activeID = activeTupleBuffers.get(i);
 	             	SortedSublist sortedSublist = new SortedSublist();
 	            	sortedSublist.its = activeID.createIndexedTupleSource();
+	            	sortedSublist.its.setNoBlocking(true);
 	            	sortedSublist.index = i;
 	            	if (activeID == output) {
 	            		sortedSublist.limit = output.getRowCount();
@@ -351,9 +471,9 @@
 	            if (masterSortIndex < 0) {
 	            	masterSortIndex = this.activeTupleBuffers.size() - 1;
 	            }
-            } finally {
-            	this.bufferManager.releaseBuffers(reserved);
-            }
+    		}
+        } finally {
+        	this.bufferManager.releaseBuffers(reserved);
         }
     	
         // Close sorted source (all others have been removed)
@@ -385,11 +505,7 @@
 			if (sortedSublist.limit < sortedSublist.its.getCurrentIndex()) {
 				return; //special case for still reading the output tuplebuffer
 			}
-			try {
-				sortedSublist.tuple = sortedSublist.its.nextTuple();
-	        } catch (BlockedException e) {
-	        	//intermediate sources aren't closed
-	        }  
+			sortedSublist.tuple = sortedSublist.its.nextTuple();
 	        if (sortedSublist.tuple == null) {
 	        	return; // done with this sublist
 	        }
@@ -415,5 +531,29 @@
     public boolean isDistinct() {
     	return this.comparator.isDistinct();
     }
+
+	public void remove() {
+		if (workingBuffer != null && source != null) {
+			workingBuffer.remove();
+			workingBuffer = null;
+		}
+		if (!this.activeTupleBuffers.isEmpty()) {
+			//these can be leaked with a single pass, but
+			//they should not be reused whole
+			for (int i = 0; i < this.activeTupleBuffers.size(); i++) {
+				TupleBuffer tb = this.activeTupleBuffers.get(i);
+				if (tb == output || (i == 0 && phase == DONE)) {
+					continue;
+				}
+				tb.remove();
+			}
+			this.activeTupleBuffers.clear();
+		}
+		this.output = null;
+	}
+
+	public void setNonBlocking(boolean b) {
+		this.nonBlocking = b;
+	}
     
 }

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SortingFilter.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SortingFilter.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SortingFilter.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -28,9 +28,9 @@
 import org.teiid.api.exception.query.ExpressionEvaluationException;
 import org.teiid.api.exception.query.FunctionExecutionException;
 import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.common.buffer.TupleSource;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.query.function.aggregate.AggregateFunction;
@@ -134,7 +134,9 @@
 
             // Sort
             if (sortUtility == null) {
-            	sortUtility = new SortUtility(collectionBuffer.createIndexedTupleSource(), sortItems, removeDuplicates?Mode.DUP_REMOVE_SORT:Mode.SORT, mgr, groupName, collectionBuffer.getSchema());
+            	sortUtility = new SortUtility(null, sortItems, removeDuplicates?Mode.DUP_REMOVE_SORT:Mode.SORT, mgr, groupName, collectionBuffer.getSchema());
+            	collectionBuffer.setForwardOnly(true);
+            	sortUtility.setWorkingBuffer(collectionBuffer);
             }
             TupleBuffer sorted = sortUtility.sort();
             sorted.setForwardOnly(true);

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -286,8 +286,6 @@
     				this.prefetch.setPosition(1);
     				this.prefetch.disableSave();
     				ts = this.prefetch;
-    			} else {
-    				ts = this.buffer.createIndexedTupleSource();
     			}
     		} else {
     			ts = new BatchIterator(this.source);
@@ -295,6 +293,9 @@
 		    this.sortUtility = new SortUtility(ts, expressions, Collections.nCopies(expressions.size(), OrderBy.ASC), 
 		    		sortOption == SortOption.SORT_DISTINCT?Mode.DUP_REMOVE_SORT:Mode.SORT, this.source.getBufferManager(), this.source.getConnectionID(), source.getElements());
 		    this.markDistinct(sortOption == SortOption.SORT_DISTINCT && expressions.size() == this.getOuterVals().size());
+		    if (ts == null) {
+		    	this.sortUtility.setWorkingBuffer(this.buffer);
+		    }
 		}
     	if (sortOption == SortOption.NOT_SORTED) {
     		this.buffers = sortUtility.onePassSort();

Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/util/CommandContext.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/util/CommandContext.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/util/CommandContext.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -76,6 +76,13 @@
  */
 public class CommandContext implements Cloneable, org.teiid.CommandContext {
 	
+	private static ThreadLocal<LinkedList<CommandContext>> threadLocalContext = new ThreadLocal<LinkedList<CommandContext>>() {
+		@Override
+		protected LinkedList<CommandContext> initialValue() {
+			return new LinkedList<CommandContext>();
+		}
+	};
+	
 	private static class GlobalState {
 	    /** Uniquely identify the command being processed */
 	    private Object processorID;
@@ -145,6 +152,7 @@
 		private LRUCache<String, SimpleDateFormat> dateFormatCache;
 		
 		private Options options;
+		private long reservedBuffers;
 	}
 	
 	private GlobalState globalState = new GlobalState();
@@ -656,6 +664,16 @@
 		this.globalState.executor = e;
 	}
 	
+	public void close() {
+		synchronized (this.globalState) {
+			if (this.globalState.reservedBuffers > 0) {
+				long toRelease = this.globalState.reservedBuffers;
+				this.globalState.reservedBuffers = 0;
+				this.globalState.bufferManager.releaseOrphanedBuffers(toRelease);
+			}
+		}
+	}
+	
 	public static DecimalFormat getDecimalFormat(CommandContext context, String format) {
 		DecimalFormat result = null;
 		if (context != null) {
@@ -700,6 +718,22 @@
 		return this.globalState.options;
 	}
 	
+	public static CommandContext getThreadLocalContext() {
+		return threadLocalContext.get().peek();
+	}
+	
+	public static void pushThreadLocalContext(CommandContext context) {
+		threadLocalContext.get().push(context);
+	}
+	
+	public static void popThreadLocalContext() {
+		threadLocalContext.get().poll();
+	}
+
+	public long addAndGetReservedBuffers(int i) {
+		return globalState.reservedBuffers += i;
+	}
+	
 	public void setOptions(Options options) {
 		this.globalState.options = options;
 	}

Copied: branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestExtensibleBufferedInputStream.java (from rev 4560, branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BatchManager.java)
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestExtensibleBufferedInputStream.java	                        (rev 0)
+++ branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestExtensibleBufferedInputStream.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+public class TestExtensibleBufferedInputStream {
+	
+	@Test public void testReset() throws IOException {
+		InputStream is = new ExtensibleBufferedInputStream() {
+			boolean returned = false;
+			@Override
+			protected ByteBuffer nextBuffer() throws IOException {
+				if (returned) {
+					return null;
+				}
+				ByteBuffer result = ByteBuffer.allocate(3);
+				returned = true;
+				return result;
+			}
+		};
+		is.read();
+		is.read();
+		is.reset();
+		for (int i = 0; i < 3; i++) {
+			assertEquals(0, is.read());
+		}
+		assertEquals(-1, is.read());
+	}
+
+}

Modified: branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestSTree.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestSTree.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -134,8 +134,8 @@
 	}
 	
 	@Test public void testSearch() throws TeiidComponentException, TeiidProcessingException {
-		BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
-		bm.setProcessorBatchSize(1);
+		//due to buffering changes we need to hold this in memory directly rather than serialize it out as that will lead to GC overhead errors
+		BufferManagerImpl bm = BufferManagerFactory.getTestBufferManager(Integer.MAX_VALUE, 1);
 		
 		ElementSymbol e1 = new ElementSymbol("x");
 		e1.setType(Integer.class);

Modified: branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -31,6 +31,7 @@
 
 import org.junit.Test;
 import org.teiid.common.buffer.BufferManager.TupleSourceType;
+import org.teiid.common.buffer.TupleBuffer.TupleBufferTupleSource;
 import org.teiid.core.types.ClobType;
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.query.sql.symbol.ElementSymbol;
@@ -72,4 +73,19 @@
 		assertNotNull(tb.getLobReference(c.getReferenceStreamId()));
 	}
 	
+	@Test public void testReverseIteration() throws Exception {
+		ElementSymbol x = new ElementSymbol("x"); //$NON-NLS-1$
+		x.setType(DataTypeManager.DefaultDataClasses.INTEGER);
+		List<ElementSymbol> schema = Arrays.asList(x);
+		TupleBuffer tb = BufferManagerFactory.getStandaloneBufferManager().createTupleBuffer(schema, "x", TupleSourceType.PROCESSOR); //$NON-NLS-1$ 
+		tb.addTuple(Arrays.asList(1));
+		tb.addTuple(Arrays.asList(2));
+		TupleBufferTupleSource tbts = tb.createIndexedTupleSource();
+		tbts.setReverse(true);
+		assertTrue(tbts.hasNext());
+		assertEquals(2, tbts.nextTuple().get(0));
+		assertEquals(1, tbts.nextTuple().get(0));
+		assertFalse(tbts.hasNext());
+	}
+	
 }

Added: branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferManagerImpl.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferManagerImpl.java	                        (rev 0)
+++ branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferManagerImpl.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -0,0 +1,53 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.teiid.common.buffer.BufferManager.BufferReserveMode;
+
+public class TestBufferManagerImpl {
+	
+    @Test public void testReserve() throws Exception {
+        BufferManagerImpl bufferManager = new BufferManagerImpl();
+        bufferManager.setCache(new MemoryStorageManager());
+        bufferManager.setMaxProcessingKB(1024);
+        bufferManager.setMaxReserveKB(1024);
+        bufferManager.initialize();
+        bufferManager.setNominalProcessingMemoryMax(512000);
+        
+        //restricted by nominal max
+        assertEquals(512000, bufferManager.reserveBuffers(1024000, BufferReserveMode.NO_WAIT));
+		//forced
+        assertEquals(1024000, bufferManager.reserveBuffersBlocking(1024000, new long[] {0,0}, true));
+        
+        //not forced, so we get noting
+        assertEquals(0, bufferManager.reserveBuffersBlocking(1024000, new long[] {0,0}, false));
+        
+        bufferManager.releaseBuffers(512000);
+        //the difference between 1mb and 1000k
+        assertEquals(24576, bufferManager.reserveBuffers(1024000, BufferReserveMode.NO_WAIT));
+    }
+
+}

Modified: branches/7.7.x-TEIID-2429/test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java
===================================================================
--- branches/7.7.x-TEIID-2429/test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -116,10 +116,11 @@
 		execute("select * from vdbresources",new Object[] {}); //$NON-NLS-1$
 		TestMMDatabaseMetaData.compareResultSet(this.internalResultSet);
 	}
-
+/**
 	@Test public void testColumns() throws Exception {
-		checkResult("testColumns", "select* from SYS.Columns order by Name"); //$NON-NLS-1$ //$NON-NLS-2$
+		checkResult("testColumns", "select* from SYS.Columns order by Name, uid"); //$NON-NLS-1$ //$NON-NLS-2$
 	}
+	*/
 
 	@Test public void testTableType() throws Exception {
 

Modified: branches/7.7.x-TEIID-2429/test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java
===================================================================
--- branches/7.7.x-TEIID-2429/test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java	2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java	2013-04-17 18:59:47 UTC (rev 4561)
@@ -34,15 +34,18 @@
 import java.sql.Time;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import javax.xml.stream.XMLOutputFactory;
 import javax.xml.stream.XMLStreamWriter;
 
+import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.teiid.api.exception.query.QueryParserException;
@@ -67,14 +70,7 @@
 import org.teiid.query.processor.ProcessorDataManager;
 import org.teiid.query.processor.ProcessorPlan;
 import org.teiid.query.processor.TestTextTable;
-import org.teiid.query.processor.relational.BlockingFakeRelationalNode;
-import org.teiid.query.processor.relational.EnhancedSortMergeJoinStrategy;
-import org.teiid.query.processor.relational.FakeRelationalNode;
-import org.teiid.query.processor.relational.JoinNode;
-import org.teiid.query.processor.relational.JoinStrategy;
-import org.teiid.query.processor.relational.MergeJoinStrategy;
-import org.teiid.query.processor.relational.RelationalNode;
-import org.teiid.query.processor.relational.SortNode;
+import org.teiid.query.processor.relational.*;
 import org.teiid.query.processor.relational.MergeJoinStrategy.SortOption;
 import org.teiid.query.processor.relational.SortUtility.Mode;
 import org.teiid.query.sql.lang.Command;
@@ -89,7 +85,10 @@
 @SuppressWarnings("nls")
 public class TestEnginePerformance {
 	
+	private static boolean debug = false;
+	
 	private static BufferManagerImpl bm;
+	private static BufferFrontedFileStoreCache cache;
 	private static ExecutorService es;
 	private static Random r = new Random(0);
 	
@@ -146,9 +145,9 @@
 				
 			});
 		}
-		es.invokeAll(tasks);
-		for (Callable<Void> callable : tasks) {
-			callable.call();
+		List<Future<Void>> result = es.invokeAll(tasks);
+		for (Future<Void> future : result) {
+			future.get();
 		}
 	}
 	
@@ -198,7 +197,7 @@
 		for (int i = 0; i < rowCount; i++) {
 			data[i] = Arrays.asList(i, String.valueOf(i));
 		}
-		//Collections.shuffle(Arrays.asList(data), r);
+		Collections.shuffle(Arrays.asList(data), r);
 		return data;
 	}
 	
@@ -267,11 +266,11 @@
 		bm = new BufferManagerImpl();
 
 		bm.setMaxProcessingKB(1<<12);
-		bm.setMaxReserveKB((1<<19)-(1<<17));
+		bm.setMaxReserveKB((1<<18)-(1<<16));
 		bm.setMaxActivePlans(20);
 		
-		BufferFrontedFileStoreCache cache = new BufferFrontedFileStoreCache();
-		cache.setMemoryBufferSpace(1<<27);
+		cache = new BufferFrontedFileStoreCache();
+		cache.setMemoryBufferSpace(1<<26);
 		FileStorageManager fsm = new FileStorageManager();
 		fsm.setStorageDirectory(UnitTestUtil.getTestScratchPath() + "/data");
 		cache.setStorageManager(fsm);
@@ -282,6 +281,12 @@
 		es = Executors.newCachedThreadPool();
 	}
 	
+	@After public void tearDown() throws Exception {
+		if (debug) {
+			showStats();
+		}
+	}
+	
 	private void helpTestXMLTable(int iterations, int threadCount, String file, int expectedRowCount) throws QueryParserException,
 		TeiidException, InterruptedException, Exception {
 		String sql = "select * from xmltable('/root/child' passing xmlparse(document cast(? as clob) wellformed) columns x integer path '@id', y long path 'gc2') as x"; //$NON-NLS-1$
@@ -455,6 +460,97 @@
 		});
 	}
 	
+	private void helpTestLargeSort(int iterations, int threads, final int rows) throws InterruptedException, Exception {
+		final List<ElementSymbol> elems = new ArrayList<ElementSymbol>();
+		final int cols = 50;
+		for (int i = 0; i < cols; i++) {
+			ElementSymbol elem1 = new ElementSymbol("e" + i);
+			elem1.setType(DataTypeManager.DefaultDataClasses.STRING);
+			elems.add(elem1);
+		}
+		
+		final List<ElementSymbol> sortElements = Arrays.asList(elems.get(0));
+		
+		final Task task = new Task() {
+			@Override
+			public Void call() throws Exception {
+				CommandContext context = new CommandContext ("pid", "test", null, null, 1);               //$NON-NLS-1$ //$NON-NLS-2$
+				SortNode sortNode = new SortNode(1);
+		    	sortNode.setSortElements(new OrderBy(sortElements).getOrderByItems());
+		        sortNode.setMode(Mode.SORT);
+				sortNode.setElements(elems);
+				RelationalNode rn = new RelationalNode(2) {
+					int blockingPeriod = 3;
+					int count = 0;
+					int batches = 0;
+					
+					@Override
+					protected TupleBatch nextBatchDirect() throws BlockedException,
+							TeiidComponentException, TeiidProcessingException {
+						if (count++%blockingPeriod==0) {
+							throw BlockedException.INSTANCE;
+						}
+						int batchSize = this.getBatchSize();
+						int batchRows = batchSize;
+						boolean done = false;
+						int start = batches++ * batchSize;
+						if (start + batchSize >= rows) {
+							done = true;
+							batchRows = rows - start;  
+						}
+						ArrayList<List<?>> batch = new ArrayList<List<?>>(batchRows);
+						for (int i = 0; i < batchRows; i++) {
+							ArrayList<Object> row = new ArrayList<Object>();
+							for (int j = 0; j < cols; j++) {
+								if (j == 0) {
+									row.add(String.valueOf((i * 279470273) % 4294967291l));
+								} else {
+									row.add(i + "abcdefghijklmnop" + j);	
+								}
+							}
+							batch.add(row);
+						}
+						TupleBatch result = new TupleBatch(start+1, batch);
+						if (done) {
+							result.setTerminationFlag(true);
+						}
+						return result;
+					}
+					
+					@Override
+					public Object clone() {
+						return null;
+					}
+				};
+				rn.setElements(elems);
+		        sortNode.addChild(rn);        
+				sortNode.initialize(context, bm, null);    
+		        rn.initialize(context, bm, null);
+		        process(sortNode, rows);
+				return null;
+			}
+		};
+		runTask(iterations, threads, task);
+	}
+	
+	@Test public void runWideSort_1_100000() throws Exception {
+		helpTestLargeSort(4, 1, 100000);
+	}
+	
+	@Test public void runWideSort_4_100000() throws Exception {
+		helpTestLargeSort(2, 4, 100000);
+	}
+
+	private static void showStats() {
+		System.out.println(bm.getBatchesAdded());
+		System.out.println(bm.getReferenceHits());
+		System.out.println(bm.getReadAttempts());
+		System.out.println(bm.getReadCount());
+		System.out.println(bm.getWriteCount());
+		System.out.println(cache.getStorageReads());
+		System.out.println(cache.getStorageWrites());
+	}
+
 	/**
 	 * Generates a 5 MB document
 	 */



More information about the teiid-commits mailing list