[teiid-commits] teiid SVN: r3051 - in trunk: engine/src/main/java/org/teiid/common/buffer and 9 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Sat Apr 2 07:57:45 EDT 2011


Author: shawkins
Date: 2011-04-02 07:57:44 -0400 (Sat, 02 Apr 2011)
New Revision: 3051

Added:
   trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
Removed:
   trunk/engine/src/main/java/org/teiid/query/processor/relational/PartitionedSortJoin.java
Modified:
   trunk/documentation/reference/src/main/docbook/en-US/content/architecture.xml
   trunk/documentation/reference/src/main/docbook/en-US/content/federated_planning.xml
   trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
   trunk/engine/src/main/java/org/teiid/common/buffer/TupleBrowser.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/query/optimizer/relational/PlanToProcessConverter.java
   trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleAssignOutputElements.java
   trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleImplementJoinStrategy.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/ListNestedSortComparator.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
   trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
   trunk/engine/src/test/java/org/teiid/query/optimizer/TestOptimizer.java
   trunk/engine/src/test/java/org/teiid/query/optimizer/TestPartitionedJoinPlanning.java
   trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
   trunk/engine/src/test/java/org/teiid/query/processor/TestVirtualDepJoin.java
   trunk/engine/src/test/java/org/teiid/query/processor/relational/NodeTestUtil.java
   trunk/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
Log:
TEIID-1517 changing the join partitioning logic to use an index instead or to just perform repeated merges for unbalanced joins.

Modified: trunk/documentation/reference/src/main/docbook/en-US/content/architecture.xml
===================================================================
--- trunk/documentation/reference/src/main/docbook/en-US/content/architecture.xml	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/documentation/reference/src/main/docbook/en-US/content/architecture.xml	2011-04-02 11:57:44 UTC (rev 3051)
@@ -148,11 +148,11 @@
         have a match, emit a row.  In general, merge join is
         on the order of n+m rather than n*m in nested loop.  Merge join is the
         default algorithm.</para>
-      <para>Any of the Join Algorithms above can be made into a
-        dependent join.  The
-        decision to implement a dependent join is considered after the
-        join algorithm is chosen, and does not currently influence the
-        algorithm selection.</para>
+      <para>Using costing information the engine may also delay the decision
+      to perform a full sort merge join.  Based upon the actual row counts involved, the engine 
+      can choose to build an index of the smaller side (which will perform similarly to a hash join) 
+      or to only partially sort the larger side of the relation.</para>
+      <para>Joins involving equi-join predicates are also eligible to be made into <xref linkend="dependent_joins"/>.</para>
     </section>
     <section>
       <title>Sort Based Algorithms</title>
@@ -162,13 +162,13 @@
         does not require all of the result set to ever be in memory yet
         uses the maximal amount of memory allowed by the buffer manager.
       </para>
-      <para>It consists of two phases.  The first phase (“sort”) will
+      <para>It consists of two phases.  The first phase ("sort") will
         take an unsorted input stream and produce one or more sorted
         input streams.  Each pass reads as much of the unsorted stream
         as possible, sorts it, and writes it back out as a new stream.
          Since the stream may be more than can fit in memory, this may
         result in many sorted streams.</para>
-      <para>The second phase (“merge”) consists of a set of phases
+      <para>The second phase ("merge") consists of a set of phases
         that grab the next batch from as many sorted input streams as
         will fit in memory.  It then repeatedly grabs the next tuple in
         sorted order from each stream and outputs merged sorted batches

Modified: trunk/documentation/reference/src/main/docbook/en-US/content/federated_planning.xml
===================================================================
--- trunk/documentation/reference/src/main/docbook/en-US/content/federated_planning.xml	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/documentation/reference/src/main/docbook/en-US/content/federated_planning.xml	2011-04-02 11:57:44 UTC (rev 3051)
@@ -188,6 +188,9 @@
           inefficient join structure and may result in many source
           queries.</para>
       </tip>
+      <para>The engine will for IN clauses to filter the values coming from the dependent side.  
+      If the number of values from the independent side exceeds the translators MaxInCriteriaSize, the values will be split into multiple IN predicates up to MaxDependentPredicates.  
+      When the number of independent values exceeds MaxInCriteriaSize*MaxDependentPredicates, then multiple dependent queries will be issued in parallel.</para>
     </section>
     <section>
       <title>Copy Criteria</title>

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/STree.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/STree.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -23,8 +23,8 @@
 package org.teiid.common.buffer;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
@@ -33,6 +33,7 @@
 
 import org.teiid.common.buffer.SPage.SearchResult;
 import org.teiid.core.TeiidComponentException;
+import org.teiid.query.processor.relational.ListNestedSortComparator;
 
 /**
  * Self balancing search tree using skip list like logic
@@ -53,7 +54,7 @@
 	protected volatile SPage[] header = new SPage[] {new SPage(this, true)};
     protected BatchManager keyManager;
     protected BatchManager leafManager;
-    protected Comparator comparator;
+    protected ListNestedSortComparator comparator;
     protected int pageSize;
     protected int keyLength;
     protected String[] types;
@@ -66,7 +67,7 @@
 	
 	public STree(BatchManager manager,
 			BatchManager leafManager,
-            final Comparator comparator,
+            final ListNestedSortComparator comparator,
             int pageSize,
             int keyLength,
             String[] types) {
@@ -168,7 +169,7 @@
 		return null;
 	}
 	
-	public List insert(List tuple, InsertMode mode) throws TeiidComponentException {
+	public List insert(List tuple, InsertMode mode, int sizeHint) throws TeiidComponentException {
 		LinkedList<SearchResult> places = new LinkedList<SearchResult>();
 		List match = null;
 		if (mode == InsertMode.ORDERED) {
@@ -193,7 +194,16 @@
 			}
 		}
 		List key = extractKey(tuple);
-		int level = randomLevel(); 
+		int level = 0;
+		if (mode != InsertMode.ORDERED || sizeHint == -1) {
+			level = randomLevel(); 
+		} else if (!places.isEmpty() && places.getLast().values.getTuples().size() == pageSize) {
+			int row = rowCount.get();
+			while (row != 0 && row%pageSize == 0) {
+				row = (row - pageSize + 1)/pageSize;
+				level++;
+			}
+		}
 		assert header.length == places.size();
 		if (level >= header.length) {
 			header = Arrays.copyOf(header, level + 1);
@@ -219,7 +229,10 @@
 	}
 	
 	List extractKey(List tuple) {
-		return tuple.subList(0, keyLength);
+		if (tuple.size() > keyLength) {
+			return new ArrayList(tuple.subList(0, keyLength));
+		}
+		return tuple;
 	}
 
 	SPage insert(List k, SearchResult result, SearchResult parent, Object value, boolean ordered) throws TeiidComponentException {
@@ -396,4 +409,33 @@
 		return preferMemory;
 	}
 	
+	public ListNestedSortComparator getComparator() {
+		return comparator;
+	}
+	
+	/**
+	 * Quickly check if the index can be compacted
+	 */
+	public void compact() {
+		while (true) {
+			if (this.header.length == 1) {
+				return;
+			}
+			SPage child = this.header[header.length - 2];
+			if (child.next != null) {
+				//TODO: condense the page pointers
+				return;
+			}
+			//remove unneeded index level
+			this.header = Arrays.copyOf(this.header, header.length - 1);
+		}
+	}
+
+	public void removeRowIdFromKey() {
+		this.keyLength--;
+		int[] sortParameters = this.comparator.getSortParameters();
+		sortParameters = Arrays.copyOf(sortParameters, sortParameters.length - 1);
+		this.comparator.setSortParameters(sortParameters);
+	}
+	
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBrowser.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBrowser.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBrowser.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -79,11 +79,11 @@
 		this.tree = sTree;
 		this.direction = direction;
 		
-		init(lowerBound, upperBound);
+		init(lowerBound, upperBound, false);
 	}
 
 	private void init(List<Object> lowerBound,
-			List<?> upperBound)
+			List<?> upperBound, boolean isPartialKey)
 			throws TeiidComponentException {
 		if (lowerBound != null) {
 			lowerBound.addAll(Collections.nCopies(tree.getKeyLength() - lowerBound.size(), null));
@@ -95,7 +95,7 @@
 		boolean valid = true;
 		
 		if (upperBound != null) {
-			if (lowerBound != null && this.tree.comparator.compare(upperBound, lowerBound) < 0) {
+			if (!isPartialKey && lowerBound != null && this.tree.comparator.compare(upperBound, lowerBound) < 0) {
 				valid = false;
 			}
 			LinkedList<SearchResult> places = new LinkedList<SearchResult>();
@@ -105,12 +105,15 @@
 			boundIndex = upper.index;
 			if (boundIndex < 0) {
 				//we are guaranteed by find to not get back the -1 index, unless
-				//there are now tuples, in which case a bound of -1 is fine
+				//there are no tuples, in which case a bound of -1 is fine
 				boundIndex = Math.min(upper.values.getTuples().size(), -boundIndex -1) - 1;
 			}
 			if (!direction) {
 				values = upper.values;
 			}
+			if (lowerBound != null) {
+				valid = index<=boundIndex;
+			}
 		} else {
 			while (bound == null || bound.children != null) {
 				bound = tree.findChildTail(bound);
@@ -165,7 +168,7 @@
 					return null;
 				}
 				if (newValue.size() < tree.getKeyLength()) {
-					init(new ArrayList<Object>(newValue), newValue);
+					init(new ArrayList<Object>(newValue), newValue, true);
 					inPartial = true;
 					continue;
 				}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -87,7 +87,7 @@
  */
 public class BufferManagerImpl implements BufferManager, StorageManager {
 	
-	private static final double KB_PER_VALUE = 64d/1024;
+	public static final double KB_PER_VALUE = 64d/1024;
 	private static final int IO_BUFFER_SIZE = 1 << 14;
 	private static final int COMPACTION_THRESHOLD = 1 << 25; //start checking at 32 megs
 	
@@ -528,7 +528,7 @@
 		if (this.maxProcessingBatches < 0) {
 			this.maxProcessingKB = Math.max((int)Math.min(128 * KB_PER_VALUE * processorBatchSize, Integer.MAX_VALUE), (int)(.1 * maxMemory)/maxActivePlans);
 		} else {
-			this.maxProcessingKB = Math.max(0, (int)Math.min(maxProcessingBatches * KB_PER_VALUE * processorBatchSize, Integer.MAX_VALUE));
+			this.maxProcessingKB = Math.max(0, (int)Math.min(Math.ceil(maxProcessingBatches * KB_PER_VALUE * processorBatchSize), Integer.MAX_VALUE));
 		}
 	}
 	
@@ -551,14 +551,15 @@
     	lock.lock();
 	    try {
 	    	if (mode == BufferReserveMode.WAIT) {
-		    	int waitCount = 0;
-		    	while (count - waitCount > this.reserveBatchKB) {
+	    		//don't wait for more than is available
+	    		int waitCount = Math.min(count, this.maxReserveBatchKB);
+		    	while (waitCount > 0 && waitCount > this.reserveBatchKB) {
 		    		try {
 						batchesFreed.await(100, TimeUnit.MILLISECONDS);
 					} catch (InterruptedException e) {
 						throw new TeiidRuntimeException(e);
 					}
-					waitCount++;
+					waitCount /= 2;
 		    	}	
 	    	}
 	    	if (this.reserveBatchKB >= count || mode == BufferReserveMode.FORCE) {

Modified: trunk/engine/src/main/java/org/teiid/query/optimizer/relational/PlanToProcessConverter.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/optimizer/relational/PlanToProcessConverter.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/main/java/org/teiid/query/optimizer/relational/PlanToProcessConverter.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -61,7 +61,7 @@
 import org.teiid.query.processor.relational.NestedLoopJoinStrategy;
 import org.teiid.query.processor.relational.NestedTableJoinStrategy;
 import org.teiid.query.processor.relational.NullNode;
-import org.teiid.query.processor.relational.PartitionedSortJoin;
+import org.teiid.query.processor.relational.EnhancedSortMergeJoinStrategy;
 import org.teiid.query.processor.relational.PlanExecutionNode;
 import org.teiid.query.processor.relational.ProjectIntoNode;
 import org.teiid.query.processor.relational.ProjectNode;
@@ -235,10 +235,10 @@
                 List joinCrits = (List) node.getProperty(NodeConstants.Info.JOIN_CRITERIA);
                 String depValueSource = (String) node.getProperty(NodeConstants.Info.DEPENDENT_VALUE_SOURCE);
                 SortOption leftSort = (SortOption)node.getProperty(NodeConstants.Info.SORT_LEFT);
-                if(stype == JoinStrategyType.MERGE || stype == JoinStrategyType.PARTITIONED_SORT) {
+                if(stype == JoinStrategyType.MERGE || stype == JoinStrategyType.ENHANCED_SORT) {
                 	MergeJoinStrategy mjStrategy = null;
-                	if (stype.equals(JoinStrategyType.PARTITIONED_SORT)) { 
-                		mjStrategy = new PartitionedSortJoin(leftSort, (SortOption)node.getProperty(NodeConstants.Info.SORT_RIGHT));
+                	if (stype.equals(JoinStrategyType.ENHANCED_SORT)) { 
+                		mjStrategy = new EnhancedSortMergeJoinStrategy(leftSort, (SortOption)node.getProperty(NodeConstants.Info.SORT_RIGHT));
                 	} else {
                 		mjStrategy = new MergeJoinStrategy(leftSort, (SortOption)node.getProperty(NodeConstants.Info.SORT_RIGHT), false);
                 	}

Modified: trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleAssignOutputElements.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleAssignOutputElements.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleAssignOutputElements.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -43,6 +43,7 @@
 import org.teiid.query.optimizer.relational.plantree.NodeConstants;
 import org.teiid.query.optimizer.relational.plantree.NodeEditor;
 import org.teiid.query.optimizer.relational.plantree.PlanNode;
+import org.teiid.query.processor.relational.RelationalNode;
 import org.teiid.query.sql.lang.Command;
 import org.teiid.query.sql.lang.Criteria;
 import org.teiid.query.sql.lang.OrderBy;
@@ -351,7 +352,7 @@
             PlanNode projectNode = allProjects.get(i);
             List<SingleElementSymbol> projectCols = (List<SingleElementSymbol>) projectNode.getProperty(NodeConstants.Info.PROJECT_COLS);
 
-            newCols = filter(filteredIndex, projectCols);
+            newCols = RelationalNode.projectTuple(filteredIndex, projectCols);
             
             projectNode.setProperty(NodeConstants.Info.PROJECT_COLS, newCols);
             if (updateGroups) {
@@ -403,15 +404,6 @@
 		return newCols;
 	}
 
-	static List<SingleElementSymbol> filter(int[] filteredIndex,
-			List<SingleElementSymbol> projectCols) {
-		List<SingleElementSymbol> newCols = new ArrayList<SingleElementSymbol>();
-		for(int j=0; j<filteredIndex.length; j++) {
-		    newCols.add(projectCols.get(filteredIndex[j]));
-		}
-		return newCols;
-	}
-
     /** 
      * Check all branches for either a dup removal or a non all union.
      *

Modified: trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleImplementJoinStrategy.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleImplementJoinStrategy.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleImplementJoinStrategy.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -41,6 +41,7 @@
 import org.teiid.query.optimizer.relational.plantree.NodeFactory;
 import org.teiid.query.optimizer.relational.plantree.PlanNode;
 import org.teiid.query.optimizer.relational.plantree.NodeConstants.Info;
+import org.teiid.query.processor.relational.RelationalNode;
 import org.teiid.query.processor.relational.JoinNode.JoinStrategyType;
 import org.teiid.query.processor.relational.MergeJoinStrategy.SortOption;
 import org.teiid.query.sql.lang.CompareCriteria;
@@ -98,7 +99,7 @@
             } 
             
             /**
-             * Don't push sorts for unbalanced inner joins, we prefer to use partitioning 
+             * Don't push sorts for unbalanced inner joins, we prefer to use a processing time cost based decision 
              */
             boolean pushLeft = true;
             boolean pushRight = true;
@@ -163,8 +164,8 @@
 					}
 				}
         		joinNode.setProperty(Info.JOIN_CRITERIA, joinCriteria);
-        		leftExpressions = RuleAssignOutputElements.filter(reorder, leftExpressions);
-            	rightExpressions = RuleAssignOutputElements.filter(reorder, rightExpressions);
+        		leftExpressions = RelationalNode.projectTuple(reorder, leftExpressions);
+            	rightExpressions = RelationalNode.projectTuple(reorder, rightExpressions);
             	joinNode.setProperty(NodeConstants.Info.LEFT_EXPRESSIONS, leftExpressions);
             	joinNode.setProperty(NodeConstants.Info.RIGHT_EXPRESSIONS, rightExpressions);
             }
@@ -173,7 +174,7 @@
 			insertSort(joinNode.getLastChild(), rightExpressions, joinNode, metadata, capabilitiesFinder, pushRight);
         	
         	if (joinNode.getProperty(NodeConstants.Info.JOIN_TYPE) == JoinType.JOIN_INNER && (!pushRight || !pushedLeft)) {
-        		joinNode.setProperty(NodeConstants.Info.JOIN_STRATEGY, JoinStrategyType.PARTITIONED_SORT);
+        		joinNode.setProperty(NodeConstants.Info.JOIN_STRATEGY, JoinStrategyType.ENHANCED_SORT);
         	}
         }
         

Copied: trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java (from rev 3026, trunk/engine/src/main/java/org/teiid/query/processor/relational/PartitionedSortJoin.java)
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -0,0 +1,312 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.query.processor.relational;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.teiid.common.buffer.IndexedTupleSource;
+import org.teiid.common.buffer.STree;
+import org.teiid.common.buffer.TupleBrowser;
+import org.teiid.common.buffer.TupleSource;
+import org.teiid.common.buffer.BufferManager.BufferReserveMode;
+import org.teiid.common.buffer.STree.InsertMode;
+import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.types.DataTypeManager;
+import org.teiid.query.processor.CollectionTupleSource;
+import org.teiid.query.sql.lang.OrderBy;
+import org.teiid.query.sql.symbol.ElementSymbol;
+import org.teiid.query.sql.symbol.SingleElementSymbol;
+
+
+/**
+ * Extends the basic fully sorted merge join to check for conditions necessary 
+ * to not fully sort one of the sides.
+ * 
+ * Will be used for inner joins and only if both sorts are not required.
+ * Degrades to a normal merge join if the tuples are balanced.
+ * 
+ * Refined in 7.4 to use a full index if it is small enough or a repeated merge, rather than a partitioning approach (which was really just a sinlge level index)
+ */
+public class EnhancedSortMergeJoinStrategy extends MergeJoinStrategy {
+	
+	private TupleSource currentSource;
+	private SourceState sortedSource;
+	private SourceState notSortedSource;
+	private List<?> partitionedTuple;
+	private TupleBrowser tb;
+	private int reserved;
+	private STree index;
+	private int[] reverseIndexes;
+	private List<?> sortedTuple;
+	private boolean repeatedMerge;
+	
+	/**
+	 * Number of index batches we'll allow to marked as prefers memory regardless of buffer space
+	 */
+	private int preferMemCutoff = 8;
+
+	public EnhancedSortMergeJoinStrategy(SortOption sortLeft, SortOption sortRight) {
+		super(sortLeft, sortRight, false);
+	}
+	
+	public void setPreferMemCutoff(int cutoff) {
+		this.preferMemCutoff = cutoff;
+	}
+	
+    @Override
+    public void close() {
+    	if (joinNode == null) {
+    		return;
+    	}
+    	super.close();
+    	if (this.index != null) {
+    		this.index.remove();
+    	}
+    	releaseReserved();
+    	this.index = null;
+    	this.tb = null;
+    	this.currentSource = null;
+    	this.sortedSource = null;
+    	this.notSortedSource = null;
+    	this.sortedTuple = null;
+    	this.reverseIndexes = null;
+    }
+    
+    /**
+     * Create an index of the smaller size
+     *  
+     * TODO: reuse existing temp table indexes
+     */
+    public void createIndex(SourceState state, boolean sorted) throws TeiidComponentException, TeiidProcessingException {
+    	int keyLength = state.getExpressionIndexes().length;
+    	List elements = state.getSource().getOutputElements();
+
+    	//TODO: minimize reordering, or at least detect when it's not necessary
+    	int[] reorderedSortIndex = Arrays.copyOf(state.getExpressionIndexes(), elements.size());
+    	Set<Integer> used = new HashSet<Integer>();
+    	for (int i : state.getExpressionIndexes()) {
+			used.add(i);
+    	}
+    	int j = state.getExpressionIndexes().length;
+    	for (int i = 0; i < elements.size(); i++) {
+    		if (!used.contains(i)) {
+    			reorderedSortIndex[j++] = i;
+    		}
+    	}
+    	List<SingleElementSymbol> reordered = RelationalNode.projectTuple(reorderedSortIndex, elements);
+    	if (!state.isDistinct()) {
+    		//need to add a rowid, just in case
+    		reordered = new ArrayList<SingleElementSymbol>(reordered);
+    		ElementSymbol id = new ElementSymbol("rowId"); //$NON-NLS-1$
+    		id.setType(DataTypeManager.DefaultDataClasses.INTEGER);
+    		reordered.add(keyLength, id);
+    		keyLength++;
+    	}
+    	index = this.joinNode.getBufferManager().createSTree(reordered, this.joinNode.getConnectionID(), keyLength);
+    	index.setPreferMemory(true);
+    	if (!state.isDistinct()) {
+    		index.getComparator().setDistinctIndex(keyLength-2);
+    	}
+    	IndexedTupleSource its = state.getTupleBuffer().createIndexedTupleSource(true);
+    	int rowId = 0;
+    	List<?> lastTuple = null;
+    	boolean sortedDistinct = sorted && !state.isDistinct();
+    	outer: while (its.hasNext()) {
+    		//detect if sorted and distinct
+    		List<?> originalTuple = its.nextTuple();
+    		//remove the tuple if it has null
+    		for (int i : state.getExpressionIndexes()) {
+    			if (originalTuple.get(i) == null) {
+    				continue outer;
+    			}
+    		}
+    		if (sortedDistinct && lastTuple != null && this.compare(lastTuple, originalTuple, state.getExpressionIndexes(), state.getExpressionIndexes()) == 0) {
+    			sortedDistinct = false;
+    		}
+    		lastTuple = originalTuple;
+    		List<Object> tuple = (List<Object>) RelationalNode.projectTuple(reorderedSortIndex, originalTuple);
+    		if (!state.isDistinct()) {
+    			tuple.add(keyLength - 1, rowId++);
+    		}
+    		index.insert(tuple, sorted?InsertMode.ORDERED:InsertMode.NEW, state.getTupleBuffer().getRowCount());
+    	}
+    	if (!sorted) {
+    		index.compact();
+    	}
+    	its.closeSource();
+    	this.reverseIndexes = new int[elements.size()];
+    	for (int i = 0; i < reverseIndexes.length; i++) {
+    		int oldIndex = reorderedSortIndex[i];
+    		this.reverseIndexes[oldIndex] = i + (!state.isDistinct()&&i>=keyLength-1?1:0); 
+    	}
+    	if (!state.isDistinct() 
+    			&& ((!sorted && index.getComparator().isDistinct()) || (sorted && sortedDistinct))) {
+    		this.index.removeRowIdFromKey();
+    		state.markDistinct(true);
+    	}
+    }
+    
+    @Override
+    protected void loadLeft() throws TeiidComponentException,
+    		TeiidProcessingException {
+    	//always buffer to determine row counts
+    	this.leftSource.getTupleBuffer();
+    }
+    
+    @Override
+    protected void loadRight() throws TeiidComponentException,
+    		TeiidProcessingException {
+    	this.rightSource.getTupleBuffer();
+    	
+    	if (processingSortRight == SortOption.SORT && shouldIndex(this.leftSource, this.rightSource)) {
+    		this.processingSortRight = SortOption.NOT_SORTED;
+    	} else if (processingSortLeft == SortOption.SORT && shouldIndex(this.rightSource, this.leftSource)) {
+    		this.processingSortLeft = SortOption.NOT_SORTED;
+    	} 
+    	if (this.processingSortLeft != SortOption.NOT_SORTED && this.processingSortRight != SortOption.NOT_SORTED) {
+    		super.loadRight();
+    		super.loadLeft();
+    		return; //degrade to merge join
+    	}
+        if (this.processingSortLeft == SortOption.NOT_SORTED) {
+        	this.sortedSource = this.rightSource;
+        	this.notSortedSource = this.leftSource;
+
+        	if (!repeatedMerge) {
+        		createIndex(this.rightSource, this.processingSortRight == SortOption.ALREADY_SORTED);
+        	} else {
+        		super.loadRight(); //sort if needed
+        		this.notSortedSource.sort(SortOption.NOT_SORTED); //do a single sort pass
+        	}
+        } else if (this.processingSortRight == SortOption.NOT_SORTED) {
+        	this.sortedSource = this.leftSource;
+        	this.notSortedSource = this.rightSource;
+
+        	if (!repeatedMerge) {
+        		createIndex(this.leftSource, this.processingSortLeft == SortOption.ALREADY_SORTED);
+        	} else {
+        		super.loadLeft(); //sort if needed
+        		this.notSortedSource.sort(SortOption.NOT_SORTED); //do a single sort pass
+        	}
+        }
+    }
+    
+    private boolean shouldIndex(SourceState possibleIndex, SourceState other) throws TeiidComponentException, TeiidProcessingException {
+    	if (possibleIndex.getRowCount() * 4 > other.getRowCount()) {
+    		return false; //index is too large
+    	}
+    	int schemaSize = this.joinNode.getBufferManager().getSchemaSize(other.getSource().getOutputElements());
+    	int toReserve = this.joinNode.getBufferManager().getMaxProcessingKB();
+    	//check if the other side can be sorted in memory
+    	if (other.getRowCount()/this.joinNode.getBatchSize() < toReserve/schemaSize) {
+    		return false;
+    	}
+    	boolean useIndex = false;
+    	int indexSchemaSize = this.joinNode.getBufferManager().getSchemaSize(possibleIndex.getSource().getOutputElements());
+    	//approximate that 1/2 of the index will be memory resident 
+    	toReserve = (int)(indexSchemaSize * possibleIndex.getTupleBuffer().getRowCount() / (possibleIndex.getTupleBuffer().getBatchSize() * .5)); 
+    	if (toReserve < this.joinNode.getBufferManager().getMaxProcessingKB()) {
+    		useIndex = true;
+    	} else if (possibleIndex.getTupleBuffer().getRowCount() / this.joinNode.getBatchSize() < preferMemCutoff) {
+    		useIndex = true;
+    	} 
+    	if (useIndex) {
+    		reserved = this.joinNode.getBufferManager().reserveBuffers(toReserve, BufferReserveMode.FORCE);
+    		return true;
+    	} 
+    	this.repeatedMerge = true;
+    	return true;
+    }
+    
+	private void releaseReserved() {
+		this.joinNode.getBufferManager().releaseBuffers(this.reserved);
+		this.reserved = 0;
+	}
+        
+    @Override
+    protected void process() throws TeiidComponentException,
+    		TeiidProcessingException {
+    	if (this.processingSortLeft != SortOption.NOT_SORTED && this.processingSortRight != SortOption.NOT_SORTED) {
+    		super.process();
+    		return;
+    	}
+    	if (this.rightSource.getTupleBuffer().getRowCount() == 0) {
+    		return;
+    	}
+    	if (repeatedMerge) {
+    		while (this.notSortedSource.hasBuffer()) {
+    			super.process();
+    			resetMatchState();
+    			this.sortedSource.resetState();
+    			this.notSortedSource.nextBuffer();
+    		}
+    		return;
+    	}
+    	//else this is a single scan against the index
+    	if (currentSource == null) {
+    		currentSource = this.notSortedSource.getTupleBuffer().createIndexedTupleSource();
+    	}
+    	while (true) {
+	    	if (this.partitionedTuple == null) {
+	    		partitionedTuple = this.currentSource.nextTuple();
+	    		if (partitionedTuple == null) {
+	    			return;
+	    		}
+	        	List<?> key = RelationalNode.projectTuple(this.notSortedSource.getExpressionIndexes(), this.partitionedTuple);
+	        	tb = new TupleBrowser(this.index, new CollectionTupleSource(Arrays.asList(key).iterator()), OrderBy.ASC);
+	    	}
+	    	if (sortedTuple == null) {
+	    		sortedTuple = tb.nextTuple();
+	    	
+		    	if (sortedTuple == null) {
+		    		partitionedTuple = null;
+		    		continue;
+		    	}
+	    	}
+	    	List<?> reorderedTuple = RelationalNode.projectTuple(reverseIndexes, sortedTuple);
+			List outputTuple = outputTuple(this.processingSortLeft==SortOption.NOT_SORTED?partitionedTuple:reorderedTuple, 
+					this.processingSortLeft==SortOption.NOT_SORTED?reorderedTuple:partitionedTuple);
+			boolean matches = this.joinNode.matchesCriteria(outputTuple);
+	        if (matches) {
+	        	this.joinNode.addBatchRow(outputTuple);
+	        }
+	        this.sortedTuple = null;
+    	}
+    }
+    
+    @Override
+    public EnhancedSortMergeJoinStrategy clone() {
+    	return new EnhancedSortMergeJoinStrategy(this.sortLeft, this.sortRight);
+    }
+    
+    @Override
+    public String getName() {
+    	return "ENHANCED SORT JOIN"; //$NON-NLS-1$
+    }
+       	
+}


Property changes on: trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -54,7 +54,7 @@
 	
 	public enum JoinStrategyType {    
 	    MERGE,
-	    PARTITIONED_SORT,
+	    ENHANCED_SORT,
 	    NESTED_LOOP,
 	    NESTED_TABLE
 	}

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/ListNestedSortComparator.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/ListNestedSortComparator.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/ListNestedSortComparator.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -192,5 +192,13 @@
     	return null;	
     }
     
+    public int[] getSortParameters() {
+		return sortParameters;
+	}
+    
+    public void setSortParameters(int[] sortParameters) {
+		this.sortParameters = sortParameters;
+	}
+    
 } // END CLASS    
 

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -73,7 +73,7 @@
     
     //planning time information
     public enum SortOption {
-        ALREADY_SORTED, SORT, SORT_DISTINCT, PARTITION
+        ALREADY_SORTED, SORT, SORT_DISTINCT, NOT_SORTED
     }
     
     protected SortOption sortLeft;
@@ -112,7 +112,13 @@
     @Override
     public void initialize(JoinNode joinNode) {
         super.initialize(joinNode);
-        this.outerState = this.leftSource;
+        resetMatchState();
+        this.processingSortRight = this.sortRight;
+        this.processingSortLeft = this.sortLeft;
+    }
+
+	protected void resetMatchState() {
+		this.outerState = this.leftSource;
         this.innerState = this.rightSource;
         this.mergeState = MergeState.SCAN;
         this.matchState = MatchState.MATCH_LEFT;
@@ -120,9 +126,7 @@
         this.leftScanState = ScanState.READ;
         this.rightScanState = ScanState.READ;
         this.outerMatched = false;
-        this.processingSortRight = this.sortRight;
-        this.processingSortLeft = this.sortLeft;
-    }
+	}
 
     /**
      * @see org.teiid.query.processor.relational.JoinStrategy#close()

Deleted: trunk/engine/src/main/java/org/teiid/query/processor/relational/PartitionedSortJoin.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/PartitionedSortJoin.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/PartitionedSortJoin.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -1,318 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.query.processor.relational;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-
-import org.teiid.common.buffer.IndexedTupleSource;
-import org.teiid.common.buffer.TupleBatch;
-import org.teiid.common.buffer.TupleBuffer;
-import org.teiid.common.buffer.BufferManager.BufferReserveMode;
-import org.teiid.core.TeiidComponentException;
-import org.teiid.core.TeiidProcessingException;
-
-
-/**
- * Extends the basic fully sorted merge join to check for conditions necessary 
- * to not fully sort one of the sides
- * 
- * Will be used for inner joins and only if both sorts are not required.
- * Degrades to a normal merge join if the tuples are balanced. 
- */
-public class PartitionedSortJoin extends MergeJoinStrategy {
-	
-	private List[] endTuples;
-	private List<Boolean> overlap = new ArrayList<Boolean>();
-	private List<Integer> endRows = new ArrayList<Integer>();
-	private List<TupleBuffer> partitions = new ArrayList<TupleBuffer>();
-	private int currentPartition;
-	private IndexedTupleSource currentSource;
-	private SourceState sortedSource;
-	private SourceState partitionedSource;
-	private boolean partitioned;
-	private List<?> partitionedTuple;
-	private int matchBegin = -1;
-	private int matchEnd = -1;
-	private int reserved;
-
-	public PartitionedSortJoin(SortOption sortLeft, SortOption sortRight) {
-		super(sortLeft, sortRight, false);
-	}
-	
-    @Override
-    public void close() {
-    	if (joinNode == null) {
-    		return;
-    	}
-    	super.close();
-    	for (TupleBuffer tupleSourceID : this.partitions) {
-			tupleSourceID.remove();
-		}
-    	releaseReserved();
-    	this.endTuples = null;
-    	this.overlap.clear();
-    	this.endRows.clear();
-    	this.partitions.clear();
-    	this.currentSource = null;
-    	this.sortedSource = null;
-    	this.partitionedSource = null;
-    	this.partitionedTuple = null;
-    }
-    
-    @Override
-    public void initialize(JoinNode joinNode) {
-    	super.initialize(joinNode);
-    	this.currentPartition = 0;
-    	this.partitioned = false;
-    	this.matchBegin = -1;
-    	this.matchEnd = -1;
-    }
-	
-    public void computeBatchBounds(SourceState state) throws TeiidComponentException, TeiidProcessingException {
-    	if (endTuples != null) {
-    		return;
-    	}
-    	Comparator comp = new ListNestedSortComparator(state.getExpressionIndexes(), true);
-    	ArrayList<List<?>> bounds = new ArrayList<List<?>>();
-        int beginRow = 1;
-        while (beginRow <= state.getRowCount()) {
-        	TupleBatch batch = state.getTupleBuffer().getBatch(beginRow);
-    		if (batch.getRowCount() == 0) {
-    			break;
-    		}
-    		beginRow = batch.getEndRow() + 1; 
-    		if (!bounds.isEmpty()) {
-    			overlap.add(comp.compare(bounds.get(bounds.size() - 1), batch.getTuple(batch.getBeginRow())) == 0);
-    		}
-    		bounds.add(batch.getTuple(batch.getEndRow()));
-    		endRows.add(batch.getEndRow());
-        }
-        this.endTuples = bounds.toArray(new List[bounds.size()]);
-    }
-    
-    @Override
-    protected void loadLeft() throws TeiidComponentException,
-    		TeiidProcessingException {
-    	//always buffer to determine row counts
-    	this.leftSource.getTupleBuffer();
-    }
-    
-    @Override
-    protected void loadRight() throws TeiidComponentException,
-    		TeiidProcessingException {
-    	this.rightSource.getTupleBuffer();
-    	if (processingSortRight == SortOption.SORT
-    			&& this.leftSource.getRowCount() * 4 < this.rightSource.getRowCount()
-    			&& testAndSetPartitions(this.rightSource.getRowCount(), this.rightSource.getSource().getOutputElements())) {
-    		this.processingSortRight = SortOption.PARTITION;
-    	} else if (processingSortLeft == SortOption.SORT
-    			&& this.rightSource.getRowCount() * 4 < this.leftSource.getRowCount()
-    			&& testAndSetPartitions(this.leftSource.getRowCount(), this.leftSource.getSource().getOutputElements())) {
-    		this.processingSortLeft = SortOption.PARTITION;
-    	} 
-        super.loadRight(); //sort right if needed
-        if (this.processingSortLeft == SortOption.PARTITION) {
-        	computeBatchBounds(this.rightSource);
-        	this.sortedSource = this.rightSource;
-        	this.partitionedSource = this.leftSource;
-        }
-        super.loadLeft(); //sort left if needed
-        if (this.processingSortRight == SortOption.PARTITION) {
-        	computeBatchBounds(this.leftSource);
-        	this.sortedSource = this.leftSource;
-        	this.partitionedSource = this.rightSource;
-        }
-        if (this.processingSortLeft == SortOption.PARTITION) {
-        	partitionSource();
-        } 
-        if (this.processingSortRight == SortOption.PARTITION) {
-    		partitionSource();
-    	}
-    }
-
-    /**
-     * Since the source to be partitioned is already loaded, then there's no
-     * chance of a blocked exception during partitioning, so reserve some batches.
-     * 
-     * TODO: partition at the same time as the load to determine size
-     * 
-     * @return
-     */
-	private boolean testAndSetPartitions(int rowCount, List elements) {
-		int partitionCount = (rowCount / this.joinNode.getBatchSize() + rowCount % this.joinNode.getBatchSize() == 0 ? 0:1) 
-			* this.joinNode.getBufferManager().getSchemaSize(elements);
-		if (partitionCount > this.joinNode.getBufferManager().getMaxProcessingKB() * 8) {
-			return false; 
-		}
-		int toReserve = Math.max(1, (int)(partitionCount * .75));
-		int excess = Math.max(0, toReserve - this.joinNode.getBufferManager().getMaxProcessingKB());
-		reserved = this.joinNode.getBufferManager().reserveBuffers(toReserve - excess, BufferReserveMode.FORCE);
-		if (excess > 0) {
-			reserved += this.joinNode.getBufferManager().reserveBuffers(toReserve, BufferReserveMode.NO_WAIT);
-		}
-		if (reserved == toReserve) {
-			return true;
-		}
-		releaseReserved();
-		return false;
-	}
-    
-	private void partitionSource() throws TeiidComponentException,
-			TeiidProcessingException {
-		if (partitioned) {
-			return;
-		}
-		if (endTuples.length < 2) {
-			partitions.add(this.partitionedSource.getTupleBuffer());
-		} else {
-			if (partitions.isEmpty()) {
-				for (int i = 0; i < endTuples.length; i++) {
-					TupleBuffer tc = this.partitionedSource.createSourceTupleBuffer();
-					tc.setForwardOnly(true);
-					this.partitions.add(tc);
-				}
-			}
-			while (this.partitionedSource.getIterator().hasNext()) {
-				List<?> tuple = this.partitionedSource.getIterator().nextTuple();
-				int index = binarySearch(tuple, this.endTuples, this.partitionedSource.getExpressionIndexes(), this.sortedSource.getExpressionIndexes());
-				if (index < 0) {
-					index = -index - 1;
-				}
-				if (index > this.partitions.size() -1) {
-					continue;
-				}
-				while (index > 0 && this.overlap.get(index - 1) 
-						&& compare(tuple, this.endTuples[index - 1], this.partitionedSource.getExpressionIndexes(), this.sortedSource.getExpressionIndexes()) == 0) {
-					index--;
-				}
-				this.partitions.get(index).addTuple(tuple);
-			}
-			for (TupleBuffer partition : this.partitions) {
-				partition.close();
-			}
-			releaseReserved();
-		}
-		partitioned = true;
-	}
-
-	private void releaseReserved() {
-		this.joinNode.getBufferManager().releaseBuffers(this.reserved);
-		this.reserved = 0;
-	}
-        
-    @Override
-    protected void process() throws TeiidComponentException,
-    		TeiidProcessingException {
-    	if (this.processingSortLeft != SortOption.PARTITION && this.processingSortRight != SortOption.PARTITION) {
-    		super.process();
-    	}
-    	if (endRows.isEmpty()) {
-    		return; //no rows on the sorted side
-    	}
-    	while (currentPartition < partitions.size()) {
-    		if (currentSource == null) {
-    			currentSource = partitions.get(currentPartition).createIndexedTupleSource();
-    		}
-    		
-    		int beginIndex = currentPartition>0?endRows.get(currentPartition - 1)+1:1;
-    		
-			List[] batch = this.sortedSource.getTupleBuffer().getBatch(beginIndex).getAllTuples();
-						
-    		while (partitionedTuple != null || currentSource.hasNext()) {
-    			if (partitionedTuple == null) {
-    				partitionedTuple = currentSource.nextTuple();
-	    			int index = binarySearch(partitionedTuple, batch, this.partitionedSource.getExpressionIndexes(), this.sortedSource.getExpressionIndexes());
-	    			if (index < 0) {
-	            		partitionedTuple = null;
-	    				continue;
-	    			}
-	    			matchBegin = index;
-	    			matchEnd = index;
-	    			if (!this.sortedSource.isDistinct()) {
-		    			while (matchBegin > 0) {
-		    				if (compare(partitionedTuple, batch[matchBegin - 1], this.partitionedSource.getExpressionIndexes(), this.sortedSource.getExpressionIndexes()) != 0) {
-		    					break;
-		    				}
-		    				matchBegin--;
-		    			}
-		    			while (matchEnd < batch.length - 1) {
-		    				if (compare(partitionedTuple, batch[matchEnd + 1], this.partitionedSource.getExpressionIndexes(), this.sortedSource.getExpressionIndexes()) != 0) {
-		    					break;
-		    				}
-		    				matchEnd++;
-		    			}
-	    			}
-	    			if (matchEnd == batch.length - 1 && currentPartition < overlap.size() && overlap.get(currentPartition)) {
-	    				this.partitions.get(currentPartition + 1).addTuple(partitionedTuple);
-	    			}
-    			}
-    			while (matchBegin <= matchEnd) {
-    				List outputTuple = outputTuple(this.processingSortLeft==SortOption.PARTITION?partitionedTuple:batch[matchBegin], 
-    						this.processingSortLeft==SortOption.PARTITION?batch[matchBegin]:partitionedTuple);
-    				boolean matches = this.joinNode.matchesCriteria(outputTuple);
-    				matchBegin++;
-                    if (matches) {
-                    	this.joinNode.addBatchRow(outputTuple);
-                    }
-    			}
-    			matchBegin = -1;
-    			matchEnd = -1;
-        		partitionedTuple = null;
-    		}
-    		currentSource.closeSource();
-    		currentSource = null;
-    		currentPartition++;
-    	}
-    }
-    
-    public int binarySearch(List<?> tuple, List[] tuples, int[] leftIndexes, int[] rightIndexes) {
-    	int begin = 0;
-    	int end = tuples.length - 1;
-    	while (begin <= end) {
-	    	int mid = (begin + end)/2;
-	    	int compare = compare(tuples[mid], tuple, rightIndexes, leftIndexes);
-	    	if (compare == 0) {
-	    		return mid;
-	    	}
-	    	if (compare < 0) {
-	    		end = mid - 1;
-	    	} else {
-	    		begin = mid + 1;
-	    	}
-    	}
-    	return -begin -1;
-    }
-    
-    @Override
-    public PartitionedSortJoin clone() {
-    	return new PartitionedSortJoin(this.sortLeft, this.sortRight);
-    }
-    
-    @Override
-    public String getName() {
-    	return "PARTITIONED SORT JOIN"; //$NON-NLS-1$
-    }
-       	
-}

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -371,9 +371,9 @@
 		return result;
 	}
 	
-	public static List<?> projectTuple(int[] indexes, List<?> tupleValues) {
+	public static <T> List<T> projectTuple(int[] indexes, List<T> tupleValues) {
 	
-		List<Object> projectedTuple = new ArrayList<Object>(indexes.length);
+		List<T> projectedTuple = new ArrayList<T>(indexes.length);
 	
 		for (int index : indexes) {
 			projectedTuple.add(tupleValues.get(index));

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -176,6 +176,16 @@
         }
         return this.activeTupleBuffers.get(0);
     }
+    
+    public List<TupleBuffer> onePassSort() throws TeiidComponentException, TeiidProcessingException {
+    	assert this.mode != Mode.DUP_REMOVE;
+    	
+    	if(this.phase == INITIAL_SORT) {
+            initialSort();
+        }
+    	
+    	return activeTupleBuffers;
+    }
 
 	private TupleBuffer createTupleBuffer() throws TeiidComponentException {
 		TupleBuffer tb = bufferManager.createTupleBuffer(this.schema, this.groupName, TupleSourceType.PROCESSOR);

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -48,6 +48,7 @@
     private List expressions;
     private BatchCollector collector;
     private TupleBuffer buffer;
+    private List<TupleBuffer> buffers;
     private List<Object> outerVals;
     private IndexedTupleSource iterator;
     private int[] expressionIndexes;
@@ -75,7 +76,7 @@
 		this.implicitBuffer = implicitBuffer;
 	}
     
-    private int[] getExpressionIndecies(List expressions,
+    static int[] getExpressionIndecies(List expressions,
                                         List elements) {
         if (expressions == null) {
             return new int[0];
@@ -104,7 +105,14 @@
     }
     
     public void close() {
-    	if (this.buffer != null) {
+    	while (nextBuffer()) {
+    		//do nothing
+    	}
+        this.open = false;
+    }
+
+	private void closeBuffer() {
+		if (this.buffer != null) {
             this.buffer.remove();
             this.buffer = null;
         }
@@ -113,8 +121,7 @@
         	this.iterator = null;
         }
         this.currentTuple = null;
-        this.open = false;
-    }
+	}
 
     public int getRowCount() throws TeiidComponentException, TeiidProcessingException {
     	return this.getTupleBuffer().getRowCount();
@@ -178,22 +185,58 @@
     }
     
     public void sort(SortOption sortOption) throws TeiidComponentException, TeiidProcessingException {
-    	if (sortOption == SortOption.SORT || sortOption == SortOption.SORT_DISTINCT) {
-	    	if (this.sortUtility == null) {
-	    		TupleSource ts = null;
-	    		if (this.buffer != null) {
-	    			this.buffer.setForwardOnly(true);
-	    			ts = this.buffer.createIndexedTupleSource();
-	    		} else {
-	    			ts = new BatchIterator(this.source);
-	    		}
-			    this.sortUtility = new SortUtility(ts, expressions, Collections.nCopies(expressions.size(), OrderBy.ASC), 
-			    		sortOption == SortOption.SORT_DISTINCT?Mode.DUP_REMOVE_SORT:Mode.SORT, this.source.getBufferManager(), this.source.getConnectionID(), source.getElements());
-			    this.markDistinct(sortOption == SortOption.SORT_DISTINCT && expressions.size() == this.getOuterVals().size());
-			}
-			this.buffer = sortUtility.sort();
-	        this.markDistinct(sortUtility.isDistinct());
+    	if (sortOption == SortOption.ALREADY_SORTED) {
+    		return;
     	}
+    	if (this.sortUtility == null) {
+    		TupleSource ts = null;
+    		if (this.buffer != null) {
+    			this.buffer.setForwardOnly(true);
+    			ts = this.buffer.createIndexedTupleSource();
+    		} else {
+    			ts = new BatchIterator(this.source);
+    		}
+		    this.sortUtility = new SortUtility(ts, expressions, Collections.nCopies(expressions.size(), OrderBy.ASC), 
+		    		sortOption == SortOption.SORT_DISTINCT?Mode.DUP_REMOVE_SORT:Mode.SORT, this.source.getBufferManager(), this.source.getConnectionID(), source.getElements());
+		    this.markDistinct(sortOption == SortOption.SORT_DISTINCT && expressions.size() == this.getOuterVals().size());
+		}
+    	if (sortOption == SortOption.NOT_SORTED) {
+    		this.buffers = sortUtility.onePassSort();
+    		if (this.buffers.size() == 1) {
+    			this.markDistinct(sortUtility.isDistinct());
+    		}
+    		nextBuffer();
+    		return;
+    	} 
+		this.buffer = sortUtility.sort();
+        this.markDistinct(sortUtility.isDistinct());
     }
+    
+    public boolean hasBuffer() {
+    	return this.buffer != null;
+    }
+    
+    public boolean nextBuffer() {
+    	this.closeBuffer();
+    	if (this.buffers == null || this.buffers.isEmpty()) {
+    		return false;
+    	}
+    	this.buffer = this.buffers.remove(this.buffers.size() - 1);
+    	this.buffer.setForwardOnly(false);
+    	this.resetState();
+    	return true;
+    }
 
+    /**
+     * return the iterator to a fresh state
+     */
+	public void resetState() {
+		if (this.iterator != null) {
+			this.iterator.reset();
+			this.iterator.setPosition(1);
+		}
+		this.currentTuple = null;
+		this.maxProbeMatch = 1;
+	}
+    
 }

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -345,10 +345,11 @@
 		//TODO: ordered insert optimization
 		TupleSource ts = createTupleSource(allColumns, null, null);
 		indexTable.insert(ts, allColumns);
+		indexTable.getTree().compact();
 	}
 	
 	private int reserveBuffers() {
-		return bm.reserveBuffers(leafBatchSize + (tree.getHeight() - 1)*keyBatchSize, BufferReserveMode.WAIT);
+		return bm.reserveBuffers(leafBatchSize + (tree.getHeight() - 1)*keyBatchSize, BufferReserveMode.FORCE);
 	}
 
 	public TupleSource createTupleSource(final List<? extends SingleElementSymbol> projectedCols, final Criteria condition, OrderBy orderBy) throws TeiidComponentException, TeiidProcessingException {
@@ -612,7 +613,7 @@
 	}
 	
 	private void insertTuple(List<?> list, boolean ordered) throws TeiidComponentException, TeiidProcessingException {
-		if (tree.insert(list, ordered?InsertMode.ORDERED:InsertMode.NEW) != null) {
+		if (tree.insert(list, ordered?InsertMode.ORDERED:InsertMode.NEW, -1) != null) {
 			throw new TeiidProcessingException(QueryPlugin.Util.getString("TempTable.duplicate_key")); //$NON-NLS-1$
 		}
 	}
@@ -639,11 +640,11 @@
 				}
 				return result;
 			} 
-			List<?> result = tree.insert(tuple, InsertMode.UPDATE);
+			List<?> result = tree.insert(tuple, InsertMode.UPDATE, -1);
 			if (indexTables != null) {
 				for (TempTable index : this.indexTables.values()) {
 					tuple = RelationalNode.projectTuple(RelationalNode.getProjectionIndexes(index.getColumnMap(), index.columns), tuple);
-					index.tree.insert(tuple, InsertMode.UPDATE);
+					index.tree.insert(tuple, InsertMode.UPDATE, -1);
 				}
 			}
 			return result;
@@ -653,7 +654,7 @@
 	}
 	
 	private void updateTuple(List<?> tuple) throws TeiidComponentException {
-		if (tree.insert(tuple, InsertMode.UPDATE) == null) {
+		if (tree.insert(tuple, InsertMode.UPDATE, -1) == null) {
 			throw new AssertionError("Update failed"); //$NON-NLS-1$
 		}
 	}

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -519,6 +519,7 @@
 			
 			//TODO: if this insert fails, it's unnecessary to do the undo processing
 			table.insert(ts, variables);
+			table.getTree().compact();
 			rowCount = table.getRowCount();
 			//TODO: could pre-process indexes to remove overlap
 			for (Object index : metadata.getIndexesInGroup(group.getMetadataID())) {

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -29,6 +29,7 @@
 
 import org.junit.Test;
 import org.teiid.common.buffer.STree.InsertMode;
+import org.teiid.common.buffer.impl.BufferManagerImpl;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.query.sql.symbol.ElementSymbol;
 
@@ -45,7 +46,7 @@
 		STree map = bm.createSTree(elements, "1", 1);
 		
 		for (int i = 20000; i > 0; i--) {
-			assertNull(map.insert(Arrays.asList(i, String.valueOf(i)), InsertMode.NEW));
+			assertNull(map.insert(Arrays.asList(i, String.valueOf(i)), InsertMode.NEW, -1));
 			assertEquals(20000 - i + 1, map.getRowCount());
 		}
 		
@@ -54,7 +55,31 @@
 		}
 		
 		assertEquals(0, map.getRowCount());
-		assertNull(map.insert(Arrays.asList(1, String.valueOf(1)), InsertMode.NEW));
+		assertNull(map.insert(Arrays.asList(1, String.valueOf(1)), InsertMode.NEW, -1));
 	}
 	
+	@Test public void testOrderedInsert() throws TeiidComponentException {
+		BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
+		bm.setProcessorBatchSize(16);
+		
+		ElementSymbol e1 = new ElementSymbol("x");
+		e1.setType(Integer.class);
+		List elements = Arrays.asList(e1);
+		STree map = bm.createSTree(elements, "1", 1);
+		
+		int size = (1<<16)+(1<<4)+1;
+		
+		for (int i = 0; i < size; i++) {
+			assertNull(map.insert(Arrays.asList(i), InsertMode.ORDERED, size));
+			assertEquals(i + 1, map.getRowCount());
+		}
+		
+		assertEquals(4, map.getHeight());
+
+		for (int i = 0; i < size; i++) {
+			assertNotNull(map.remove(Arrays.asList(i)));
+		}
+				
+	}
+
 }

Modified: trunk/engine/src/test/java/org/teiid/query/optimizer/TestOptimizer.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/optimizer/TestOptimizer.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/test/java/org/teiid/query/optimizer/TestOptimizer.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -63,7 +63,7 @@
 import org.teiid.query.processor.relational.NestedLoopJoinStrategy;
 import org.teiid.query.processor.relational.NestedTableJoinStrategy;
 import org.teiid.query.processor.relational.NullNode;
-import org.teiid.query.processor.relational.PartitionedSortJoin;
+import org.teiid.query.processor.relational.EnhancedSortMergeJoinStrategy;
 import org.teiid.query.processor.relational.PlanExecutionNode;
 import org.teiid.query.processor.relational.ProjectIntoNode;
 import org.teiid.query.processor.relational.ProjectNode;
@@ -396,8 +396,8 @@
                 updateCounts(NestedLoopJoinStrategy.class, counts, types);
             } else if (strategy instanceof MergeJoinStrategy) {
                 updateCounts(MergeJoinStrategy.class, counts, types);
-                if (strategy instanceof PartitionedSortJoin) {
-                    updateCounts(PartitionedSortJoin.class, counts, types);
+                if (strategy instanceof EnhancedSortMergeJoinStrategy) {
+                    updateCounts(EnhancedSortMergeJoinStrategy.class, counts, types);
                 } 
             } else if (strategy instanceof NestedTableJoinStrategy) {
             	updateCounts(NestedTableJoinStrategy.class, counts, types);

Modified: trunk/engine/src/test/java/org/teiid/query/optimizer/TestPartitionedJoinPlanning.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/optimizer/TestPartitionedJoinPlanning.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/test/java/org/teiid/query/optimizer/TestPartitionedJoinPlanning.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -31,7 +31,7 @@
 import org.teiid.query.optimizer.capabilities.FakeCapabilitiesFinder;
 import org.teiid.query.optimizer.capabilities.SourceCapabilities.Capability;
 import org.teiid.query.processor.ProcessorPlan;
-import org.teiid.query.processor.relational.PartitionedSortJoin;
+import org.teiid.query.processor.relational.EnhancedSortMergeJoinStrategy;
 import org.teiid.query.unittest.FakeMetadataFacade;
 import org.teiid.query.unittest.FakeMetadataFactory;
 import org.teiid.query.unittest.FakeMetadataObject;
@@ -76,7 +76,7 @@
             0,      // Sort
             0       // UnionAll
         });  
-        checkNodeTypes(plan, new int[] {1}, new Class[] {PartitionedSortJoin.class});
+        checkNodeTypes(plan, new int[] {1}, new Class[] {EnhancedSortMergeJoinStrategy.class});
     }    
 
 

Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -309,6 +309,12 @@
 		TestOptimizer.helpPlan("select * from x, x1 where x.e2 = x1.e2 and x.e1 = x1.e1", this.metadata, new String[] {"SELECT x1.e2, x1.e1 FROM x1 ORDER BY x1.e1, x1.e2", "SELECT x.e2, x.e1 FROM x ORDER BY x.e1, x.e2"}, ComparisonMode.EXACT_COMMAND_STRING);
 	}
 	
+	@Test public void testUnneededMergePredicate() throws Exception {
+		execute("create local temporary table x (e1 string, e2 integer, primary key (e1))", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
+		execute("create local temporary table x1 (e1 string, e2 integer)", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
+		TestOptimizer.helpPlan("select x.e1 from x, x1 where x.e2 = x1.e2 and x.e1 = x1.e1", this.metadata, new String[] {"SELECT x.e2, x.e1 FROM x ORDER BY x.e1", "SELECT x1.e2, x1.e1 FROM x1 ORDER BY x1.e1"}, ComparisonMode.EXACT_COMMAND_STRING);
+	}
+	
 	private void sampleTable() throws Exception {
 		execute("create local temporary table x (e1 string, e2 integer, primary key (e1, e2))", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
 		execute("insert into x (e2, e1) values (3, 'b')", new List[] {Arrays.asList(1)}); //$NON-NLS-1$

Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestVirtualDepJoin.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestVirtualDepJoin.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestVirtualDepJoin.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -580,12 +580,11 @@
         elementIDs = metadata.getElementIDsInGroupID(groupID);
         elementSymbols = FakeDataStore.createElements(elementIDs);
     
-        TimestampUtil ts = new TimestampUtil();
         dataMgr.registerTuples(groupID, elementSymbols,                               
                                new List[] { 
-                                   Arrays.asList(new Object[] { new Long(100), "Miles", "Davis", ts.createDate(1926, 4, 25) }), //$NON-NLS-1$ //$NON-NLS-2$
-                                   Arrays.asList(new Object[] { new Long(200), "John", "Coltrane", ts.createDate(1926, 8, 23) }), //$NON-NLS-1$ //$NON-NLS-2$
-                                   Arrays.asList(new Object[] { new Long(300), "Thelonious", "Monk", ts.createDate(1917, 9, 10) }), //$NON-NLS-1$ //$NON-NLS-2$
+                                   Arrays.asList(new Object[] { new Long(100), "Miles", "Davis", TimestampUtil.createDate(1926, 4, 25) }), //$NON-NLS-1$ //$NON-NLS-2$
+                                   Arrays.asList(new Object[] { new Long(200), "John", "Coltrane", TimestampUtil.createDate(1926, 8, 23) }), //$NON-NLS-1$ //$NON-NLS-2$
+                                   Arrays.asList(new Object[] { new Long(300), "Thelonious", "Monk", TimestampUtil.createDate(1917, 9, 10) }), //$NON-NLS-1$ //$NON-NLS-2$
                                    } );    
 
         // Group CustomerMaster.Locations

Modified: trunk/engine/src/test/java/org/teiid/query/processor/relational/NodeTestUtil.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/relational/NodeTestUtil.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/test/java/org/teiid/query/processor/relational/NodeTestUtil.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -22,16 +22,12 @@
 
 package org.teiid.query.processor.relational;
 
-import java.util.Properties;
-
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.StorageManager;
 import org.teiid.common.buffer.impl.BufferManagerImpl;
 import org.teiid.common.buffer.impl.MemoryStorageManager;
 import org.teiid.core.TeiidComponentException;
 
-
-
 /** 
  * @since 4.2
  */
@@ -41,6 +37,8 @@
     	BufferManagerImpl bufferManager = new BufferManagerImpl();
     	bufferManager.setProcessorBatchSize(procBatchSize);
     	bufferManager.setConnectorBatchSize(connectorBatchSize);
+    	bufferManager.setMaxProcessingBatchColumns((int)(bytesAvailable/procBatchSize/BufferManagerImpl.KB_PER_VALUE/1024));
+    	bufferManager.setMaxReserveBatchColumns((int)(bytesAvailable/procBatchSize/BufferManagerImpl.KB_PER_VALUE/1024));
         // Get the properties for BufferManager
         return createBufferManager(bufferManager);
     }
@@ -48,6 +46,8 @@
     static BufferManager getTestBufferManager(long bytesAvailable, int procBatchSize) {
     	BufferManagerImpl bufferManager = new BufferManagerImpl();
     	bufferManager.setProcessorBatchSize(procBatchSize);
+    	bufferManager.setMaxProcessingBatchColumns((int)(bytesAvailable/procBatchSize/BufferManagerImpl.KB_PER_VALUE/1024));
+    	bufferManager.setMaxReserveBatchColumns((int)(bytesAvailable/procBatchSize/BufferManagerImpl.KB_PER_VALUE/1024));
         // Get the properties for BufferManager
         return createBufferManager(bufferManager);
     }

Modified: trunk/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java	2011-03-31 04:11:52 UTC (rev 3050)
+++ trunk/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java	2011-04-02 11:57:44 UTC (rev 3051)
@@ -22,7 +22,7 @@
 
 package org.teiid.query.processor.relational;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -50,7 +50,7 @@
 import org.teiid.query.unittest.FakeMetadataFactory;
 import org.teiid.query.util.CommandContext;
 
-
+ at SuppressWarnings("unchecked")
 public class TestJoinNode {
     private static final int NO_CRITERIA = 0;
     private static final int EQUAL_CRITERIA = 1;
@@ -218,20 +218,20 @@
 	        if (batchSize == 0) {
 	        	continue;
 	        }
-	        helpTestJoinDirect(expected, batchSize);
+	        helpTestJoinDirect(expected, batchSize, 100000);
 	        List[] temp = leftTuples;
 	        leftTuples = rightTuples;
 	        rightTuples = temp;
 	        helpCreateJoin();                
-	        helpTestJoinDirect(expectedReversed, batchSize);
+	        helpTestJoinDirect(expectedReversed, batchSize, 100000);
 	        temp = leftTuples;
 	        leftTuples = rightTuples;
 	        rightTuples = temp;
     	}
     }
     
-    public void helpTestJoinDirect(List[] expectedResults, int batchSize) throws TeiidComponentException, TeiidProcessingException {
-        BufferManager mgr = NodeTestUtil.getTestBufferManager(1, batchSize);
+    public void helpTestJoinDirect(List[] expectedResults, int batchSize, int processingBytes) throws TeiidComponentException, TeiidProcessingException {
+        BufferManager mgr = NodeTestUtil.getTestBufferManager(processingBytes, batchSize);
         CommandContext context = new CommandContext("pid", "test", null, null, 1);               //$NON-NLS-1$ //$NON-NLS-2$
         
         join.addChild(leftNode);
@@ -630,9 +630,9 @@
            Arrays.asList(new Object[] { 4, 4 }),
         };
         helpCreateJoin();               
-        this.joinStrategy = new PartitionedSortJoin(SortOption.SORT, SortOption.SORT);
+        this.joinStrategy = new EnhancedSortMergeJoinStrategy(SortOption.SORT, SortOption.SORT);
         this.join.setJoinStrategy(joinStrategy);
-        helpTestJoinDirect(expected, 100);
+        helpTestJoinDirect(expected, 100, 1);
     }
     
     @Test public void testMergeJoinOptimizationNoRows() throws Exception {
@@ -641,9 +641,9 @@
         this.rightTuples = new List[] {};
         expected = new List[] {};
         helpCreateJoin();               
-        this.joinStrategy = new PartitionedSortJoin(SortOption.SORT, SortOption.SORT);
+        this.joinStrategy = new EnhancedSortMergeJoinStrategy(SortOption.SORT, SortOption.SORT);
         this.join.setJoinStrategy(joinStrategy);
-        helpTestJoinDirect(expected, 100);
+        helpTestJoinDirect(expected, 100, 1);
     }
     
     @Test public void testMergeJoinOptimizationWithDistinct() throws Exception {
@@ -673,51 +673,94 @@
            Arrays.asList(new Object[] { 1, 1 })
         };
         helpCreateJoin();               
-        this.joinStrategy = new PartitionedSortJoin(SortOption.SORT, SortOption.SORT);
+        this.joinStrategy = new EnhancedSortMergeJoinStrategy(SortOption.SORT, SortOption.SORT);
         this.join.setJoinStrategy(joinStrategy);
-        this.join.setRightDistinct(true);
-        helpTestJoinDirect(expected, 100);
+        //this.join.setRightDistinct(true);
+        helpTestJoinDirect(expected, 100, 1);
     }
     
-    @Test public void testMergeJoinOptimizationWithMultiplePartitions() throws Exception {
+    @Test public void testMergeJoinOptimizationWithDistinctAlreadySorted() throws Exception {
         this.joinType = JoinType.JOIN_INNER;
-        int rows = 30;
+        int rows = 50;
         List[] data = new List[rows];
         for(int i=0; i<rows; i++) { 
             data[i] = new ArrayList();
-            Integer value = new Integer(i % 17);
+            Integer value = new Integer((i*17) % 47);
             data[i].add(value);
         }
-        this.rightTuples = data;
-        this.leftTuples = new List[] {
+        this.leftTuples = data;
+        this.rightTuples = new List[] {
+            Arrays.asList(1),  
+            Arrays.asList(2),
             Arrays.asList(4),
+            Arrays.asList(6),
             Arrays.asList(7),
-            Arrays.asList(2),
-            Arrays.asList(6),
-            Arrays.asList(6),
-            Arrays.asList(1),  
             Arrays.asList(8),
         };
         expected = new List[] {
-           Arrays.asList(new Object[] { 1, 1 }),
-           Arrays.asList(new Object[] { 2, 2 }),
            Arrays.asList(new Object[] { 4, 4 }),
-           Arrays.asList(new Object[] { 6, 6 }),
-           Arrays.asList(new Object[] { 1, 1 }),
-           Arrays.asList(new Object[] { 2, 2 }),
-           Arrays.asList(new Object[] { 4, 4 }),
-           Arrays.asList(new Object[] { 6, 6 }),
-           Arrays.asList(new Object[] { 7, 7 }),
            Arrays.asList(new Object[] { 8, 8 }),
            Arrays.asList(new Object[] { 7, 7 }),
-           Arrays.asList(new Object[] { 8, 8 }),
+           Arrays.asList(new Object[] { 2, 2 }),
            Arrays.asList(new Object[] { 6, 6 }),
-           Arrays.asList(new Object[] { 6, 6 }),           
+           Arrays.asList(new Object[] { 1, 1 })
         };
         helpCreateJoin();               
-        this.joinStrategy = new PartitionedSortJoin(SortOption.SORT, SortOption.SORT);
+        this.joinStrategy = new EnhancedSortMergeJoinStrategy(SortOption.SORT, SortOption.ALREADY_SORTED);
         this.join.setJoinStrategy(joinStrategy);
-        helpTestJoinDirect(expected, 4);
+        helpTestJoinDirect(expected, 100, 1);
     }
 
+    @Test public void testRepeatedMerge() throws Exception {
+    	helpTestRepeatedMerge(false);
+    }
+
+    @Test public void testRepeatedMergeWithDistinct() throws Exception {
+    	helpTestRepeatedMerge(true);
+    }
+    
+    public void helpTestRepeatedMerge(boolean indexDistinct) throws Exception {
+        this.joinType = JoinType.JOIN_INNER;
+        int rows = 69;
+        List[] data = new List[rows];
+        for(int i=0; i<rows; i++) { 
+            data[i]=Arrays.asList((i*17) % 91);
+        }
+        if (indexDistinct) {
+        	data[2] = Arrays.asList(0);
+        }
+        data[6] = Arrays.asList((Integer)null);
+        this.rightTuples = data;
+        this.leftTuples = new List[17];
+        for (int i = 0; i < this.leftTuples.length; i++) {
+        	this.leftTuples[i] = Arrays.asList(i);
+        }
+        if (!indexDistinct) {
+        	this.leftTuples[1] = Arrays.asList(0);
+        }
+        this.leftTuples[11] = Arrays.asList((Integer)null);
+        
+        expected = new List[] {
+        		Arrays.asList(13, 13),
+        		Arrays.asList(2, 2),
+        		Arrays.asList(8, 8),
+        		Arrays.asList(14, 14),
+        		Arrays.asList(3, 3),
+        		Arrays.asList(9, 9),
+        		Arrays.asList(15, 15),
+        		Arrays.asList(4, 4),
+        		Arrays.asList(10, 10),
+        		Arrays.asList(16, 16),
+        		Arrays.asList(5, 5),
+        		Arrays.asList(0, 0),
+        		Arrays.asList(0, 0),
+        };
+        helpCreateJoin();               
+        EnhancedSortMergeJoinStrategy psj = new EnhancedSortMergeJoinStrategy(SortOption.SORT, SortOption.SORT);
+        psj.setPreferMemCutoff(1);
+        this.joinStrategy = psj;
+        this.join.setJoinStrategy(joinStrategy);
+        helpTestJoinDirect(expected, 4, 1000);
+    }
+
 }



More information about the teiid-commits mailing list