[teiid-commits] teiid SVN: r1833 - in trunk: client/src/main/java/com/metamatrix/common/batch and 16 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Feb 16 16:04:31 EST 2010


Author: shawkins
Date: 2010-02-16 16:04:26 -0500 (Tue, 16 Feb 2010)
New Revision: 1833

Modified:
   trunk/build/kit-runtime/deploy.properties
   trunk/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
   trunk/client/src/main/java/com/metamatrix/common/lob/ReaderInputStream.java
   trunk/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java
   trunk/common-core/src/main/java/com/metamatrix/core/util/HashCodeUtil.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/com/metamatrix/query/eval/Evaluator.java
   trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleImplementJoinStrategy.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/BatchCollector.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/AccessNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchedUpdateNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentAccessNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentProcedureAccessNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentProcedureExecutionNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/InsertPlanExecutionNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/JoinNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/JoinStrategy.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/LimitNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/MergeJoinStrategy.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/NullNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PlanExecutionNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/ProjectIntoNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/RelationalNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SourceState.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SubqueryAwareEvaluator.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SubqueryAwareRelationalNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/UnionAllNode.java
   trunk/engine/src/main/java/com/metamatrix/query/sql/lang/JoinType.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java
   trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestPartitionedJoinPlanning.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/xml/TestXMLPlanningEnhancements.java
Log:
TEIID-913 changes related to buffering and performance.  removing exceptions from tuplesource close, tweaking to more reasonable defaults, updated buffering to also consider column width, refined the performance of value caching, and allowed partitioning to work over larger sets

Modified: trunk/build/kit-runtime/deploy.properties
===================================================================
--- trunk/build/kit-runtime/deploy.properties	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/build/kit-runtime/deploy.properties	2010-02-16 21:04:26 UTC (rev 1833)
@@ -28,37 +28,42 @@
 # Processor settings
 # 
 
-#Process pool maximum thread count. (default 64)
-process.maxThreads=64
+#Process pool maximum thread count. (default 16) Increase this value if your load includes a large number of XQueries
+#or if the system's available processors is larger than 8.
+process.maxThreads=16
 
 #Query processor time slice, in milliseconds. (default 2000)
 process.timeSliceInMilli=2000
 
-#Plan debug messages allowed. see option debug.
-process.optionDebugAllowed=true
-
 #Maximum allowed fetch size, set via JDBC. User requested value ignored above this value. (default 20480)
 process.maxRowsFetchSize=20480
 
-# The max lob chunk size transferred each time when processing blobs, clobs(100KB default) 
+# The max lob chunk size in KB transferred each time when processing blobs, clobs(100KB default) 
 process.lobChunkSizeInKB=100
 
 #
 # BufferManager Settings
 #
 
-#The max size of a batch sent between connector and query service. Should be even multiple of processorBatchSize. (default 2048)
-dqp.buffer.connectorBatchSize=2048
+#The max row count of a batch from a connector. Should be even multiple of processorBatchSize. (default 512)
+dqp.buffer.connectorBatchSize=1024
 
-#The max size of a batch sent internally within the query processor. Should be <= the connectorBatchSize. (default 1024)
-dqp.buffer.processorBatchSize=1024
+#The max row count of a batch sent internally within the query processor. Should be <= the connectorBatchSize. (default 256)
+dqp.buffer.processorBatchSize=512
 
 #Defines whether to use disk buffering or not. (default true)
 dqp.buffer.useDisk=true
 
-#The number of batches to actively hold in the BufferManager
-org.teiid.buffer.maxReserveBatches=64
+#The number of batch columns to allow in memory (default 16384).  
+#This value should be set lower or higher depending on the available memory to Teiid in the VM.  
+#16384 is considered a good default for a dedicated 32-bit VM running Teiid with a 1 gig heap.
+org.teiid.buffer.maxReserveBatchColumns=16384
 
+#The number of batch columns guarenteed to a processing operation.  Set this value lower if the workload typically
+#processes larger numbers of concurrent queries with large intermediate results from operations such as sorting, 
+#grouping, etc. (default 124)
+org.teiid.buffer.maxProcessingBatchesColumns=128
+
 #
 # Cache Settings
 #

Modified: trunk/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -322,10 +322,6 @@
             int b;
             boolean readingShort;
             int length = in.readInt();
-            /* Although using a StringBuffer and doing a toString() to get the String value reuses
-             * the StringBuffer's internal char[], the StringBuffer.append() calls are all synchronized,
-             * and likely too costly compared to simply copying the array during derialization.
-             */
             char[] chars = new char[length];
             readingShort = true;
             for (int i = 0; i < length; i++) {

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -58,6 +58,7 @@
 		private final Socket socket;
 		private ObjectOutputStream outputStream;
 		private ObjectInputStream inputStream;
+		private Object readLock = new Object();
 
 		private OioObjectChannel(Socket socket) throws IOException {
 			log.fine("creating new OioObjectChannel"); //$NON-NLS-1$
@@ -118,13 +119,15 @@
 		//## JDBC4.0-end ##
 		public Object read() throws IOException, ClassNotFoundException {
 			log.finer("reading message from socket"); //$NON-NLS-1$
-			try {
-				return inputStream.readObject();
-			} catch (SocketTimeoutException e) {
-				throw e;
-	        } catch (IOException e) {
-	            close();
-	            throw e;
+			synchronized (readLock) {
+				try {
+					return inputStream.readObject();
+				} catch (SocketTimeoutException e) {
+					throw e;
+		        } catch (IOException e) {
+		            close();
+		            throw e;
+				}
 			}
 		}
 

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -70,19 +70,18 @@
 public class SocketServerInstanceImpl implements SocketServerInstance {
 	
 	static final int HANDSHAKE_RETRIES = 10;
+    private static Logger log = Logger.getLogger("org.teiid.client.sockets"); //$NON-NLS-1$
 
 	private AtomicInteger MESSAGE_ID = new AtomicInteger();
+    private Map<Serializable, ResultsReceiver<Object>> asynchronousListeners = new ConcurrentHashMap<Serializable, ResultsReceiver<Object>>();
 
 	private HostInfo hostInfo;
 	private boolean ssl;
-    private ObjectChannel socketChannel;
-    private static Logger log = Logger.getLogger("org.teiid.client.sockets"); //$NON-NLS-1$
     private long synchTimeout;
 
+    private ObjectChannel socketChannel;
     private Cryptor cryptor;
     
-    private Map<Serializable, ResultsReceiver<Object>> asynchronousListeners = new ConcurrentHashMap<Serializable, ResultsReceiver<Object>>();
-    
     private boolean hasReader;
     
     public SocketServerInstanceImpl() {
@@ -95,7 +94,7 @@
         this.synchTimeout = synchTimeout;
     }
     
-    public void connect(ObjectChannelFactory channelFactory) throws CommunicationException, IOException {
+    public synchronized void connect(ObjectChannelFactory channelFactory) throws CommunicationException, IOException {
         InetSocketAddress address = new InetSocketAddress(hostInfo.getInetAddress(), hostInfo.getPortNumber());
         this.socketChannel = channelFactory.createObjectChannel(address, ssl);
         try {
@@ -249,6 +248,45 @@
     public Cryptor getCryptor() {
         return this.cryptor;
     }
+    
+    void read(long timeout, TimeUnit unit, ResultsFuture<?> future) throws TimeoutException, InterruptedException {
+    	long timeoutMillis = (int)Math.min(unit.toMillis(timeout), Integer.MAX_VALUE);
+		long start = System.currentTimeMillis();
+		while (!future.isDone()) {
+			boolean reading = false;
+			synchronized (this) {
+				if (!hasReader) {
+					hasReader = true;
+					reading = true;
+				} else if (!future.isDone()) {
+					this.wait(Math.max(1, timeoutMillis));
+				}
+			} 
+			if (reading) {
+				try {
+					if (!future.isDone()) {
+						receivedMessage(socketChannel.read());
+					}
+				} catch (SocketTimeoutException e) {
+				} catch (Exception e) {
+					exceptionOccurred(e);
+				} finally {
+					synchronized (this) {
+						hasReader = false;
+						this.notifyAll();
+					}
+				}
+			}
+			if (!future.isDone()) {
+				long now = System.currentTimeMillis();
+				timeoutMillis -= now - start;
+				start = now;
+				if (timeoutMillis <= 0) {
+					throw new TimeoutException();
+				}
+			}
+		}
+    }
 
 	@SuppressWarnings("unchecked")
 	//## JDBC4.0-begin ##
@@ -315,42 +353,7 @@
 					public Object get(long timeout, TimeUnit unit)
 							throws InterruptedException, ExecutionException,
 							TimeoutException {
-						long timeoutMillis = (int)Math.min(unit.toMillis(timeout), Integer.MAX_VALUE);
-						long start = System.currentTimeMillis();
-						while (!isDone()) {
-							boolean reading = false;
-							synchronized (SocketServerInstanceImpl.this) {
-								if (!hasReader) {
-									hasReader = true;
-									reading = true;
-								} else if (!isDone()) {
-									SocketServerInstanceImpl.this.wait(Math.max(1, timeoutMillis));
-								}
-							} 
-							if (reading) {
-								try {
-									if (!isDone()) {
-										receivedMessage(socketChannel.read());
-									}
-								} catch (SocketTimeoutException e) {
-								} catch (Exception e) {
-									exceptionOccurred(e);
-								} finally {
-									synchronized (SocketServerInstanceImpl.this) {
-										hasReader = false;
-										SocketServerInstanceImpl.this.notifyAll();
-									}
-								}
-							}
-							if (!isDone()) {
-								long now = System.currentTimeMillis();
-								timeoutMillis -= now - start;
-								start = now;
-								if (timeoutMillis <= 0) {
-									throw new TimeoutException();
-								}
-							}
-						}
+						read(timeout, unit, this);
 						return super.get(timeout, unit);
 					}
 				};

Modified: trunk/client/src/main/java/com/metamatrix/common/lob/ReaderInputStream.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/lob/ReaderInputStream.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/client/src/main/java/com/metamatrix/common/lob/ReaderInputStream.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -32,13 +32,15 @@
 import java.nio.CharBuffer;
 import java.nio.charset.Charset;
 
+import com.metamatrix.common.types.Streamable;
+
 public class ReaderInputStream extends InputStream {
 	
-	private static final int DEFAULT_BUFFER_SIZE = 100 * 1024;
+	private static final int DEFAULT_BUFFER_SIZE = Streamable.STREAMING_BATCH_SIZE_IN_BYTES;
 	
 	private final Reader reader;
 	private final Charset charSet;
-	private final int bufferSize;
+	private char[] charBuffer;
 	
 	private boolean hasMore = true;
 	private ByteBuffer currentBuffer;
@@ -52,7 +54,7 @@
 	public ReaderInputStream(Reader reader, Charset charSet, int bufferSize) {
 		this.reader = reader;
 		this.charSet = charSet;
-		this.bufferSize = bufferSize;
+		this.charBuffer = new char[bufferSize];
 		if (charSet.displayName().equalsIgnoreCase("UTF-16")) { //$NON-NLS-1$
 			prefixBytes = 2;
 		}
@@ -64,18 +66,12 @@
 			if (!hasMore) {
 				return -1;
 			}
-			char[] charBuffer = new char[bufferSize];
 			int charsRead = reader.read(charBuffer);
 			if (charsRead == -1) {
 	            hasMore = false;
 				return -1;
 			}
-			if (charsRead != charBuffer.length) {
-				char[] buf = new char[charsRead];
-	            System.arraycopy(charBuffer, 0, buf, 0, charsRead);
-	            charBuffer = buf;
-			}
-			currentBuffer = charSet.encode(CharBuffer.wrap(charBuffer));
+			currentBuffer = charSet.encode(CharBuffer.wrap(charBuffer, 0, charsRead));
 			if (!needsPrefix) {
 				currentBuffer.position(prefixBytes);
 			}

Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -34,6 +34,7 @@
 /*## JDBC3.0-JDK1.5-begin ##
 import com.metamatrix.core.jdbc.SQLXML; 
 ## JDBC3.0-JDK1.5-end ##*/
+import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -43,7 +44,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.WeakHashMap;
 
 import javax.xml.transform.Source;
 
@@ -68,6 +68,7 @@
 import com.metamatrix.core.ErrorMessageKeys;
 import com.metamatrix.core.MetaMatrixRuntimeException;
 import com.metamatrix.core.util.ArgCheck;
+import com.metamatrix.core.util.HashCodeUtil;
 
 /**
  * <p>
@@ -85,11 +86,57 @@
  */
 public class DataTypeManager {
 	
-	private static final int MAX_VALUE_MAP_SIZE = 10000;
-	private static boolean USE_VALUE_CACHE = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.useValueCache", true); //$NON-NLS-1$
+	private static final boolean USE_VALUE_CACHE = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.useValueCache", true); //$NON-NLS-1$
 	
-	private static Map<Class<?>, Map<Object, WeakReference<Object>>> valueMaps = new HashMap<Class<?>, Map<Object, WeakReference<Object>>>(); 
+	private static boolean valueCacheEnabled;
+	
+	private interface ValueCache<T> {
+		T getValue(T value);
+	}
+	
+	private static class HashedValueCache<T> implements ValueCache<T> {
+		
+		final Object[] cache;
+		final boolean weak = false;
+		
+		HashedValueCache(int size) {
+			cache = new Object[1 << size];
+		}
+				
+		@SuppressWarnings("unchecked")
+		public T getValue(T value) {
+			int index = hash(primaryHash(value)) & (cache.length - 1);
+	    	Object canonicalValue = get(index);
+	    	if (value.equals(canonicalValue)) {
+	    		return (T)canonicalValue;
+	    	} 
+	    	set(index, value);
+	    	return value;
+		}
+		
+		protected Object get(int index) {
+			return cache[index];
+		}
+		
+		protected void set(int index, T value) {
+			cache[index] = value;
+		}
+		
+		protected int primaryHash(T value) {
+			return value.hashCode();
+		}
 
+		/*
+		 * The same power of 2 hash bucketing from the Java HashMap  
+		 */
+		final static int hash(int h) {
+			h ^= (h >>> 20) ^ (h >>> 12);
+	        return h ^= (h >>> 7) ^ (h >>> 4);
+		}
+	}
+	
+	private static Map<Class<?>, ValueCache<?>> valueMaps = new HashMap<Class<?>, ValueCache<?>>(128); 
+
 	public static final int MAX_STRING_LENGTH = 4000;
 
 	public static final class DefaultDataTypes {
@@ -152,7 +199,7 @@
 	 * Doubly-nested map of String srcType --> Map of String targetType -->
 	 * Transform
 	 */
-	private static Map<String, Map<String, Transform>> transforms = new HashMap<String, Map<String, Transform>>();
+	private static Map<String, Map<String, Transform>> transforms = new HashMap<String, Map<String, Transform>>(128);
 
 	/** Utility to easily get Transform given srcType and targetType */
 	private static Transform getTransformFromMaps(String srcType,
@@ -165,10 +212,10 @@
 	}
 
 	/** Base data type names and classes, Type name --> Type class */
-	private static Map<String, Class> dataTypeNames = new LinkedHashMap<String, Class>();
+	private static Map<String, Class> dataTypeNames = new LinkedHashMap<String, Class>(128);
 
 	/** Base data type names and classes, Type class --> Type name */
-	private static Map<Class, String> dataTypeClasses = new LinkedHashMap<Class, String>();
+	private static Map<Class, String> dataTypeClasses = new LinkedHashMap<Class, String>(128);
 
 	private static Set<String> DATA_TYPE_NAMES = Collections.unmodifiableSet(dataTypeNames.keySet());
 
@@ -426,38 +473,94 @@
 	 */
 	static void loadDataTypes() {
 		DataTypeManager.addDataType(DefaultDataTypes.BOOLEAN, DefaultDataClasses.BOOLEAN);
-		valueMaps.put(DefaultDataClasses.BOOLEAN, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.BYTE, DefaultDataClasses.BYTE);
-		valueMaps.put(DefaultDataClasses.BYTE, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.SHORT,	DefaultDataClasses.SHORT);
-		valueMaps.put(DefaultDataClasses.SHORT, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.CHAR, DefaultDataClasses.CHAR);
-		valueMaps.put(DefaultDataClasses.CHAR, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.INTEGER, DefaultDataClasses.INTEGER);
-		valueMaps.put(DefaultDataClasses.INTEGER, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.LONG, DefaultDataClasses.LONG);
-		valueMaps.put(DefaultDataClasses.LONG, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.BIG_INTEGER, DefaultDataClasses.BIG_INTEGER);
-		valueMaps.put(DefaultDataClasses.BIG_INTEGER, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.FLOAT, DefaultDataClasses.FLOAT);
-		valueMaps.put(DefaultDataClasses.FLOAT, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.DOUBLE, DefaultDataClasses.DOUBLE);
-		valueMaps.put(DefaultDataClasses.DOUBLE, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.BIG_DECIMAL, DefaultDataClasses.BIG_DECIMAL);
-		valueMaps.put(DefaultDataClasses.BIG_DECIMAL, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.DATE, DefaultDataClasses.DATE);
-		valueMaps.put(DefaultDataClasses.DATE, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.TIME, DefaultDataClasses.TIME);
-		valueMaps.put(DefaultDataClasses.TIME, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.TIMESTAMP, DefaultDataClasses.TIMESTAMP);
-		valueMaps.put(DefaultDataClasses.TIMESTAMP, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.STRING, DefaultDataClasses.STRING);
-		valueMaps.put(DefaultDataClasses.STRING, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.CLOB, DefaultDataClasses.CLOB);
 		DataTypeManager.addDataType(DefaultDataTypes.XML, DefaultDataClasses.XML);
 		DataTypeManager.addDataType(DefaultDataTypes.OBJECT, DefaultDataClasses.OBJECT);
 		DataTypeManager.addDataType(DefaultDataTypes.NULL, DefaultDataClasses.NULL);
 		DataTypeManager.addDataType(DefaultDataTypes.BLOB, DefaultDataClasses.BLOB);
+		
+		if (USE_VALUE_CACHE) {
+			valueMaps.put(DefaultDataClasses.BOOLEAN, new ValueCache<Boolean>() {
+				@Override
+				public Boolean getValue(Boolean value) {
+					return Boolean.valueOf(value);
+				}
+			});
+			valueMaps.put(DefaultDataClasses.BYTE, new ValueCache<Byte>() {
+				@Override
+				public Byte getValue(Byte value) {
+					return Byte.valueOf(value);
+				}
+			});
+			valueMaps.put(DefaultDataClasses.SHORT, new HashedValueCache<Short>(13));
+			valueMaps.put(DefaultDataClasses.CHAR, new HashedValueCache<Character>(13));
+			valueMaps.put(DefaultDataClasses.INTEGER, new HashedValueCache<Integer>(14));
+			valueMaps.put(DefaultDataClasses.LONG, new HashedValueCache<Long>(14));
+			valueMaps.put(DefaultDataClasses.BIG_INTEGER, new HashedValueCache<BigInteger>(14));
+			valueMaps.put(DefaultDataClasses.FLOAT, new HashedValueCache<Float>(14));
+			valueMaps.put(DefaultDataClasses.DOUBLE, new HashedValueCache<Double>(14));
+			valueMaps.put(DefaultDataClasses.DATE, new HashedValueCache<Date>(14));
+			valueMaps.put(DefaultDataClasses.TIME, new HashedValueCache<Time>(14));
+			valueMaps.put(DefaultDataClasses.TIMESTAMP, new HashedValueCache<Timestamp>(14));
+			valueMaps.put(DefaultDataClasses.BIG_DECIMAL, new HashedValueCache<BigDecimal>(15) {
+				@Override
+				protected Object get(int index) {
+					WeakReference<?> ref = (WeakReference<?>) cache[index];
+					if (ref != null) {
+						return ref.get();
+					}
+					return null;
+				}
+				
+				@Override
+				protected void set(int index, BigDecimal value) {
+					cache[index] = new WeakReference<BigDecimal>(value);
+				}
+			});
+			valueMaps.put(DefaultDataClasses.STRING, new HashedValueCache<String>(15) {
+				HashedValueCache<String> smallCache = new HashedValueCache<String>(13);
+				
+				@Override
+				public String getValue(String value) {
+					if (value.length() < 14) {
+						return smallCache.getValue(value);
+					}
+					return super.getValue(value);
+				}
+				
+				@Override
+				protected Object get(int index) {
+					WeakReference<?> ref = (WeakReference<?>) cache[index];
+					if (ref != null) {
+						return ref.get();
+					}
+					return null;
+				}
+				
+				@Override
+				protected void set(int index, String value) {
+					cache[index] = new WeakReference<Object>(value);
+				}
+				
+				@Override
+				protected int primaryHash(String value) {
+					return HashCodeUtil.expHashCode(value);
+				}
+			});
+		}
 	}
 
 	/**
@@ -723,27 +826,26 @@
     	sourceConverters.put(sourceClass, transform);
     }
     
+    public static void setValueCacheEnabled(boolean enabled) {
+    	valueCacheEnabled = enabled;
+    }
+    
+    public static boolean isValueCacheEnabled() {
+    	return valueCacheEnabled;
+    }
+    
     @SuppressWarnings("unchecked")
 	public static <T> T getCanonicalValue(T value) {
-    	if (USE_VALUE_CACHE) {
+    	if (USE_VALUE_CACHE && valueCacheEnabled) {
     		if (value == null) {
     			return null;
     		}
-	    	Map<Object, WeakReference<Object>> valueMap = valueMaps.get(value.getClass());
-	    	if (valueMap == null) {
+    		//TODO: this initial lookup is inefficient, since there are likely collisions
+	    	ValueCache valueCache = valueMaps.get(value.getClass());
+	    	if (valueCache == null) {
 	    		return value;
 	    	}
-			WeakReference<Object> valueReference = valueMap.get(value);
-			Object canonicalValue = null;
-			if (valueReference != null) {
-				canonicalValue = valueReference.get();
-			}
-			if (canonicalValue != null) {
-				return (T)canonicalValue;
-			}
-			if (valueMap.size() <= MAX_VALUE_MAP_SIZE) {
-				valueMap.put(value, new WeakReference<Object>(value));
-			}
+	    	return (T)valueCache.getValue(value);
     	}
 		return value;
     }

Modified: trunk/common-core/src/main/java/com/metamatrix/core/util/HashCodeUtil.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/core/util/HashCodeUtil.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/common-core/src/main/java/com/metamatrix/core/util/HashCodeUtil.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -24,6 +24,7 @@
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.RandomAccess;
 
 /**
  * <P>This class provides utility functions for generating good
@@ -105,9 +106,7 @@
 	 * Compute a hash code on a large collection by walking the list
 	 * and combining the hash code at every exponential index:
 	 * 1, 2, 4, 8, ...  This has been shown to give a good hash
-	 * for good time complexity.  This uses an iterator to walk
-	 * the collection and pull the necessary hash code values.
-	 * Slower than a List or array but faster than getting EVERY value.	 
+	 * for good time complexity. 	 
 	 */
 	public static final int expHashCode(int previous, List x) {
 		if(x == null || x.size() == 0) {
@@ -115,19 +114,42 @@
 		}
 		int size = x.size();				// size of collection
 		int hc = (PRIME*previous) + size;	// hash code so far
-		int skip = 0;						// skip between samples
-		int total = 0;						// collection examined already
-		Iterator iter = x.iterator();		// collection iterator
-		Object obj = iter.next();			// last iterated object, primed at first
-		while(total < size) {
-			for(int i=0; i<skip; i++) {		// skip to next sample
-				obj = iter.next();
+		if (x instanceof RandomAccess) {
+			int index = 1;
+			int xlen = x.size()+1;	// switch to 1-based
+			while(index < xlen) {
+				hc = hashCode(hc, x.get(index-1));
+				index = index << 1;		// left shift by 1 to double
 			}
-			hc = hashCode(hc, obj);			// add sample to hashcode
-			skip = (skip == 0) ? 1 : skip << 1;		// left shift by 1 to double
-			total += skip;					// update total
+		} else {
+			int skip = 0;						// skip between samples
+			int total = 0;						// collection examined already
+			Iterator iter = x.iterator();		// collection iterator
+			Object obj = iter.next();			// last iterated object, primed at first
+			while(total < size) {
+				for(int i=0; i<skip; i++) {		// skip to next sample
+					obj = iter.next();
+				}
+				hc = hashCode(hc, obj);			// add sample to hashcode
+				skip = (skip == 0) ? 1 : skip << 1;		// left shift by 1 to double
+				total += skip;					// update total
+			}
 		}
 		return hc;
 	}
+	
+	public static final int expHashCode(String x) {
+		if(x == null) {
+			return 0;
+		}
+		int hc = x.length();
+		int index = 1;
+		int xlen = x.length()+1;	// switch to 1-based
+		while(index < xlen) {
+			hc = PRIME * hc + x.charAt(index-1);
+			index = index << 1;		// left shift by 1 to double
+		}
+		return hc;
+	}
 
 }

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -47,17 +47,22 @@
 		 */
 		FINAL
 	}
+	
+	public enum BufferReserveMode {
+		WAIT,
+		FORCE,
+		NO_WAIT
+	}
 
-	public static int DEFAULT_CONNECTOR_BATCH_SIZE = 2048;
-	public static int DEFAULT_PROCESSOR_BATCH_SIZE = 1024;
-	public static int DEFAULT_MAX_PROCESSING_BATCHES = 8;
+	public static int DEFAULT_CONNECTOR_BATCH_SIZE = 1024;
+	public static int DEFAULT_PROCESSOR_BATCH_SIZE = 512;
+	public static int DEFAULT_MAX_PROCESSING_BATCHES = 128;
 	
 	/**
-	 * The BufferManager may maintain at least this many batch references in memory.
-	 * 
-	 * Up to 2x this value may be held by soft references.
+	 * This is the maximum number of batch columns used for processing.
+	 * See {@link #reserveBuffers(int, boolean)}
 	 */
-	public static int DEFAULT_RESERVE_BUFFERS = 64;
+	public static int DEFAULT_RESERVE_BUFFERS = 16384;
 	
     /**
      * Get the batch size to use during query processing.  
@@ -79,7 +84,7 @@
 	 * across even a blocked exception.
 	 * @return
 	 */
-    int getMaxProcessingBatches();
+    int getMaxProcessingBatchColumns();
     
     /**
      * Creates a new {@link FileStore}.  See {@link FileStore#setCleanupReference(Object)} to
@@ -90,14 +95,12 @@
     FileStore createFileStore(String name);
     
     /**
-     * Reserve up to count buffers for use.  Wait will cause the process to block until
-     * all of the requested or half of the total buffers are available.
+     * Reserve up to count buffers for use.
      * @param count
-     * @param wait
+     * @param mode
      * @return
-     * @throws MetaMatrixComponentException
      */
-    int reserveBuffers(int count, boolean wait) throws MetaMatrixComponentException;
+    int reserveBuffers(int count, BufferReserveMode mode);
     
     /**
      * Releases the buffers reserved by a call to {@link BufferManager#reserveBuffers(int, boolean)}
@@ -105,4 +108,9 @@
      */
     void releaseBuffers(int count);
     
+    /**
+     * Get the size estimate for the given schema.
+     */
+    int getSchemaSize(List elements);
+    
 }

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -22,6 +22,7 @@
 
 package com.metamatrix.common.buffer;
 
+import java.lang.ref.SoftReference;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -32,10 +33,13 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.common.log.LogManager;
 import com.metamatrix.common.types.DataTypeManager;
 import com.metamatrix.common.types.Streamable;
+import com.metamatrix.core.log.MessageLevel;
 import com.metamatrix.core.util.Assertion;
 import com.metamatrix.dqp.DQPPlugin;
+import com.metamatrix.dqp.util.LogConstants;
 import com.metamatrix.query.sql.symbol.Expression;
 
 public class TupleBuffer {
@@ -75,15 +79,16 @@
 		private List<?> getCurrentTuple() throws MetaMatrixComponentException,
 				BlockedException {
 			if (currentRow <= rowCount) {
-				if (forwardOnly) {
-					if (batch == null || currentRow > batch.getEndRow()) {
+				//if (forwardOnly) {
+					if (batch == null || !batch.containsRow(currentRow)) {
 						batch = getBatch(currentRow);
 					}
 					return batch.getTuple(currentRow);
-				} 
+				//} 
 				//TODO: determine if we should directly hold a soft reference here
-				return getRow(currentRow);
+				//return getRow(currentRow);
 			}
+			batch = null;
 			if(isFinal) {
 	            return null;
 	        } 
@@ -91,8 +96,7 @@
 		}
 
 	    @Override
-	    public void closeSource()
-	    throws MetaMatrixComponentException{
+	    public void closeSource() {
 	    	batch = null;
 	        mark = 1;
 	        reset();
@@ -210,21 +214,26 @@
 	 * @throws MetaMatrixComponentException
 	 */
 	public void addTupleBatch(TupleBatch batch, boolean save) throws MetaMatrixComponentException {
-		Assertion.assertTrue(this.rowCount < batch.getBeginRow());
-		if (this.rowCount != batch.getBeginRow() - 1) {
-			saveBatch(false, true);
-			this.rowCount = batch.getBeginRow() - 1;
-		} 
+		setRowCount(batch.getBeginRow() - 1); 
 		if (save) {
 			for (List<?> tuple : batch.getAllTuples()) {
 				addTuple(tuple);
 			}
 		}
 	}
+
+	public void setRowCount(int rowCount)
+			throws MetaMatrixComponentException {
+		assert this.rowCount <= rowCount;
+		if (this.rowCount != rowCount) {
+			saveBatch(false, true);
+			this.rowCount = rowCount;
+		}
+	}
 	
 	public void purge() {
 		if (this.batchBuffer != null) {
-			this.batchBuffer = null;
+			this.batchBuffer.clear();
 		}
 		for (BatchManager.ManagedBatch batch : this.batches.values()) {
 			batch.remove();
@@ -260,14 +269,6 @@
 		this.isFinal = true;
 	}
 	
-	List<?> getRow(int row) throws MetaMatrixComponentException {
-		if (this.batchBuffer != null && row > rowCount - this.batchBuffer.size()) {
-			return this.batchBuffer.get(row - rowCount + this.batchBuffer.size() - 1);
-		}
-		TupleBatch batch = getBatch(row);
-		return batch.getTuple(row);
-	}
-
 	/**
 	 * Get the batch containing the given row.
 	 * NOTE: the returned batch may be empty or may begin with a row other
@@ -311,8 +312,13 @@
 	
 	public void remove() {
 		if (!removed) {
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+	            LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Removing TupleBuffer:", this.tupleSourceID); //$NON-NLS-1$
+	        }
+			this.batchBuffer = null;
+			purge();
 			this.manager.remove();
-			purge();
+			removed = true;
 		}
 	}
 	

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -54,12 +54,8 @@
 	
     /**
      * Closes the Tuple Source.  
-     * @throws MetaMatrixComponentException indicating a non-business
-     * exception such as a communication exception, or other such
-     * nondeterministic exception
      */    
-	void closeSource()
-		throws MetaMatrixComponentException;
+	void closeSource();
 	
 	/**
 	 * Returns an estimate of the number of rows that can be read without blocking.

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -27,17 +27,14 @@
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.lang.ref.Reference;
-import java.lang.ref.SoftReference;
 import java.lang.ref.WeakReference;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -81,6 +78,10 @@
  * with a simple LRU.
  * 
  * TODO: allow for cached stores to use lru - (result set/mat view)
+ * TODO: account for row/content based sizing (difficult given value sharing)
+ * TODO: account for memory based lobs (it would be nice if the approximate buffer size matched at 100kB)
+ * TODO: add detection of pinned batches to prevent unnecessary purging of non-persistent batches
+ *       - this is not necessary for already persistent batches, since we hold a weak reference
  */
 public class BufferManagerImpl implements BufferManager, StorageManager {
 	
@@ -96,10 +97,7 @@
 		ManagedBatchImpl removeBatch(int row) {
 			ManagedBatchImpl result = batches.remove(row);
 			if (result != null) {
-				activeBatchCount--;
-				if (toPersistCount > 0) {
-					toPersistCount--;
-				}
+				activeBatchColumnCount -= result.columnCount;
 			}
 			return result;
 		}
@@ -111,23 +109,30 @@
 		
 		private long offset = -1;
 		private boolean persistent;
-		private volatile TupleBatch pBatch;
-		private Reference<TupleBatch> batchReference;
+		private volatile TupleBatch activeBatch;
+		private volatile Reference<TupleBatch> batchReference;
 		private int beginRow;
+		private int columnCount;
 		
-		public ManagedBatchImpl(String id, FileStore store, TupleBatch batch) throws MetaMatrixComponentException {
+		public ManagedBatchImpl(String id, FileStore store, TupleBatch batch) {
             LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", batchAdded.incrementAndGet()); //$NON-NLS-1$
 			this.id = id;
 			this.store = store;
-			this.pBatch = batch;
+			this.activeBatch = batch;
 			this.beginRow = batch.getBeginRow();
-			addToCache(false);
-			persistBatchReferences();
+			List[] allTuples = batch.getAllTuples();
+			if (allTuples.length > 0) {
+				columnCount = allTuples[0].size();
+			}
 		}
 
 		private void addToCache(boolean update) {
 			synchronized (activeBatches) {
-				activeBatchCount++;
+				TupleBatch batch = this.activeBatch;
+				if (batch == null) {
+					return; //already removed
+				}
+				activeBatchColumnCount += columnCount;
 				TupleBufferInfo tbi = null;
 				if (update) {
 					tbi = activeBatches.remove(this.id);
@@ -164,35 +169,34 @@
 					}
 				}
 			}
+			persistBatchReferences();
 			synchronized (this) {
-				if (this.batchReference != null && this.pBatch == null) {
-					TupleBatch result = this.batchReference.get();
-					if (result != null) {
-						if (!cache) {
-							softCache.remove(this);
-							this.batchReference.clear();
+				TupleBatch batch = this.activeBatch;
+				if (batch != null){
+					return batch;
+				}
+				Reference<TupleBatch> ref = this.batchReference;
+				this.batchReference = null;
+				if (ref != null) {
+					batch = ref.get();
+					if (batch != null) {
+						if (cache) {
+							this.activeBatch = batch;
+				        	addToCache(true);
 						} 
 						referenceHit.getAndIncrement();
-						return result;
+						return batch;
 					}
 				}
-
-				TupleBatch batch = this.pBatch;
-				if (batch != null){
-					return batch;
-				}
-			}
-			persistBatchReferences();
-            LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reading batch from disk", readCount.incrementAndGet()); //$NON-NLS-1$
-			synchronized (this) {
+				LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reading batch from disk", readCount.incrementAndGet()); //$NON-NLS-1$
 				try {
 		            ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(store.createInputStream(this.offset), IO_BUFFER_SIZE));
-		            TupleBatch batch = new TupleBatch();
+		            batch = new TupleBatch();
 		            batch.setDataTypes(types);
 		            batch.readExternal(ois);
 			        batch.setDataTypes(null);
 			        if (cache) {
-			        	this.pBatch = batch;
+			        	this.activeBatch = batch;
 			        	addToCache(true);
 			        }
 					return batch;
@@ -204,71 +208,9 @@
 			}
 		}
 
-		public void persistBatchReferences() throws MetaMatrixComponentException {
-			ManagedBatchImpl mb = null;
-			boolean createSoft = false;
-			/*
-			 * If we are over our limit, collect half of the batches. 
-			 */
-			synchronized (activeBatches) {
-				if (activeBatchCount > reserveBatches && toPersistCount == 0) {
-					toPersistCount = activeBatchCount / 2;
-				}
-			}
-			while (true) {
-				synchronized (activeBatches) {
-					if (activeBatchCount == 0 || toPersistCount == 0) {
-						toPersistCount = 0;
-						break;
-					}
-					Iterator<TupleBufferInfo> iter = activeBatches.values().iterator();
-					TupleBufferInfo tbi = iter.next();
-					Map.Entry<Integer, ManagedBatchImpl> entry = null;
-					if (tbi.lastUsed != null) {
-						entry = tbi.batches.floorEntry(tbi.lastUsed - 1);
-					}
-					if (entry == null) {
-						entry = tbi.batches.pollLastEntry();
-					} else {
-						createSoft = true;
-						tbi.batches.remove(entry.getKey());
-					}
-					if (tbi.batches.isEmpty()) {
-						iter.remove();
-					}
-					activeBatchCount--;
-					toPersistCount--;
-					mb = entry.getValue();
-				}
-				persist(createSoft, mb);
-			}
-			synchronized (softCache) {
-				if (softCache.size() > reserveBatches) {
-					Iterator<ManagedBatchImpl> iter = softCache.iterator();
-					mb = iter.next();
-					iter.remove();
-				}
-			}
-			persist(false, mb);
-		}
-
-		private void persist(boolean createSoft, ManagedBatchImpl mb)
-				throws MetaMatrixComponentException {
+		public synchronized void persist() throws MetaMatrixComponentException {
 			try {
-				if (mb != null) {
-					mb.persist(createSoft);
-				}
-			} catch (MetaMatrixComponentException e) {
-				if (mb == this) {
-					throw e;
-				}
-				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read that batch later will result in an exception"); //$NON-NLS-1$
-			}
-		}
-
-		public synchronized void persist(boolean createSoft) throws MetaMatrixComponentException {
-			try {
-				TupleBatch batch = pBatch;
+				TupleBatch batch = activeBatch;
 				if (batch != null) {
 					if (!persistent) {
 						LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Writing batch to disk", writeCount.incrementAndGet()); //$NON-NLS-1$
@@ -282,18 +224,13 @@
 				            fsos.flushBuffer();
 						}
 					}
-					if (createSoft) {
-						this.batchReference = new SoftReference<TupleBatch>(batch);
-						softCache.add(this);
-					} else {
-						this.batchReference = new WeakReference<TupleBatch>(batch);
-					}
+					this.batchReference = new WeakReference<TupleBatch>(batch);
 				}
 			} catch (IOException e) {
 				throw new MetaMatrixComponentException(e);
 			} finally {
 				persistent = true;
-				pBatch = null;
+				activeBatch = null;
 			}
 		}
 
@@ -304,16 +241,13 @@
 					activeBatches.remove(this.id);
 				}
 			}
-			softCache.remove(this);
-			pBatch = null;
-			if (batchReference != null) {
-				batchReference.clear();
-			}
+			activeBatch = null;
+			batchReference = null;
 		}
 		
 		@Override
 		public String toString() {
-			return "ManagedBatch " + id + " " + pBatch; //$NON-NLS-1$ //$NON-NLS-2$
+			return "ManagedBatch " + id + " " + activeBatch; //$NON-NLS-1$ //$NON-NLS-2$
 		}
 	}
 	
@@ -321,16 +255,14 @@
     private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
     private int processorBatchSize = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE;
     private int maxProcessingBatches = BufferManager.DEFAULT_MAX_PROCESSING_BATCHES;
-    private int reserveBatches = BufferManager.DEFAULT_RESERVE_BUFFERS;
-    private int maxReserveBatches = BufferManager.DEFAULT_RESERVE_BUFFERS;
+    private int maxReserveBatchColumns = BufferManager.DEFAULT_RESERVE_BUFFERS;
+    private volatile int reserveBatchColumns = BufferManager.DEFAULT_RESERVE_BUFFERS;
     
     private ReentrantLock lock = new ReentrantLock(true);
     private Condition batchesFreed = lock.newCondition();
     
-    private int toPersistCount = 0;
-    private int activeBatchCount = 0;
+    private volatile int activeBatchColumnCount = 0;
     private Map<String, TupleBufferInfo> activeBatches = new LinkedHashMap<String, TupleBufferInfo>();
-	private Set<ManagedBatchImpl> softCache = Collections.synchronizedSet(new LinkedHashSet<ManagedBatchImpl>());
     
     private StorageManager diskMgr;
 
@@ -341,18 +273,20 @@
 	private AtomicInteger readAttempts = new AtomicInteger();
 	private AtomicInteger referenceHit = new AtomicInteger();
 	
-    public int getMaxProcessingBatches() {
+	@Override
+    public int getMaxProcessingBatchColumns() {
 		return maxProcessingBatches;
 	}
     
-    public void setMaxProcessingBatches(int maxProcessingBatches) {
-		this.maxProcessingBatches = Math.max(2, maxProcessingBatches);
+    public void setMaxProcessingBatchColumns(int maxProcessingBatches) {
+		this.maxProcessingBatches = Math.max(0, maxProcessingBatches);
 	}
 
     /**
      * Get processor batch size
      * @return Number of rows in a processor batch
      */
+    @Override
     public int getProcessorBatchSize() {        
         return this.processorBatchSize;
     }
@@ -361,6 +295,7 @@
      * Get connector batch size
      * @return Number of rows in a connector batch
      */
+    @Override
     public int getConnectorBatchSize() {
         return this.connectorBatchSize;
     }
@@ -402,7 +337,10 @@
     				this.store = createFileStore(newID);
     				this.store.setCleanupReference(this);
     			}
-    			return new ManagedBatchImpl(newID, store, batch);
+    			ManagedBatchImpl mbi = new ManagedBatchImpl(newID, store, batch);
+    			mbi.addToCache(false);
+    			persistBatchReferences();
+    			return mbi;
     		}
 
     		@Override
@@ -451,9 +389,12 @@
     
     @Override
     public void releaseBuffers(int count) {
+    	if (count < 1) {
+    		return;
+    	}
     	lock.lock();
     	try {
-	    	this.reserveBatches += count;
+	    	this.reserveBatchColumns += count;
 	    	batchesFreed.signalAll();
     	} finally {
     		lock.unlock();
@@ -461,31 +402,83 @@
     }	
     
     @Override
-    public int reserveBuffers(int count, boolean wait) throws MetaMatrixComponentException {
+    public int reserveBuffers(int count, BufferReserveMode mode) {
     	lock.lock();
 	    try {
-	    	while (wait && count > this.reserveBatches && this.reserveBatches < this.maxReserveBatches / 2) {
-	    		try {
-					batchesFreed.await();
-				} catch (InterruptedException e) {
-					throw new MetaMatrixComponentException(e);
-				}
-	    	}	
-	    	this.reserveBatches -= count;
-	    	if (this.reserveBatches >= 0) {
+	    	if (mode == BufferReserveMode.WAIT) {
+		    	int waitCount = 0;
+		    	while (count - waitCount > this.reserveBatchColumns) {
+		    		try {
+						batchesFreed.await(100, TimeUnit.MILLISECONDS);
+					} catch (InterruptedException e) {
+						throw new MetaMatrixRuntimeException(e);
+					}
+					waitCount++;
+		    	}	
+	    	}
+	    	if (this.reserveBatchColumns >= count || mode == BufferReserveMode.FORCE) {
+		    	this.reserveBatchColumns -= count;
 	    		return count;
 	    	}
-	    	int result = count + this.reserveBatches;
-	    	this.reserveBatches = 0;
+	    	int result = Math.max(0, this.reserveBatchColumns);
+	    	this.reserveBatchColumns -= result;
 	    	return result;
 	    } finally {
     		lock.unlock();
+    		persistBatchReferences();
     	}
     }
     
-    public void setMaxReserveBatches(int maxReserveBatches) {
-		this.maxReserveBatches = maxReserveBatches;
+	void persistBatchReferences() {
+		if (activeBatchColumnCount == 0 || activeBatchColumnCount <= reserveBatchColumns) {
+			int memoryCount = activeBatchColumnCount + maxReserveBatchColumns - reserveBatchColumns;
+			if (DataTypeManager.isValueCacheEnabled()) {
+				if (memoryCount < maxReserveBatchColumns / 8) {
+					DataTypeManager.setValueCacheEnabled(false);
+				}
+			} else if (memoryCount > maxReserveBatchColumns / 4) {
+				DataTypeManager.setValueCacheEnabled(true);
+			}
+			return;
+		}
+		while (true) {
+			ManagedBatchImpl mb = null;
+			synchronized (activeBatches) {
+				if (activeBatchColumnCount == 0 || activeBatchColumnCount * 5 < reserveBatchColumns * 4) {
+					break;
+				}
+				Iterator<TupleBufferInfo> iter = activeBatches.values().iterator();
+				TupleBufferInfo tbi = iter.next();
+				Map.Entry<Integer, ManagedBatchImpl> entry = null;
+				if (tbi.lastUsed != null) {
+					entry = tbi.batches.floorEntry(tbi.lastUsed - 1);
+				}
+				if (entry == null) {
+					entry = tbi.batches.lastEntry();
+				} 
+				tbi.removeBatch(entry.getKey());
+				if (tbi.batches.isEmpty()) {
+					iter.remove();
+				}
+				mb = entry.getValue();
+			}
+			try {
+				mb.persist();
+			} catch (MetaMatrixComponentException e) {
+				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read that batch later will result in an exception"); //$NON-NLS-1$
+			}
+		}
 	}
+	
+	@Override
+	public int getSchemaSize(List elements) {
+		return elements.size();
+	}
+	
+    public void setMaxReserveBatchColumns(int maxReserve) {
+    	this.maxReserveBatchColumns = maxReserve;
+		this.reserveBatchColumns = maxReserve;
+	}
 
 	public void shutdown() {
 	}

Modified: trunk/engine/src/main/java/com/metamatrix/query/eval/Evaluator.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/eval/Evaluator.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/eval/Evaluator.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -39,7 +39,6 @@
 import com.metamatrix.api.exception.query.ExpressionEvaluationException;
 import com.metamatrix.common.buffer.BlockedException;
 import com.metamatrix.common.types.Sequencable;
-import com.metamatrix.core.util.ArgCheck;
 import com.metamatrix.core.util.Assertion;
 import com.metamatrix.core.util.EquivalenceUtil;
 import com.metamatrix.query.QueryPlugin;
@@ -225,8 +224,11 @@
 	}
 
     private final int compareValues(Object leftValue, Object rightValue) {
-    	ArgCheck.isInstanceOf(Comparable.class, leftValue);
-    	ArgCheck.isInstanceOf(Comparable.class, rightValue);
+    	assert leftValue instanceof Comparable<?>;
+    	assert rightValue instanceof Comparable<?>;
+    	if (leftValue == rightValue) {
+    		return 0;
+    	}
         return ((Comparable)leftValue).compareTo(rightValue);
     }
 

Modified: trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleImplementJoinStrategy.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleImplementJoinStrategy.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleImplementJoinStrategy.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -79,13 +79,11 @@
             if (joinNode.getProperty(NodeConstants.Info.JOIN_TYPE) == JoinType.JOIN_INNER && context != null) {
             	float leftCost = NewCalculateCostUtil.computeCostForTree(joinNode.getFirstChild(), metadata);
             	float rightCost = NewCalculateCostUtil.computeCostForTree(joinNode.getLastChild(), metadata);
-            	boolean leftSmall = leftCost < context.getProcessorBatchSize() / 4;
-            	boolean rightSmall = rightCost < context.getProcessorBatchSize() / 4;
-            	boolean leftLarge = leftCost > context.getProcessorBatchSize();
-            	boolean rightLarge = rightCost > context.getProcessorBatchSize();
-            	if (leftLarge || rightLarge) {
-	                pushLeft = leftCost == NewCalculateCostUtil.UNKNOWN_VALUE || leftSmall || rightLarge;
-	                pushRight = rightCost == NewCalculateCostUtil.UNKNOWN_VALUE || rightSmall || leftLarge || joinNode.getProperty(NodeConstants.Info.DEPENDENT_VALUE_SOURCE) != null;
+            	if (leftCost != NewCalculateCostUtil.UNKNOWN_VALUE && rightCost != NewCalculateCostUtil.UNKNOWN_VALUE 
+            			&& (leftCost > context.getProcessorBatchSize() || rightCost > context.getProcessorBatchSize())) {
+            		//we use a larger constant here to ensure that we don't unwisely prevent pushdown
+            		pushLeft = leftCost < context.getProcessorBatchSize() || leftCost / rightCost < 16;
+            		pushRight = rightCost < context.getProcessorBatchSize() || rightCost / leftCost < 16 || joinNode.getProperty(NodeConstants.Info.DEPENDENT_VALUE_SOURCE) != null;
             	}
             }            
 

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/BatchCollector.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/BatchCollector.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/BatchCollector.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -95,7 +95,7 @@
         	add = this.batchHandler.batchProduced(batch);
         }
     	// Add batch
-        if(batch.getRowCount() > 0) {
+        if(batch.getRowCount() > 0 || batch.getTerminationFlag()) {
         	buffer.addTupleBatch(batch, add);
         }
     }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -103,7 +103,7 @@
 	}
     
     @Override
-    public void closeSource() throws MetaMatrixComponentException {
+    public void closeSource() {
     	if (this.buffer != null) {
     		this.buffer.remove();
     		this.buffer = null;
@@ -124,7 +124,8 @@
         List result = currentTuple;
         currentTuple = null;
         if (mark && saveOnMark && this.currentRow - 1 > this.buffer.getRowCount()) {
-        	this.buffer.addTupleBatch(new TupleBatch(this.currentRow - 1, new List[] {result}), true);
+        	this.buffer.setRowCount(this.currentRow - 2);
+        	this.buffer.addTuple(result);
         	this.bufferedIndex = this.currentRow - 1;
         }
         return result;

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -30,6 +30,7 @@
 import com.metamatrix.common.buffer.BlockedException;
 import com.metamatrix.common.buffer.BufferManager;
 import com.metamatrix.common.buffer.TupleBatch;
+import com.metamatrix.common.buffer.BufferManager.BufferReserveMode;
 import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
 import com.metamatrix.common.log.LogManager;
 import com.metamatrix.core.MetaMatrixRuntimeException;
@@ -57,6 +58,7 @@
 	private BufferManager bufferMgr;
 	private ProcessorPlan processPlan;
     private boolean initialized = false;
+    private int reserved;
     /** Flag that marks whether the request has been canceled. */
     private volatile boolean requestCanceled = false;
     private static final int DEFAULT_WAIT = 50;       
@@ -125,10 +127,12 @@
 	    try {
 	    	// initialize if necessary
 			if(! initialized) {
+				if (reserved == 0) {
+					reserved = this.bufferMgr.reserveBuffers(this.bufferMgr.getSchemaSize(this.getOutputElements()), BufferReserveMode.FORCE);
+				}
 				// Open the top node for reading
 				processPlan.open();
-	
-	            initialized = true;
+				initialized = true;
 			}
 	
 			long currentTime = System.currentTimeMillis();
@@ -155,11 +159,7 @@
 	    } catch (BlockedException e) {
 	    	throw e;
 	    } catch (MetaMatrixException e) {
-	    	try {
-	    		closeProcessing();
-	    	} catch (MetaMatrixException e1){
-	    		LogManager.logDetail(LogConstants.CTX_DQP, e1, "Error closing processor"); //$NON-NLS-1$
-	    	}
+    		closeProcessing();
 	    	if (e instanceof MetaMatrixProcessingException) {
 	    		throw (MetaMatrixProcessingException)e;
 	    	}
@@ -180,18 +180,22 @@
 	                   
     /**
      * Close processing and clean everything up.  Should only be called by the same thread that called process.
-     * @throws MetaMatrixComponentException 
      */
-    public void closeProcessing() throws MetaMatrixComponentException  {
+    public void closeProcessing() {
     	if (processorClosed) {
     		return;
     	}
     	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
     		LogManager.logDetail(LogConstants.CTX_DQP, "QueryProcessor: closing processor"); //$NON-NLS-1$
     	}
+		this.bufferMgr.releaseBuffers(reserved);
+		reserved = 0;
         processorClosed = true;
-    	    
-    	processPlan.close();
+        try {
+        	processPlan.close();
+		} catch (MetaMatrixComponentException e1){
+			LogManager.logDetail(LogConstants.CTX_DQP, e1, "Error closing processor"); //$NON-NLS-1$
+		}
     }
 
     @Override

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/AccessNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/AccessNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/AccessNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -102,7 +102,7 @@
         isUpdate = RelationalNodeUtil.isUpdate(atomicCommand);
         
 		if(needProcessing) {
-            this.tupleSource = getDataManager().registerRequest(this.getContext().getProcessorID(), atomicCommand, modelName, connectorBindingId, getID());
+			registerRequest(atomicCommand);
 		}
 	}
 
@@ -152,7 +152,7 @@
             	}
                 Command atomicCommand = (Command)command.clone();
                 if (prepareNextCommand(atomicCommand)) {
-                    tupleSource = getDataManager().registerRequest(this.getContext().getProcessorID(), atomicCommand, modelName, null, getID());
+                    registerRequest(atomicCommand);
                     break;
                 }
             }            
@@ -167,6 +167,11 @@
         terminateBatches();
         return pullBatch();
 	}
+
+	private void registerRequest(Command atomicCommand)
+			throws MetaMatrixComponentException, MetaMatrixProcessingException {
+		tupleSource = getDataManager().registerRequest(this.getContext().getProcessorID(), atomicCommand, modelName, connectorBindingId, getID());
+	}
 	
 	protected boolean processCommandsIndividually() {
 		return false;
@@ -176,15 +181,11 @@
         return false;
     }
     
-	public void close() throws MetaMatrixComponentException {
-	    if (!isClosed()) {
-            super.close();
-            
-            closeSources();            
-        }
+	public void closeDirect() {
+        closeSources();            
 	}
 
-    private void closeSources() throws MetaMatrixComponentException {
+    private void closeSources() {
         if(this.tupleSource != null) {
     		this.tupleSource.closeSource();
             tupleSource = null;

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchedUpdateNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchedUpdateNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchedUpdateNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -144,14 +144,10 @@
      * @see com.metamatrix.query.processor.relational.RelationalNode#close()
      * @since 4.2
      */
-    public void close() throws MetaMatrixComponentException {
-        if (!isClosed()) {
-            super.close();
-
-            if (tupleSource != null) {
-            	tupleSource.closeSource();
-            	tupleSource = null;
-            }
+    public void closeDirect() {
+        if (tupleSource != null) {
+        	tupleSource.closeSource();
+        	tupleSource = null;
         }
     }
     

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentAccessNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentAccessNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentAccessNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -54,11 +54,8 @@
     /**
      * @see com.metamatrix.query.processor.relational.AccessNode#close()
      */
-    public void close() throws MetaMatrixComponentException {
-        if (isClosed()) {
-            return;
-        }
-        super.close();
+    public void closeDirect() {
+        super.closeDirect();
 
         if (criteriaProcessor != null) {
             criteriaProcessor.close();

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -90,7 +90,7 @@
 	                DependentValueSource originalVs = (DependentValueSource)dependentNode.getContext().getVariableContext().getGlobalValue(valueSource);
 	                this.sortUtility = new SortUtility(originalVs.getTupleBuffer().createIndexedTupleSource(), sortSymbols, sortDirection, Mode.DUP_REMOVE, dependentNode.getBufferManager(), dependentNode.getConnectionID());
             	}
-            	dvs = new DependentValueSource(sortUtility.sort(), dependentNode.getBufferManager().getProcessorBatchSize() / 2);
+            	dvs = new DependentValueSource(sortUtility.sort());
             	for (SetState setState : dependentSetStates) {
                     setState.valueIterator = dvs.getValueIterator(setState.valueExpression);
     			}
@@ -176,7 +176,7 @@
         }        
     }
 
-    public void close() throws MetaMatrixComponentException {
+    public void close() {
         if (dependentState != null) {
             for (TupleState state : dependentState.values()) {
 				state.close();

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentProcedureAccessNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentProcedureAccessNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentProcedureAccessNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -77,11 +77,8 @@
         criteriaProcessor = null;
     }
     
-    public void close() throws MetaMatrixComponentException {
-        if (isClosed()) {
-            return;
-        }
-        super.close();
+    public void closeDirect() {
+        super.closeDirect();
 
         if (criteriaProcessor != null) {
             criteriaProcessor.close();

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentProcedureExecutionNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentProcedureExecutionNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentProcedureExecutionNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -76,11 +76,8 @@
         criteriaProcessor = null;
     }
     
-    public void close() throws MetaMatrixComponentException {
-        if (isClosed()) {
-            return;
-        }
-        super.close();
+    public void closeDirect() {
+        super.closeDirect();
 
         if (criteriaProcessor != null) {
             criteriaProcessor.close();

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -45,17 +45,15 @@
 public class DependentValueSource implements
                                  ValueIteratorSource {
 
-    private TupleBuffer tupleSourceID;
-    private int maxSetSize;
+    private TupleBuffer buffer;
     private Map<Expression, Set<Object>> cachedSets;
     
-    public DependentValueSource(TupleBuffer tupleSourceID, int maxSetSize) {
-        this.tupleSourceID = tupleSourceID;
-        this.maxSetSize = maxSetSize;
+    public DependentValueSource(TupleBuffer tupleSourceID) {
+        this.buffer = tupleSourceID;
     }
     
     public TupleBuffer getTupleBuffer() {
-		return tupleSourceID;
+		return buffer;
 	}
     
     /** 
@@ -63,7 +61,7 @@
      * @see com.metamatrix.query.sql.util.ValueIteratorSource#getValueIterator(com.metamatrix.query.sql.symbol.Expression)
      */
     public ValueIterator getValueIterator(Expression valueExpression) throws  MetaMatrixComponentException {
-    	IndexedTupleSource its = tupleSourceID.createIndexedTupleSource();
+    	IndexedTupleSource its = buffer.createIndexedTupleSource();
     	int index = 0;
     	if (valueExpression != null) {
     		index = its.getSchema().indexOf(valueExpression);
@@ -78,10 +76,10 @@
     		result = cachedSets.get(valueExpression);
     	}
     	if (result == null) {
-			if (tupleSourceID.getRowCount() > maxSetSize) {
+			if (buffer.getRowCount() > buffer.getBatchSize()) {
 				return null;
 			}
-			IndexedTupleSource its = tupleSourceID.createIndexedTupleSource();
+			IndexedTupleSource its = buffer.createIndexedTupleSource();
         	int index = 0;
         	if (valueExpression != null) {
         		index = its.getSchema().indexOf(valueExpression);

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -125,7 +125,9 @@
             this.collectionBuffer.close();
 
             // Sort
-            sortUtility = new SortUtility(collectionBuffer.createIndexedTupleSource(), elements, sortTypes, Mode.DUP_REMOVE, mgr, groupName);
+            if (sortUtility == null) {
+            	sortUtility = new SortUtility(collectionBuffer.createIndexedTupleSource(), elements, sortTypes, Mode.DUP_REMOVE_SORT, mgr, groupName);
+            }
             TupleBuffer sorted = sortUtility.sort();
             sorted.setForwardOnly(true);
             try {

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -212,9 +212,8 @@
             return groupPhase();
         }
         
-        TupleBatch terminationBatch = new TupleBatch(1, Collections.EMPTY_LIST);
-        terminationBatch.setTerminationFlag(true);
-        return terminationBatch;
+        this.terminateBatches();
+        return pullBatch();
     }
 	
 	public TupleSource getCollectionTupleSource() {
@@ -265,7 +264,7 @@
 			}
 			
 			@Override
-			public void closeSource() throws MetaMatrixComponentException {
+			public void closeSource() {
 				
 			}
 			
@@ -390,14 +389,11 @@
         }
     }
 
-    public void close() throws MetaMatrixComponentException {
-        if (!isClosed()) {
-        	if (this.sortBuffer != null) {
-        		this.sortBuffer.remove();
-        		this.sortBuffer = null;
-        	}
-            super.close();
-        }
+    public void closeDirect() {
+    	if (this.sortBuffer != null) {
+    		this.sortBuffer.remove();
+    		this.sortBuffer = null;
+    	}
     }
 
 	protected void getNodeString(StringBuffer str) {

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/InsertPlanExecutionNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/InsertPlanExecutionNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/InsertPlanExecutionNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -99,8 +99,7 @@
 	}
 	
 	@Override
-	public void close() throws MetaMatrixComponentException {
-		super.close();
+	public void closeDirect() {
 		this.currentBatch = null;
 	}
 	

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/JoinNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/JoinNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/JoinNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -36,6 +36,7 @@
 import com.metamatrix.common.buffer.BufferManager;
 import com.metamatrix.common.buffer.TupleBatch;
 import com.metamatrix.common.buffer.TupleBuffer;
+import com.metamatrix.common.buffer.BufferManager.BufferReserveMode;
 import com.metamatrix.query.processor.ProcessorDataManager;
 import com.metamatrix.query.processor.relational.SourceState.ImplicitBuffer;
 import com.metamatrix.query.sql.LanguageObject;
@@ -48,6 +49,10 @@
  */
 public class JoinNode extends SubqueryAwareRelationalNode {
 	
+	static class BatchAvailableException extends RuntimeException {}
+	
+	static BatchAvailableException BATCH_AVILABLE = new BatchAvailableException(); 
+	
 	public enum JoinStrategyType {    
 	    MERGE,
 	    PARTITIONED_SORT,
@@ -59,6 +64,7 @@
     
     private boolean leftOpened;
     private boolean rightOpened;
+    private int reserved;
     
     private JoinStrategy joinStrategy;
     private JoinType joinType;
@@ -147,16 +153,25 @@
             this.leftOpened = true;
         }
         
-        if(!isDependent() && !this.rightOpened) {
-            // Open right child if non-dependent
-            getChildren()[1].open();
-            this.rightOpened = true;
+        if(!isDependent()) {
+        	openRight();
         }
             
         this.state = State.LOAD_LEFT;
         // Set Up Join Strategy
         this.joinStrategy.initialize(this);
     }
+
+	private void openRight() throws MetaMatrixComponentException,
+			MetaMatrixProcessingException {
+		if (!this.rightOpened) {
+			if (reserved == 0) {
+				reserved = getBufferManager().reserveBuffers(getBufferManager().getSchemaSize(getOutputElements()), BufferReserveMode.FORCE);
+			}
+			getChildren()[1].open();
+			this.rightOpened = true;
+		}
+	}
             
     /** 
      * @see com.metamatrix.query.processor.relational.RelationalNode#clone()
@@ -194,33 +209,25 @@
             }
         	//left child was already opened by the join node
             this.joinStrategy.loadLeft();
+            if (isDependent()) { 
+                TupleBuffer buffer = this.joinStrategy.leftSource.getTupleBuffer();
+                this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource, new DependentValueSource(buffer));
+            }
             state = State.LOAD_RIGHT;
         }
         if (state == State.LOAD_RIGHT) {
-            if (isDependent() && !this.rightOpened) { 
-                TupleBuffer tsID = this.joinStrategy.leftSource.getTupleBuffer();
-                this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource, new DependentValueSource(tsID, this.getBufferManager().getProcessorBatchSize() / 2));
-                //open the right side now that the tuples have been collected
-                this.getChildren()[1].open();
-                this.rightOpened = true;
-            }
+        	this.openRight();
             this.joinStrategy.loadRight();
+        	this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource, null);
             state = State.EXECUTE;
         }
-        
-        while(true) {
-            if(super.isBatchFull()) {
-                return super.pullBatch();
-            }
-            List outputTuple = this.joinStrategy.nextTuple();
-            if(outputTuple != null) {
-                List projectTuple = projectTuple(this.projectionIndexes, outputTuple);
-                super.addBatchRow(projectTuple);
-            } else {
-                super.terminateBatches();
-                return super.pullBatch();
-            }
+        try {
+        	this.joinStrategy.process();
+        	this.terminateBatches();
+        } catch (BatchAvailableException e) {
+        	//pull the batch
         }
+        return pullBatch();
     }
 
     /** 
@@ -295,13 +302,12 @@
         this.dependentValueSource = dependentValueSource;
     }
     
-    public void close()
-            throws MetaMatrixComponentException {
-        super.close();
+    public void closeDirect() {
+		getBufferManager().releaseBuffers(reserved);
+		reserved = 0;
+        super.closeDirect();
         joinStrategy.close();
-        if (this.isDependent()) {
-        	this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource, null);
-        }
+    	this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource, null);
     }
 
     public JoinType getJoinType() {
@@ -335,5 +341,14 @@
     	}
     	return Arrays.asList(this.joinCriteria);
     }
+    
+    @Override
+    protected void addBatchRow(List row) {
+        List projectTuple = projectTuple(this.projectionIndexes, row);
+        super.addBatchRow(projectTuple);
+        if (isBatchFull()) {
+        	throw BATCH_AVILABLE;
+        }
+    }
 
 }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/JoinStrategy.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/JoinStrategy.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/JoinStrategy.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -79,7 +79,7 @@
         return combinedRow; 
     }
     
-    protected abstract List nextTuple() throws MetaMatrixComponentException, CriteriaEvaluationException, MetaMatrixProcessingException;
+    protected abstract void process() throws MetaMatrixComponentException, CriteriaEvaluationException, MetaMatrixProcessingException;
     
     public abstract Object clone();
         

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/LimitNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/LimitNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/LimitNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -98,9 +98,6 @@
         rowCounter += resultBatch.getRowCount();
         if (rowCounter == limit || batch.getTerminationFlag()) {
             resultBatch.setTerminationFlag(true);
-            if (!batch.getTerminationFlag()){
-                getChildren()[0].close();
-            }
         }        
         return resultBatch;
     }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/MergeJoinStrategy.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/MergeJoinStrategy.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/MergeJoinStrategy.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -135,7 +135,7 @@
     }
     
     @Override
-    protected List nextTuple() throws MetaMatrixComponentException,
+    protected void process() throws MetaMatrixComponentException,
     		CriteriaEvaluationException, MetaMatrixProcessingException {
         while (this.mergeState != MergeState.DONE) {
             
@@ -149,7 +149,7 @@
                         this.leftScanState = ScanState.DONE;
                         if (joinNode.getJoinType() != JoinType.JOIN_FULL_OUTER) {
                             mergeState = MergeState.DONE;
-                            return null;
+                            return;
                         }
                     }
                 }
@@ -163,7 +163,7 @@
                         this.rightScanState = ScanState.DONE;
                         if (!this.joinNode.getJoinType().isOuter()) {
                             mergeState = MergeState.DONE;
-                            return null;
+                            return;
                         }
                     }
                 }
@@ -173,7 +173,7 @@
                 if (this.leftScanState == ScanState.DONE) {
                     if (this.rightScanState == ScanState.DONE) {
                         this.mergeState = MergeState.DONE;
-                        return null;
+                        return;
                     }
                     result = -1;
                 } else if (this.rightScanState == ScanState.DONE) {
@@ -191,12 +191,12 @@
                 } else if (result > 0) {
                     this.leftScanState = ScanState.READ;
                     if (this.joinNode.getJoinType().isOuter()) {
-                        return outputTuple(this.leftSource.getCurrentTuple(), this.rightSource.getOuterVals());
+                    	this.joinNode.addBatchRow(outputTuple(this.leftSource.getCurrentTuple(), this.rightSource.getOuterVals()));
                     }
                 } else {
                     this.rightScanState = ScanState.READ;
                     if (joinNode.getJoinType() == JoinType.JOIN_FULL_OUTER) {
-                        return outputTuple(this.leftSource.getOuterVals(), this.rightSource.getCurrentTuple());
+                    	this.joinNode.addBatchRow(outputTuple(this.leftSource.getOuterVals(), this.rightSource.getCurrentTuple()));
                     }
                 }
             }
@@ -229,11 +229,11 @@
 
                 while (loopState == LoopState.LOAD_INNER || loopState == LoopState.EVALUATE_CRITERIA) {
                     if (loopState == LoopState.LOAD_INNER) {
-                        if (compareToPrevious(innerState)) {
-                            loopState = LoopState.EVALUATE_CRITERIA;
-                        } else {
-                            loopState = LoopState.LOAD_OUTER;
+                        if (!compareToPrevious(innerState)) {
+                        	loopState = LoopState.LOAD_OUTER;
+                        	break;
                         }
+                        loopState = LoopState.EVALUATE_CRITERIA;
                     }
 
                     if (loopState == LoopState.EVALUATE_CRITERIA) {
@@ -248,25 +248,25 @@
                                 if (this.joinNode.getJoinType() == JoinType.JOIN_SEMI) {
                                     this.loopState = LoopState.LOAD_OUTER; //only one match is needed for semi join
                                 } 
-                                return outputTuple;
+                                this.joinNode.addBatchRow(outputTuple);
+                                continue;
                             }
                             //right outer join || anti semi join can skip to the next outer value
                             this.loopState = LoopState.LOAD_OUTER;
+                            break;
                         } 
                     }
                 }
 
                 if (!outerMatched) {
                     if (matchState == MatchState.MATCH_RIGHT) {
-                    	return outputTuple(this.leftSource.getOuterVals(), this.rightSource.getCurrentTuple());
+                    	this.joinNode.addBatchRow(outputTuple(this.leftSource.getOuterVals(), this.rightSource.getCurrentTuple()));
+                    } else if (this.joinNode.getJoinType().isOuter()) {
+                    	this.joinNode.addBatchRow(outputTuple(this.leftSource.getCurrentTuple(), this.rightSource.getOuterVals()));
                     }
-                    if (this.joinNode.getJoinType().isOuter()) {
-                        return outputTuple(this.leftSource.getCurrentTuple(), this.rightSource.getOuterVals());
-                    }
                 }
             }
         }
-        return null;
     }
 
     protected boolean compareToPrevious(SourceState target) throws MetaMatrixComponentException,

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/NullNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/NullNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/NullNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -22,7 +22,6 @@
 
 package com.metamatrix.query.processor.relational;
 
-import java.util.Collections;
 import java.util.Map;
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
@@ -37,9 +36,8 @@
     public TupleBatch nextBatchDirect()
         throws MetaMatrixComponentException {
 
-        TupleBatch batch = new TupleBatch(1, Collections.EMPTY_LIST);
-        batch.setTerminationFlag(true);
-        return batch;
+        this.terminateBatches();
+        return pullBatch();
     }
         
     protected void getNodeString(StringBuffer str) {

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -32,6 +32,7 @@
 import com.metamatrix.common.buffer.IndexedTupleSource;
 import com.metamatrix.common.buffer.TupleBatch;
 import com.metamatrix.common.buffer.TupleBuffer;
+import com.metamatrix.common.buffer.BufferManager.BufferReserveMode;
 
 /**
  * Extends the basic fully sorted merge join to check for conditions necessary 
@@ -54,6 +55,7 @@
 	private List<?> partitionedTuple;
 	private int matchBegin = -1;
 	private int matchEnd = -1;
+	private int reserved;
 
 	public PartitionedSortJoin(SortOption sortLeft, SortOption sortRight) {
 		super(sortLeft, sortRight, false);
@@ -65,6 +67,7 @@
     	for (TupleBuffer tupleSourceID : this.partitions) {
 			tupleSourceID.remove();
 		}
+    	releaseReserved();
     	this.endTuples = null;
     	this.overlap.clear();
     	this.endRows.clear();
@@ -117,14 +120,13 @@
     protected void loadRight() throws MetaMatrixComponentException,
     		MetaMatrixProcessingException {
     	this.rightSource.getTupleBuffer();
-    	int maxRows = this.joinNode.getBatchSize() * getMaxProcessingBatches();
     	if (processingSortRight == SortOption.SORT
-    			&& this.leftSource.getRowCount() < maxRows
-    			&& this.leftSource.getRowCount() * 4 < this.rightSource.getRowCount()) {
+    			&& this.leftSource.getRowCount() * 4 < this.rightSource.getRowCount()
+    			&& testAndSetPartitions(this.rightSource.getRowCount(), this.rightSource.getSource().getOutputElements())) {
     		this.processingSortRight = SortOption.PARTITION;
     	} else if (processingSortLeft == SortOption.SORT
-    			&& this.rightSource.getRowCount() < maxRows 
-    			&& this.rightSource.getRowCount() * 4 < this.leftSource.getRowCount()) {
+    			&& this.rightSource.getRowCount() * 4 < this.leftSource.getRowCount()
+    			&& testAndSetPartitions(this.leftSource.getRowCount(), this.leftSource.getSource().getOutputElements())) {
     		this.processingSortLeft = SortOption.PARTITION;
     	} 
         super.loadRight(); //sort right if needed
@@ -140,49 +142,52 @@
         	this.partitionedSource = this.rightSource;
         }
         if (this.processingSortLeft == SortOption.PARTITION) {
-        	partitionSource(true);
+        	partitionSource();
         } 
         if (this.processingSortRight == SortOption.PARTITION) {
-    		partitionSource(false);
-        	if (this.processingSortRight == SortOption.SORT) {
-        		//degrade to a merge join
-        		super.loadRight(); 
-        	}
+    		partitionSource();
     	}
     }
 
     /**
      * Since the source to be partitioned is already loaded, then there's no
-     * chance of a blocked exception during partitioning, so double the max.
+     * chance of a blocked exception during partitioning, so reserve some batches.
      * 
      * TODO: partition at the same time as the load to determine size
      * 
      * @return
      */
-	private int getMaxProcessingBatches() {
-		return 2 * this.joinNode.getBufferManager().getMaxProcessingBatches();
+	private boolean testAndSetPartitions(int rowCount, List elements) {
+		int partitionCount = (rowCount / this.joinNode.getBatchSize() + rowCount % this.joinNode.getBatchSize() == 0 ? 0:1) 
+			* this.joinNode.getBufferManager().getSchemaSize(elements);
+		if (partitionCount > this.joinNode.getBufferManager().getMaxProcessingBatchColumns() * 8) {
+			return false; 
+		}
+		int toReserve = Math.max(1, (int)(partitionCount * .75));
+		int excess = Math.max(0, toReserve - this.joinNode.getBufferManager().getMaxProcessingBatchColumns());
+		reserved = this.joinNode.getBufferManager().reserveBuffers(toReserve - excess, BufferReserveMode.FORCE);
+		if (excess > 0) {
+			reserved += this.joinNode.getBufferManager().reserveBuffers(toReserve, BufferReserveMode.NO_WAIT);
+		}
+		if (reserved == toReserve) {
+			return true;
+		}
+		releaseReserved();
+		return false;
 	}
     
-	private void partitionSource(boolean left) throws MetaMatrixComponentException,
+	private void partitionSource() throws MetaMatrixComponentException,
 			MetaMatrixProcessingException {
 		if (partitioned) {
 			return;
 		}
-		if (endTuples.length > getMaxProcessingBatches() + 1) {
-			if (left) {
-				this.processingSortLeft = SortOption.SORT;
-			} else {
-				this.processingSortRight = SortOption.SORT;
-			}
-			return;
-		}
 		if (endTuples.length < 2) {
 			partitions.add(this.partitionedSource.getTupleBuffer());
 		} else {
 			if (partitions.isEmpty()) {
 				for (int i = 0; i < endTuples.length; i++) {
 					TupleBuffer tc = this.partitionedSource.createSourceTupleBuffer();
-					tc.setBatchSize(Math.max(1, this.joinNode.getBatchSize()));
+					tc.setForwardOnly(true);
 					this.partitions.add(tc);
 				}
 			}
@@ -204,18 +209,24 @@
 			for (TupleBuffer partition : this.partitions) {
 				partition.close();
 			}
+			releaseReserved();
 		}
 		partitioned = true;
 	}
+
+	private void releaseReserved() {
+		this.joinNode.getBufferManager().releaseBuffers(this.reserved);
+		this.reserved = 0;
+	}
         
     @Override
-    protected List nextTuple() throws MetaMatrixComponentException,
+    protected void process() throws MetaMatrixComponentException,
     		CriteriaEvaluationException, MetaMatrixProcessingException {
     	if (this.processingSortLeft != SortOption.PARTITION && this.processingSortRight != SortOption.PARTITION) {
-    		return super.nextTuple();
+    		super.process();
     	}
     	if (endRows.isEmpty()) {
-    		return null; //no rows on the sorted side
+    		return; //no rows on the sorted side
     	}
     	while (currentPartition < partitions.size()) {
     		if (currentSource == null) {
@@ -260,7 +271,7 @@
     				boolean matches = this.joinNode.matchesCriteria(outputTuple);
     				matchBegin++;
                     if (matches) {
-                    	return outputTuple;
+                    	this.joinNode.addBatchRow(outputTuple);
                     }
     			}
     			matchBegin = -1;
@@ -271,7 +282,6 @@
     		currentSource = null;
     		currentPartition++;
     	}
-    	return null;
     }
     
     public int binarySearch(List<?> tuple, List[] tuples, int[] leftIndexes, int[] rightIndexes) {

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PlanExecutionNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PlanExecutionNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PlanExecutionNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -30,10 +30,13 @@
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
 import com.metamatrix.common.buffer.BlockedException;
 import com.metamatrix.common.buffer.TupleBatch;
+import com.metamatrix.common.log.LogManager;
+import com.metamatrix.dqp.util.LogConstants;
 import com.metamatrix.query.processor.ProcessorPlan;
 import com.metamatrix.query.sql.util.VariableContext;
 import com.metamatrix.query.util.CommandContext;
 
+//TODO: consolidate with QueryProcessor
 public class PlanExecutionNode extends RelationalNode {
 
     // Initialization state
@@ -133,11 +136,12 @@
         return false;
     }
     
-	public void close() throws MetaMatrixComponentException {
-        if (!isClosed()) {
-            super.close();            
-	        plan.close();
-        }
+	public void closeDirect() {
+        try {
+        	plan.close();
+		} catch (MetaMatrixComponentException e1){
+			LogManager.logDetail(LogConstants.CTX_DQP, e1, "Error closing processor"); //$NON-NLS-1$
+		}
 	}
 	
 	protected void getNodeString(StringBuffer str) {

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/ProjectIntoNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/ProjectIntoNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/ProjectIntoNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -191,7 +191,7 @@
     	tupleSource = getDataManager().registerRequest(this.getContext().getProcessorID(), command, this.modelName, null, getID());        
     }
     
-    private void closeRequest() throws MetaMatrixComponentException {
+    private void closeRequest() {
 
         if (this.tupleSource != null) {
             tupleSource.closeSource();
@@ -253,8 +253,7 @@
 		return intoGroup.isTempGroupSymbol();
 	}
 
-    public void close() throws MetaMatrixComponentException {
+    public void closeDirect() {
         closeRequest();
-        super.close();
 	}
 }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/RelationalNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/RelationalNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/RelationalNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -252,17 +252,16 @@
      * @throws MetaMatrixComponentException
      * @since 4.2
      */
-    public TupleBatch nextBatch() throws BlockedException,  MetaMatrixComponentException, MetaMatrixProcessingException {
+    public final TupleBatch nextBatch() throws BlockedException,  MetaMatrixComponentException, MetaMatrixProcessingException {
         boolean recordStats = this.context != null && (this.context.getCollectNodeStatistics() || this.context.getProcessDebug());
         
-        TupleBatch batch = null;
         try {
             while (true) {
             	//start timer for this batch
                 if(recordStats && this.context.getCollectNodeStatistics()) {
                     this.nodeStatistics.startBatchTimer();
                 }
-                batch = nextBatchDirect();
+                TupleBatch batch = nextBatchDirect();
                 if (recordStats) {
                     if(this.context.getCollectNodeStatistics()) {
                         // stop timer for this batch (normal)
@@ -279,15 +278,17 @@
                 //there have been several instances in the code that have not correctly accounted for non-terminal zero length batches
                 //this processing style however against the spirit of batch processing (but was already utilized by Sort and Grouping nodes)
                 if (batch.getRowCount() != 0 || batch.getTerminationFlag()) {
-                    break;
+                    if (batch.getTerminationFlag()) {
+                        close();
+                    }
+                    return batch;
                 }
             }
-            return batch;
         } catch (BlockedException e) {
             if(recordStats && this.context.getCollectNodeStatistics()) {
                 // stop timer for this batch (BlockedException)
                 this.nodeStatistics.stopBatchTimer();
-                this.nodeStatistics.collectCumulativeNodeStats(batch, RelationalNodeStatistics.BLOCKEDEXCEPTION_STOP);
+                this.nodeStatistics.collectCumulativeNodeStats(null, RelationalNodeStatistics.BLOCKEDEXCEPTION_STOP);
             }
             throw e;
         } catch (MetaMatrixComponentException e) {
@@ -310,10 +311,11 @@
 	protected abstract TupleBatch nextBatchDirect()
 		throws BlockedException, MetaMatrixComponentException, MetaMatrixProcessingException;
 
-	public void close()
+	public final void close()
 		throws MetaMatrixComponentException {
 
         if (!this.closed) {
+        	closeDirect();
             for(int i=0; i<children.length; i++) {
                 if(children[i] != null) {
                     children[i].close();
@@ -324,6 +326,10 @@
             this.closed = true;
         }
     }
+	
+	public void closeDirect() {
+		
+	}
 
     /**
      * Check if the node has been already closed

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -126,14 +126,11 @@
 		return this.pullBatch();
     }
 
-    public void close() throws MetaMatrixComponentException {
-        if (!isClosed()) {
-            super.close();
-            if(this.output != null) {
-                this.output.remove();
-                this.output = null;
-                this.outputTs = null;
-            }
+    public void closeDirect() {
+        if(this.output != null) {
+            this.output.remove();
+            this.output = null;
+            this.outputTs = null;
         }
     }
 

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -36,6 +36,7 @@
 import com.metamatrix.common.buffer.IndexedTupleSource;
 import com.metamatrix.common.buffer.TupleBuffer;
 import com.metamatrix.common.buffer.TupleSource;
+import com.metamatrix.common.buffer.BufferManager.BufferReserveMode;
 import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
 import com.metamatrix.common.log.LogManager;
 import com.metamatrix.core.log.MessageLevel;
@@ -87,6 +88,7 @@
     private BufferManager bufferManager;
     private String groupName;
     private List<SingleElementSymbol> schema;
+    private int schemaSize;
 	private ListNestedSortComparator comparator;
 
     private TupleBuffer output;
@@ -110,6 +112,7 @@
         this.bufferManager = bufferMgr;
         this.groupName = groupName;
         this.schema = this.source.getSchema();
+        this.schemaSize = bufferManager.getSchemaSize(this.schema);
         int distinctIndex = sortElements != null? sortElements.size() - 1:0;
         if (mode != Mode.SORT) {
 	        if (sortElements == null) {
@@ -175,26 +178,22 @@
 	            	workingTuples = new TreeSet<List<?>>(comparator);
 	            }
     		}
+    		
             int totalReservedBuffers = 0;
             try {
-	            int maxRows = bufferManager.getMaxProcessingBatches() * bufferManager.getProcessorBatchSize();
+	            int maxRows = this.bufferManager.getProcessorBatchSize();
 		        while(!doneReading) {
 		        	//attempt to reserve more working memory if there are additional rows available before blocking
-		        	if (workingTuples.size() == maxRows) {
-		        		if (source.available() < 1) {
+		        	if (workingTuples.size() >= maxRows) {
+	        			int reserved = bufferManager.reserveBuffers(schemaSize, 
+	        					(totalReservedBuffers + schemaSize <= bufferManager.getMaxProcessingBatchColumns())?BufferReserveMode.FORCE:BufferReserveMode.NO_WAIT);
+	        			if (reserved != schemaSize) {
 		        			break;
-		        		}
-	        			int reserved = bufferManager.reserveBuffers(1, false);
-		        		if (reserved == 0) {
-		        			break;
 		        		} 
-		        		totalReservedBuffers += 1;
+		        		totalReservedBuffers += reserved;
 		        		maxRows += bufferManager.getProcessorBatchSize();	
 		        	}
 		            try {
-		            	if (totalReservedBuffers > 0 && source.available() == 0) {
-		            		break;
-		            	}
 		            	List<?> tuple = source.nextTuple();
 		            	
 		            	if (tuple == null) {
@@ -205,6 +204,9 @@
 	                    	this.collected++;
 	                    }
 		            } catch(BlockedException e) {
+		            	if (workingTuples.size() >= bufferManager.getProcessorBatchSize()) {
+		            		break;
+		            	}
 		            	if (mode != Mode.DUP_REMOVE  
 		            			|| (this.output != null && collected < this.output.getRowCount() * 2) 
 		            			|| (this.output == null && this.workingTuples.isEmpty() && this.activeTupleBuffers.isEmpty())) {
@@ -231,7 +233,7 @@
 	            
 		        sublist.saveBatch();
             } finally {
-            	bufferManager.releaseBuffers(totalReservedBuffers);
+        		bufferManager.releaseBuffers(totalReservedBuffers);
             }
         }
     	
@@ -248,12 +250,17 @@
             
             TupleBuffer merged = createTupleBuffer();
 
-            int maxSortIndex = Math.min(activeTupleBuffers.size(), this.bufferManager.getMaxProcessingBatches());
-            int reservedBuffers = 0;
-            if (activeTupleBuffers.size() > maxSortIndex) {
-            	reservedBuffers = bufferManager.reserveBuffers(activeTupleBuffers.size() - maxSortIndex, true);
+            int desiredSpace = activeTupleBuffers.size() * schemaSize;
+            int reserved = Math.min(desiredSpace, this.bufferManager.getMaxProcessingBatchColumns());
+            bufferManager.reserveBuffers(reserved, BufferReserveMode.FORCE);
+            if (desiredSpace > reserved) {
+            	reserved += bufferManager.reserveBuffers(desiredSpace - reserved, BufferReserveMode.WAIT);
             }
-            maxSortIndex += reservedBuffers;
+            int maxSortIndex = Math.max(2, reserved / schemaSize); //always allow progress
+            //release any partial excess
+            int release = reserved % schemaSize > 0 ? 1 : 0;
+            bufferManager.releaseBuffers(release);
+            reserved -= release;
             try {
 	        	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
 	            	LogManager.logTrace(LogConstants.CTX_DQP, "Merging", maxSortIndex, "sublists out of", activeTupleBuffers.size()); //$NON-NLS-1$ //$NON-NLS-2$
@@ -294,16 +301,21 @@
 	            	masterSortIndex = this.activeTupleBuffers.size() - 1;
 	            }
             } finally {
-            	this.bufferManager.releaseBuffers(reservedBuffers);
+            	this.bufferManager.releaseBuffers(reserved);
             }
         }
     	
         // Close sorted source (all others have been removed)
         if (doneReading) {
-        	activeTupleBuffers.get(0).close();
-        	activeTupleBuffers.get(0).setForwardOnly(false);
         	if (this.output != null) {
 	        	this.output.close();
+	        	TupleBuffer last = activeTupleBuffers.remove(0);
+	        	if (output != last) {
+	        		last.remove();
+	        	}
+	        } else {
+	        	activeTupleBuffers.get(0).close();
+	        	activeTupleBuffers.get(0).setForwardOnly(false);
 	        }
 	        this.phase = DONE;
 	        return;

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SourceState.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SourceState.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SourceState.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -29,6 +29,7 @@
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
 import com.metamatrix.common.buffer.IndexedTupleSource;
 import com.metamatrix.common.buffer.TupleBuffer;
+import com.metamatrix.common.buffer.TupleSource;
 import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
 import com.metamatrix.query.processor.BatchCollector;
 import com.metamatrix.query.processor.BatchIterator;
@@ -64,6 +65,10 @@
         this.expressionIndexes = getExpressionIndecies(expressions, elements);
     }
     
+    public RelationalNode getSource() {
+		return source;
+	}
+    
     public void setImplicitBuffer(ImplicitBuffer implicitBuffer) {
 		this.implicitBuffer = implicitBuffer;
 	}
@@ -101,10 +106,7 @@
             this.buffer = null;
         }
         if (this.iterator != null) {
-        	try {
-				this.iterator.closeSource();
-			} catch (MetaMatrixComponentException e) {
-			}
+			this.iterator.closeSource();
         	this.iterator = null;
         }
     }
@@ -173,9 +175,15 @@
     public void sort(SortOption sortOption) throws MetaMatrixComponentException, MetaMatrixProcessingException {
     	if (sortOption == SortOption.SORT || sortOption == SortOption.SORT_DISTINCT) {
 	    	if (this.sortUtility == null) {
-			    this.sortUtility = new SortUtility(this.buffer != null ? this.buffer.createIndexedTupleSource() : new BatchIterator(this.source), 
-			                                        expressions, Collections.nCopies(expressions.size(), OrderBy.ASC), sortOption == SortOption.SORT_DISTINCT?Mode.DUP_REMOVE_SORT:Mode.SORT,
-			                                        this.source.getBufferManager(), this.source.getConnectionID());
+	    		TupleSource ts = null;
+	    		if (this.buffer != null) {
+	    			this.buffer.setForwardOnly(true);
+	    			ts = this.buffer.createIndexedTupleSource();
+	    		} else {
+	    			ts = new BatchIterator(this.source);
+	    		}
+			    this.sortUtility = new SortUtility(ts, expressions, Collections.nCopies(expressions.size(), OrderBy.ASC), 
+			    		sortOption == SortOption.SORT_DISTINCT?Mode.DUP_REMOVE_SORT:Mode.SORT, this.source.getBufferManager(), this.source.getConnectionID());
 			    this.markDistinct(sortOption == SortOption.SORT_DISTINCT && expressions.size() == this.getOuterVals().size());
 			}
 			this.buffer = sortUtility.sort();

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SubqueryAwareEvaluator.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SubqueryAwareEvaluator.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SubqueryAwareEvaluator.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -56,7 +56,7 @@
 		List<?> tuple;
 		ProcessorPlan plan;
 		
-		void close() throws MetaMatrixComponentException {
+		void close() {
 			if (processor == null) {
 				return;
 			}
@@ -85,7 +85,7 @@
 		}
 	}
 	
-	public void close() throws MetaMatrixComponentException {
+	public void close() {
 		for (SubqueryState state : subqueries.values()) {
 			state.close();
 		}
@@ -123,9 +123,9 @@
 		        state.collector = state.processor.createBatchCollector();
 			}
 			state.done = true;
-			state.processor.getProcessorPlan().reset();
+			state.plan.reset();
 		}
-		return new DependentValueSource(state.collector.collectTuples(), this.manager.getProcessorBatchSize() / 2).getValueIterator(ref.getValueExpression());
+		return new DependentValueSource(state.collector.collectTuples()).getValueIterator(ref.getValueExpression());
 	}
 	
 }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SubqueryAwareRelationalNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SubqueryAwareRelationalNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SubqueryAwareRelationalNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -2,7 +2,6 @@
 
 import java.util.Map;
 
-import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.query.eval.Evaluator;
 
 public abstract class SubqueryAwareRelationalNode extends RelationalNode {
@@ -31,8 +30,7 @@
 	}
 	
 	@Override
-	public void close() throws MetaMatrixComponentException {
-		super.close();
+	public void closeDirect() {
 		if (evaluator != null) {
 			evaluator.close();
 		}

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/UnionAllNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/UnionAllNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/UnionAllNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -28,13 +28,19 @@
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
 import com.metamatrix.common.buffer.BlockedException;
+import com.metamatrix.common.buffer.BufferManager;
 import com.metamatrix.common.buffer.TupleBatch;
+import com.metamatrix.common.buffer.BufferManager.BufferReserveMode;
+import com.metamatrix.query.processor.ProcessorDataManager;
+import com.metamatrix.query.util.CommandContext;
 
 public class UnionAllNode extends RelationalNode {
 
     private boolean[] sourceDone;
     
     private int outputRow = 1;
+    private int reserved;
+    private int schemaSize;
 	
 	public UnionAllNode(int nodeID) {
 		super(nodeID);
@@ -47,12 +53,21 @@
         outputRow = 1;   
     }    
     
+    @Override
+    public void initialize(CommandContext context, BufferManager bufferManager,
+    		ProcessorDataManager dataMgr) {
+    	super.initialize(context, bufferManager, dataMgr);
+    	this.schemaSize = getBufferManager().getSchemaSize(getOutputElements());
+    }
+    
 	public void open() 
 		throws MetaMatrixComponentException, MetaMatrixProcessingException {
 
         // Initialize done flags
         sourceDone = new boolean[getChildren().length];
-        
+        if (reserved == 0) {
+        	reserved = getBufferManager().reserveBuffers((getChildren().length - 1) * schemaSize, BufferReserveMode.FORCE);
+        }
         // Open the children
         super.open();
 	}
@@ -79,6 +94,10 @@
                             // Mark source as being done and decrement the activeSources counter
                             sourceDone[i] = true;
                             activeSources--;
+                            if (reserved > 0) {
+                            	getBufferManager().releaseBuffers(schemaSize);
+                            	reserved-=schemaSize;
+                            }
                         }
                     } catch(BlockedException e) {
                     	if(i<children.length-1 && hasDependentProcedureExecutionNode(children[0])){
@@ -122,6 +141,12 @@
         
         return outputBatch;
     }    
+    
+    @Override
+    public void closeDirect() {
+    	getBufferManager().releaseBuffers(reserved);
+    	reserved = 0;
+    }
 
 	public Object clone(){
 		UnionAllNode clonedNode = new UnionAllNode(super.getID());

Modified: trunk/engine/src/main/java/com/metamatrix/query/sql/lang/JoinType.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/sql/lang/JoinType.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/com/metamatrix/query/sql/lang/JoinType.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -36,38 +36,40 @@
 	// Constants defining join type - users will construct these
 	
 	/** Represents an inner join:  a INNER JOIN b */
-	public static final JoinType JOIN_INNER 		= new JoinType(0);
+	public static final JoinType JOIN_INNER 		= new JoinType(0, false);
 
 	/** Represents a right outer join:  a RIGHT OUTER JOIN b */
-	public static final JoinType JOIN_RIGHT_OUTER 	= new JoinType(1);
+	public static final JoinType JOIN_RIGHT_OUTER 	= new JoinType(1, true);
 
 	/** Represents a left outer join:  a LEFT OUTER JOIN b */
-	public static final JoinType JOIN_LEFT_OUTER 	= new JoinType(2);
+	public static final JoinType JOIN_LEFT_OUTER 	= new JoinType(2, true);
 
 	/** Represents a full outer join:  a FULL OUTER JOIN b */
-	public static final JoinType JOIN_FULL_OUTER 	= new JoinType(3);
+	public static final JoinType JOIN_FULL_OUTER 	= new JoinType(3, true);
 
 	/** Represents a cross join:  a CROSS JOIN b */
-	public static final JoinType JOIN_CROSS 		= new JoinType(4);
+	public static final JoinType JOIN_CROSS 		= new JoinType(4, false);
     
     /** Represents a union join:  a UNION JOIN b - not used after rewrite */
-    public static final JoinType JOIN_UNION         = new JoinType(5);
+    public static final JoinType JOIN_UNION         = new JoinType(5, true);
     
     /** internal SEMI Join type */
-    public static final JoinType JOIN_SEMI          = new JoinType(6);
+    public static final JoinType JOIN_SEMI          = new JoinType(6, false);
     
     /** internal ANTI SEMI Join type */
-    public static final JoinType JOIN_ANTI_SEMI          = new JoinType(7);
+    public static final JoinType JOIN_ANTI_SEMI          = new JoinType(7, true);
 
 	private int type;
+	private boolean outer;
 
 	/**
 	 * Construct a join type object.  This is private and is only called by
 	 * the static constant objects in this class.
 	 * @param type Type code for object
 	 */
-	private JoinType(int type) { 
+	private JoinType(int type, boolean outer) { 
 		this.type = type;
+		this.outer = outer;
 	}
 
 	/**
@@ -97,7 +99,7 @@
 	 * @return True if left/right/full outer, false if inner/cross
 	 */
 	public boolean isOuter() { 
-		return this.equals(JOIN_LEFT_OUTER) || this.equals(JOIN_FULL_OUTER) || this.equals(JOIN_RIGHT_OUTER) || this.equals(JOIN_ANTI_SEMI); 	
+		return outer; 	
 	}
 	
 	public boolean isSemi() {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -54,8 +54,10 @@
 import com.metamatrix.common.buffer.TupleBatch;
 import com.metamatrix.common.buffer.TupleSource;
 import com.metamatrix.common.comm.api.ResultsReceiver;
+import com.metamatrix.common.log.LogManager;
 import com.metamatrix.common.vdb.api.ModelInfo;
 import com.metamatrix.core.CoreConstants;
+import com.metamatrix.core.log.MessageLevel;
 import com.metamatrix.core.util.Assertion;
 import com.metamatrix.dqp.DQPPlugin;
 import com.metamatrix.dqp.embedded.DQPEmbeddedProperties;
@@ -69,6 +71,7 @@
 import com.metamatrix.dqp.service.DataService;
 import com.metamatrix.dqp.service.MetadataService;
 import com.metamatrix.dqp.service.VDBService;
+import com.metamatrix.dqp.util.LogConstants;
 import com.metamatrix.metadata.runtime.api.MetadataSourceUtil;
 import com.metamatrix.query.processor.CollectionTupleSource;
 import com.metamatrix.query.processor.ProcessorDataManager;
@@ -371,9 +374,14 @@
 		this.dataService.executeRequest(aqr, connectorId, receiver);
 	}
 
-	public void closeRequest(AtomicRequestID request, ConnectorID connectorId)
-			throws MetaMatrixComponentException {
-		this.dataService.closeRequest(request, connectorId);
+	public void closeRequest(AtomicRequestID request, ConnectorID connectorId) {
+		try {
+			this.dataService.closeRequest(request, connectorId);
+		} catch (MetaMatrixComponentException e) {
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+				LogManager.logDetail(LogConstants.CTX_DQP, e, e.getMessage());
+			}
+		}
 	}
 	
 	public void cancelRequest(AtomicRequestID request, ConnectorID connectorId)

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -149,7 +149,7 @@
         }
     }
     
-    public void fullyCloseSource() throws MetaMatrixComponentException {
+    public void fullyCloseSource() {
     	this.dataMgr.closeRequest(aqr.getAtomicRequestID(), connectorId);
     }
     
@@ -160,7 +160,7 @@
     /**
      * @see TupleSource#closeSource()
      */
-    public void closeSource() throws MetaMatrixComponentException {
+    public void closeSource() {
     	if (this.supportsImplicitClose) {
     		this.dataMgr.closeRequest(aqr.getAtomicRequestID(), connectorId);
     	}

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -273,14 +273,8 @@
 	protected void attemptClose() {
 		int rowcount = -1;
 		if (this.resultsBuffer != null) {
-			try {
-				if (this.processor != null) {
-					this.processor.closeProcessing();
-				}
-			} catch (MetaMatrixComponentException e) {
-				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-					LogManager.logDetail(LogConstants.CTX_DQP, e, e.getMessage());
-				}
+			if (this.processor != null) {
+				this.processor.closeProcessing();
 			}
 			
 			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
@@ -301,13 +295,7 @@
 			}
 			
 			for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
-				try {
-					connectorRequest.fullyCloseSource();
-				} catch (MetaMatrixComponentException e) {
-					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-						LogManager.logDetail(LogConstants.CTX_DQP, e, e.getMessage());
-					}
-				}
+				connectorRequest.fullyCloseSource();
 		    }
 
 			this.resultsBuffer = null;
@@ -376,16 +364,16 @@
 		if (this.transactionContext != null && this.transactionContext.isInTransaction()) {
 			this.transactionState = TransactionState.ACTIVE;
 		}
+	    if (analysisRecord.recordQueryPlan()) {
+	        analysisRecord.setQueryPlan(processor.getProcessorPlan().getDescriptionProperties());
+	    }
 		Option option = originalCommand.getOption();
 		if (option != null && option.getPlanOnly()) {
 		    doneProducingBatches = true;
             resultsBuffer.close();
             this.cid = null;
+            this.processor = null;
 		}
-		
-	    if (analysisRecord.recordQueryPlan()) {
-	        analysisRecord.setQueryPlan(processor.getProcessorPlan().getDescriptionProperties());
-	    }
 	    this.returnsUpdateCount = request.returnsUpdateCount;
 		request = null;
 	}

Modified: trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -40,6 +40,7 @@
 import com.metamatrix.api.exception.query.QueryPlannerException;
 import com.metamatrix.api.exception.query.QueryResolverException;
 import com.metamatrix.api.exception.query.QueryValidatorException;
+import com.metamatrix.common.buffer.BufferManager;
 import com.metamatrix.common.types.DataTypeManager;
 import com.metamatrix.core.MetaMatrixRuntimeException;
 import com.metamatrix.query.analysis.AnalysisRecord;
@@ -3357,11 +3358,11 @@
 
         FakeMetadataFacade metadata = FakeMetadataFactory.example1();
         FakeMetadataObject g1 = metadata.getStore().findObject("pm1.g1", FakeMetadataObject.GROUP); //$NON-NLS-1$
-        g1.putProperty(FakeMetadataObject.Props.CARDINALITY, new Integer(RuleChooseDependent.DEFAULT_INDEPENDENT_CARDINALITY + 500));
+        g1.putProperty(FakeMetadataObject.Props.CARDINALITY, new Integer(RuleChooseDependent.DEFAULT_INDEPENDENT_CARDINALITY + BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE / 4));
         FakeMetadataObject g2 = metadata.getStore().findObject("pm1.g2", FakeMetadataObject.GROUP); //$NON-NLS-1$
-        g2.putProperty(FakeMetadataObject.Props.CARDINALITY, new Integer(RuleChooseDependent.DEFAULT_INDEPENDENT_CARDINALITY + 1000));
+        g2.putProperty(FakeMetadataObject.Props.CARDINALITY, new Integer(RuleChooseDependent.DEFAULT_INDEPENDENT_CARDINALITY + BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE));
         FakeMetadataObject g3 = metadata.getStore().findObject("pm1.g3", FakeMetadataObject.GROUP); //$NON-NLS-1$
-        g3.putProperty(FakeMetadataObject.Props.CARDINALITY, new Integer(RuleChooseDependent.DEFAULT_INDEPENDENT_CARDINALITY + 1000));
+        g3.putProperty(FakeMetadataObject.Props.CARDINALITY, new Integer(RuleChooseDependent.DEFAULT_INDEPENDENT_CARDINALITY + BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE));
     
         ProcessorPlan plan = helpPlan(sql, metadata,  
             null, capFinder,

Modified: trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestPartitionedJoinPlanning.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestPartitionedJoinPlanning.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestPartitionedJoinPlanning.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -27,6 +27,7 @@
 
 import org.junit.Test;
 
+import com.metamatrix.common.buffer.BufferManager;
 import com.metamatrix.query.optimizer.capabilities.BasicSourceCapabilities;
 import com.metamatrix.query.optimizer.capabilities.FakeCapabilitiesFinder;
 import com.metamatrix.query.optimizer.capabilities.SourceCapabilities.Capability;
@@ -47,13 +48,14 @@
         caps.setCapabilitySupport(Capability.CRITERIA_COMPARE_EQ, true);
         caps.setCapabilitySupport(Capability.QUERY_FROM_GROUP_ALIAS, true);
         caps.setCapabilitySupport(Capability.QUERY_ORDERBY, true);
+        caps.setSourceProperty(Capability.MAX_IN_CRITERIA_SIZE, 100);
         capFinder.addCapabilities("pm1", caps); //$NON-NLS-1$
 
         FakeMetadataFacade metadata = FakeMetadataFactory.example1();
         FakeMetadataObject g1 = metadata.getStore().findObject("pm1.g1", FakeMetadataObject.GROUP); //$NON-NLS-1$
-        g1.putProperty(FakeMetadataObject.Props.CARDINALITY, 600);
+        g1.putProperty(FakeMetadataObject.Props.CARDINALITY, BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE);
         FakeMetadataObject g2 = metadata.getStore().findObject("pm1.g2", FakeMetadataObject.GROUP); //$NON-NLS-1$
-        g2.putProperty(FakeMetadataObject.Props.CARDINALITY, 3000);
+        g2.putProperty(FakeMetadataObject.Props.CARDINALITY, BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE * 16);
     
         ProcessorPlan plan = helpPlan(sql, metadata,  
             null, capFinder,

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -105,11 +105,7 @@
 		return null;
 	}
 
-	public void closeSource()
-		throws MetaMatrixComponentException {
-        if (exceptionOnClose) {
-            throw new FakeComponentException();
-        }
+	public void closeSource() {
 	}
     
     public void setBlockOnce(){

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -230,7 +230,7 @@
         private FakeTupleSource(int numCommands) {
             this.numCommands = numCommands;
         }
-        public void closeSource() throws MetaMatrixComponentException {}
+        public void closeSource() {}
         public List getSchema() {return null;}
         public List nextTuple() throws MetaMatrixComponentException {
             if (first) {

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -36,7 +36,6 @@
 import com.metamatrix.common.buffer.TupleSource;
 import com.metamatrix.query.processor.FakeTupleSource;
 import com.metamatrix.query.processor.ProcessorDataManager;
-import com.metamatrix.query.processor.FakeTupleSource.FakeComponentException;
 import com.metamatrix.query.sql.lang.BatchedUpdateCommand;
 import com.metamatrix.query.sql.lang.Command;
 import com.metamatrix.query.sql.lang.Insert;
@@ -120,24 +119,6 @@
         helpTestNextBatch(20, true, false, false);
     }
     
-    public void testNextBatch_ExceptionOnClose() throws Exception {
-        try {
-            helpTestNextBatch(100, true, false, true);
-            fail("expected exception"); //$NON-NLS-1$
-        } catch (FakeComponentException e) {
-            //expected
-        }
-    }
-    
-    public void testNextBatch_ExceptionOnClose1() throws Exception {
-        try {
-            helpTestNextBatch(100, false, false, true);
-            fail("expected exception"); //$NON-NLS-1$
-        } catch (FakeComponentException e) {
-            //expected
-        }
-    }
-
     private static final class FakePDM implements ProcessorDataManager {
         private int expectedBatchSize;
         private int callCount = 0;
@@ -205,7 +186,7 @@
         private FakeDataTupleSource(int rows) {
             this.rows = rows;
         }
-        public void closeSource() throws MetaMatrixComponentException {}
+        public void closeSource() {}
         public List getSchema() {return null;}
         public List nextTuple() throws MetaMatrixComponentException {
             if (currentRow % 100 == 0 && block) {

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/xml/TestXMLPlanningEnhancements.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/xml/TestXMLPlanningEnhancements.java	2010-02-16 20:34:46 UTC (rev 1832)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/xml/TestXMLPlanningEnhancements.java	2010-02-16 21:04:26 UTC (rev 1833)
@@ -28,6 +28,7 @@
 
 import junit.framework.TestCase;
 
+import com.metamatrix.common.buffer.BufferManager;
 import com.metamatrix.common.types.DataTypeManager;
 import com.metamatrix.dqp.message.ParameterInfo;
 import com.metamatrix.query.mapping.relational.QueryNode;
@@ -36,6 +37,7 @@
 import com.metamatrix.query.mapping.xml.MappingElement;
 import com.metamatrix.query.mapping.xml.MappingNode;
 import com.metamatrix.query.optimizer.TestOptimizer;
+import com.metamatrix.query.optimizer.relational.rules.RuleChooseDependent;
 import com.metamatrix.query.processor.FakeDataManager;
 import com.metamatrix.query.processor.ProcessorPlan;
 import com.metamatrix.query.unittest.FakeMetadataFacade;
@@ -483,8 +485,8 @@
         FakeMetadataObject suppliers = metadata.getStore().findObject("stock.suppliers", FakeMetadataObject.GROUP); //$NON-NLS-1$
 
         // supply the costing information for OrdersC
-        orders.putProperty(FakeMetadataObject.Props.CARDINALITY, new Integer(1000));
-        suppliers.putProperty(FakeMetadataObject.Props.CARDINALITY, new Integer(10));        
+        orders.putProperty(FakeMetadataObject.Props.CARDINALITY, BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE - 1);
+        suppliers.putProperty(FakeMetadataObject.Props.CARDINALITY, RuleChooseDependent.DEFAULT_INDEPENDENT_CARDINALITY);        
 
         String expectedDoc = TestXMLProcessor.readFile("TestXMLProcessor-FullSuppliers.xml"); //$NON-NLS-1$
         



More information about the teiid-commits mailing list