[teiid-commits] teiid SVN: r3465 - in trunk: engine/src/main/java/org/teiid/common/buffer and 14 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Mon Sep 12 14:23:58 EDT 2011


Author: shawkins
Date: 2011-09-12 14:23:57 -0400 (Mon, 12 Sep 2011)
New Revision: 3465

Added:
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java
Modified:
   trunk/api/src/main/java/org/teiid/CommandContext.java
   trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
   trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
   trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
   trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/NullNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
   trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
   trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
   trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
   trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
   trunk/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
   trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java
   trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
   trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
Log:
TEIID-1750 improvement to batch sizing

Modified: trunk/api/src/main/java/org/teiid/CommandContext.java
===================================================================
--- trunk/api/src/main/java/org/teiid/CommandContext.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/api/src/main/java/org/teiid/CommandContext.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -87,7 +87,8 @@
 	
 	/**
 	 * Get the processor batch size set on the BufferManager
-	 * @return
+	 * @return - the nominal batch size target.  actual batch sizes will vary based 
+	 * upon the column types
 	 */
 	int getProcessorBatchSize();
 	

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -74,6 +74,12 @@
      * Get the batch size to use during query processing.  
      * @return Batch size (# of rows)
      */
+    int getProcessorBatchSize(List<? extends Expression> schema);
+    
+    /**
+     * Get the nominal batch size target
+     * @return
+     */
     int getProcessorBatchSize();
 
     /**

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -215,7 +215,7 @@
 
 	protected abstract void writeDirect(byte[] bytes, int offset, int length) throws TeiidComponentException;
 
-	public void remove() {
+	public synchronized void remove() {
 		if (!this.removed) {
 			this.removed = true;
 			this.removeDirect();

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -102,9 +102,9 @@
 		this.stree = stree;
 		this.id = counter.getAndIncrement();
 		stree.pages.put(this.id, this);
-		this.values = new TupleBatch(0, new ArrayList(stree.pageSize/4));
+		this.values = new TupleBatch(0, new ArrayList(stree.getPageSize(leaf)/4));
 		if (!leaf) {
-			children = new ArrayList<SPage>(stree.pageSize/4);
+			children = new ArrayList<SPage>(stree.getPageSize(false)/4);
 		}
 	}
 	

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/STree.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/STree.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -62,7 +62,8 @@
     protected BatchManager keyManager;
     protected BatchManager leafManager;
     protected ListNestedSortComparator comparator;
-    protected int pageSize;
+    private int pageSize;
+    protected int leafSize;
     protected int keyLength;
     protected String[] types;
     protected String[] keytypes;
@@ -76,6 +77,7 @@
 			BatchManager leafManager,
             final ListNestedSortComparator comparator,
             int pageSize,
+            int leafSize,
             int keyLength,
             String[] types) {
 		randomSeed = seedGenerator.nextInt() | 0x00000100; // ensure nonzero
@@ -90,6 +92,7 @@
 			mask <<= 1;
 			mask++;
 		}
+		this.leafSize = leafSize;
 		this.keyLength = keyLength;
 		this.types = types;
 		this.keytypes = Arrays.copyOf(types, keyLength);
@@ -276,10 +279,10 @@
 			} else {
 				level = randomLevel();
 			}
-		} else if (!places.isEmpty() && places.getLast().values.getTuples().size() == pageSize) {
+		} else if (!places.isEmpty() && places.getLast().values.getTuples().size() == getPageSize(true)) {
 			int row = rowCount.get();
-			while (row != 0 && row%pageSize == 0) {
-				row = (row - pageSize + 1)/pageSize;
+			while (row != 0 && row%getPageSize(true) == 0) {
+				row = (row - getPageSize(true) + 1)/getPageSize(true);
 				level++;
 			}
 		}
@@ -312,9 +315,9 @@
 			return 0;
 		}
 		int logSize = 1;
-		while (sizeHint > this.pageSize) {
+		while (sizeHint > this.getPageSize(logSize==0)) {
 			logSize++;
-			sizeHint/=this.pageSize;
+			sizeHint/=this.getPageSize(logSize==0);
 		}
 		return logSize;
 	}
@@ -329,8 +332,8 @@
 	SPage insert(List k, SearchResult result, SearchResult parent, Object value, boolean ordered) throws TeiidComponentException {
 		SPage page = result.page;
 		int index = -result.index - 1;
-		if (result.values.getTuples().size() == pageSize) {
-			boolean leaf = !(value instanceof SPage);
+		boolean leaf = !(value instanceof SPage);
+		if (result.values.getTuples().size() == getPageSize(leaf)) {
 			SPage nextPage = new SPage(this, leaf);
 			TupleBatch nextValues = nextPage.getValues();
 			nextPage.next = page.next;
@@ -342,17 +345,17 @@
 			boolean inNext = false;
 			if (!ordered) {
 				//split the values
-				nextValues.getTuples().addAll(result.values.getTuples().subList(pageSize/2, pageSize));
-				result.values.getTuples().subList(pageSize/2, pageSize).clear();
+				nextValues.getTuples().addAll(result.values.getTuples().subList(getPageSize(leaf)/2, getPageSize(leaf)));
+				result.values.getTuples().subList(getPageSize(leaf)/2, getPageSize(leaf)).clear();
 				if (!leaf) {
-					nextPage.children.addAll(page.children.subList(pageSize/2, pageSize));
-					page.children.subList(pageSize/2, pageSize).clear();
+					nextPage.children.addAll(page.children.subList(getPageSize(leaf)/2, getPageSize(false)));
+					page.children.subList(getPageSize(false)/2, getPageSize(false)).clear();
 				}
-				if (index <= pageSize/2) {
+				if (index <= getPageSize(leaf)/2) {
 					setValue(index, k, value, result.values, page);
 				} else {
 					inNext = true;
-					setValue(index - pageSize/2, k, value, nextValues, nextPage);
+					setValue(index - getPageSize(leaf)/2, k, value, nextValues, nextPage);
 				}
 				page.setValues(result.values);
 				if (parent != null) {
@@ -396,7 +399,9 @@
 				continue;
 			}
 			searchResult.values.getTuples().remove(searchResult.index);
+			boolean leaf = true;
 			if (searchResult.page.children != null) {
+				leaf = false;
 				searchResult.page.children.remove(searchResult.index);
 			}
 			int size = searchResult.values.getTuples().size();
@@ -423,18 +428,18 @@
 					header[0] = new SPage(this, true);
 				}
 				continue;
-			} else if (size < pageSize/2) {
+			} else if (size < getPageSize(leaf)/2) {
 				//check for merge
 				if (searchResult.page.next != null) {
 					TupleBatch nextValues = searchResult.page.next.getValues();
-					if (nextValues.getTuples().size() < pageSize/4) {
+					if (nextValues.getTuples().size() < getPageSize(leaf)/4) {
 						SPage.merge(places, nextValues, searchResult.page, searchResult.values);
 						continue;
 					}
 				}
 				if (searchResult.page.prev != null) {
 					TupleBatch prevValues = searchResult.page.prev.getValues();
-					if (prevValues.getTuples().size() < pageSize/4) {
+					if (prevValues.getTuples().size() < getPageSize(leaf)/4) {
 						SPage.merge(places, searchResult.values, searchResult.page.prev, prevValues);
 						continue;
 					}
@@ -537,4 +542,11 @@
 		}
 	}
 	
+	public int getPageSize(boolean leaf) {
+		if (leaf) {
+			return leafSize;
+		}
+		return pageSize;
+	}
+
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -83,12 +83,12 @@
  * 
  * TODO: allow for cached stores to use lru - (result set/mat view)
  * TODO: account for row/content based sizing (difficult given value sharing)
- * TODO: account for memory based lobs (it would be nice if the approximate buffer size matched at 100kB)
  * TODO: add detection of pinned batches to prevent unnecessary purging of non-persistent batches
  *       - this is not necessary for already persistent batches, since we hold a weak reference
  */
 public class BufferManagerImpl implements BufferManager, StorageManager {
 	
+	private static final int TARGET_BYTES_PER_ROW = 1 << 11; //2k bytes per row
 	private static final int IO_BUFFER_SIZE = 1 << 14;
 	private static final int COMPACTION_THRESHOLD = 1 << 25; //start checking at 32 megs
 	
@@ -233,7 +233,6 @@
 		private boolean softCache;
 		private volatile TupleBatch activeBatch;
 		private volatile Reference<TupleBatch> batchReference;
-		private int beginRow;
 		private WeakReference<BatchManagerImpl> managerRef;
 		private Long id;
 		private LobManager lobManager;
@@ -243,7 +242,6 @@
 			this.softCache = softCache;
 			id = batchAdded.incrementAndGet();
 			this.activeBatch = batch;
-			this.beginRow = batch.getBeginRow();
 			this.managerRef = ref;
 			BatchManagerImpl batchManager = ref.get();
 			if (batchManager.lobIndexes != null) {
@@ -355,9 +353,9 @@
 					Assertion.isNotNull(info, "Invalid batch " + id); //$NON-NLS-1$
 					ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(batchManager.store.createInputStream(info[0]), IO_BUFFER_SIZE));
 		            batch = new TupleBatch();
+		            batch.setRowOffset(ois.readInt());
 		            batch.setDataTypes(types);
 		            batch.readExternal(ois);
-		            batch.setRowOffset(this.beginRow);
 			        batch.setDataTypes(null);
 					if (lobManager != null) {
 						for (List<?> tuple : batch.getTuples()) {
@@ -404,6 +402,7 @@
 							offset = batchManager.getOffset();
 							OutputStream fsos = new BufferedOutputStream(batchManager.store.createOutputStream(), IO_BUFFER_SIZE);
 				            ObjectOutputStream oos = new ObjectOutputStream(fsos);
+				            oos.writeInt(batch.getBeginRow());
 				            batch.writeExternal(oos);
 				            oos.close();
 				            long size = batchManager.store.getLength() - offset;
@@ -475,6 +474,7 @@
     private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a hint to set the reserveBatchKB
     private boolean useWeakReferences = true;
     private boolean inlineLobs = true;
+    private int targetBytesPerRow = TARGET_BYTES_PER_ROW;
 
     private ReentrantLock lock = new ReentrantLock(true);
     private Condition batchesFreed = lock.newCondition();
@@ -535,6 +535,10 @@
     public void setConnectorBatchSize(int connectorBatchSize) {
         this.connectorBatchSize = connectorBatchSize;
     } 
+    
+    public void setTargetBytesPerRow(int targetBytesPerRow) {
+		this.targetBytesPerRow = targetBytesPerRow;
+	}
 
     public void setProcessorBatchSize(int processorBatchSize) {
         this.processorBatchSize = processorBatchSize;
@@ -560,7 +564,7 @@
     	final String newID = String.valueOf(this.tsId.getAndIncrement());
     	int[] lobIndexes = LobManager.getLobIndexes(elements);
     	BatchManager batchManager = new BatchManagerImpl(newID, lobIndexes);
-        TupleBuffer tupleBuffer = new TupleBuffer(batchManager, newID, elements, lobIndexes, getProcessorBatchSize());
+        TupleBuffer tupleBuffer = new TupleBuffer(batchManager, newID, elements, lobIndexes, getProcessorBatchSize(elements));
         if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
         	LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating TupleBuffer:", newID, elements, Arrays.toString(tupleBuffer.getTypes()), "of type", tupleSourceType); //$NON-NLS-1$ //$NON-NLS-2$
         }
@@ -595,7 +599,7 @@
     	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
     		LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating STree:", newID); //$NON-NLS-1$
     	}
-    	return new STree(keyManager, bm, new ListNestedSortComparator(compareIndexes), getProcessorBatchSize(), keyLength, TupleBuffer.getTypeNames(elements));
+    	return new STree(keyManager, bm, new ListNestedSortComparator(compareIndexes), getProcessorBatchSize(elements), getProcessorBatchSize(elements.subList(0, keyLength)), keyLength, TupleBuffer.getTypeNames(elements));
     }
 
     @Override
@@ -732,7 +736,11 @@
 	}
 	
 	@Override
-	public int getSchemaSize(List<? extends Expression> elements) {
+	public int getProcessorBatchSize(List<? extends Expression> schema) {
+		return getSizeEstimates(schema)[0];
+	}
+	
+	private int[] getSizeEstimates(List<? extends Expression> elements) {
 		int total = 0;
 		boolean isValueCacheEnabled = DataTypeManager.isValueCacheEnabled();
 		//we make a assumption that the average column size under 64bits is approximately 128bytes
@@ -741,10 +749,40 @@
 			Class<?> type = element.getType();
 			total += SizeUtility.getSize(isValueCacheEnabled, type);
 		}
+		//assume 64-bit
 		total += 8*elements.size() + 36;  // column list / row overhead
-		total *= processorBatchSize; 
-		return Math.max(1, total / 1024);
+		
+		//nominal targetBytesPerRow but can scale up or down
+		
+		int totalCopy = total;
+		boolean less = totalCopy < targetBytesPerRow;
+		int rowCount = processorBatchSize;
+		
+		for (int i = 0; i < 2; i++) {
+			if (less) {
+				totalCopy <<= 2;
+			} else {
+				totalCopy >>= 2;
+			}
+			if (less && totalCopy > targetBytesPerRow
+					|| !less && totalCopy < targetBytesPerRow) {
+				break;
+			}
+			if (less) {
+				rowCount <<= 1;
+			} else {
+				rowCount >>= 1;
+			}
+		}
+		rowCount = Math.max(1, rowCount);
+		total *= rowCount; 
+		return new int[]{rowCount, Math.max(1, total / 1024)};
 	}
+	
+	@Override
+	public int getSchemaSize(List<? extends Expression> elements) {
+		return getSizeEstimates(elements)[1];
+	}
 
 	public void shutdown() {
 	}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -29,13 +29,11 @@
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.teiid.common.buffer.FileStore;
 import org.teiid.common.buffer.StorageManager;
 import org.teiid.core.TeiidComponentException;
-import org.teiid.core.util.Assertion;
 import org.teiid.logging.LogManager;
 import org.teiid.logging.MessageLevel;
 import org.teiid.query.QueryPlugin;
@@ -47,7 +45,6 @@
 public class FileStorageManager implements StorageManager {
 	
 	public static final int DEFAULT_MAX_OPEN_FILES = 64;
-	public static final long DEFAULT_MAX_FILESIZE = 2L * 1024L;
 	public static final long DEFAULT_MAX_BUFFERSPACE = 50L * 1024L * 1024L * 1024L;
 	private static final String FILE_PREFIX = "b_"; //$NON-NLS-1$
 	
@@ -97,7 +94,7 @@
 	
 	public class DiskStore extends FileStore {
 	    private String name;
-		private TreeMap<Long, FileInfo> storageFiles = new TreeMap<Long, FileInfo>(); 
+		private FileInfo fileInfo; 
 	    
 	    public DiskStore(String name) {
 			this.name = name;
@@ -106,20 +103,15 @@
 	    /**
 	     * Concurrent reads are possible, but only after writing is complete.
 	     */
-	    public int readDirect(long fileOffset, byte[] b, int offSet, int length) throws TeiidComponentException {
-	    	Map.Entry<Long, FileInfo> entry = storageFiles.floorEntry(fileOffset);
-	    	Assertion.isNotNull(entry);
-			FileInfo fileInfo = entry.getValue();
-			synchronized (fileInfo) {
-				try {
-					RandomAccessFile fileAccess = fileInfo.open();
-			        fileAccess.seek(fileOffset - entry.getKey());
-			        return fileAccess.read(b, offSet, length);
-				} catch (IOException e) {
-					throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
-				} finally {
-					fileInfo.close();
-				}
+	    public synchronized int readDirect(long fileOffset, byte[] b, int offSet, int length) throws TeiidComponentException {
+			try {
+				RandomAccessFile fileAccess = fileInfo.open();
+		        fileAccess.seek(fileOffset);
+		        return fileAccess.read(b, offSet, length);
+			} catch (IOException e) {
+				throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
+			} finally {
+				fileInfo.close();
 			}
 	    }
 
@@ -132,26 +124,14 @@
 				usedBufferSpace.addAndGet(-length);
 				throw new TeiidComponentException(QueryPlugin.Util.getString("FileStoreageManager.space_exhausted", maxBufferSpace)); //$NON-NLS-1$
 			}
-			Map.Entry<Long, FileInfo> entry = this.storageFiles.lastEntry();
-			boolean createNew = false;
-			FileInfo fileInfo = null;
 			long fileOffset = 0;
-			if (entry == null) {
-				createNew = true;
-			} else {
-				fileInfo = entry.getValue();
-				fileOffset = entry.getKey();
-				createNew = entry.getValue().file.length() + length > getMaxFileSize();
-			}
-			if (createNew) {
-				FileInfo newFileInfo = new FileInfo(createFile(name, storageFiles.size()));
+			if (fileInfo == null) {
+				fileInfo = new FileInfo(createFile(name));
 	            if (fileInfo != null) {
 	            	fileOffset += fileInfo.file.length();
 	            }
-	            storageFiles.put(fileOffset, newFileInfo);
-	            fileInfo = newFileInfo;
 	        }
-			synchronized (fileInfo) {
+			synchronized (this) {
 		        try {
 		        	RandomAccessFile fileAccess = fileInfo.open();
 		            long pointer = fileAccess.length();
@@ -168,16 +148,13 @@
 		
 		public synchronized void removeDirect() {
 			usedBufferSpace.addAndGet(-len);
-			for (FileInfo info : storageFiles.values()) {
-				info.delete();
-			}
+			fileInfo.delete();
 		}
 		
 	}
 
     // Initialization
     private int maxOpenFiles = DEFAULT_MAX_OPEN_FILES;
-    private long maxFileSize = DEFAULT_MAX_FILESIZE * 1024L * 1024L; // 2GB
     private String directory;
     private File dirFile;
 
@@ -216,14 +193,6 @@
         }
     }
     
-    public void setMaxFileSize(long maxFileSize) {
-    	this.maxFileSize = maxFileSize * 1024L * 1024L;
-	}
-    
-    void setMaxFileSizeDirect(long maxFileSize) {
-    	this.maxFileSize = maxFileSize;
-    }
-    
     public void setMaxOpenFiles(int maxOpenFiles) {
 		this.maxOpenFiles = maxOpenFiles;
 	}
@@ -232,15 +201,15 @@
 		this.directory = directory;
 	}
     
-    File createFile(String name, int fileNumber) throws TeiidComponentException {
+    File createFile(String name) throws TeiidComponentException {
         try {
-        	File storageFile = File.createTempFile(FILE_PREFIX + name + "_" + String.valueOf(fileNumber) + "_", null, this.dirFile); //$NON-NLS-1$ //$NON-NLS-2$
+        	File storageFile = File.createTempFile(FILE_PREFIX + name + "_", null, this.dirFile); //$NON-NLS-1$
             if (LogManager.isMessageToBeRecorded(org.teiid.logging.LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
                 LogManager.logDetail(org.teiid.logging.LogConstants.CTX_BUFFER_MGR, "Created temporary storage area file " + storageFile.getAbsoluteFile()); //$NON-NLS-1$
             }
             return storageFile;
         } catch(IOException e) {
-        	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_creating", name + "_" + fileNumber)); //$NON-NLS-1$ //$NON-NLS-2$
+        	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_creating", name)); //$NON-NLS-1$
         }
     }
     
@@ -248,10 +217,6 @@
     	return new DiskStore(name);
     }
     
-    public long getMaxFileSize() {
-		return maxFileSize;
-	}
-    
     public String getDirectory() {
 		return directory;
 	}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -25,6 +25,8 @@
 import java.lang.reflect.Array;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.teiid.common.buffer.TupleBatch;
 import org.teiid.core.types.DataTypeManager;
@@ -40,6 +42,27 @@
 public final class SizeUtility {
 	public static final int REFERENCE_SIZE = 8;
 	
+	private static Map<Class<?>, int[]> SIZE_ESTIMATES = new HashMap<Class<?>, int[]>(128);
+	
+	static {
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.STRING, new int[] {100, 256});
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.DATE, new int[] {20, 28});
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.TIME, new int[] {20, 28});
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.TIMESTAMP, new int[] {20, 28});
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.LONG, new int[] {12, 16});
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.DOUBLE, new int[] {12, 16});
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.INTEGER, new int[] {6, 12});
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.FLOAT, new int[] {6, 12});
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.CHAR, new int[] {4, 10});
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.SHORT, new int[] {4, 10});
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.OBJECT, new int[] {1024, 1024});
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.NULL, new int[] {0, 0});
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.BYTE, new int[] {1, 1});
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.BOOLEAN, new int[] {1, 1});
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.BIG_INTEGER, new int[] {75, 100});
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.BIG_DECIMAL, new int[] {150, 200});
+	}
+	
 	private long bigIntegerEstimate;
 	private long bigDecimalEstimate;
 	
@@ -85,35 +108,12 @@
     
     static int getSize(boolean isValueCacheEnabled,
 			Class<?> type) {
-		if (type == DataTypeManager.DefaultDataClasses.STRING) {
-			return isValueCacheEnabled?100:256; //assumes an "average" string length of approximately 100 chars
-		} else if (type == DataTypeManager.DefaultDataClasses.DATE 
-				|| type == DataTypeManager.DefaultDataClasses.TIME 
-				|| type == DataTypeManager.DefaultDataClasses.TIMESTAMP) {
-			return isValueCacheEnabled?20:28;
-		} else if (type == DataTypeManager.DefaultDataClasses.LONG 
-				|| type	 == DataTypeManager.DefaultDataClasses.DOUBLE) {
-			return isValueCacheEnabled?12:16;
-		} else if (type == DataTypeManager.DefaultDataClasses.INTEGER 
-				|| type == DataTypeManager.DefaultDataClasses.FLOAT) {
-			return isValueCacheEnabled?6:12;
-		} else if (type == DataTypeManager.DefaultDataClasses.CHAR 
-				|| type == DataTypeManager.DefaultDataClasses.SHORT) {
-			return isValueCacheEnabled?4:10;
-		} else if (type == DataTypeManager.DefaultDataClasses.OBJECT) {
-			return 1024;
-		} else if (type == DataTypeManager.DefaultDataClasses.NULL) {
-			return 0; //it's free
-		} else if (type == DataTypeManager.DefaultDataClasses.BYTE
-				|| type == DataTypeManager.DefaultDataClasses.BOOLEAN) {
-			return 1; //should always be value cached, but there's a small chance it's not
-		} else if (type == DataTypeManager.DefaultDataClasses.BIG_INTEGER){
-			return isValueCacheEnabled?75:100;
-		} else if (type == DataTypeManager.DefaultDataClasses.BIG_DECIMAL) {
-		 	return isValueCacheEnabled?150:200;
-		}
-		return 512; //this is is misleading for lobs
-		//most references are not actually removed from memory
+    	int[] vals = SIZE_ESTIMATES.get(type);
+    	if (vals == null) {
+			return 512; //this is is misleading for lobs
+			//most references are not actually removed from memory
+    	}
+    	return vals[isValueCacheEnabled?0:1];
 	}
     
     /**

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -0,0 +1,113 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.teiid.common.buffer.FileStore;
+import org.teiid.common.buffer.StorageManager;
+import org.teiid.core.TeiidComponentException;
+
+public class SplittableStorageManager implements StorageManager {
+	
+	public static final long DEFAULT_MAX_FILESIZE = 2L * 1024L;
+    private long maxFileSize = DEFAULT_MAX_FILESIZE * 1024L * 1024L; // 2GB
+	private StorageManager storageManager;
+	
+	public SplittableStorageManager(StorageManager storageManager) {
+		this.storageManager = storageManager;
+	}
+	
+	@Override
+	public FileStore createFileStore(String name) {
+		return new SplittableFileStore(name);
+	}
+	
+	@Override
+	public void initialize() throws TeiidComponentException {
+		storageManager.initialize();
+	}
+	
+	public class SplittableFileStore extends FileStore {
+	    private String name;
+		private ConcurrentSkipListMap<Long, FileStore> storageFiles = new ConcurrentSkipListMap<Long, FileStore>(); 
+	    
+	    public SplittableFileStore(String name) {
+			this.name = name;
+		}
+	    
+	    public int readDirect(long fileOffset, byte[] b, int offSet, int length) throws TeiidComponentException {
+	    	Map.Entry<Long, FileStore> entry = storageFiles.floorEntry(fileOffset);
+	    	FileStore fileInfo = entry.getValue();
+	    	return fileInfo.read(fileOffset - entry.getKey(), b, offSet, length);
+	    }
+
+		public void writeDirect(byte[] bytes, int offset, int length) throws TeiidComponentException {
+			Map.Entry<Long, FileStore> entry = this.storageFiles.lastEntry();
+			boolean createNew = false;
+			FileStore fileInfo = null;
+			long fileOffset = 0;
+			if (entry == null) {
+				createNew = true;
+			} else {
+				fileInfo = entry.getValue();
+				fileOffset = entry.getKey();
+				createNew = entry.getValue().getLength() + length > getMaxFileSize();
+			}
+			if (createNew) {
+				FileStore newFileInfo = storageManager.createFileStore(name + "_" + storageFiles.size()); //$NON-NLS-1$
+	            if (fileInfo != null) {
+	            	fileOffset += fileInfo.getLength();
+	            }
+	            storageFiles.put(fileOffset, newFileInfo);
+	            fileInfo = newFileInfo;
+	        }
+			fileInfo.write(bytes, offset, length);
+		}
+		
+		public void removeDirect() {
+			for (FileStore info : storageFiles.values()) {
+				info.remove();
+			}
+		}
+		
+	}
+	
+    public long getMaxFileSize() {
+		return maxFileSize;
+	}
+	
+    public void setMaxFileSize(long maxFileSize) {
+    	this.maxFileSize = maxFileSize * 1024L * 1024L;
+	}
+    
+    void setMaxFileSizeDirect(long maxFileSize) {
+    	this.maxFileSize = maxFileSize;
+    }
+    
+    public StorageManager getStorageManager() {
+		return storageManager;
+	}
+
+}


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -233,7 +233,6 @@
                 props,
                 this.requestMsg.getShowPlan() != ShowPlan.OFF);
         this.context.setProcessorBatchSize(bufferManager.getProcessorBatchSize());
-        this.context.setConnectorBatchSize(bufferManager.getConnectorBatchSize());
         this.context.setGlobalTableStore(this.globalTables);
         if (multiSourceModels != null) {
             MultiSourcePlanToProcessConverter modifier = new MultiSourcePlanToProcessConverter(

Modified: trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -163,7 +163,7 @@
      */
     public void initialize(CommandContext context, ProcessorDataManager dataMgr, BufferManager bufferMgr) {       
         this.bufferMgr = bufferMgr;
-        this.batchSize = bufferMgr.getProcessorBatchSize();
+        this.batchSize = bufferMgr.getProcessorBatchSize(getOutputElements());
         setContext(context.clone());
         this.dataMgr = new ProcessorDataManager() {
 			

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/NullNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/NullNode.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/NullNode.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -22,6 +22,9 @@
 
 package org.teiid.query.processor.relational;
 
+import java.util.Collections;
+import java.util.List;
+
 import org.teiid.common.buffer.TupleBatch;
 import org.teiid.core.TeiidComponentException;
 
@@ -38,11 +41,12 @@
         this.terminateBatches();
         return pullBatch();
     }
+    
+    @Override
+    public List getOutputElements() {
+    	return Collections.emptyList();
+    }
         
-    protected void getNodeString(StringBuffer str) {
-        super.getNodeString(str);
-    }
-
 	public Object clone(){
 		NullNode clonedNode = new NullNode(super.getID());
 		super.copy(this, clonedNode);

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -114,7 +114,11 @@
             this.getProcessingState().nodeStatistics = new RelationalNodeStatistics();
         }
 
-        this.getProcessingState().batchSize = bufferManager.getProcessorBatchSize();
+        if (getOutputElements() != null) {
+        	this.getProcessingState().batchSize = bufferManager.getProcessorBatchSize(getOutputElements());
+        } else {
+        	this.getProcessingState().batchSize = bufferManager.getProcessorBatchSize();
+        }
     }
 
     public CommandContext getContext() {

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -94,6 +94,7 @@
     private String groupName;
     private List<? extends Expression> schema;
     private int schemaSize;
+    private int batchSize;
 	private ListNestedSortComparator comparator;
 
     private TupleBuffer output;
@@ -164,6 +165,7 @@
         this.groupName = groupName;
         this.schema = schema;
         this.schemaSize = bufferManager.getSchemaSize(this.schema);
+        this.batchSize = bufferManager.getProcessorBatchSize(this.schema);
         this.comparator = new ListNestedSortComparator(cols, sortTypes);
         int distinctIndex = cols.length - 1;
         this.comparator.setDistinctIndex(distinctIndex);
@@ -226,7 +228,7 @@
     		
             int totalReservedBuffers = 0;
             try {
-	            int maxRows = this.bufferManager.getProcessorBatchSize();
+	            int maxRows = this.batchSize;
 		        while(!doneReading) {
 		        	//attempt to reserve more working memory if there are additional rows available before blocking
 		        	if (workingTuples.size() >= maxRows) {
@@ -236,7 +238,7 @@
 		        			break;
 		        		} 
 		        		totalReservedBuffers += reserved;
-		        		maxRows += bufferManager.getProcessorBatchSize();	
+		        		maxRows += this.batchSize;	
 		        	}
 		            try {
 		            	List<?> tuple = source.nextTuple();
@@ -249,7 +251,7 @@
 	                    	this.collected++;
 	                    }
 		            } catch(BlockedException e) {
-		            	if (workingTuples.size() >= bufferManager.getProcessorBatchSize()) {
+		            	if (workingTuples.size() >= this.batchSize) {
 		            		break;
 		            	}
 		            	if (mode != Mode.DUP_REMOVE  

Modified: trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -144,11 +144,8 @@
         	
 			if (this.resultInfo.isAutoStaged() && tempTable != null) {
 				AlterTempTable att = new AlterTempTable(tempTable);
-				int size = (Integer)this.currentRow.get(0);
-				if (size > this.bufferMgr.getProcessorBatchSize() * 2) {
-					//TODO: if the parent is small, then this is not necessary
-					att.setIndexColumns(this.resultInfo.getFkColumns());
-				}
+				//TODO: if the parent is small, then this is not necessary
+				att.setIndexColumns(this.resultInfo.getFkColumns());
 				this.dataManager.registerRequest(this.internalProcessor.getContext(), att, TempMetadataAdapter.TEMP_MODEL.getName(), null, 0, -1);
 			}
 

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -213,7 +213,7 @@
     		TempTable tt = contextStore.getTempTable(att.getTempTable().toUpperCase());
     		Assertion.isNotNull(tt, "Table doesn't exist"); //$NON-NLS-1$
     		tt.setUpdatable(false);
-    		if (att.getIndexColumns() != null) {
+    		if (att.getIndexColumns() != null && tt.getRowCount() > 2*tt.getTree().getPageSize(true)) {
     			tt.addIndex(att.getIndexColumns(), false);
     		}
     		return CollectionTupleSource.createUpdateCountTupleSource(0);

Modified: trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -26,6 +26,7 @@
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
@@ -78,8 +79,6 @@
 
 	    private int processorBatchSize = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE;
 	    
-	    private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
-
 	    private String userName;
 	    
 	    private Serializable commandPayload;
@@ -312,20 +311,15 @@
         return this.globalState.collectNodeStatistics;
     }
     
-    public int getConnectorBatchSize() {
-        return this.globalState.connectorBatchSize;
-    }
-    
-    public void setConnectorBatchSize(int connectorBatchSize) {
-        this.globalState.connectorBatchSize = connectorBatchSize;
-    }
-
-    
+    @Override
     public int getProcessorBatchSize() {
         return this.globalState.processorBatchSize;
     }
-
     
+    public int getProcessorBatchSize(List<Expression> schema) {
+    	return this.globalState.bufferManager.getProcessorBatchSize(schema);
+    }
+    
     public void setProcessorBatchSize(int processorBatchSize) {
         this.globalState.processorBatchSize = processorBatchSize;
     }

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -79,7 +79,7 @@
 
 	@Test public void testOrderedInsert() throws TeiidComponentException {
 		BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
-		bm.setProcessorBatchSize(16);
+		bm.setProcessorBatchSize(4);
 		
 		ElementSymbol e1 = new ElementSymbol("x");
 		e1.setType(Integer.class);

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -27,9 +27,7 @@
 import java.io.File;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.RandomAccessFile;
 import java.util.Arrays;
-import java.util.Map;
 import java.util.Random;
 
 import org.junit.Test;
@@ -41,12 +39,9 @@
 @SuppressWarnings("nls")
 public class TestFileStorageManager {
 		
-	public FileStorageManager getStorageManager(Integer maxFileSize, Integer openFiles, String dir) throws TeiidComponentException {
+	public FileStorageManager getStorageManager(Integer openFiles, String dir) throws TeiidComponentException {
         FileStorageManager sm = new FileStorageManager();
         sm.setStorageDirectory(UnitTestUtil.getTestScratchPath() + (dir != null ? File.separator + dir : "")); //$NON-NLS-1$
-        if (maxFileSize != null) {
-        	sm.setMaxFileSizeDirect(maxFileSize);
-        }
         if (openFiles != null) {
         	sm.setMaxOpenFiles(openFiles);
         }
@@ -55,7 +50,7 @@
 	}
     
     @Test public void testWrite() throws Exception {
-        FileStorageManager sm = getStorageManager(null, null, null);        
+        FileStorageManager sm = getStorageManager(null, null);        
         String tsID = "0";     //$NON-NLS-1$
         FileStore store = sm.createFileStore(tsID);
         writeBytes(store);
@@ -64,27 +59,8 @@
         assertEquals(0, sm.getUsedBufferSpace());
     }
             
-    @Test public void testCreatesSpillFiles() throws Exception {
-        FileStorageManager sm = getStorageManager(1024, null, null); // 1KB
-        String tsID = "0";     //$NON-NLS-1$
-        // Add one batch
-        FileStore store = sm.createFileStore(tsID);
-        writeBytes(store);
-        
-        Map<File, RandomAccessFile> cache = sm.getFileCache();
-        assertEquals(1, cache.size());
-
-        writeBytes(store);
-        
-        assertEquals(2, cache.size());
-        
-        store.remove();
-        
-        assertEquals(0, cache.size());
-    }
-    
     @Test(expected=TeiidComponentException.class) public void testMaxSpace() throws Exception {
-    	FileStorageManager sm = getStorageManager(null, null, null); 
+    	FileStorageManager sm = getStorageManager(null, null); 
     	sm.setMaxBufferSpace(1);
         String tsID = "0";     //$NON-NLS-1$
         // Add one batch
@@ -93,7 +69,7 @@
     }
     
     @Test public void testFlush() throws Exception {
-    	FileStorageManager sm = getStorageManager(null, null, null);
+    	FileStorageManager sm = getStorageManager(null, null);
     	FileStore store = sm.createFileStore("0");
     	FileStoreOutputStream fsos = store.createOutputStream(2);
     	fsos.write(new byte[3]);
@@ -104,7 +80,7 @@
 
     static Random r = new Random();
     
-	private void writeBytes(FileStore store)
+	static void writeBytes(FileStore store)
 			throws TeiidComponentException {
 		byte[] bytes = new byte[2048];
         r.nextBytes(bytes);
@@ -114,9 +90,8 @@
         assertTrue(Arrays.equals(bytes, bytesRead));
 	}
     
-	
     @Test public void testWritingMultipleFiles() throws Exception {
-    	FileStorageManager sm = getStorageManager(1024, null, null); 
+    	FileStorageManager sm = getStorageManager(null, null); 
         String tsID = "0";     //$NON-NLS-1$
         // Add one batch
         FileStore store = sm.createFileStore(tsID);

Added: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java	                        (rev 0)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -0,0 +1,52 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.teiid.common.buffer.FileStore;
+
+public class TestSplittableStorageManager {
+	
+    @Test public void testCreatesSpillFiles() throws Exception {
+    	MemoryStorageManager msm = new MemoryStorageManager();
+        SplittableStorageManager ssm = new SplittableStorageManager(msm);
+        ssm.setMaxFileSizeDirect(2048);
+        String tsID = "0";     //$NON-NLS-1$
+        // Add one batch
+        FileStore store = ssm.createFileStore(tsID);
+        TestFileStorageManager.writeBytes(store);
+        
+        assertEquals(1, msm.getCreated());
+
+        TestFileStorageManager.writeBytes(store);
+        
+        assertEquals(2, msm.getCreated());
+        
+        store.remove();
+        
+        assertEquals(2, msm.getRemoved());
+    }
+
+}


Property changes on: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -302,8 +302,8 @@
 	}
 	
     @Test public void testBufferLimit() throws Exception {
-    	//the sql should return 100 rows
-        String sql = "SELECT A.IntKey FROM BQT1.SmallA as A, BQT1.SmallA as B"; //$NON-NLS-1$
+    	//the sql should return 400 rows
+        String sql = "SELECT A.IntKey FROM BQT1.SmallA as A, BQT1.SmallA as B, (select intkey from BQT1.SmallA limit 4) as C"; //$NON-NLS-1$
         String userName = "1"; //$NON-NLS-1$
         String sessionid = "1"; //$NON-NLS-1$
         
@@ -315,27 +315,29 @@
         Future<ResultsMessage> message = core.executeRequest(reqMsg.getExecutionId(), reqMsg);
         ResultsMessage rm = message.get(500000, TimeUnit.MILLISECONDS);
         assertNull(rm.getException());
-        assertEquals(2, rm.getResults().length);
+
+        int rowsPerBatch = 8;
+		assertEquals(rowsPerBatch, rm.getResults().length);
         RequestWorkItem item = core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
 
-        message = core.processCursorRequest(reqMsg.getExecutionId(), 3, 2);
+        message = core.processCursorRequest(reqMsg.getExecutionId(), 9, rowsPerBatch);
         rm = message.get(500000, TimeUnit.MILLISECONDS);
         assertNull(rm.getException());
-        assertEquals(2, rm.getResults().length);
+        assertEquals(rowsPerBatch, rm.getResults().length);
         //ensure that we are idle
         for (int i = 0; i < 10 && item.getThreadState() != ThreadState.IDLE; i++) {
         	Thread.sleep(100);
         }
         assertEquals(ThreadState.IDLE, item.getThreadState());
-        assertTrue(item.resultsBuffer.getManagedRowCount() <= 46);
+        assertTrue(item.resultsBuffer.getManagedRowCount() <= rowsPerBatch*23);
         //pull the rest of the results
         for (int j = 0; j < 48; j++) {
             item = core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
 
-	        message = core.processCursorRequest(reqMsg.getExecutionId(), j * 2 + 5, 2);
+	        message = core.processCursorRequest(reqMsg.getExecutionId(), (j + 2) * rowsPerBatch + 1, rowsPerBatch);
 	        rm = message.get(5000, TimeUnit.MILLISECONDS);
 	        assertNull(rm.getException());
-	        assertEquals(2, rm.getResults().length);
+	        assertEquals(rowsPerBatch, rm.getResults().length);
         }
     }
     
@@ -353,7 +355,7 @@
         Future<ResultsMessage> message = core.executeRequest(reqMsg.getExecutionId(), reqMsg);
         ResultsMessage rm = message.get(500000, TimeUnit.MILLISECONDS);
         assertNull(rm.getException());
-        assertEquals(2, rm.getResults().length);
+        assertEquals(8, rm.getResults().length);
         RequestWorkItem item = core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
         assertEquals(100, item.resultsBuffer.getRowCount());
     }

Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -365,7 +365,6 @@
 		props.setProperty("soap_port", "12345"); //$NON-NLS-1$ //$NON-NLS-2$
 		CommandContext context = new CommandContext("0", "test", "user", null, "myvdb", 1, props, DEBUG); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
         context.setProcessorBatchSize(BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE);
-        context.setConnectorBatchSize(BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE);
         context.setBufferManager(BufferManagerFactory.getStandaloneBufferManager());
         context.setPreparedPlanCache(new SessionAwareCache<PreparedPlan>());
 		return context;
@@ -1080,7 +1079,6 @@
 
         CommandContext context = createCommandContext();
         context.setProcessorBatchSize(2);
-        context.setConnectorBatchSize(2);
         context.setMetadata(RealMetadataFactory.example1Cached());
         
         // Plan query

Modified: trunk/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -33,9 +33,9 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.teiid.common.buffer.BlockedException;
-import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.BufferManagerFactory;
 import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.impl.BufferManagerImpl;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.types.DataTypeManager;
@@ -232,7 +232,8 @@
     }
     
     public void helpTestJoinDirect(List[] expectedResults, int batchSize, int processingBytes) throws TeiidComponentException, TeiidProcessingException {
-        BufferManager mgr = BufferManagerFactory.getTestBufferManager(processingBytes, batchSize);
+        BufferManagerImpl mgr = BufferManagerFactory.getTestBufferManager(processingBytes, batchSize);
+        mgr.setTargetBytesPerRow(100);
         CommandContext context = new CommandContext("pid", "test", null, null, 1);               //$NON-NLS-1$ //$NON-NLS-2$
         
         join.addChild(leftNode);

Modified: trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -72,10 +72,9 @@
         ElementSymbol elementSymbol_2 = new ElementSymbol("myGroup.myElement2"); //$NON-NLS-1$
         elementSymbol_1.setType(Integer.class);
         elementSymbol_2.setType(String.class);
-        ArrayList elements = new ArrayList();
-        elements.add(elementSymbol_1);
-        elements.add(elementSymbol_2);
+        List<ElementSymbol> elements = Arrays.asList(elementSymbol_1, elementSymbol_2);
         node.setIntoElements(elements); 
+        child.setElements(elements);
         node.setMode(mode);
         node.setModelName("myModel"); //$NON-NLS-1$
         

Modified: trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -34,6 +34,7 @@
 import org.teiid.common.buffer.impl.BufferManagerImpl;
 import org.teiid.common.buffer.impl.FileStorageManager;
 import org.teiid.common.buffer.impl.MemoryStorageManager;
+import org.teiid.common.buffer.impl.SplittableStorageManager;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidRuntimeException;
 import org.teiid.core.util.FileUtils;
@@ -61,7 +62,7 @@
 	private int processorBatchSize = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE;
 	private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
     private int maxOpenFiles = FileStorageManager.DEFAULT_MAX_OPEN_FILES;
-    private long maxFileSize = FileStorageManager.DEFAULT_MAX_FILESIZE; // 2GB
+    private long maxFileSize = SplittableStorageManager.DEFAULT_MAX_FILESIZE; // 2GB
     private int maxProcessingKb = BufferManager.DEFAULT_MAX_PROCESSING_KB;
     private int maxReserveKb = BufferManager.DEFAULT_RESERVE_BUFFER_KB;
     private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE;
@@ -100,11 +101,12 @@
                 // Get the properties for FileStorageManager and create.
                 fsm = new FileStorageManager();
                 fsm.setStorageDirectory(bufferDir.getCanonicalPath());
-                fsm.setMaxFileSize(maxFileSize);
                 fsm.setMaxOpenFiles(maxOpenFiles);
                 fsm.setMaxBufferSpace(maxBufferSpace*MB);
-                fsm.initialize();        
-                this.bufferMgr.setStorageManager(fsm);
+                SplittableStorageManager ssm = new SplittableStorageManager(fsm);
+                ssm.setMaxFileSize(maxFileSize);
+                ssm.initialize();        
+                this.bufferMgr.setStorageManager(ssm);
             } else {
             	this.bufferMgr.setStorageManager(new MemoryStorageManager());
             }

Modified: trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java	2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java	2011-09-12 18:23:57 UTC (rev 3465)
@@ -31,6 +31,7 @@
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.impl.BufferManagerImpl;
 import org.teiid.common.buffer.impl.FileStorageManager;
+import org.teiid.common.buffer.impl.SplittableStorageManager;
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.core.util.UnitTestUtil;
 import org.teiid.query.sql.symbol.Constant;
@@ -53,7 +54,8 @@
         assertTrue(svc.isUseDisk());
         
         BufferManagerImpl mgr = (BufferManagerImpl) svc.getBufferManager();
-        assertTrue(((FileStorageManager)mgr.getStorageManager()).getDirectory().endsWith(svc.getBufferDirectory().getName()));
+        SplittableStorageManager ssm = (SplittableStorageManager)mgr.getStorageManager();
+        assertTrue(((FileStorageManager)ssm.getStorageManager()).getDirectory().endsWith(svc.getBufferDirectory().getName()));
     }
 
     @Test public void testCheckMemPropertyGotSet2() throws Exception {
@@ -91,7 +93,8 @@
         svc.start();
         
         BufferManager mgr = svc.getBufferManager();
-        assertEquals(13141, mgr.getSchemaSize(schema));
+        assertEquals(6570, mgr.getSchemaSize(schema));
+        assertEquals(256, mgr.getProcessorBatchSize(schema));
     }
     
 }



More information about the teiid-commits mailing list