[teiid-commits] teiid SVN: r1777 - in trunk: client/src/main/java/com/metamatrix/dqp/message and 17 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Sun Jan 24 22:26:23 EST 2010


Author: shawkins
Date: 2010-01-24 22:26:22 -0500 (Sun, 24 Jan 2010)
New Revision: 1777

Added:
   trunk/engine/src/main/java/com/metamatrix/common/buffer/BatchManager.java
Removed:
   trunk/engine/src/main/java/com/metamatrix/common/buffer/ManagedBatch.java
Modified:
   trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java
   trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMDataSource.java
   trunk/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java
   trunk/client/src/main/java/com/metamatrix/dqp/message/RequestMessage.java
   trunk/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java
   trunk/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/JDBCBaseExecution.java
   trunk/connectors/connector-jdbc/src/main/resources/connector-jdbc.xml
   trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBatch.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/FileStorageManager.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryStorageManager.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/CollectionTupleSource.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java
   trunk/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java
   trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java
   trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
Log:
TEIID-913 introducing a forward only buffermanager and taking steps to reduce memory consumption, including a canonical value map and streaming of batches to and from disk.  

Modified: trunk/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -151,7 +151,7 @@
             readIsNullData(in, isNull);
             for (int i = 0; i < batch.length; i++) {
                 if (!isNullObject(isNull, i)) {
-                    batch[i].set(col, readObject(in));
+                    batch[i].set(col, DataTypeManager.getCanonicalValue(readObject(in)));
                 }
             }
         }

Modified: trunk/client/src/main/java/com/metamatrix/dqp/message/RequestMessage.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/dqp/message/RequestMessage.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/client/src/main/java/com/metamatrix/dqp/message/RequestMessage.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -39,7 +39,7 @@
 
     static final long serialVersionUID = 2258063872049251854L;
     
-    public static final int DEFAULT_FETCH_SIZE = 2000;
+    public static final int DEFAULT_FETCH_SIZE = 2048;
 
     private String[] commands;
     private boolean isBatchedUpdate;

Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -729,12 +729,6 @@
         String partial = getExecutionProperty(ExecutionProperties.PROP_PARTIAL_RESULTS_MODE);
         res.setPartialResults(Boolean.valueOf(partial).booleanValue());
 
-        // Get fetch size
-        res.setFetchSize(fetchSize);
-
-        // Get cursor type
-        res.setCursorType(this.resultSetType);
-
         // Get xml validation mode
         String validate = getExecutionProperty(ExecutionProperties.PROP_XML_VALIDATION);
         if(validate == null) {
@@ -846,8 +840,8 @@
             sqlOptions.toUpperCase().indexOf(ExecutionProperties.SQL_OPTION_SHOWPLAN.toUpperCase()) >= 0) {
             reqMsg.setShowPlan(true);
         }
-
-        reqMsg.setFetchSize(getFetchSize());
+        reqMsg.setCursorType(this.resultSetType);
+        reqMsg.setFetchSize(this.fetchSize);
         reqMsg.setStyleSheet(this.styleSheet);
         reqMsg.setRowLimit(this.maxRows);
 

Modified: trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMDataSource.java
===================================================================
--- trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMDataSource.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMDataSource.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -613,7 +613,7 @@
     	ds.setServerName("hostName"); //$NON-NLS-1$
     	ds.setDatabaseName("vdbName"); //$NON-NLS-1$
     	ds.setPortNumber(1);
-    	assertEquals("jdbc:teiid:vdbName at mm://hostname:1;fetchSize=2000;ApplicationName=JDBC;serverURL=mm://hostname:1;a=b;VirtualDatabaseName=vdbName;foo=bar", ds.buildURL()); //$NON-NLS-1$
+    	assertEquals("jdbc:teiid:vdbName at mm://hostname:1;fetchSize=2048;ApplicationName=JDBC;serverURL=mm://hostname:1;a=b;VirtualDatabaseName=vdbName;foo=bar", ds.buildURL()); //$NON-NLS-1$
     }
 
     public void testInvalidDataSource() {

Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -23,6 +23,7 @@
 package com.metamatrix.common.types;
 
 import java.io.IOException;
+import java.lang.ref.WeakReference;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.sql.Blob;
@@ -42,6 +43,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.WeakHashMap;
 
 import javax.xml.transform.Source;
 
@@ -61,6 +63,7 @@
 import com.metamatrix.common.types.basic.NumberToLongTransform;
 import com.metamatrix.common.types.basic.NumberToShortTransform;
 import com.metamatrix.common.types.basic.ObjectToAnyTransform;
+import com.metamatrix.common.util.PropertiesUtils;
 import com.metamatrix.core.CorePlugin;
 import com.metamatrix.core.ErrorMessageKeys;
 import com.metamatrix.core.MetaMatrixRuntimeException;
@@ -81,6 +84,11 @@
  * </p>
  */
 public class DataTypeManager {
+	
+	private static final int MAX_VALUE_MAP_SIZE = 10000;
+	private static boolean USE_VALUE_CACHE = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.useValueCache", true); //$NON-NLS-1$
+	
+	private static Map<Class<?>, Map<Object, WeakReference<Object>>> valueMaps = new HashMap<Class<?>, Map<Object, WeakReference<Object>>>(); 
 
 	public static final int MAX_STRING_LENGTH = 4000;
 
@@ -418,19 +426,33 @@
 	 */
 	static void loadDataTypes() {
 		DataTypeManager.addDataType(DefaultDataTypes.BOOLEAN, DefaultDataClasses.BOOLEAN);
+		valueMaps.put(DefaultDataClasses.BOOLEAN, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.BYTE, DefaultDataClasses.BYTE);
+		valueMaps.put(DefaultDataClasses.BYTE, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.SHORT,	DefaultDataClasses.SHORT);
+		valueMaps.put(DefaultDataClasses.SHORT, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.CHAR, DefaultDataClasses.CHAR);
+		valueMaps.put(DefaultDataClasses.CHAR, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.INTEGER, DefaultDataClasses.INTEGER);
+		valueMaps.put(DefaultDataClasses.INTEGER, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.LONG, DefaultDataClasses.LONG);
+		valueMaps.put(DefaultDataClasses.LONG, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.BIG_INTEGER, DefaultDataClasses.BIG_INTEGER);
+		valueMaps.put(DefaultDataClasses.BIG_INTEGER, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.FLOAT, DefaultDataClasses.FLOAT);
+		valueMaps.put(DefaultDataClasses.FLOAT, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.DOUBLE, DefaultDataClasses.DOUBLE);
+		valueMaps.put(DefaultDataClasses.DOUBLE, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.BIG_DECIMAL, DefaultDataClasses.BIG_DECIMAL);
+		valueMaps.put(DefaultDataClasses.BIG_DECIMAL, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.DATE, DefaultDataClasses.DATE);
+		valueMaps.put(DefaultDataClasses.DATE, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.TIME, DefaultDataClasses.TIME);
+		valueMaps.put(DefaultDataClasses.TIME, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.TIMESTAMP, DefaultDataClasses.TIMESTAMP);
+		valueMaps.put(DefaultDataClasses.TIMESTAMP, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.STRING, DefaultDataClasses.STRING);
+		valueMaps.put(DefaultDataClasses.STRING, Collections.synchronizedMap(new WeakHashMap<Object, WeakReference<Object>>()));
 		DataTypeManager.addDataType(DefaultDataTypes.CLOB, DefaultDataClasses.CLOB);
 		DataTypeManager.addDataType(DefaultDataTypes.XML, DefaultDataClasses.XML);
 		DataTypeManager.addDataType(DefaultDataTypes.OBJECT, DefaultDataClasses.OBJECT);
@@ -686,7 +708,8 @@
             Object[] params = new Object[] { sourceType, targetClass, value};
             throw new TransformationException(CorePlugin.Util.getString("ObjectToAnyTransform.Invalid_value", params)); //$NON-NLS-1$
 		}
-		return (T) transform.transform(value);
+		T result = (T) transform.transform(value);
+		return getCanonicalValue(result);
 	}
 	
     public static boolean isNonComparable(String type) {
@@ -699,4 +722,29 @@
     public static <S> void addSourceTransform(Class<S> sourceClass, SourceTransform<S, ?> transform) {
     	sourceConverters.put(sourceClass, transform);
     }
+    
+    @SuppressWarnings("unchecked")
+	public static <T> T getCanonicalValue(T value) {
+    	if (USE_VALUE_CACHE) {
+    		if (value == null) {
+    			return null;
+    		}
+	    	Map<Object, WeakReference<Object>> valueMap = valueMaps.get(value.getClass());
+	    	if (valueMap == null) {
+	    		return value;
+	    	}
+			WeakReference<Object> valueReference = valueMap.get(value);
+			Object canonicalValue = null;
+			if (valueReference != null) {
+				canonicalValue = valueReference.get();
+			}
+			if (canonicalValue != null) {
+				return (T)canonicalValue;
+			}
+			if (valueMap.size() <= MAX_VALUE_MAP_SIZE) {
+				valueMap.put(value, new WeakReference<Object>(value));
+			}
+    	}
+		return value;
+    }
 }

Modified: trunk/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/JDBCBaseExecution.java
===================================================================
--- trunk/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/JDBCBaseExecution.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/JDBCBaseExecution.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -80,7 +80,10 @@
         this.logger = logger;
         this.context = context;
 
-        fetchSize = PropertiesUtils.getIntProperty(props, JDBCPropertyNames.FETCH_SIZE, context.getBatchSize());
+        fetchSize = PropertiesUtils.getIntProperty(props, JDBCPropertyNames.FETCH_SIZE, 0);
+        if (fetchSize == 0) {
+        	fetchSize = context.getBatchSize();
+        }
         maxResultRows = PropertiesUtils.getIntProperty(props, ConnectorPropertyNames.MAX_RESULT_ROWS, -1);
         //if the connector work needs to throw an excpetion, set the size plus 1
         if (maxResultRows > 0 && PropertiesUtils.getBooleanProperty(props, ConnectorPropertyNames.EXCEPTION_ON_MAX_ROWS, false)) {

Modified: trunk/connectors/connector-jdbc/src/main/resources/connector-jdbc.xml
===================================================================
--- trunk/connectors/connector-jdbc/src/main/resources/connector-jdbc.xml	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/connectors/connector-jdbc/src/main/resources/connector-jdbc.xml	2010-01-25 03:26:22 UTC (rev 1777)
@@ -17,6 +17,7 @@
             <PropertyDefinition Name="ConnectionSource" DisplayName="Connection Source Class" ShortDescription="Driver, DataSource, or XADataSource class name" IsRequired="true"   />
             <PropertyDefinition Name="TrimStrings" DisplayName="Trim string flag" ShortDescription="Right Trim fixed character types returned as Strings - note that the native type must be char or nchar and the source must support the rtrim function." DefaultValue="false" PropertyType="Boolean" IsExpert="true" />
             <PropertyDefinition Name="UseCommentsInSourceQuery" DisplayName="Use informational comments in Source Queries" ShortDescription="This will embed /*comment*/ style comment with session/request id in source SQL query for informational purposes" DefaultValue="false" PropertyType="Boolean"  IsExpert="true"/>
+            <PropertyDefinition Name="FetchSize" DisplayName="Statement Fetch Size" ShortDescription="Statement Fetch Size" DefaultValue="0" PropertyType="Integer"  IsExpert="true"/>
         </ComponentType>
         <ComponentType Name="Oracle Connector" ComponentTypeCode="2" Deployable="true" Deprecated="false" Monitorable="false" SuperComponentType="JDBC Connector" ParentComponentType="Connectors" LastChangedBy="ConfigurationStartup" CreatedBy="ConfigurationStartup">
             <PropertyDefinition Name="ConnectionSource" DisplayName="Connection Source Class" ShortDescription="Driver, DataSource, or XADataSource class name" DefaultValue="oracle.jdbc.driver.OracleDriver" IsRequired="true"   />

Added: trunk/engine/src/main/java/com/metamatrix/common/buffer/BatchManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BatchManager.java	                        (rev 0)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BatchManager.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.common.buffer;
+
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+
+public interface BatchManager {
+	
+	public interface ManagedBatch {
+		
+		TupleBatch getBatch(boolean cache, String[] types) throws MetaMatrixComponentException;
+		
+		void remove();
+		
+	}
+	
+	ManagedBatch createManagedBatch(TupleBatch batch) throws MetaMatrixComponentException;
+	
+	void remove();
+	
+}


Property changes on: trunk/engine/src/main/java/com/metamatrix/common/buffer/BatchManager.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -48,48 +48,17 @@
 		FINAL
 	}
 
-	/**
-	 * Optional property - the max size of a batch sent between connector and query service.
-	 * Making batches larger reduces communication overhead between connector and query service
-	 * but increases the granularity of memory management on those batches.  This value should 
-	 * be a positive integer and defaults to 1000.
-	 */
-	public static final String CONNECTOR_BATCH_SIZE = "metamatrix.buffer.connectorBatchSize"; //$NON-NLS-1$
-	/**
-	 * Optional property - the max size of a batch sent internally within the query processor.
-	 * In general, these batches should be smaller than the connector batch size as there are 
-	 * no communication costs with these batches.  Smaller batches typically allow a user to 
-	 * get their first results quicker and allow fine-grained buffer management on intermediate
-	 * results.  This value should be a positive integer and defaults to 100.
-	 */
-	public static final String PROCESSOR_BATCH_SIZE = "metamatrix.buffer.processorBatchSize"; //$NON-NLS-1$
-	/**
-	 * Optional property - this value specifies the location to store temporary buffers to
-	 * large to fit in memory.  Temporary buffer files will be created and destroyed in this
-	 * directory.  This value should be a string specifying an absolute directory path.
-	 */
-	public static final String BUFFER_STORAGE_DIRECTORY = "metamatrix.buffer.storageDirectory"; //$NON-NLS-1$
-	/**
-	 * Optional property - this values specifies how many open file descriptors should be cached
-	 * in the storage directory.  Increasing this value in heavy load may improve performance
-	 * but will use more file descriptors, which are a limited system resource.
-	 */
-	public static final String MAX_OPEN_FILES = "metamatrix.buffer.maxOpenFiles"; //$NON-NLS-1$
-	/**
-	 * Optional property - this values specifies the maximum size in MegaBytes that a buffer file can reach.
-	 * The default is 2048 MB (i.e. 2GB).
-	 */
-	public static final String MAX_FILE_SIZE = "metamatrix.buffer.maxFileSize"; //$NON-NLS-1$
-	/**
-	 * Optional property - the max number of batches to process at once in algorithms such as sorting.
-	 */
-	public static final String MAX_PROCESSING_BATCHES = "metamatrix.buffer.maxProcessingBatches"; //$NON-NLS-1$
-	
 	public static int DEFAULT_CONNECTOR_BATCH_SIZE = 2048;
 	public static int DEFAULT_PROCESSOR_BATCH_SIZE = 1024;
 	public static int DEFAULT_MAX_PROCESSING_BATCHES = 8;
+	
+	/**
+	 * The BufferManager may maintain at least this many batch references in memory.
+	 * 
+	 * Up to 2x this value may be held by soft references.
+	 */
 	public static int DEFAULT_RESERVE_BUFFERS = 64;
-
+	
     /**
      * Get the batch size to use during query processing.  
      * @return Batch size (# of rows)

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -59,13 +59,22 @@
 
 		@Override
 		public void write(byte[] b, int off, int len) throws IOException {
-			if (count + len <= buffer.length) {
-				System.arraycopy(b, off, buffer, count, len);
-				count += len;
+			if (len > buffer.length) {
+				flushBuffer();
+				writeDirect(b, off, len);
 				return;
 			}
+			int bufferedLength = Math.min(len, buffer.length - count);
+			if (count < buffer.length) {
+				System.arraycopy(b, off, buffer, count, bufferedLength);
+				count += bufferedLength;
+				if (bufferedLength == len) {
+					return;
+				}
+			}
 			flushBuffer();
-			writeDirect(b, off, len);
+			System.arraycopy(b, off + bufferedLength, buffer, count, len - bufferedLength);
+			count += len - bufferedLength;
 		}
 
 		private void writeDirect(byte[] b, int off, int len) throws IOException {
@@ -77,7 +86,7 @@
 			}
 		}
 
-		private void flushBuffer() throws IOException {
+		public void flushBuffer() throws IOException {
 			if (count > 0) {
 				writeDirect(buffer, 0, count);
 				count = 0;
@@ -119,6 +128,9 @@
 		}
 	}
 	
+	private boolean removed;
+	private long len;
+	
 	public void setCleanupReference(Object o) {
 		REFERENCES.add(new CleanupReference(o, this));
 		for (int i = 0; i < 10; i++) {
@@ -131,8 +143,10 @@
 		}
 	}
 	
-	private boolean removed;
-	
+	public synchronized long getLength() {
+		return len;
+	}
+		
 	public int read(long fileOffset, byte[] b, int offSet, int length)
 			throws MetaMatrixComponentException {
 		if (removed) {
@@ -155,18 +169,21 @@
     	} while (n < length);
 	}
 	
-	public long write(byte[] bytes) throws MetaMatrixComponentException {
-		return write(bytes, 0, bytes.length);
+	public void write(byte[] bytes) throws MetaMatrixComponentException {
+		write(bytes, 0, bytes.length);
 	}
 
-	public long write(byte[] bytes, int offset, int length) throws MetaMatrixComponentException {
+	public synchronized long write(byte[] bytes, int offset, int length) throws MetaMatrixComponentException {
 		if (removed) {
 			throw new MetaMatrixComponentException("already removed"); //$NON-NLS-1$
 		}
-		return writeDirect(bytes, offset, length);
+		writeDirect(bytes, offset, length);
+		long result = len;
+		len += length;		
+		return result;
 	}
 
-	protected abstract long writeDirect(byte[] bytes, int offset, int length) throws MetaMatrixComponentException;
+	protected abstract void writeDirect(byte[] bytes, int offset, int length) throws MetaMatrixComponentException;
 
 	public void remove() {
 		if (!this.removed) {
@@ -177,9 +194,9 @@
 	
 	protected abstract void removeDirect();
 	
-	public InputStream createInputStream() {
+	public InputStream createInputStream(final long start) {
 		return new InputStream() {
-			private long offset;
+			private long offset = start;
 			
 			@Override
 			public int read() throws IOException {

Deleted: trunk/engine/src/main/java/com/metamatrix/common/buffer/ManagedBatch.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/ManagedBatch.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/ManagedBatch.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -1,78 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.buffer;
-
-import java.lang.ref.SoftReference;
-
-class ManagedBatch {
-
-    private int beginRow;
-    private SoftReference<TupleBatch> batchReference;
-    private long offset;
-    private int length;
-    
-    /**
-     * Constructor for ManagedBatch.
-     */
-    public ManagedBatch(TupleBatch batch) {
-        this.beginRow = batch.getBeginRow();
-        this.batchReference = new SoftReference<TupleBatch>(batch);
-    }
-    
-    /**
-     * Get the begin row, must be >= 1
-     * @return Begin row
-     */
-    public int getBeginRow() {
-        return this.beginRow;
-    }
-    
-    public String toString() {
-        return "ManagedBatch[" + beginRow + "]"; //$NON-NLS-1$ //$NON-NLS-2$
-    }
-    
-    public TupleBatch getBatch() {
-		return this.batchReference.get();
-	}
-    
-    public void setBatchReference(TupleBatch batch) {
-		this.batchReference = new SoftReference<TupleBatch>(batch);
-	}
-    
-    public int getLength() {
-		return length;
-	}
-    
-    public long getOffset() {
-		return offset;
-	}
-    
-    public void setLength(int length) {
-		this.length = length;
-	}
-    
-    public void setOffset(long offset) {
-		this.offset = offset;
-	}
-        
-}

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBatch.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBatch.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBatch.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -26,6 +26,7 @@
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.io.Serializable;
 import java.util.List;
 
 import com.metamatrix.common.batch.BatchSerializer;
@@ -40,8 +41,10 @@
  * This object is immutable and Serializable;
  */
 public class TupleBatch implements Externalizable {
-    
-    private int rowOffset;    
+	
+	private static final long serialVersionUID = 6304443387337336957L;
+	
+	private int rowOffset;    
     private List[] tuples;
     
     // Optional state
@@ -140,6 +143,10 @@
         this.terminationFlag = terminationFlag;    
     }
     
+    public String[] getDataTypes() {
+		return types;
+	}
+    
     public void setDataTypes(String[] types) {
         this.types = types;
     }

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -22,35 +22,29 @@
 
 package com.metamatrix.common.buffer;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.ref.SoftReference;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.common.types.DataTypeManager;
 import com.metamatrix.common.types.Streamable;
-import com.metamatrix.core.util.AccessibleByteArrayOutputStream;
 import com.metamatrix.core.util.Assertion;
 import com.metamatrix.dqp.DQPPlugin;
-import com.metamatrix.query.execution.QueryExecPlugin;
+import com.metamatrix.query.sql.symbol.Expression;
 
 public class TupleBuffer {
 	
 	class TupleSourceImpl implements IndexedTupleSource {
-	    private SoftReference<TupleBatch> currentBatch;
 	    private int currentRow = 1;
 	    private int mark = 1;
 		private List<?> currentTuple;
+		private TupleBatch batch;
 
 	    @Override
 	    public int getCurrentIndex() {
@@ -80,47 +74,30 @@
 
 		private List<?> getCurrentTuple() throws MetaMatrixComponentException,
 				BlockedException {
-			TupleBatch batch = getBatch();
-	        if(batch.getRowCount() == 0) {
-	            // Check if last
-                if(isFinal) {
-                	currentBatch = null;
-                    return null;
-                } 
-                throw BlockedException.INSTANCE;
-	        }
-
-	        return batch.getTuple(currentRow);
+			if (currentRow <= rowCount) {
+				if (forwardOnly) {
+					if (batch == null || currentRow > batch.getEndRow()) {
+						batch = getBatch(currentRow);
+					}
+					return batch.getTuple(currentRow);
+				} 
+				//TODO: determine if we should directly hold a soft reference here
+				return getRow(currentRow);
+			}
+			if(isFinal) {
+	            return null;
+	        } 
+	        throw BlockedException.INSTANCE;
 		}
 
 	    @Override
 	    public void closeSource()
 	    throws MetaMatrixComponentException{
-	    	currentBatch = null;
+	    	batch = null;
 	        mark = 1;
 	        reset();
 	    }
 	    
-	    // Retrieves the necessary batch based on the currentRow
-	    TupleBatch getBatch() throws MetaMatrixComponentException{
-	    	TupleBatch batch = null;
-	    	if (currentBatch != null) {
-	            batch = currentBatch.get();
-	        }
-	        if (batch != null) {
-	            if (currentRow <= batch.getEndRow() && currentRow >= batch.getBeginRow()) {
-	                return batch;
-	            }
-	            currentBatch = null;
-	        } 
-	        
-            batch = TupleBuffer.this.getBatch(currentRow);
-            if (batch != null) {
-            	currentBatch = new SoftReference<TupleBatch>(batch);
-            }
-	        return batch;
-	    }
-	    
 	    @Override
 		public boolean hasNext() throws MetaMatrixComponentException {
 	        if (this.currentTuple != null) {
@@ -149,34 +126,56 @@
 		        this.currentTuple = null;
 	        }
 	    }
+	    
+	    @Override
+	    public int available() {
+	    	return 0;
+	    }
 	}
+	
+    /**
+     * Gets the data type names for each of the input expressions, in order.
+     * @param expressions List of Expressions
+     * @return
+     * @since 4.2
+     */
+    public static String[] getTypeNames(List expressions) {
+    	if (expressions == null) {
+    		return null;
+    	}
+        String[] types = new String[expressions.size()];
+        for (ListIterator i = expressions.listIterator(); i.hasNext();) {
+            Expression expr = (Expression)i.next();
+            types[i.previousIndex()] = DataTypeManager.getDataTypeName(expr.getType());
+        }
+        return types;
+    }
 
 	private static final AtomicLong LOB_ID = new AtomicLong();
 	
 	//construction state
-	private StorageManager manager;
+	private BatchManager manager;
 	private String tupleSourceID;
 	private List<?> schema;
 	private String[] types;
 	private int batchSize;
 	
-	private FileStore store;
-
 	private int rowCount;
 	private boolean isFinal;
-    private TreeMap<Integer, ManagedBatch> batches = new TreeMap<Integer, ManagedBatch>();
+    private TreeMap<Integer, BatchManager.ManagedBatch> batches = new TreeMap<Integer, BatchManager.ManagedBatch>();
 	private ArrayList<List<?>> batchBuffer;
-	private AtomicInteger referenceCount = new AtomicInteger(1);
+	private boolean removed;
+	private boolean forwardOnly;
 
     //lob management
     private Map<String, Streamable<?>> lobReferences; //references to contained lobs
     private boolean lobs = true;
 	
-	public TupleBuffer(StorageManager manager, String id, List<?> schema, String[] types, int batchSize) {
+	public TupleBuffer(BatchManager manager, String id, List<?> schema, int batchSize) {
 		this.manager = manager;
 		this.tupleSourceID = id;
 		this.schema = schema;
-		this.types = types;
+		this.types = getTypeNames(schema);
 		this.batchSize = batchSize;
 		if (types != null) {
 			int i = 0;
@@ -215,33 +214,16 @@
 
 	void saveBatch(boolean finalBatch) throws MetaMatrixComponentException {
 		Assertion.assertTrue(!this.isRemoved());
-		if (batchBuffer == null || batchBuffer.isEmpty()) {
+		if (batchBuffer == null || batchBuffer.isEmpty() || batchBuffer.size() < Math.max(1, batchSize / 32)) {
 			return;
 		}
-		List<?> rows = batchBuffer==null?Collections.emptyList():batchBuffer;
-        TupleBatch writeBatch = new TupleBatch(rowCount - rows.size() + 1, rows);
+        TupleBatch writeBatch = new TupleBatch(rowCount - batchBuffer.size() + 1, batchBuffer);
         if (finalBatch) {
         	writeBatch.setTerminationFlag(true);
         }
-		ManagedBatch mbatch = new ManagedBatch(writeBatch);
-		if (this.store == null) {
-			this.store = this.manager.createFileStore(this.tupleSourceID);
-			this.store.setCleanupReference(this);
-		}
-		AccessibleByteArrayOutputStream baos = null;
-        try {
-            baos = new AccessibleByteArrayOutputStream(1024);
-            ObjectOutputStream oos = new ObjectOutputStream(baos);
-            writeBatch.setDataTypes(types);
-            writeBatch.writeExternal(oos);
-            oos.flush();
-            oos.close();
-        } catch(IOException e) {
-        	throw new MetaMatrixComponentException(e, QueryExecPlugin.Util.getString("FileStorageManager.batch_error")); //$NON-NLS-1$
-        }
-        mbatch.setLength(baos.getCount());
-		mbatch.setOffset(this.store.write(baos.getBuffer(), 0, baos.getCount()));
-		this.batches.put(mbatch.getBeginRow(), mbatch);
+        writeBatch.setDataTypes(types);
+		BatchManager.ManagedBatch mbatch = manager.createManagedBatch(writeBatch);
+		this.batches.put(writeBatch.getBeginRow(), mbatch);
         batchBuffer = null;
 	}
 	
@@ -252,7 +234,23 @@
 		}
 		this.isFinal = true;
 	}
+	
+	List<?> getRow(int row) throws MetaMatrixComponentException {
+		if (this.batchBuffer != null && row > rowCount - this.batchBuffer.size()) {
+			return this.batchBuffer.get(row - rowCount + this.batchBuffer.size() - 1);
+		}
+		TupleBatch batch = getBatch(row);
+		return batch.getTuple(row);
+	}
 
+	/**
+	 * Get the batch containing the given row.
+	 * NOTE: the returned batch may be empty or may begin with a row other
+	 * than the one specified.
+	 * @param row
+	 * @return
+	 * @throws MetaMatrixComponentException
+	 */
 	public TupleBatch getBatch(int row) throws MetaMatrixComponentException {
 		if (row > rowCount) {
 			TupleBatch batch = new TupleBatch(rowCount + 1, new List[] {});
@@ -262,58 +260,44 @@
 			return batch;
 		}
 		if (this.batchBuffer != null && row > rowCount - this.batchBuffer.size()) {
-			return new TupleBatch(rowCount - this.batchBuffer.size() + 1, batchBuffer);
+			TupleBatch result = new TupleBatch(rowCount - this.batchBuffer.size() + 1, batchBuffer);
+			if (forwardOnly) {
+				this.batchBuffer = null;
+			}
+			return result;
 		}
 		if (this.batchBuffer != null && !this.batchBuffer.isEmpty()) {
+			//this is just a sanity check to ensure we're not holding too many
+			//hard references to batches.
 			saveBatch(isFinal);
 		}
-		Map.Entry<Integer, ManagedBatch> entry = batches.floorEntry(row);
+		Map.Entry<Integer, BatchManager.ManagedBatch> entry = batches.floorEntry(row);
 		Assertion.isNotNull(entry);
-		ManagedBatch batch = entry.getValue();
-    	TupleBatch result = batch.getBatch();
-    	if (result != null) {
-    		return result;
+		BatchManager.ManagedBatch batch = entry.getValue();
+    	TupleBatch result = batch.getBatch(!forwardOnly, types);
+    	if (lobs && result.getDataTypes() == null) {
+	        correctLobReferences(result.getAllTuples());
     	}
-        try {
-            byte[] bytes = new byte[batch.getLength()];
-            this.store.readFully(batch.getOffset(), bytes, 0, bytes.length);
-            ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-            ObjectInputStream ois = new ObjectInputStream(bais);
-
-            result = new TupleBatch();
-            result.setDataTypes(types);
-            result.readExternal(ois);
-        } catch(IOException e) {
-        	throw new MetaMatrixComponentException(e, QueryExecPlugin.Util.getString("FileStoreageManager.error_reading", tupleSourceID)); //$NON-NLS-1$
-        } catch (ClassNotFoundException e) {
-        	throw new MetaMatrixComponentException(e, QueryExecPlugin.Util.getString("FileStoreageManager.error_reading", tupleSourceID)); //$NON-NLS-1$
-        }
-		if (lobs) {
-			correctLobReferences(result.getAllTuples());
+    	result.setDataTypes(types);
+    	if (forwardOnly) {
+			batches.remove(entry.getKey());
 		}
-		batch.setBatchReference(result);
 		return result;
 	}
 	
 	public void remove() {
-		int count = this.referenceCount.getAndDecrement();
-		if (count == 0) {
-			if (this.store != null) {
-				this.store.remove();
-				this.store = null;
-			}
+		if (!removed) {
+			this.manager.remove();
 			if (this.batchBuffer != null) {
 				this.batchBuffer = null;
 			}
+			for (BatchManager.ManagedBatch batch : this.batches.values()) {
+				batch.remove();
+			}
 			this.batches.clear();
 		}
 	}
 	
-	public boolean addReference() {
-		int count = this.referenceCount.addAndGet(1);
-		return count > 1;
-	}
-	
 	public int getRowCount() {
 		return rowCount;
 	}
@@ -383,6 +367,10 @@
         }
     }
     
+    public void setForwardOnly(boolean forwardOnly) {
+		this.forwardOnly = forwardOnly;
+	}
+    
 	/**
 	 * Create a new iterator for this buffer
 	 * @return
@@ -393,11 +381,11 @@
 	
 	@Override
 	public String toString() {
-		return this.tupleSourceID.toString();
+		return this.tupleSourceID;
 	}
 	
 	public boolean isRemoved() {
-		return this.referenceCount.get() <= 0;
+		return removed;
 	}
 	
 }

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -60,5 +60,11 @@
      */    
 	void closeSource()
 		throws MetaMatrixComponentException;
+	
+	/**
+	 * Returns an estimate of the number of rows that can be read without blocking.
+	 * @return
+	 */
+	int available();
 
 }

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -22,9 +22,20 @@
 
 package com.metamatrix.common.buffer.impl;
 
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.ref.Reference;
+import java.lang.ref.SoftReference;
+import java.lang.ref.WeakReference;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -32,10 +43,14 @@
 import javax.xml.transform.Source;
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.common.buffer.BatchManager;
 import com.metamatrix.common.buffer.BufferManager;
 import com.metamatrix.common.buffer.FileStore;
 import com.metamatrix.common.buffer.StorageManager;
+import com.metamatrix.common.buffer.TupleBatch;
 import com.metamatrix.common.buffer.TupleBuffer;
+import com.metamatrix.common.buffer.BatchManager.ManagedBatch;
+import com.metamatrix.common.buffer.FileStore.FileStoreOutputStream;
 import com.metamatrix.common.log.LogManager;
 import com.metamatrix.common.types.DataTypeManager;
 import com.metamatrix.common.types.InputStreamFactory;
@@ -49,34 +64,185 @@
 import com.metamatrix.core.log.MessageLevel;
 import com.metamatrix.core.util.Assertion;
 import com.metamatrix.dqp.util.LogConstants;
+import com.metamatrix.query.execution.QueryExecPlugin;
 import com.metamatrix.query.processor.xml.XMLUtil;
-import com.metamatrix.query.sql.symbol.Expression;
 
 /**
  * <p>Default implementation of BufferManager.</p>
  * Responsible for creating/tracking TupleBuffers and providing access to the StorageManager
  */
 public class BufferManagerImpl implements BufferManager, StorageManager {
+	
+	private static final int IO_BUFFER_SIZE = 1 << 14;
+	
+	private final class ManagedBatchImpl implements ManagedBatch {
+		final private String id;
+		final private FileStore store;
+		
+		private long offset = -1;
+		private boolean persistent;
+		private volatile TupleBatch pBatch;
+		private Reference<TupleBatch> batchReference;
+		
+		public ManagedBatchImpl(String id, FileStore store, TupleBatch batch) throws MetaMatrixComponentException {
+            LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", batchAdded.incrementAndGet()); //$NON-NLS-1$
+			this.id = id;
+			this.store = store;
+			this.pBatch = batch;
+			if (batch.getBeginRow() == 1) {
+				activeBatches.add(this);
+			} else {
+				this.persist(false);
+			}
+			persistBatchReferences();
+		}
 
+		@Override
+		public TupleBatch getBatch(boolean cache, String[] types) throws MetaMatrixComponentException {
+			readAttempts.getAndIncrement();
+			synchronized (this) {
+				if (this.batchReference != null && this.pBatch == null) {
+					TupleBatch result = this.batchReference.get();
+					if (result != null) {
+						if (!cache) {
+							softCache.remove(this);
+							this.batchReference.clear();
+						} 
+						return result;
+					}
+				}
+
+				TupleBatch batch = this.pBatch;
+				if (batch != null){
+					activeBatches.remove(this);
+					if (cache) {
+						activeBatches.add(this);
+					} 
+					return batch;
+				}
+			}
+			persistBatchReferences();
+            LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reading batch from disk", readCount.incrementAndGet()); //$NON-NLS-1$
+			synchronized (this) {
+				//Resurrect from disk
+				try {
+		            ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(store.createInputStream(this.offset), IO_BUFFER_SIZE));
+		            TupleBatch batch = new TupleBatch();
+		            batch.setDataTypes(types);
+		            batch.readExternal(ois);
+			        batch.setDataTypes(null);
+			        if (cache) {
+			        	this.pBatch = batch;
+			        	activeBatches.add(this);
+			        }
+					return batch;
+		        } catch(IOException e) {
+		        	throw new MetaMatrixComponentException(e, QueryExecPlugin.Util.getString("FileStoreageManager.error_reading", id)); //$NON-NLS-1$
+		        } catch (ClassNotFoundException e) {
+		        	throw new MetaMatrixComponentException(e, QueryExecPlugin.Util.getString("FileStoreageManager.error_reading", id)); //$NON-NLS-1$
+		        }
+			}
+		}
+
+		public void persistBatchReferences() throws MetaMatrixComponentException {
+			persistOneBatch(softCache, reserveBatches * 2, false);
+			persistOneBatch(activeBatches, reserveBatches, true);
+		}
+
+		private void persistOneBatch(Set<ManagedBatchImpl> set, int requiredSize, boolean createSoft) throws MetaMatrixComponentException {
+			ManagedBatchImpl mb = null;
+			synchronized (set) {
+				if (set.size() > requiredSize) {
+					Iterator<ManagedBatchImpl> iter = set.iterator();
+					mb = iter.next();
+					iter.remove();
+				}
+			}
+			try {
+				if (mb != null) {
+					mb.persist(createSoft);
+				}
+			} catch (MetaMatrixComponentException e) {
+				if (mb == this) {
+					throw e;
+				}
+				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read that batch later will result in an exception"); //$NON-NLS-1$
+			}
+		}
+
+		public synchronized void persist(boolean createSoft) throws MetaMatrixComponentException {
+			try {
+				TupleBatch batch = pBatch;
+				if (batch != null) {
+					if (!persistent) {
+						LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Writing batch to disk", writeCount.incrementAndGet()); //$NON-NLS-1$
+						synchronized (store) {
+							offset = store.getLength();
+							FileStoreOutputStream fsos = store.createOutputStream(IO_BUFFER_SIZE);
+				            ObjectOutputStream oos = new ObjectOutputStream(fsos);
+				            batch.writeExternal(oos);
+				            oos.flush();
+				            oos.close();
+				            fsos.flushBuffer();
+						}
+					}
+					if (createSoft) {
+						this.batchReference = new SoftReference<TupleBatch>(batch);
+						softCache.add(this);
+					} else {
+						this.batchReference = new WeakReference<TupleBatch>(batch);
+					}
+				}
+			} catch (IOException e) {
+				throw new MetaMatrixComponentException(e);
+			} finally {
+				persistent = true;
+				pBatch = null;
+			}
+		}
+
+		public void remove() {
+			activeBatches.remove(this);
+			softCache.remove(this);
+			pBatch = null;
+			if (batchReference != null) {
+				batchReference.clear();
+			}
+		}
+		
+		@Override
+		public String toString() {
+			return "ManagedBatch " + id + " " + pBatch; //$NON-NLS-1$ //$NON-NLS-2$
+		}
+	}
+	
 	// Configuration 
     private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
     private int processorBatchSize = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE;
     private int maxProcessingBatches = BufferManager.DEFAULT_MAX_PROCESSING_BATCHES;
     private int reserveBatches = BufferManager.DEFAULT_RESERVE_BUFFERS;
     private int maxReserveBatches = BufferManager.DEFAULT_RESERVE_BUFFERS;
+
     private ReentrantLock lock = new ReentrantLock(true);
     private Condition batchesFreed = lock.newCondition();
     
+	private Set<ManagedBatchImpl> activeBatches = Collections.synchronizedSet(new LinkedHashSet<ManagedBatchImpl>());
+	private Set<ManagedBatchImpl> softCache = Collections.synchronizedSet(new LinkedHashSet<ManagedBatchImpl>());
+    
     private StorageManager diskMgr;
 
-    private AtomicLong currentTuple = new AtomicLong(0);
-    
+    private AtomicLong currentTuple = new AtomicLong();
+    private AtomicInteger batchAdded = new AtomicInteger();
+    private AtomicInteger readCount = new AtomicInteger();
+	private AtomicInteger writeCount = new AtomicInteger();
+	private AtomicInteger readAttempts = new AtomicInteger();
+	
     public int getMaxProcessingBatches() {
 		return maxProcessingBatches;
 	}
     
     public void setMaxProcessingBatches(int maxProcessingBatches) {
-		this.maxProcessingBatches = maxProcessingBatches;
+		this.maxProcessingBatches = Math.max(2, maxProcessingBatches);
 	}
 
     /**
@@ -118,11 +284,33 @@
 	}
     
     @Override
-    public TupleBuffer createTupleBuffer(List elements, String groupName,
+    public TupleBuffer createTupleBuffer(final List elements, String groupName,
     		TupleSourceType tupleSourceType)
     		throws MetaMatrixComponentException {
-    	String newID = String.valueOf(this.currentTuple.getAndIncrement());
-        TupleBuffer tupleBuffer = new TupleBuffer(this, newID, elements, getTypeNames(elements), getProcessorBatchSize());
+    	final String newID = String.valueOf(this.currentTuple.getAndIncrement());
+    	
+    	BatchManager batchManager = new BatchManager() {
+    		private FileStore store;
+
+    		@Override
+    		public ManagedBatch createManagedBatch(TupleBatch batch)
+    				throws MetaMatrixComponentException {
+    			if (this.store == null) {
+    				this.store = createFileStore(newID);
+    				this.store.setCleanupReference(this);
+    			}
+    			return new ManagedBatchImpl(newID, store, batch);
+    		}
+
+    		@Override
+    		public void remove() {
+    			if (this.store != null) {
+    				this.store.remove();
+    				this.store = null;
+    			}
+    		}
+    	};
+        TupleBuffer tupleBuffer = new TupleBuffer(batchManager, newID, elements, getProcessorBatchSize());
         if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
             LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, new Object[]{"Creating TupleBuffer:", newID, "of type "+tupleSourceType}); //$NON-NLS-1$ //$NON-NLS-2$
         }
@@ -139,7 +327,7 @@
     
 	@Override
 	public void initialize(Properties props) throws MetaMatrixComponentException {
-		PropertiesUtils.setBeanProperties(this, props, "metamatrix.buffer"); //$NON-NLS-1$
+		PropertiesUtils.setBeanProperties(this, props, "org.teiid.buffer"); //$NON-NLS-1$
 		DataTypeManager.addSourceTransform(Source.class, new SourceTransform<Source, XMLType>() {
 			@Override
 			public XMLType transform(Source value) {
@@ -158,24 +346,6 @@
 		});
 	}
     
-    /**
-     * Gets the data type names for each of the input expressions, in order.
-     * @param expressions List of Expressions
-     * @return
-     * @since 4.2
-     */
-    private static String[] getTypeNames(List expressions) {
-    	if (expressions == null) {
-    		return null;
-    	}
-        String[] types = new String[expressions.size()];
-        for (ListIterator i = expressions.listIterator(); i.hasNext();) {
-            Expression expr = (Expression)i.next();
-            types[i.previousIndex()] = DataTypeManager.getDataTypeName(expr.getType());
-        }
-        return types;
-    }
-    
     @Override
     public void releaseBuffers(int count) {
     	lock.lock();
@@ -209,6 +379,10 @@
     		lock.unlock();
     	}
     }
+    
+    public void setMaxReserveBatches(int maxReserveBatches) {
+		this.maxReserveBatches = maxReserveBatches;
+	}
 
 	public void shutdown() {
 	}

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/FileStorageManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/FileStorageManager.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/FileStorageManager.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -33,7 +33,6 @@
 import java.util.TreeMap;
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
-import com.metamatrix.common.buffer.BufferManager;
 import com.metamatrix.common.buffer.FileStore;
 import com.metamatrix.common.buffer.StorageManager;
 import com.metamatrix.common.log.LogManager;
@@ -113,7 +112,7 @@
 			}
 	    }
 
-		public synchronized long writeDirect(byte[] bytes, int offset, int length) throws MetaMatrixComponentException {
+		public void writeDirect(byte[] bytes, int offset, int length) throws MetaMatrixComponentException {
 			Map.Entry<Long, FileInfo> entry = this.storageFiles.lastEntry();
 			boolean createNew = false;
 			FileInfo fileInfo = null;
@@ -139,7 +138,6 @@
 	            fileAccess.setLength(pointer + length);
 	            fileAccess.seek(pointer);
 	            fileAccess.write(bytes, offset, length);
-	            return fileOffset + pointer;
 	        } catch(IOException e) {
 	            throw new MetaMatrixComponentException(e, QueryExecPlugin.Util.getString("FileStoreageManager.error_reading", fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
 	        } finally {
@@ -185,7 +183,7 @@
      * @see com.metamatrix.common.buffer.BufferManager#MAX_FILE_SIZE
      */
     public void initialize(Properties props) throws MetaMatrixComponentException {
-        this.directory = props.getProperty(BufferManager.BUFFER_STORAGE_DIRECTORY);
+    	PropertiesUtils.setBeanProperties(this, props, "org.teiid.buffer"); //$NON-NLS-1$
         if(this.directory == null) {
         	throw new MetaMatrixComponentException(QueryExecPlugin.Util.getString("FileStoreageManager.no_directory")); //$NON-NLS-1$
         }
@@ -199,14 +197,20 @@
         } else if(! dirFile.mkdirs()) {
         	throw new MetaMatrixComponentException(QueryExecPlugin.Util.getString("FileStoreageManager.error_creating", dirFile.getAbsoluteFile())); //$NON-NLS-1$
         }
-
-        // Set up max number of open file descriptors
-        maxOpenFiles = PropertiesUtils.getIntProperty(props, BufferManager.MAX_OPEN_FILES, DEFAULT_MAX_OPEN_FILES);
-        
-        // Set the max file size
-        maxFileSize = PropertiesUtils.getIntProperty(props, BufferManager.MAX_FILE_SIZE, 2048) * 1024L * 1024L; // Multiply by 1MB
     }
     
+    public void setMaxFileSize(long maxFileSize) {
+    	this.maxFileSize = maxFileSize * 1024L * 1024L;
+	}
+    
+    public void setMaxOpenFiles(int maxOpenFiles) {
+		this.maxOpenFiles = maxOpenFiles;
+	}
+    
+    public void setStorageDirectory(String directory) {
+		this.directory = directory;
+	}
+    
     File createFile(String name, int fileNumber) throws MetaMatrixComponentException {
         try {
         	File storageFile = File.createTempFile(FILE_PREFIX + name + "_" + String.valueOf(fileNumber) + "_", null, this.dirFile); //$NON-NLS-1$ //$NON-NLS-2$

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryStorageManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryStorageManager.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryStorageManager.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -41,20 +41,16 @@
 	public FileStore createFileStore(String name) {
 		return new FileStore() {
 			private ByteBuffer buffer = ByteBuffer.allocate(2 << 15);
-			private int end;
 			
 			@Override
-			public synchronized long writeDirect(byte[] bytes, int offset, int length) throws MetaMatrixComponentException {
-				if (end + length > buffer.capacity()) {
+			public void writeDirect(byte[] bytes, int offset, int length) throws MetaMatrixComponentException {
+				if (getLength() + length > buffer.capacity()) {
 					ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2 + length);
 					newBuffer.put(buffer);
 					buffer = newBuffer;
 				}
-				buffer.position(end);
+				buffer.position((int)getLength());
 				buffer.put(bytes, offset, length);
-				long result = end;
-				end += length;
-				return result;
 			}
 			
 			@Override
@@ -65,12 +61,12 @@
 			@Override
 			public synchronized int readDirect(long fileOffset, byte[] b, int offset, int length)
 					throws MetaMatrixComponentException {
-				if (fileOffset >= end) {
+				if (fileOffset >= getLength()) {
 					return -1;
 				}
 				int position = (int)fileOffset;
 				buffer.position(position);
-				length = Math.min(length, end - position);
+				length = Math.min(length, (int)getLength() - position);
 				buffer.get(b, offset, length);
 				return length;
 			}

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -116,4 +116,12 @@
         }
     }
     
+    @Override
+    public int available() {
+    	if (currentBatch != null) {
+    		return currentBatch.getEndRow() - currentRow + 1;
+    	}
+    	return 0;
+    }
+    
 }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/CollectionTupleSource.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/CollectionTupleSource.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/CollectionTupleSource.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -67,4 +67,12 @@
 	public void closeSource() {
 		
 	}
+	
+	@Override
+	public int available() {
+		if (tuples.hasNext()) {
+			return 1;
+		}
+		return 0;
+	}
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -106,6 +106,7 @@
 
         if(collectionBuffer == null) {
             collectionBuffer = mgr.createTupleBuffer(elements, groupName, TupleSourceType.PROCESSOR);
+            collectionBuffer.setForwardOnly(true);
         }
 
         List row = new ArrayList(1);
@@ -126,6 +127,7 @@
             // Sort
             sortUtility = new SortUtility(collectionBuffer.createIndexedTupleSource(), elements, sortTypes, Mode.DUP_REMOVE, mgr, groupName);
             TupleBuffer sorted = sortUtility.sort();
+            sorted.setForwardOnly(true);
             try {
 	            // Add all input to proxy
 	            TupleSource sortedSource = sorted.createIndexedTupleSource();

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -268,6 +268,14 @@
 			public void closeSource() throws MetaMatrixComponentException {
 				
 			}
+			
+			@Override
+			public int available() {
+				if (sourceBatch != null) {
+		    		return sourceBatch.getEndRow() - sourceRow + 1;
+		    	}
+				return 0;
+			}
 		};
 		
 	}
@@ -287,6 +295,7 @@
 
     private void sortPhase() throws BlockedException, MetaMatrixComponentException, MetaMatrixProcessingException {
         this.sortBuffer = this.sortUtility.sort();
+        this.sortBuffer.setForwardOnly(true);
         this.groupTupleSource = this.sortBuffer.createIndexedTupleSource();
         this.phase = GROUP;
     }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -42,21 +42,10 @@
  */
 public class PartitionedSortJoin extends MergeJoinStrategy {
 	
-	/**
-	 * This is a compromise between the max size of the smaller side
-	 * and effective partitioning assuming that we only want to hold
-	 * two batches in memory during partitioning.
-	 * 
-	 * TODO: apply partitioning recursively and/or have a better mechanism
-	 * for buffermanager reserve/release of memory 
-	 * (would also help the sort utility)
-	 */
-	public static final int MAX_PARTITIONS = 16; 
-	
 	private List[] endTuples;
 	private List<Boolean> overlap = new ArrayList<Boolean>();
 	private List<Integer> endRows = new ArrayList<Integer>();
-	private List<TupleBuffer> partitionIds = new ArrayList<TupleBuffer>();
+	private List<TupleBuffer> partitions = new ArrayList<TupleBuffer>();
 	private int currentPartition;
 	private IndexedTupleSource currentSource;
 	private SourceState sortedSource;
@@ -73,13 +62,13 @@
     @Override
     public void close() {
     	super.close();
-    	for (TupleBuffer tupleSourceID : this.partitionIds) {
+    	for (TupleBuffer tupleSourceID : this.partitions) {
 			tupleSourceID.remove();
 		}
     	this.endTuples = null;
     	this.overlap.clear();
     	this.endRows.clear();
-    	this.partitionIds.clear();
+    	this.partitions.clear();
     	this.currentSource = null;
     	this.sortedSource = null;
     	this.partitionedSource = null;
@@ -128,7 +117,7 @@
     protected void loadRight() throws MetaMatrixComponentException,
     		MetaMatrixProcessingException {
     	this.rightSource.getTupleBuffer();
-    	int maxRows = this.joinNode.getBatchSize() * MAX_PARTITIONS;
+    	int maxRows = this.joinNode.getBatchSize() * getMaxProcessingBatches();
     	if (processingSortRight == SortOption.SORT
     			&& this.leftSource.getRowCount() < maxRows
     			&& this.leftSource.getRowCount() * 4 < this.rightSource.getRowCount()) {
@@ -161,13 +150,25 @@
         	}
     	}
     }
+
+    /**
+     * Since the source to be partitioned is already loaded, then there's no
+     * chance of a blocked exception during partitioning, so double the max.
+     * 
+     * TODO: partition at the same time as the load to determine size
+     * 
+     * @return
+     */
+	private int getMaxProcessingBatches() {
+		return 2 * this.joinNode.getBufferManager().getMaxProcessingBatches();
+	}
     
 	private void partitionSource(boolean left) throws MetaMatrixComponentException,
 			MetaMatrixProcessingException {
 		if (partitioned) {
 			return;
 		}
-		if (endTuples.length > MAX_PARTITIONS + 1) {
+		if (endTuples.length > getMaxProcessingBatches() + 1) {
 			if (left) {
 				this.processingSortLeft = SortOption.SORT;
 			} else {
@@ -176,13 +177,13 @@
 			return;
 		}
 		if (endTuples.length < 2) {
-			partitionIds.add(this.partitionedSource.getTupleBuffer());
+			partitions.add(this.partitionedSource.getTupleBuffer());
 		} else {
-			if (partitionIds.isEmpty()) {
+			if (partitions.isEmpty()) {
 				for (int i = 0; i < endTuples.length; i++) {
 					TupleBuffer tc = this.partitionedSource.createSourceTupleBuffer();
-					tc.setBatchSize(Math.max(1, this.joinNode.getBatchSize()/4));
-					this.partitionIds.add(tc);
+					tc.setBatchSize(Math.max(1, this.joinNode.getBatchSize()));
+					this.partitions.add(tc);
 				}
 			}
 			while (this.partitionedSource.getIterator().hasNext()) {
@@ -191,15 +192,18 @@
 				if (index < 0) {
 					index = -index - 1;
 				}
-				if (index > this.partitionIds.size() -1) {
+				if (index > this.partitions.size() -1) {
 					continue;
 				}
 				while (index > 0 && this.overlap.get(index - 1) 
 						&& compare(tuple, this.endTuples[index - 1], this.partitionedSource.getExpressionIndexes(), this.sortedSource.getExpressionIndexes()) == 0) {
 					index--;
 				}
-				this.partitionIds.get(index).addTuple(tuple);
+				this.partitions.get(index).addTuple(tuple);
 			}
+			for (TupleBuffer partition : this.partitions) {
+				partition.saveBatch();
+			}
 			this.partitionedSource.getIterator().setPosition(1);
 		}
 		partitioned = true;
@@ -214,12 +218,12 @@
     	if (endRows.isEmpty()) {
     		return null; //no rows on the sorted side
     	}
-    	while (currentPartition < partitionIds.size()) {
+    	while (currentPartition < partitions.size()) {
     		if (currentSource == null) {
-    			if (!this.partitionIds.isEmpty()) {
-    				this.partitionIds.get(currentPartition).close();
+    			if (!this.partitions.isEmpty()) {
+    				this.partitions.get(currentPartition).close();
     			}
-    			currentSource = partitionIds.get(currentPartition).createIndexedTupleSource();
+    			currentSource = partitions.get(currentPartition).createIndexedTupleSource();
     		}
     		
     		int beginIndex = currentPartition>0?endRows.get(currentPartition - 1)+1:1;
@@ -251,7 +255,7 @@
 		    			}
 	    			}
 	    			if (matchEnd == batch.length - 1 && currentPartition < overlap.size() && overlap.get(currentPartition)) {
-	    				this.partitionIds.get(currentPartition + 1).addTuple(partitionedTuple);
+	    				this.partitions.get(currentPartition + 1).addTuple(partitionedTuple);
 	    			}
     			}
     			while (matchBegin <= matchEnd) {

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -104,6 +104,8 @@
     private TupleBatch outputPhase() throws BlockedException, MetaMatrixComponentException, MetaMatrixProcessingException {
 		if (!this.output.isFinal()) {
 			this.phase = SORT;
+		} else {
+			this.output.setForwardOnly(true);
 		}
 		List<?> tuple = null;
 		try {

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -82,7 +82,7 @@
 	}
 
 	//constructor state
-    private TupleSource sourceID;
+    private TupleSource source;
     private Mode mode;
     private BufferManager bufferManager;
     private String groupName;
@@ -105,11 +105,11 @@
     
     public SortUtility(TupleSource sourceID, List sortElements, List<Boolean> sortTypes, Mode mode, BufferManager bufferMgr,
                         String groupName) {
-        this.sourceID = sourceID;
+        this.source = sourceID;
         this.mode = mode;
         this.bufferManager = bufferMgr;
         this.groupName = groupName;
-        this.schema = this.sourceID.getSchema();
+        this.schema = this.source.getSchema();
         int distinctIndex = sortElements != null? sortElements.size() - 1:0;
         if (mode != Mode.SORT) {
 	        if (sortElements == null) {
@@ -159,6 +159,7 @@
 
 	private TupleBuffer createTupleBuffer() throws MetaMatrixComponentException {
 		TupleBuffer tb = bufferManager.createTupleBuffer(this.schema, this.groupName, TupleSourceType.PROCESSOR);
+		tb.setForwardOnly(true);
 		return tb;
 	}
     
@@ -178,16 +179,23 @@
             try {
 	            int maxRows = bufferManager.getMaxProcessingBatches() * bufferManager.getProcessorBatchSize();
 		        while(!doneReading) {
+		        	//attempt to reserve more working memory if there are additional rows available before blocking
 		        	if (workingTuples.size() == maxRows) {
-		        		int reserved = bufferManager.reserveBuffers(1, false);
+		        		if (source.available() < 1) {
+		        			break;
+		        		}
+	        			int reserved = bufferManager.reserveBuffers(1, false);
 		        		if (reserved == 0) {
 		        			break;
 		        		} 
 		        		totalReservedBuffers += 1;
-		        		maxRows += bufferManager.getProcessorBatchSize();
+		        		maxRows += bufferManager.getProcessorBatchSize();	
 		        	}
 		            try {
-		            	List<?> tuple = sourceID.nextTuple();
+		            	if (totalReservedBuffers > 0 && source.available() == 0) {
+		            		break;
+		            	}
+		            	List<?> tuple = source.nextTuple();
 		            	
 		            	if (tuple == null) {
 		            		doneReading = true;
@@ -293,6 +301,7 @@
         // Close sorted source (all others have been removed)
         if (doneReading) {
         	activeTupleBuffers.get(0).close();
+        	activeTupleBuffers.get(0).setForwardOnly(false);
         	if (this.output != null) {
 	        	this.output.close();
 	        }
@@ -302,6 +311,7 @@
     	Assertion.assertTrue(mode == Mode.DUP_REMOVE);
     	if (this.output == null) {
     		this.output = activeTupleBuffers.get(0);
+    		this.output.setForwardOnly(false);
     	}
     	this.phase = INITIAL_SORT;
     }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -75,7 +75,7 @@
 			@Override
 			public InputStream getInputStream() throws IOException {
 				//TODO: adjust the buffer size, and/or develop a shared buffer strategy
-				return new BufferedInputStream(lobBuffer.createInputStream());
+				return new BufferedInputStream(lobBuffer.createInputStream(0));
 			}
 			
 			@Override

Modified: trunk/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -129,6 +129,11 @@
 		public void closeSource() throws MetaMatrixComponentException {
 			
 		}
+		
+		@Override
+		public int available() {
+			return 0;
+		}
 	}
 
 	private BufferManager buffer;

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -84,7 +84,7 @@
     private ICommand translatedCommand;
     private Class<?>[] schema;
     private List<Integer> convertToRuntimeType;
-    private List<Integer> convertToDesiredRuntimeType;
+    private boolean[] convertToDesiredRuntimeType;
         
     /* End state information */    
     protected boolean lastBatch;
@@ -309,12 +309,12 @@
         Command command = this.requestMsg.getCommand();
 		List<SingleElementSymbol> symbols = this.requestMsg.getCommand().getProjectedSymbols();
 		this.schema = new Class[symbols.size()];
-		this.convertToDesiredRuntimeType = new ArrayList<Integer>(symbols.size());
+		this.convertToDesiredRuntimeType = new boolean[symbols.size()];
 		this.convertToRuntimeType = new ArrayList<Integer>(symbols.size());
 		for (int i = 0; i < schema.length; i++) {
 			SingleElementSymbol symbol = symbols.get(i);
 			this.schema[i] = symbol.getType();
-			this.convertToDesiredRuntimeType.add(i);
+			this.convertToDesiredRuntimeType[i] = true;
 			this.convertToRuntimeType.add(i);
 		}
 
@@ -469,20 +469,23 @@
 			}
 		}
 		//TODO: add a proper intermediate schema
-		for (int i = convertToDesiredRuntimeType.size() - 1; i >= 0; i--) {
-			int index = convertToDesiredRuntimeType.get(i);
-			Object value = row.get(index);
-			if (value != null) {
-				Object result;
-				try {
-					result = DataTypeManager.transformValue(value, value.getClass(), this.schema[index]);
-				} catch (TransformationException e) {
-					throw new ConnectorException(e);
+		for (int i = 0; i < row.size(); i++) {
+			if (convertToDesiredRuntimeType[i]) {
+				Object value = row.get(i);
+				if (value != null) {
+					Object result;
+					try {
+						result = DataTypeManager.transformValue(value, value.getClass(), this.schema[i]);
+					} catch (TransformationException e) {
+						throw new ConnectorException(e);
+					}
+					if (value == result) {
+						convertToDesiredRuntimeType[i] = false;
+					}
+					row.set(i, result);
 				}
-				if (value == result) {
-					convertToDesiredRuntimeType.remove(i);
-				}
-				row.set(index, result);
+			} else {
+				row.set(i, DataTypeManager.getCanonicalValue(row.get(i)));
 			}
 		}
 	}

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -225,4 +225,17 @@
 		return this.isTransactional;
 	}
 	
+	@Override
+	public int available() {
+		if (index < currentBatchCount) {
+			return currentBatchCount - index;
+		}
+		synchronized (this) {
+			if (nextBatch != null) {
+				return nextBatch.length;
+			}
+		}
+		return 0;
+	}
+	
 }

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -100,15 +100,15 @@
 			return;
 		}
 		List values = requestMsg.getParameterValues();
-		List spParams = proc.getParameters();
+		List<SPParameter> spParams = proc.getParameters();
 		proc.clearParameters();
 		int inParameterCount = values.size();
 		if (this.requestMsg.isPreparedBatchUpdate() && values.size() > 0) {
 			inParameterCount = ((List)values.get(0)).size();
 		}
 		int index = 1;
-		for (Iterator params = spParams.iterator(); params.hasNext();) {
-			SPParameter param = (SPParameter) params.next();
+		for (Iterator<SPParameter> params = spParams.iterator(); params.hasNext();) {
+			SPParameter param = params.next();
 			if (param.getParameterType() == SPParameter.RETURN_VALUE) {
 				continue;
 			}
@@ -147,13 +147,13 @@
         	if (!this.addedLimit) { //TODO: this is a little problematic
             	prepPlan.setCommand(this.userCommand);
 		        // Defect 13751: Clone the plan in its current state (i.e. before processing) so that it can be used for later queries
-		        prepPlan.setPlan((ProcessorPlan)processPlan.clone());
+		        prepPlan.setPlan(processPlan.clone());
 		        prepPlan.setAnalysisRecord(analysisRecord);
 		        this.prepPlanCache.putPreparedPlan(id, this.context.isSessionFunctionEvaluated(), prepPlan);
         	}
         } else {
         	LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Query exist in cache: ", sqlQuery }); //$NON-NLS-1$
-            processPlan = (ProcessorPlan)cachedPlan.clone();
+            processPlan = cachedPlan.clone();
             //already in cache. obtain the values from cache
             analysisRecord = prepPlan.getAnalysisRecord();
             

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -22,6 +22,7 @@
 
 package org.teiid.dqp.internal.process;
 
+import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -107,6 +108,7 @@
     private AnalysisRecord analysisRecord;
     private TransactionContext transactionContext;
     protected TupleBuffer resultsBuffer;
+    private TupleBatch savedBatch;
     private Collection schemas;     // These are schemas associated with XML results
     private boolean returnsUpdateCount;
     
@@ -346,6 +348,9 @@
 			}
 		});
 		resultsBuffer = collector.getTupleBuffer();
+		if (requestMsg.getCursorType() == ResultSet.TYPE_FORWARD_ONLY) {
+			resultsBuffer.setForwardOnly(true);
+		}
 		analysisRecord = request.analysisRecord;
 		schemas = request.schemas;
 		transactionContext = request.transactionContext;
@@ -383,11 +388,18 @@
 			LogManager.logDetail(LogConstants.CTX_DQP, "[RequestWorkItem.sendResultsIfNeeded] requestID: " + requestID + " resultsID: " + this.resultsBuffer + " done: " + doneProducingBatches );   //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
 		}
 
-    	if (batch == null || batch.getBeginRow() > this.begin) {
-    		batch = resultsBuffer.getBatch(begin);
+    	if (batch == null || !(batch.getBeginRow() <= this.begin && batch.getEndRow() >= this.begin)) {
+    		if (savedBatch != null && savedBatch.getBeginRow() <= this.begin && savedBatch.getEndRow() >= this.begin) {
+    			batch = savedBatch;
+    		} else {
+    			batch = resultsBuffer.getBatch(begin);
+    		}
     		//TODO: support fetching more than 1 batch
     		int count = this.end - this.begin + 1;
     		if (batch.getRowCount() > count) {
+    			if (requestMsg.getCursorType() == ResultSet.TYPE_FORWARD_ONLY) {
+    				savedBatch = batch;
+    			}
     			int beginRow = Math.min(this.begin, batch.getEndRow() - count + 1);
     			int endRow = Math.min(beginRow + count - 1, batch.getEndRow());
         		int firstOffset = beginRow - batch.getBeginRow();

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -120,4 +120,9 @@
         this.exceptionOnClose = exceptionOnClose;
     }
     
+    @Override
+    public int available() {
+    	return 0;
+    }
+    
 }

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -492,8 +492,8 @@
     private BufferManager createCustomBufferMgr(int batchSize) throws MetaMatrixComponentException {
         BufferManagerImpl bufferMgr = new BufferManagerImpl();
         Properties props = new Properties();
-        props.setProperty(BufferManager.PROCESSOR_BATCH_SIZE, String.valueOf(batchSize));
-        props.setProperty(BufferManager.CONNECTOR_BATCH_SIZE, String.valueOf(batchSize));
+        bufferMgr.setConnectorBatchSize(batchSize);
+        bufferMgr.setProcessorBatchSize(batchSize);
         bufferMgr.initialize(props);
 
         // Add unmanaged memory storage manager

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -37,24 +37,23 @@
 public class NodeTestUtil {
     
     static BufferManager getTestBufferManager(long bytesAvailable, int procBatchSize, int connectorBatchSize) {
-
+    	BufferManagerImpl bufferManager = new BufferManagerImpl();
+    	bufferManager.setProcessorBatchSize(procBatchSize);
+    	bufferManager.setConnectorBatchSize(connectorBatchSize);
         // Get the properties for BufferManager
         Properties bmProps = new Properties();                        
-        bmProps.setProperty(BufferManager.PROCESSOR_BATCH_SIZE, "" + procBatchSize); //$NON-NLS-1$
-        bmProps.setProperty(BufferManager.CONNECTOR_BATCH_SIZE, "" + connectorBatchSize); //$NON-NLS-1$
-        return createBufferManager(bmProps);
+        return createBufferManager(bufferManager, bmProps);
     }
     
     static BufferManager getTestBufferManager(long bytesAvailable, int procBatchSize) {
-
+    	BufferManagerImpl bufferManager = new BufferManagerImpl();
+    	bufferManager.setProcessorBatchSize(procBatchSize);
         // Get the properties for BufferManager
         Properties bmProps = new Properties();                        
-        bmProps.setProperty(BufferManager.PROCESSOR_BATCH_SIZE, "" + procBatchSize); //$NON-NLS-1$
-        return createBufferManager(bmProps);
+        return createBufferManager(bufferManager, bmProps);
     }
     
-    static BufferManager createBufferManager(Properties bmProps) {
-        BufferManagerImpl bufferManager = new BufferManagerImpl();
+    static BufferManager createBufferManager(BufferManagerImpl bufferManager, Properties bmProps) {
         try {
 			bufferManager.initialize(bmProps);
 		} catch (MetaMatrixComponentException e) {

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -242,6 +242,11 @@
             }
             return null;
         }
+        
+        @Override
+        public int available() {
+        	return 0;
+        }
     }
     
 }

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -217,5 +217,9 @@
                     ? null
                     : Arrays.asList(new Object[] {new Integer(currentRow), Integer.toString(currentRow)});
         }
+        @Override
+        public int available() {
+        	return 0;
+        }
     }
 }

Modified: trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java
===================================================================
--- trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -34,7 +34,6 @@
 import com.metamatrix.common.application.exception.ApplicationInitializationException;
 import com.metamatrix.common.application.exception.ApplicationLifecycleException;
 import com.metamatrix.common.buffer.BufferManager;
-import com.metamatrix.common.buffer.StorageManager;
 import com.metamatrix.common.buffer.impl.BufferManagerImpl;
 import com.metamatrix.common.buffer.impl.FileStorageManager;
 import com.metamatrix.common.buffer.impl.MemoryStorageManager;
@@ -92,22 +91,21 @@
             String connectorBatchSize = configurationSvc.getConnectorBatchSize();
                 
             // Set up buffer configuration properties
-            Properties bufferProps = new Properties();                                  
-            bufferProps.setProperty(BufferManager.BUFFER_STORAGE_DIRECTORY, bufferDir.getCanonicalPath());
-            bufferProps.setProperty(BufferManager.PROCESSOR_BATCH_SIZE, processorBatchSize); 
-            bufferProps.setProperty(BufferManager.CONNECTOR_BATCH_SIZE, connectorBatchSize); 
+            Properties bufferProps = configurationSvc.getSystemProperties();                                 
             
             // Construct and initialize the buffer manager
             this.bufferMgr = new BufferManagerImpl();
+            this.bufferMgr.setConnectorBatchSize(Integer.valueOf(connectorBatchSize));
+            this.bufferMgr.setProcessorBatchSize(Integer.valueOf(processorBatchSize));
+            
             this.bufferMgr.initialize(bufferProps);
             
             // If necessary, add disk storage manager
             if(useDisk) {
                 // Get the properties for FileStorageManager and create.
-                Properties fsmProps = new Properties();
-                fsmProps.setProperty(BufferManager.BUFFER_STORAGE_DIRECTORY, bufferDir.getCanonicalPath());
-                StorageManager fsm = new FileStorageManager();
-                fsm.initialize(fsmProps);        
+                FileStorageManager fsm = new FileStorageManager();
+                fsm.setStorageDirectory(bufferDir.getCanonicalPath());
+                fsm.initialize(bufferProps);        
                 this.bufferMgr.setStorageManager(fsm);
                 
                 // start the file storage manager in clean state

Modified: trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java	2010-01-22 22:49:05 UTC (rev 1776)
+++ trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java	2010-01-25 03:26:22 UTC (rev 1777)
@@ -200,7 +200,6 @@
 			dqp.begin();
 			fail("exception expected"); //$NON-NLS-1$
 		} catch (XATransactionException e) {
-			e.printStackTrace();
 			assertEquals("Component not found: com.metamatrix.dqp.client.ClientSideDQP", e.getMessage()); //$NON-NLS-1$
 		}
 	}



More information about the teiid-commits mailing list