[teiid-commits] teiid SVN: r847 - in trunk/engine/src: main/java/com/metamatrix/common/buffer/impl and 8 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Apr 28 13:36:47 EDT 2009


Author: shawkins
Date: 2009-04-28 13:36:46 -0400 (Tue, 28 Apr 2009)
New Revision: 847

Added:
   trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RulePlanSorts.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/TupleCollector.java
   trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestSortOptimization.java
Removed:
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DupRemoveNode.java
Modified:
   trunk/engine/src/main/java/com/metamatrix/common/buffer/MemoryNotAvailableException.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/PlanToProcessConverter.java
   trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/RelationalPlanner.java
   trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/plantree/NodeConstants.java
   trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/plantree/NodeEditor.java
   trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleConstants.java
   trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleImplementJoinStrategy.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchCollector.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchIterator.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/ListNestedSortComparator.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/MergeJoinStrategy.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/NestedLoopJoinStrategy.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
   trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestLimit.java
   trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java
   trunk/engine/src/test/java/com/metamatrix/query/optimizer/xml/TestXMLPlanner.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java
Log:
TEIID-178,  TEIID-175 adding support for a non blocking sort and general planning time sort optimization.

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/MemoryNotAvailableException.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/MemoryNotAvailableException.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/MemoryNotAvailableException.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -22,12 +22,12 @@
 
 package com.metamatrix.common.buffer;
 
-import com.metamatrix.api.exception.MetaMatrixProcessingException;
+import com.metamatrix.api.exception.MetaMatrixException;
 
 /**
  * Indicates memory was not available for the requested operation.
  */
-public class MemoryNotAvailableException extends MetaMatrixProcessingException {
+public class MemoryNotAvailableException extends MetaMatrixException {
 
     /**
      * No-arg costructor required by Externalizable semantics

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -470,7 +470,11 @@
         if (LogManager.isMessageToBeRecorded(LogCommonConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
             LogManager.logTrace(LogCommonConstants.CTX_BUFFER_MGR, new Object[]{"AddTupleBatch for", tupleSourceID, "with " + tupleBatch.getRowCount() + " rows"}); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
         }
-
+        
+        if (tupleBatch.getRowCount() == 0 && !tupleBatch.getTerminationFlag()) {
+        	return;
+        }
+        
         // Look up info
         TupleSourceInfo info = getTupleSourceInfo(tupleSourceID, true);
         

Modified: trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/PlanToProcessConverter.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/PlanToProcessConverter.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/PlanToProcessConverter.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -49,7 +49,6 @@
 import com.metamatrix.query.processor.relational.DependentAccessNode;
 import com.metamatrix.query.processor.relational.DependentProcedureAccessNode;
 import com.metamatrix.query.processor.relational.DependentProcedureExecutionNode;
-import com.metamatrix.query.processor.relational.DupRemoveNode;
 import com.metamatrix.query.processor.relational.GroupingNode;
 import com.metamatrix.query.processor.relational.JoinNode;
 import com.metamatrix.query.processor.relational.LimitNode;
@@ -65,6 +64,7 @@
 import com.metamatrix.query.processor.relational.SortNode;
 import com.metamatrix.query.processor.relational.UnionAllNode;
 import com.metamatrix.query.processor.relational.MergeJoinStrategy.SortOption;
+import com.metamatrix.query.processor.relational.SortUtility.Mode;
 import com.metamatrix.query.resolver.util.ResolverUtil;
 import com.metamatrix.query.sql.LanguageObject;
 import com.metamatrix.query.sql.lang.Command;
@@ -201,7 +201,7 @@
                 List joinCrits = (List) node.getProperty(NodeConstants.Info.JOIN_CRITERIA);
                 
                 if(stype.equals(JoinStrategyType.MERGE)) {
-                    MergeJoinStrategy mjStrategy = new MergeJoinStrategy(node.hasBooleanProperty(NodeConstants.Info.SORT_LEFT), node.hasBooleanProperty(NodeConstants.Info.SORT_RIGHT));
+                    MergeJoinStrategy mjStrategy = new MergeJoinStrategy((SortOption)node.getProperty(NodeConstants.Info.SORT_LEFT), (SortOption)node.getProperty(NodeConstants.Info.SORT_RIGHT), false);
                     jnode.setJoinStrategy(mjStrategy);
                     List leftExpressions = (List) node.getProperty(NodeConstants.Info.LEFT_EXPRESSIONS);
                     List rightExpressions = (List) node.getProperty(NodeConstants.Info.RIGHT_EXPRESSIONS);
@@ -319,21 +319,21 @@
 				break;
 
 			case NodeConstants.Types.SORT:
+			case NodeConstants.Types.DUP_REMOVE:
                 SortNode sortNode = new SortNode(getID());
                 
 				List elements = (List) node.getProperty(NodeConstants.Info.SORT_ORDER);
 				List sortTypes = (List) node.getProperty(NodeConstants.Info.ORDER_TYPES);
 				
 				sortNode.setSortElements(elements, sortTypes);
+				if (node.getType() == NodeConstants.Types.DUP_REMOVE) {
+					sortNode.setMode(Mode.DUP_REMOVE);
+				} else if (node.hasBooleanProperty(NodeConstants.Info.IS_DUP_REMOVAL)) {
+					sortNode.setMode(Mode.DUP_REMOVE_SORT);
+				}
 
 				processNode = sortNode;
 				break;
-
-			case NodeConstants.Types.DUP_REMOVE:
-				processNode = new DupRemoveNode(getID());
-
-				break;
-
 			case NodeConstants.Types.GROUP:
 				GroupingNode gnode = new GroupingNode(getID());
 				gnode.setGroupingElements( (List) node.getProperty(NodeConstants.Info.GROUP_COLS) );
@@ -362,7 +362,11 @@
                     if(useAll) {
                         processNode = unionAllNode;
                     } else {
-                        processNode = new DupRemoveNode(getID());
+                    	SortNode sNode = new SortNode(getID());
+                    	boolean onlyDupRemoval = node.hasBooleanProperty(NodeConstants.Info.IS_DUP_REMOVAL);
+                    	sNode.setMode(onlyDupRemoval?Mode.DUP_REMOVE:Mode.DUP_REMOVE_SORT);
+                        processNode = sNode;
+                        
                         unionAllNode.setElements( (List) node.getProperty(NodeConstants.Info.OUTPUT_COLS) );
                         processNode.addChild(unionAllNode);
                     }

Modified: trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/RelationalPlanner.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/RelationalPlanner.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/RelationalPlanner.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -293,6 +293,8 @@
         RuleStack rules = new RuleStack();
 
         rules.push(RuleConstants.COLLAPSE_SOURCE);
+        
+        rules.push(RuleConstants.PLAN_SORTS);
 
         if(hints.hasJoin) {
             rules.push(RuleConstants.IMPLEMENT_JOIN_STRATEGY);

Modified: trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/plantree/NodeConstants.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/plantree/NodeConstants.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/plantree/NodeConstants.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -87,8 +87,8 @@
         RIGHT_EXPRESSIONS,  // List <SingleElementSymbol> 
         DEPENDENT_VALUE_SOURCE, // String
         NON_EQUI_JOIN_CRITERIA,      // List <CompareCriteria>
-        SORT_LEFT,  // Boolean
-        SORT_RIGHT,     // Boolean
+        SORT_LEFT,  // SortOption
+        SORT_RIGHT,     // SortOption
         REMOVED_JOIN_GROUPS, //Set<GroupSymbol>
         
         IS_OPTIONAL,          // Boolean
@@ -110,7 +110,7 @@
         // Sort node properties
         ORDER_TYPES,        // List <Boolean>
         SORT_ORDER,         // List <SingleElementSymbol>
-        SORT_CONTROLLER,         // Boolean
+        IS_DUP_REMOVAL,		// Boolean
 
         // Source node properties
         SYMBOL_MAP,         // SymbolMap

Modified: trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/plantree/NodeEditor.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/plantree/NodeEditor.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/plantree/NodeEditor.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -49,18 +49,18 @@
 		}
 	}
 	
-    public static final PlanNode findNodePreOrder(PlanNode root, int type) {
-        return findNodePreOrder(root, type, NodeConstants.Types.NO_TYPE);
+    public static final PlanNode findNodePreOrder(PlanNode root, int types) {
+        return findNodePreOrder(root, types, NodeConstants.Types.NO_TYPE);
     }
 
-	public static final PlanNode findNodePreOrder(PlanNode root, int type, int stopTypes) {
-		if(root.getType() == type) {
+	public static final PlanNode findNodePreOrder(PlanNode root, int types, int stopTypes) {
+		if((types & root.getType()) == root.getType()) {
 			return root;
 		} else if((stopTypes & root.getType()) == root.getType()) {
 		    return null;
 		} else if(root.getChildCount() > 0) {
 		    for (PlanNode child : root.getChildren()) {
-				PlanNode found = findNodePreOrder(child, type, stopTypes);
+				PlanNode found = findNodePreOrder(child, types, stopTypes);
 				if(found != null) {
 					return found;
 				}

Modified: trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleConstants.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleConstants.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleConstants.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -52,4 +52,5 @@
     public static final OptimizerRule PLAN_UNIONS = new RulePlanUnions();
     public static final OptimizerRule PLAN_PROCEDURES = new RulePlanProcedures();
     public static final OptimizerRule CALCULATE_COST = new RuleCalculateCost();
+    public static final OptimizerRule PLAN_SORTS = new RulePlanSorts();
 }

Modified: trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleImplementJoinStrategy.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleImplementJoinStrategy.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleImplementJoinStrategy.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -42,6 +42,7 @@
 import com.metamatrix.query.optimizer.relational.plantree.NodeEditor;
 import com.metamatrix.query.optimizer.relational.plantree.NodeFactory;
 import com.metamatrix.query.optimizer.relational.plantree.PlanNode;
+import com.metamatrix.query.processor.relational.MergeJoinStrategy.SortOption;
 import com.metamatrix.query.sql.lang.OrderBy;
 import com.metamatrix.query.sql.symbol.SingleElementSymbol;
 import com.metamatrix.query.util.CommandContext;
@@ -111,7 +112,7 @@
             return;
         }
         
-        joinNode.setProperty(joinNode.getFirstChild() == childNode ? NodeConstants.Info.SORT_LEFT : NodeConstants.Info.SORT_RIGHT, Boolean.TRUE);
+        joinNode.setProperty(joinNode.getFirstChild() == childNode ? NodeConstants.Info.SORT_LEFT : NodeConstants.Info.SORT_RIGHT, SortOption.SORT);
         
         if (needsCorrection) {
             PlanNode projectNode = NodeFactory.getNewNode(NodeConstants.Types.PROJECT);

Added: trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RulePlanSorts.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RulePlanSorts.java	                        (rev 0)
+++ trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RulePlanSorts.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -0,0 +1,146 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.query.optimizer.relational.rules;
+
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.api.exception.query.QueryMetadataException;
+import com.metamatrix.api.exception.query.QueryPlannerException;
+import com.metamatrix.query.analysis.AnalysisRecord;
+import com.metamatrix.query.metadata.QueryMetadataInterface;
+import com.metamatrix.query.optimizer.capabilities.CapabilitiesFinder;
+import com.metamatrix.query.optimizer.relational.OptimizerRule;
+import com.metamatrix.query.optimizer.relational.RuleStack;
+import com.metamatrix.query.optimizer.relational.plantree.JoinStrategyType;
+import com.metamatrix.query.optimizer.relational.plantree.NodeConstants;
+import com.metamatrix.query.optimizer.relational.plantree.NodeEditor;
+import com.metamatrix.query.optimizer.relational.plantree.PlanNode;
+import com.metamatrix.query.processor.relational.MergeJoinStrategy.SortOption;
+import com.metamatrix.query.sql.lang.SetQuery;
+import com.metamatrix.query.util.CommandContext;
+
+/**
+ * Attempts to minimize the cost of sorting operations across the plan.
+ */
+public class RulePlanSorts implements OptimizerRule {
+	
+	@Override
+	public PlanNode execute(PlanNode plan, QueryMetadataInterface metadata,
+			CapabilitiesFinder capabilitiesFinder, RuleStack rules,
+			AnalysisRecord analysisRecord, CommandContext context)
+			throws QueryPlannerException, QueryMetadataException,
+			MetaMatrixComponentException {
+		optimizeSorts(false, plan);
+		return plan;
+	}
+
+	private void optimizeSorts(boolean parentBlocking, PlanNode node) {
+		node = NodeEditor.findNodePreOrder(node, 
+				NodeConstants.Types.SORT 
+				| NodeConstants.Types.DUP_REMOVE 
+				| NodeConstants.Types.GROUP 
+				| NodeConstants.Types.JOIN 
+				| NodeConstants.Types.SET_OP, NodeConstants.Types.ACCESS);
+		if (node == null) {
+			return;
+		}
+		switch (node.getType()) {
+		case NodeConstants.Types.SORT:
+			parentBlocking = true;
+			if (node.hasBooleanProperty(NodeConstants.Info.IS_DUP_REMOVAL)) {
+				break;
+			}
+			if (mergeSortWithDupRemoval(node)) {
+				node.setProperty(NodeConstants.Info.IS_DUP_REMOVAL, true);
+			}
+			break;
+		case NodeConstants.Types.DUP_REMOVE:
+			if (parentBlocking) {
+				node.setType(NodeConstants.Types.SORT);
+				node.setProperty(NodeConstants.Info.IS_DUP_REMOVAL, true);
+			} 
+			break;
+		case NodeConstants.Types.GROUP:
+			//TODO: check the join interesting order
+			parentBlocking = true;
+			break;
+		case NodeConstants.Types.JOIN:
+			if (node.getProperty(NodeConstants.Info.JOIN_STRATEGY) != JoinStrategyType.MERGE) {
+				break;
+			}
+			parentBlocking = true;
+			PlanNode toTest = node.getFirstChild();
+			if (mergeSortWithDupRemovalForJoin(toTest)) {
+				node.setProperty(NodeConstants.Info.SORT_LEFT, SortOption.SORT_DISTINCT);
+			}
+			toTest = node.getLastChild();
+			if (mergeSortWithDupRemovalForJoin(toTest)) {
+				node.setProperty(NodeConstants.Info.SORT_RIGHT, SortOption.SORT_DISTINCT);
+			}
+			break;
+		case NodeConstants.Types.SET_OP:
+			// assumes the use of the merge algorithm
+			if (node.getProperty(NodeConstants.Info.SET_OPERATION) != SetQuery.Operation.UNION) {
+				parentBlocking = true;
+			} else if (!node.hasBooleanProperty(NodeConstants.Info.USE_ALL) && !parentBlocking) {
+				node.setProperty(NodeConstants.Info.IS_DUP_REMOVAL, true);
+			}
+			break;
+		}
+		for (PlanNode child : node.getChildren()) {
+			optimizeSorts(parentBlocking, child);
+		}
+	}
+
+	/**
+	 * Look under the left and the right sources for a dup removal operation
+	 *  join
+	 *   [project]
+	 *     source
+	 *       dup remove | union not all
+	 */
+	private boolean mergeSortWithDupRemovalForJoin(PlanNode toTest) {
+		PlanNode source = NodeEditor.findNodePreOrder(toTest, NodeConstants.Types.SOURCE, NodeConstants.Types.ACCESS | NodeConstants.Types.JOIN);
+		return source != null && mergeSortWithDupRemoval(source);
+	}
+
+	private boolean mergeSortWithDupRemoval(PlanNode node) {
+		switch (node.getFirstChild().getType()) {
+		case NodeConstants.Types.SET_OP:
+			if (node.getFirstChild().getProperty(NodeConstants.Info.SET_OPERATION) == SetQuery.Operation.UNION && !node.getFirstChild().hasBooleanProperty(NodeConstants.Info.USE_ALL)) {
+				node.getFirstChild().setProperty(NodeConstants.Info.USE_ALL, true);
+				return true;
+			}
+			break;
+		case NodeConstants.Types.DUP_REMOVE:
+			NodeEditor.removeChildNode(node, node.getFirstChild());
+			return true;
+		}
+		return false;
+	}
+	
+	@Override
+	public String toString() {
+		return "PlanSorts"; //$NON-NLS-1$
+	}
+
+}


Property changes on: trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RulePlanSorts.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchCollector.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchCollector.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchCollector.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -40,6 +40,7 @@
     private boolean done = false;
     private TupleSourceID tsID;
     private int rowCount = 0;
+    private boolean collectedAny;
     
     public BatchCollector(RelationalNode sourceNode) throws MetaMatrixComponentException {
         this.sourceNode = sourceNode;
@@ -61,6 +62,7 @@
             if(batch.getRowCount() > 0) {
                 this.rowCount = batch.getEndRow();
                 sourceNode.getBufferManager().addTupleBatch(tsID, batch);
+                collectedAny = true;
             }
 
             // Check for termination condition - batch ending with null row
@@ -77,6 +79,12 @@
         return tsID;
     }
     
+    public boolean collectedAny() {
+		boolean result = collectedAny;
+		collectedAny = false;
+		return result;
+	}
+    
     public int getRowCount() {
         return rowCount;
     }
@@ -92,5 +100,9 @@
     public TupleSourceID getTupleSourceID() {
         return this.tsID;
     }
+    
+    public boolean isDone() {
+		return done;
+	}
 
 }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchIterator.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchIterator.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchIterator.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -23,7 +23,6 @@
 package com.metamatrix.query.processor.relational;
 
 import java.util.List;
-import java.util.NoSuchElementException;
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
@@ -82,7 +81,7 @@
     public List<?> nextTuple() throws MetaMatrixComponentException,
     		MetaMatrixProcessingException {
         if (currentTuple == null && !hasNext()) {
-            throw new NoSuchElementException();
+            return null;
         }
         List result = currentTuple;
         currentTuple = null;
@@ -103,13 +102,17 @@
     }
 
     public void setPosition(int position) {
+    	if (position == this.currentRow) {
+    		return;
+    	}
+		if (position < this.currentRow && (this.currentBatch == null || position < this.currentBatch.getBeginRow())) {
+			throw new UnsupportedOperationException("Backwards positioning is not allowed"); //$NON-NLS-1$
+		}
         this.currentRow = position;
+        this.currentTuple = null;
+        if (currentBatch.getEndRow() < currentRow) {
+        	this.currentBatch = null;
+        }
     }
 
-    public int available() {
-        if (currentBatch == null) {
-            return 0;
-        }
-        return currentBatch.getEndRow() - currentRow;
-    }
 }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -94,8 +94,7 @@
 					} catch (TupleSourceNotFoundException e) {
 						throw new MetaMatrixComponentException(e);
 					}
-	                this.sortUtility = new SortUtility(originalVs.getTupleSourceID(), ts.getSchema(), sortSymbols, sortDirection, true, dependentNode.getBufferManager(),
-	                                                   dependentNode.getConnectionID());
+	                this.sortUtility = new SortUtility(originalVs.getTupleSourceID(), sortSymbols, sortDirection, true, dependentNode.getBufferManager(), dependentNode.getConnectionID());
             	}
             	dvs = new DependentValueSource(sortUtility.sort(), dependentNode.getBufferManager());
             	for (SetState setState : dependentSetStates) {

Deleted: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DupRemoveNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DupRemoveNode.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DupRemoveNode.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -1,77 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.query.processor.relational;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.metamatrix.api.exception.MetaMatrixComponentException;
-import com.metamatrix.api.exception.MetaMatrixProcessingException;
-import com.metamatrix.query.sql.lang.OrderBy;
-
-public class DupRemoveNode extends SortNode {
-
-	public DupRemoveNode(int nodeID) {
-		super(nodeID);
-	}
-	
-    public void reset() {
-        super.reset();
-    }
-    
-	public void open() 
-		throws MetaMatrixComponentException, MetaMatrixProcessingException {
-
-        // Set up duplicate removal
-        super.setRemoveDuplicates(true);
-        
-        // Set up all elements for sorting
-        List sourceElements = getChildren()[0].getElements();
-        List sortTypes = new ArrayList(sourceElements.size());
-        for(int i=0; i<sourceElements.size(); i++) {
-            sortTypes.add(Boolean.valueOf(OrderBy.ASC));
-        }
-        super.setSortElements(sourceElements, sortTypes);
-
-        super.open();
-	}
-	
-	public Object clone(){
-		DupRemoveNode clonedNode = new DupRemoveNode(super.getID());
-		super.copy(this, clonedNode);
-		
-		return clonedNode;
-	}
-    
-    /* 
-     * @see com.metamatrix.query.processor.Describable#getDescriptionProperties()
-     */
-    public Map getDescriptionProperties() {   
-        // Default implementation - should be overridden     
-        Map props = super.getDescriptionProperties();
-        props.put(PROP_TYPE, "Duplicate Removal"); //$NON-NLS-1$
-        return props;
-    }
-    
-}

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -29,8 +29,10 @@
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
 import com.metamatrix.api.exception.query.ExpressionEvaluationException;
 import com.metamatrix.api.exception.query.FunctionExecutionException;
-import com.metamatrix.common.buffer.*;
-import com.metamatrix.common.buffer.BufferManager.TupleSourceStatus;
+import com.metamatrix.common.buffer.BufferManager;
+import com.metamatrix.common.buffer.TupleSource;
+import com.metamatrix.common.buffer.TupleSourceID;
+import com.metamatrix.common.buffer.TupleSourceNotFoundException;
 import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
 import com.metamatrix.query.function.aggregate.AggregateFunction;
 import com.metamatrix.query.sql.lang.OrderBy;
@@ -45,7 +47,6 @@
     private AggregateFunction proxy;
     private BufferManager mgr;
     private String groupName;
-    private int batchSize;
 
     // Derived and static - can be reused
     private List elements;
@@ -54,12 +55,10 @@
 
     // Temporary state - should be reset
     private TupleSourceID collectionID = null;
-    private List collectionRows;
-    private int collectionRow = 1;
     private SortUtility sortUtility = null;
     private TupleSourceID sortedID = null;
+    private TupleCollector tupleCollector;
 
-
     /**
      * Constructor for DuplicateFilter.
      */
@@ -69,7 +68,6 @@
         this.proxy = proxy;
         this.mgr = mgr;
         this.groupName = groupName;
-        this.batchSize = batchSize;
     }
 
     /**
@@ -93,8 +91,7 @@
         this.proxy.reset();
 
         this.collectionID = null;
-        this.collectionRows = null;
-        this.collectionRow = 1;
+        this.tupleCollector = null;
         this.sortUtility = null;
         this.sortedID = null;
     }
@@ -108,24 +105,12 @@
         try {
             if(collectionID == null) {
                 collectionID = mgr.createTupleSource(elements, elementTypes, groupName, TupleSourceType.PROCESSOR);
+                this.tupleCollector = new TupleCollector(collectionID, mgr);
             }
 
-            if(collectionRows == null) {
-                collectionRows = new ArrayList(batchSize);
-            }
-
             List row = new ArrayList(1);
             row.add(input);
-            collectionRows.add(row);
-            if(collectionRows.size() == batchSize) {
-                TupleBatch batch = new TupleBatch(collectionRow, collectionRows);
-                mgr.addTupleBatch(collectionID, batch);
-
-                // Reset state for next batch
-                collectionRow = collectionRow + batch.getRowCount();
-                collectionRows = new ArrayList(batchSize);
-            }
-
+            this.tupleCollector.addTuple(row);
         } catch(TupleSourceNotFoundException e) {
             throw new MetaMatrixComponentException(e, e.getMessage());
         }
@@ -140,15 +125,10 @@
 
         try {
             if(collectionID != null) {
-                // First save any hanging collection rows
-                if(collectionRows.size() > 0) {
-                    TupleBatch batch = new TupleBatch(collectionRow, collectionRows);
-                    mgr.addTupleBatch(collectionID, batch);
-                }
-                mgr.setStatus(collectionID, TupleSourceStatus.FULL);
+                this.tupleCollector.close();
 
                 // Sort
-                sortUtility = new SortUtility(collectionID, elements, elements, sortTypes, true, mgr, groupName);
+                sortUtility = new SortUtility(collectionID, elements, sortTypes, true, mgr, groupName);
                 this.sortedID = sortUtility.sort();
 
                 // Add all input to proxy

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -297,21 +297,22 @@
             this.sourceBatch = null;
         }
 
+    	this.getBufferManager().setStatus(this.collectionID, TupleSourceStatus.FULL);
+
         if(this.sortElements == null || this.rowCount == 0) {
             // No need to sort
-        	this.getBufferManager().setStatus(this.collectionID, TupleSourceStatus.FULL);
             this.sortedID = this.collectionID;
             this.collectionID = null;
             this.phase = GROUP;
         } else {
-            this.sortUtility = new SortUtility(collectionID, collectedExpressions,
-                                                sortElements, sortTypes, false,
-                                                getBufferManager(), getConnectionID());
+            this.sortUtility = new SortUtility(collectionID, sortElements,
+                                                sortTypes, false, getBufferManager(),
+                                                getConnectionID());
             this.phase = SORT;
         }
     }
 
-    private void sortPhase() throws BlockedException, MetaMatrixComponentException {
+    private void sortPhase() throws BlockedException, MetaMatrixComponentException, MetaMatrixProcessingException {
         this.sortedID = this.sortUtility.sort();
         this.phase = GROUP;
     }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/ListNestedSortComparator.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/ListNestedSortComparator.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/ListNestedSortComparator.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -24,6 +24,8 @@
 
 import java.util.*;
 
+import com.metamatrix.core.util.Assertion;
+
 /**
  * This class can be used for comparing lists of elements, when the fields to
  * be sorted on and the comparison mechanism are dynamically specified. <p>
@@ -61,7 +63,7 @@
     int[] sortParameters;
 
     /**
-     * Indicates whether comparision should be based on ascending or descending
+     * Indicates whether comparison should be based on ascending or descending
      * order.
      */
     boolean ascendingOrder = false;
@@ -146,7 +148,7 @@
 			} else if ( param1 instanceof Comparable ) {
                 compare = ((Comparable)param1).compareTo(param2);
             } else {
-                compare = param1.toString().compareTo(param2.toString());
+            	Assertion.failed("Expected comparable types"); //$NON-NLS-1$
             }
             k++;
         }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/MergeJoinStrategy.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/MergeJoinStrategy.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/MergeJoinStrategy.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -29,6 +29,7 @@
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
 import com.metamatrix.api.exception.query.CriteriaEvaluationException;
 import com.metamatrix.common.buffer.TupleSourceNotFoundException;
+import com.metamatrix.query.processor.relational.SortUtility.Mode;
 import com.metamatrix.query.sql.lang.JoinType;
 import com.metamatrix.query.sql.lang.OrderBy;
 
@@ -89,11 +90,13 @@
     private SortUtility rightSort;
     private SortOption processingSortRight;   
     
-    public MergeJoinStrategy(boolean sortLeft, boolean sortRight) {
-        this(sortLeft?SortOption.SORT:SortOption.SKIP_SORT, sortRight?SortOption.SORT:SortOption.SKIP_SORT, false);
-    }
-    
     public MergeJoinStrategy(SortOption sortLeft, SortOption sortRight, boolean grouping) {
+    	if (sortLeft == null) {
+    		sortLeft = SortOption.SKIP_SORT;
+    	}
+    	if (sortRight == null) {
+    		sortRight = SortOption.SKIP_SORT;
+    	}
         this.sortLeft = sortLeft;
         this.sortRight = sortRight;
         this.grouping = grouping;
@@ -260,13 +263,12 @@
                 }
 
                 if (!outerMatched) {
-                    if (matchState == MatchState.MATCH_LEFT) {
-                        if (this.joinNode.getJoinType().isOuter()) {
-                            return outputTuple(this.leftSource.getCurrentTuple(), this.rightSource.getOuterVals());
-                        }
-                    } else if (joinNode.getJoinType() == JoinType.JOIN_FULL_OUTER) {
-                        return outputTuple(this.leftSource.getOuterVals(), this.rightSource.getCurrentTuple());
+                    if (matchState == MatchState.MATCH_RIGHT) {
+                    	return outputTuple(this.leftSource.getOuterVals(), this.rightSource.getCurrentTuple());
                     }
+                    if (this.joinNode.getJoinType().isOuter()) {
+                        return outputTuple(this.leftSource.getCurrentTuple(), this.rightSource.getOuterVals());
+                    }
                 }
             }
         }
@@ -334,12 +336,11 @@
                              MetaMatrixProcessingException {
         if (sortLeft != SortOption.SKIP_SORT) { 
             if (this.leftSort == null) {
-                List sourceElements = joinNode.getChildren()[0].getElements();
                 List expressions = this.joinNode.getLeftExpressions();
-                this.leftSort = new SortUtility(this.leftSource.collectTuples(), sourceElements,
-                                                    expressions, Collections.nCopies(expressions.size(), OrderBy.ASC), sortLeft == SortOption.SORT_DISTINCT,
-                                                    this.joinNode.getBufferManager(), this.joinNode.getConnectionID());         
-                this.leftSource.setDistinct(sortLeft == SortOption.SORT_DISTINCT);
+                this.leftSort = new SortUtility(this.leftSource.collectTuples(),
+                                                    expressions, Collections.nCopies(expressions.size(), OrderBy.ASC), sortLeft == SortOption.SORT_DISTINCT?Mode.DUP_REMOVE_SORT:Mode.SORT,
+                                                    this.joinNode.getBufferManager(), this.joinNode.getConnectionID(), true);         
+                this.leftSource.setDistinct(sortLeft == SortOption.SORT_DISTINCT && expressions.size() == this.leftSource.getOuterVals().size());
             }
             this.leftSource.setTupleSource(leftSort.sort());
         } else if (this.joinNode.isDependent() || JoinType.JOIN_FULL_OUTER.equals(joinNode.getJoinType())) {
@@ -356,19 +357,20 @@
         super.loadRight();
         if (processingSortRight != SortOption.SKIP_SORT) { 
             if (this.rightSort == null) {
-                List sourceElements = joinNode.getChildren()[1].getElements();
                 List expressions = this.joinNode.getRightExpressions();
-                this.rightSort = new SortUtility(this.rightSource.getTupleSourceID(), sourceElements,
-                                                    expressions, Collections.nCopies(expressions.size(), OrderBy.ASC), processingSortRight == SortOption.SORT_DISTINCT,
-                                                    this.joinNode.getBufferManager(), this.joinNode.getConnectionID());
-                this.rightSource.setDistinct(processingSortRight == SortOption.SORT_DISTINCT);
+                this.rightSort = new SortUtility(this.rightSource.getTupleSourceID(), 
+                                                    expressions, Collections.nCopies(expressions.size(), OrderBy.ASC), processingSortRight == SortOption.SORT_DISTINCT?Mode.DUP_REMOVE_SORT:Mode.SORT,
+                                                    this.joinNode.getBufferManager(), this.joinNode.getConnectionID(), true);
+                this.rightSource.setDistinct(processingSortRight == SortOption.SORT_DISTINCT && expressions.size() == this.rightSource.getOuterVals().size());
             }
             this.rightSource.setTupleSource(rightSort.sort());
         } 
     }
     
     public void setProcessingSortRight(boolean processingSortRight) {
-        this.processingSortRight = processingSortRight?SortOption.SORT:SortOption.SKIP_SORT;
+    	if (processingSortRight && this.processingSortRight == SortOption.SKIP_SORT) {
+    		this.processingSortRight = SortOption.SORT;
+    	}
     }
 
     /**

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/NestedLoopJoinStrategy.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/NestedLoopJoinStrategy.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/NestedLoopJoinStrategy.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -33,7 +33,7 @@
 public class NestedLoopJoinStrategy extends MergeJoinStrategy {
 
     public NestedLoopJoinStrategy() {
-        super(false, false);
+        super(SortOption.SKIP_SORT, SortOption.SKIP_SORT, false);
     }
     
     /** 

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -35,18 +35,19 @@
 import com.metamatrix.common.buffer.TupleBatch;
 import com.metamatrix.common.buffer.TupleSourceID;
 import com.metamatrix.common.buffer.TupleSourceNotFoundException;
+import com.metamatrix.query.processor.relational.SortUtility.Mode;
 import com.metamatrix.query.sql.lang.OrderBy;
 
 public class SortNode extends RelationalNode {
 
 	private List sortElements;
-	private List sortTypes;
-    private boolean removeDuplicates;
+	private List<Boolean> sortTypes;
+    private Mode mode = Mode.SORT;
 
     private SortUtility sortUtility;
     private int phase = COLLECTION;
     private TupleSourceID outputID;
-    private int rowCount;
+    private int rowCount = -1;
     private int outputBeginRow = 1;
     private BatchCollector collector;
 
@@ -63,12 +64,12 @@
         sortUtility = null;
         phase = COLLECTION;
         outputID = null;
-        rowCount = 0;
+        rowCount = -1;
         outputBeginRow = 1;
         this.collector = null;
     }
 
-	public void setSortElements(List sortElements, List sortTypes) {
+	public void setSortElements(List sortElements, List<Boolean> sortTypes) {
 		this.sortElements = sortElements;
 		this.sortTypes = sortTypes;
 	}
@@ -76,10 +77,14 @@
 	public List getSortElements() {
 		return this.sortElements;
 	}
+	
+	public Mode getMode() {
+		return mode;
+	}
 
-    protected void setRemoveDuplicates(boolean removeDuplicates) {
-        this.removeDuplicates = removeDuplicates;
-    }
+	public void setMode(Mode mode) {
+		this.mode = mode;
+	}
 
 	public void open()
 		throws MetaMatrixComponentException, MetaMatrixProcessingException {
@@ -88,81 +93,54 @@
 		this.collector = new BatchCollector(this.getChildren()[0]);
 	}
 
-    /**
-     * 1ST PHASE - COLLECTION
-     *  Collect all batches from child node, save in collected tuple source
-     *
-     * 2ND PHASE - SORT INITIAL SUBLISTS
-     *  Repeat until all batches from collection TS have been read
-     *      Get and pin batches from collection TS until MemoryNotAvailableException
-     *      Sort batches
-     *      Write batches into new sorted TS
-     *      Unpin all batches
-     *  Remove collection TS
-     *
-     * 3RD PHASE - MERGE SORTED SUBLISTS
-     *  Repeat until there is one sublist
-     *      Repeat until all sorted sublists have been merged
-     *          For each sorted sublist S
-     *              Load and pin a batch until memory not available
-     *          Merge from pinned batches
-     *              As batch is done, unpin and load next
-     *          Output merge into new sublist T
-     *          Remove merged sublists
-     *      Let sublists = set of T's
-     *
-     * 4TH PHASE - OUTPUT
-     *  Return batches from single sublist from T
-     */
 	public TupleBatch nextBatchDirect()
 		throws BlockedException, MetaMatrixComponentException, MetaMatrixProcessingException {
 
         try {
 
             if(this.phase == COLLECTION) {
-                collectionPhase(null);
+                collectionPhase();
             }
 
             if(this.phase == SORT) {
                 sortPhase();
             }
 
-            if(this.phase == OUTPUT) {
-                return outputPhase();
-            }
+            return outputPhase();
         } catch(TupleSourceNotFoundException e) {
             throw new MetaMatrixComponentException(e, e.getMessage());
         }
-
-        TupleBatch terminationBatch = new TupleBatch(1, Collections.EMPTY_LIST);
-        terminationBatch.setTerminationFlag(true);
-        return terminationBatch;
     }
 
-    protected void collectionPhase(TupleBatch batch) throws BlockedException, TupleSourceNotFoundException, MetaMatrixComponentException, MetaMatrixProcessingException {
-        RelationalNode sourceNode = this.getChildren()[0];
-        TupleSourceID collectionID = collector.collectTuples(batch);
-        this.rowCount = collector.getRowCount();
-        if(this.rowCount == 0) {
-            this.phase = OUTPUT;
-        } else {
-            List sourceElements = sourceNode.getElements();
-            this.sortUtility = new SortUtility(collectionID, sourceElements,
-                                                sortElements, sortTypes, this.removeDuplicates,
-                                                getBufferManager(), getConnectionID());
-            this.phase = SORT;
-        }
+    private void collectionPhase() throws BlockedException, TupleSourceNotFoundException, MetaMatrixComponentException, MetaMatrixProcessingException {
+		try {
+			collector.collectTuples();
+		} catch (BlockedException e) {
+			if (mode != Mode.DUP_REMOVE || !collector.collectedAny()) {
+				throw e;
+			}
+		}
+		if (this.sortUtility == null) {
+	        this.sortUtility = new SortUtility(collector.getTupleSourceID(), sortElements,
+	                                            sortTypes, this.mode, getBufferManager(),
+	                                            getConnectionID(), true);
+		}
+        this.phase = SORT;
     }
 
-    private void sortPhase() throws BlockedException, TupleSourceNotFoundException, MetaMatrixComponentException {
-        this.outputID = this.sortUtility.sort();
-        this.rowCount = getBufferManager().getRowCount(outputID);
+    private void sortPhase() throws BlockedException, MetaMatrixComponentException, MetaMatrixProcessingException {
+		this.outputID = this.sortUtility.sort();
         this.phase = OUTPUT;
-
     }
 
     private TupleBatch outputPhase() throws BlockedException, TupleSourceNotFoundException, MetaMatrixComponentException {
-        if(this.rowCount == 0 || this.outputBeginRow > this.rowCount) {
+    	if (this.rowCount == -1) {
+    		this.rowCount = getBufferManager().getFinalRowCount(outputID);
+    		if (this.rowCount == -1) {
+    			this.phase = this.collector.isDone()?SORT:COLLECTION;
+    		}
+    	}
+        if(this.rowCount == 0 || (this.rowCount != -1 && this.outputBeginRow > this.rowCount)) {
             TupleBatch terminationBatch = new TupleBatch(1, Collections.EMPTY_LIST);
             terminationBatch.setTerminationFlag(true);
             return terminationBatch;
@@ -171,13 +149,13 @@
         int endPinned = this.outputBeginRow+getBatchSize()-1;
         try {
             TupleBatch outputBatch = getBufferManager().pinTupleBatch(outputID, beginPinned, endPinned);
-
+            
             this.outputBeginRow += outputBatch.getRowCount();
 
-            if(outputBeginRow > rowCount) {
+            if(rowCount != -1 && outputBeginRow > rowCount) {
                 outputBatch.setTerminationFlag(true);
             }
-
+            
             return outputBatch;
         } catch(MemoryNotAvailableException e) {
             throw BlockedOnMemoryException.INSTANCE;
@@ -206,18 +184,17 @@
 
 	protected void getNodeString(StringBuffer str) {
 		super.getNodeString(str);
-		str.append(sortElements);
+		str.append("[").append(mode).append("] "); //$NON-NLS-1$ //$NON-NLS-2$
+		if (this.mode != Mode.DUP_REMOVE) {
+			str.append(sortElements);
+		}
 	}
 
 	protected void copy(SortNode source, SortNode target){
 		super.copy(source, target);
-		if(source.sortElements != null){
-			target.sortElements = new ArrayList(source.sortElements);
-		}
-		if(sortTypes != null){
-			target.sortTypes = new ArrayList(source.sortTypes);
-		}
-		target.removeDuplicates = source.removeDuplicates;
+		target.sortElements = source.sortElements;
+		target.sortTypes = source.sortTypes;
+		target.mode = source.mode;
 	}
 
 	public Object clone(){
@@ -230,9 +207,19 @@
     public Map getDescriptionProperties() {
         // Default implementation - should be overridden
         Map props = super.getDescriptionProperties();
-        props.put(PROP_TYPE, "Sort"); //$NON-NLS-1$
+        switch (mode) {
+        case SORT:
+            props.put(PROP_TYPE, "Sort"); //$NON-NLS-1$
+        	break;
+        case DUP_REMOVE:
+        	props.put(PROP_TYPE, "Duplicate Removal"); //$NON-NLS-1$
+        	break;
+        case DUP_REMOVE_SORT:
+        	props.put(PROP_TYPE, "Duplicate Removal And Sort"); //$NON-NLS-1$
+        	break;
+        }
         
-        if(this.sortElements != null) {
+        if(this.mode != Mode.DUP_REMOVE && this.sortElements != null) {
             Boolean ASC_B = Boolean.valueOf(OrderBy.ASC);
             List cols = new ArrayList(this.sortElements.size());
             for(int i=0; i<this.sortElements.size(); i++) {
@@ -246,7 +233,7 @@
             props.put(PROP_SORT_COLS, cols);
         }
         
-        props.put(PROP_REMOVE_DUPS, "" + this.removeDuplicates); //$NON-NLS-1$
+        props.put(PROP_REMOVE_DUPS, this.mode);
         
         return props;
     }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -30,9 +30,10 @@
 import java.util.List;
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
-import com.metamatrix.common.buffer.BlockedException;
+import com.metamatrix.api.exception.MetaMatrixProcessingException;
 import com.metamatrix.common.buffer.BlockedOnMemoryException;
 import com.metamatrix.common.buffer.BufferManager;
+import com.metamatrix.common.buffer.IndexedTupleSource;
 import com.metamatrix.common.buffer.MemoryNotAvailableException;
 import com.metamatrix.common.buffer.TupleBatch;
 import com.metamatrix.common.buffer.TupleSourceID;
@@ -40,58 +41,74 @@
 import com.metamatrix.common.buffer.BufferManager.TupleSourceStatus;
 import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
 import com.metamatrix.core.util.Assertion;
+import com.metamatrix.query.sql.lang.OrderBy;
 import com.metamatrix.query.sql.symbol.SingleElementSymbol;
 import com.metamatrix.query.util.TypeRetrievalUtil;
 
 /**
  */
 public class SortUtility {
+	
+	public enum Mode {
+		SORT,
+		DUP_REMOVE,
+		DUP_REMOVE_SORT
+	}
 
+	//constructor state
     private TupleSourceID sourceID;
-    private List sourceElements;
-    private List sortElements;
-    private List sortTypes;
-    private boolean removeDuplicates;
-
-    private BufferManager bufferManager;
+    protected List sortElements;
+    protected List<Boolean> sortTypes;
+    private Mode mode;
+    protected BufferManager bufferManager;
     private String groupName;
+    private boolean useAllColumns;
+    
+    //init state
     private int batchSize;
-    private List schema;
+    protected List schema;
     private String[] schemaTypes;
+    protected int[] sortCols;
+	private Comparator comparator;
 
-    // For collecting tuples to be sorted
+    private TupleSourceID outputID;
+    private boolean doneReading;
     private int sortPhaseRow = 1;
-    private int phase = SORT;
-    private int rowCount;
-    private int[] sortCols;
-    private List activeTupleIDs = new ArrayList();
-    private List workingBatches;
+    private int phase = INITIAL_SORT;
+    protected List<TupleSourceID> activeTupleIDs = new ArrayList<TupleSourceID>();
+    private List<TupleBatch> workingBatches;
     private int[] workingPointers;
+    private int masterSortIndex;
+	private TupleSourceID mergedID;
+	private TupleSourceID tempOutId;
 
     // Phase constants for readability
-    private static final int SORT = 1;
+    private static final int INITIAL_SORT = 1;
     private static final int MERGE = 2;
     private static final int DONE = 3;
-
-    /**
-     * Constructor for SortUtility.
-     */
-    public SortUtility(TupleSourceID sourceID, List sourceElements, List sortElements, List sortTypes, boolean removeDuplicates,
-                        BufferManager bufferMgr, String groupName) {
-
+    
+    public SortUtility(TupleSourceID sourceID, List sortElements, List<Boolean> sortTypes, boolean removeDups, BufferManager bufferMgr,
+            String groupName) {
+    	this(sourceID, sortElements, sortTypes, removeDups?Mode.DUP_REMOVE_SORT:Mode.SORT, bufferMgr, groupName, false);
+    }
+    
+    public SortUtility(TupleSourceID sourceID, List sortElements, List<Boolean> sortTypes, Mode mode, BufferManager bufferMgr,
+                        String groupName, boolean useAllColumns) {
         this.sourceID = sourceID;
-        this.sourceElements = sourceElements;
         this.sortElements = sortElements;
         this.sortTypes = sortTypes;
-        this.removeDuplicates = removeDuplicates;
+        this.mode = mode;
         this.bufferManager = bufferMgr;
         this.groupName = groupName;
+        this.useAllColumns = useAllColumns;
     }
-
-    /**
-     */
+    
+    public boolean isDone() {
+    	return this.doneReading && this.phase == DONE;
+    }
+    
     public TupleSourceID sort()
-        throws BlockedException, MetaMatrixComponentException {
+        throws BlockedOnMemoryException, MetaMatrixComponentException {
 
         try {
             // One time setup
@@ -99,23 +116,25 @@
                 initialize();
             }
             
-            if (rowCount == 0) {
-                TupleSourceID mergedID = bufferManager.createTupleSource(this.schema, this.schemaTypes, this.groupName, TupleSourceType.PROCESSOR);
-                activeTupleIDs.add(mergedID);
-                phase = DONE;
+            if(this.phase == INITIAL_SORT) {
+                initialSort();
             }
             
-            if(this.phase == SORT) {
-                sortPhase();
-            }
-
             if(this.phase == MERGE) {
                 try {
                     mergePhase();
                 } finally {
+                	if (this.mergedID != null) {
+	                	this.bufferManager.removeTupleSource(mergedID);
+	                	this.mergedID = null;
+                	}
+                	if (this.tempOutId != null) {
+	                	this.bufferManager.removeTupleSource(tempOutId);
+	                	this.tempOutId = null;
+                	}
                     if (workingBatches != null) {
                         for (int i = 0; i < workingBatches.size(); i++) {
-                            TupleBatch tupleBatch = (TupleBatch)workingBatches.get(i);
+                            TupleBatch tupleBatch = workingBatches.get(i);
                             if (tupleBatch != null) {
                                 unpinWorkingBatch(i, tupleBatch);
                             }
@@ -124,191 +143,216 @@
                     workingBatches = null;
                 }
             }
-            
-            TupleSourceID result = (TupleSourceID) this.activeTupleIDs.get(0);
-            this.bufferManager.setStatus(result, TupleSourceStatus.FULL);
-            return result;
+            if (this.outputID != null) {
+            	return this.outputID;
+            }
+            return this.activeTupleIDs.get(0);
         } catch(TupleSourceNotFoundException e) {
             throw new MetaMatrixComponentException(e, e.getMessage());
         }
 
     }
 
-    protected void sortPhase() throws BlockedException, TupleSourceNotFoundException, MetaMatrixComponentException {
-        ArrayList pinned = new ArrayList();     // of int[2] representing begin, end
+	private TupleSourceID createTupleSource() throws MetaMatrixComponentException {
+		return bufferManager.createTupleSource(this.schema, this.schemaTypes, this.groupName, TupleSourceType.PROCESSOR);
+	}
 
-        // Loop until all data has been read and sorted
-        while(sortPhaseRow <= this.rowCount) {
-            List workingTuples = new ArrayList();
+    protected void initialSort() throws BlockedOnMemoryException, TupleSourceNotFoundException, MetaMatrixComponentException {
+    	while(!doneReading) {
+        	ArrayList<int[]> pinned = new ArrayList<int[]>();     // of int[2] representing begin, end
+            List<List<Object>> workingTuples = new ArrayList<List<Object>>();
+	        // Load data until out of memory
+	        while(!doneReading) {
+	            try {
+	                // Load and pin batch
+	                TupleBatch batch = bufferManager.pinTupleBatch(sourceID, sortPhaseRow, sortPhaseRow + batchSize - 1);
+	
+	                if (batch.getRowCount() == 0) {
+	                	if (bufferManager.getStatus(sourceID) == TupleSourceStatus.FULL) {
+	                		doneReading = true;
+		                }
+	                	break;
+	                }
+                    // Remember pinned rows
+                    pinned.add(new int[] { sortPhaseRow, batch.getEndRow() });
 
-            // Load data until out of memory
-            while(sortPhaseRow <= this.rowCount) {
-                try {
-                    // Load and pin batch
-                    TupleBatch batch = bufferManager.pinTupleBatch(sourceID, sortPhaseRow, sortPhaseRow + batchSize - 1);
+                    addTuples(workingTuples, batch);
+                    
+                    sortPhaseRow += batch.getRowCount();
+	            } catch(MemoryNotAvailableException e) {
+	                break;
+	            }
+	        }
+	
+	        if(workingTuples.isEmpty()) {
+        		break;
+	        }
+		
+	        TupleSourceID activeID = createTupleSource();
+	        activeTupleIDs.add(activeID);
+	        int sortedThisPass = workingTuples.size();
+	        if (this.mode == Mode.SORT) {
+	        	//perform a stable sort
+	    		Collections.sort(workingTuples, comparator);
+	        }
+	        int writeBegin = 1;
+	        while(writeBegin <= sortedThisPass) {
+	            int writeEnd = Math.min(sortedThisPass, writeBegin + batchSize - 1);
+	
+	            TupleBatch writeBatch = new TupleBatch(writeBegin, workingTuples.subList(writeBegin-1, writeEnd));
+	            bufferManager.addTupleBatch(activeID, writeBatch);
+	            writeBegin += writeBatch.getRowCount();
+	        }
+	
+	        // Clean up - unpin rows
+	        for (int[] bounds : pinned) {
+	            bufferManager.unpinTupleBatch(sourceID, bounds[0], bounds[1]);
+	        }
+        }
 
-                    if(batch.getRowCount() > 0) {
-                        // Remember pinned rows
-                        pinned.add(new int[] { sortPhaseRow, batch.getEndRow() });
+    	if (!doneReading && (mode != Mode.DUP_REMOVE || this.activeTupleIDs.isEmpty())) {
+    		throw BlockedOnMemoryException.INSTANCE;
+    	}
+    	
+    	if (this.activeTupleIDs.isEmpty()) {
+            activeTupleIDs.add(createTupleSource());
+        }  
 
-                        // Add to previous batches for sorting
-                        workingTuples.addAll(Arrays.asList(batch.getAllTuples()));
+        // Clean up
+        this.phase = MERGE;
+    }
 
-                        // Adjust beginning of next batch
-                        sortPhaseRow = batch.getEndRow() + 1;
-                    }
-
+	protected void addTuples(List workingTuples, TupleBatch batch) {
+		if (this.mode == Mode.SORT) {
+			workingTuples.addAll(Arrays.asList(batch.getAllTuples()));
+			return;
+		}
+		for (List<Object> list : batch.getAllTuples()) {
+			int index = Collections.binarySearch(workingTuples, list, comparator);
+			if (index >= 0) {
+				continue;
+			}
+			workingTuples.add(-index - 1, list);
+		}
+	}
+	
+    protected void mergePhase() throws BlockedOnMemoryException, MetaMatrixComponentException, TupleSourceNotFoundException {
+    	TupleCollector outCollector = null;
+    	while(this.activeTupleIDs.size() > 1) {    		
+            // Load and pin batch from sorted sublists while memory available
+            this.workingBatches = new ArrayList<TupleBatch>(activeTupleIDs.size());
+            int sortedIndex = 0;
+            for(; sortedIndex<activeTupleIDs.size(); sortedIndex++) {
+                TupleSourceID activeID = activeTupleIDs.get(sortedIndex);
+                try {
+                    TupleBatch sortedBatch = bufferManager.pinTupleBatch(activeID, 1, this.batchSize);
+                    workingBatches.add(sortedBatch);
                 } catch(MemoryNotAvailableException e) {
                     break;
                 }
             }
-
-            // Check for no memory available and block
-            if(workingTuples.isEmpty()) {
-                /* Defect 19087: We couldn't load any batches in memory, and we need to re-enqueue the work,
-                 * so this should be a BlockedOnMemoryException instead of a BlockedException
-                 */
+            
+            //if we cannot make progress, just block for now
+            if (workingBatches.size() < 2) {
                 throw BlockedOnMemoryException.INSTANCE;
             }
+            
+            // Initialize pointers into working batches
+            this.workingPointers = new int[workingBatches.size()];
+            Arrays.fill(this.workingPointers, 1);
 
-            // Sort whatever is in memory
-            Comparator comp = new ListNestedSortComparator(sortCols, sortTypes);
-            Collections.sort( workingTuples, comp );
+            mergedID = createTupleSource();
 
-            // Write to temporary tuple source
-            TupleSourceID sortedID = bufferManager.createTupleSource(this.schema, this.schemaTypes, this.groupName, TupleSourceType.PROCESSOR);
-            activeTupleIDs.add(sortedID);
-            int sortedThisPass = workingTuples.size();
-            int writeBegin = 1;
-            while(writeBegin <= sortedThisPass) {
-                int writeEnd = Math.min(sortedThisPass, writeBegin + batchSize - 1);
-
-                TupleBatch writeBatch = new TupleBatch(writeBegin, workingTuples.subList(writeBegin-1, writeEnd));
-                bufferManager.addTupleBatch(sortedID, writeBatch);
-                writeBegin += writeBatch.getRowCount();
-            }
-
-            // Clean up - unpin rows
-            Iterator iter = pinned.iterator();
-            while(iter.hasNext()) {
-                int[] bounds = (int[]) iter.next();
-                bufferManager.unpinTupleBatch(sourceID, bounds[0], bounds[1]);
-            }
-            pinned.clear();
-        }
-
-        // Clean up
-        this.phase = MERGE;
-    }
-
-    protected void mergePhase() throws BlockedException, TupleSourceNotFoundException, MetaMatrixComponentException {
-        // In the case where there is a single activeTupleID but removeDuplicates == true,
-        // we need to execute this loop exactly once.  We also need to execute any time
-        // the number of active tuple IDs is > 1
-
-        if(this.activeTupleIDs.size() > 1 || this.removeDuplicates) {
-            do {
-                // Load and pin batch from sorted sublists while memory available
-                this.workingBatches = new ArrayList(activeTupleIDs.size());
-                int sortedIndex = 0;
-                for(; sortedIndex<activeTupleIDs.size(); sortedIndex++) {
-                    TupleSourceID activeID = (TupleSourceID) activeTupleIDs.get(sortedIndex);
-                    try {
-                        TupleBatch sortedBatch = bufferManager.pinTupleBatch(activeID, 1, this.batchSize);
-                        workingBatches.add(sortedBatch);
-                    } catch(MemoryNotAvailableException e) {
-                        break;
-                    }
-                }
-                
-                //if we cannot make progress, just block for now
-                if (workingBatches.size() < 2 && !(workingBatches.size() == 1 && this.removeDuplicates && activeTupleIDs.size() == 1)) {
-                    throw BlockedOnMemoryException.INSTANCE;
-                }
-                
-                // Initialize pointers into working batches
-                this.workingPointers = new int[workingBatches.size()];
+            // Merge from working sorted batches
+            TupleCollector collector = new TupleCollector(mergedID, this.bufferManager);
+            while(true) {
+                // Find least valued row among working batches
+                List<?> currentRow = null;
+                int chosenBatchIndex = -1;
+                TupleBatch chosenBatch = null;
                 for(int i=0; i<workingBatches.size(); i++) {
-                    this.workingPointers[i] = 1;
-                }
-
-                // Initialize merge output
-                TupleSourceID mergedID = bufferManager.createTupleSource(this.schema, this.schemaTypes, this.groupName, TupleSourceType.PROCESSOR);
-                int mergedRowBegin = 1;
-
-                // Merge from working sorted batches
-                List currentRows = new ArrayList(this.batchSize);
-                ListNestedSortComparator comparator = new ListNestedSortComparator(sortCols, sortTypes);
-                while(true) {
-                    // Find least valued row among working batches
-                    List currentRow = null;
-                    int chosenBatchIndex = -1;
-                    TupleBatch chosenBatch = null;
-                    for(int i=0; i<workingBatches.size(); i++) {
-                        TupleBatch batch = (TupleBatch) workingBatches.get(i);
-                        if(batch != null) {
-                            List testRow = batch.getTuple(workingPointers[i]);
-                            if(currentRow == null || comparator.compare(testRow, currentRow) < 0) {
-                                // Found lower row
-                                currentRow = testRow;
-                                chosenBatchIndex = i;
-                                chosenBatch = batch;
-                            }
-                        }
+                    TupleBatch batch = workingBatches.get(i);
+                    if(batch == null) {
+                    	continue;
                     }
-
-                    // Check for termination condition - all batches must have been null
-                    if(currentRow == null) {
-                        break;
+                    List<?> testRow = batch.getTuple(workingPointers[i]);
+                    int compare = -1;
+                    if (currentRow != null) {
+                    	compare = comparator.compare(testRow, currentRow);
                     }
-                    // Output the row and update pointers
-                    currentRows.add(currentRow);
-                    incrementWorkingBatch(chosenBatchIndex, chosenBatch);
-
-                    // Move past this same row on all batches if dup removal is on
-                    if(this.removeDuplicates) {
-                        for(int i=0; i<workingBatches.size(); i++) {
-                            TupleBatch batch = (TupleBatch) workingBatches.get(i);
-                            while(batch != null) {
-                                List testRow = batch.getTuple(workingPointers[i]);
-                                if(comparator.compare(testRow, currentRow) == 0) {
-                                    if(incrementWorkingBatch(i, batch)) {
-                                        batch = (TupleBatch) workingBatches.get(i);
-                                    }
-                                } else {
-                                    break;
-                                }
-                            }
-                        }
+                    if(compare < 0) {
+                        // Found lower row
+                        currentRow = testRow;
+                        chosenBatchIndex = i;
+                        chosenBatch = batch;
+                    } else if (compare == 0 && this.mode != Mode.SORT) {
+                    	incrementWorkingBatch(i, batch);
                     }
-
-
-                    // Check for full batch and store
-                    if(currentRows.size() == this.batchSize) {
-                        bufferManager.addTupleBatch(mergedID, new TupleBatch(mergedRowBegin, currentRows));
-                        mergedRowBegin = mergedRowBegin + this.batchSize;
-                        currentRows = new ArrayList(this.batchSize);
-                    }
                 }
 
-                // Save any remaining partial batch
-                if(currentRows.size() > 0) {
-                    bufferManager.addTupleBatch(mergedID, new TupleBatch(mergedRowBegin, currentRows));
+                // Check for termination condition - all batches must have been null
+                if(currentRow == null) {
+                    break;
                 }
-                
-                // Remove merged sublists
-                for(int i=0; i<sortedIndex; i++) {
-                    bufferManager.removeTupleSource((TupleSourceID)activeTupleIDs.remove(0));
+                // Output the row and update pointers
+                collector.addTuple(currentRow);
+                if (this.outputID != null && chosenBatchIndex != masterSortIndex && sortedIndex > masterSortIndex) {
+                	if (outCollector == null) {
+                		tempOutId = createTupleSource();
+                    	outCollector = new TupleCollector(tempOutId, this.bufferManager);
+                	}
+                    outCollector.addTuple(currentRow);
                 }
+                incrementWorkingBatch(chosenBatchIndex, chosenBatch);
+            }
 
-                this.activeTupleIDs.add(mergedID);
-                
-            } while(this.activeTupleIDs.size() > 1);
+            // Save without closing
+            collector.saveBatch(false);
+            
+            // Remove merged sublists
+            for(int i=0; i<sortedIndex; i++) {
+            	TupleSourceID id = activeTupleIDs.remove(0);
+            	if (!id.equals(this.outputID)) {
+            		bufferManager.removeTupleSource(id);
+            	}
+            }
+
+            this.activeTupleIDs.add(mergedID);           
+            this.mergedID = null;
+            masterSortIndex = masterSortIndex - sortedIndex + 1;
+            if (masterSortIndex < 0) {
+            	masterSortIndex = this.activeTupleIDs.size() - 1;
+            }
+        } 
+    	
+        if (outCollector != null) {
+        	outCollector.close();
+        	//transfer the new dup removed tuples to the output id
+        	TupleCollector tc = new TupleCollector(outputID, this.bufferManager);
+        	IndexedTupleSource ts = this.bufferManager.getTupleSource(outCollector.getTupleSourceID());
+        	try {
+	        	while (ts.hasNext()) {
+	        		tc.addTuple(ts.nextTuple());
+	        	}
+        	} catch (MetaMatrixProcessingException e) {
+        		throw new MetaMatrixComponentException(e);
+        	}
         }
-
+        
         // Close sorted source (all others have been removed)
-        bufferManager.setStatus((TupleSourceID) activeTupleIDs.get(0), TupleSourceStatus.FULL);
-        this.phase = DONE;
+        if (doneReading) {
+	        bufferManager.setStatus(activeTupleIDs.get(0), TupleSourceStatus.FULL);
+	        if (this.outputID != null) {
+	        	bufferManager.setStatus(outputID, TupleSourceStatus.FULL);
+	        }
+	        this.phase = DONE;
+	        return;
+        }
+    	Assertion.assertTrue(mode == Mode.DUP_REMOVE);
+    	if (this.outputID == null) {
+    		this.outputID = activeTupleIDs.get(0);
+    	}
+    	this.phase = INITIAL_SORT;
     }
 
     /**
@@ -316,7 +360,7 @@
      * for that batchIndex, which we already happen to have.  Return whether the batch
      * was changed or not.  True = changed.
      */
-    private boolean incrementWorkingBatch(int batchIndex, TupleBatch currentBatch) throws BlockedOnMemoryException, TupleSourceNotFoundException, MetaMatrixComponentException {
+    private void incrementWorkingBatch(int batchIndex, TupleBatch currentBatch) throws BlockedOnMemoryException, TupleSourceNotFoundException, MetaMatrixComponentException {
         workingPointers[batchIndex] += 1;
         if(workingPointers[batchIndex] > currentBatch.getEndRow()) {
             TupleSourceID tsID = unpinWorkingBatch(batchIndex, currentBatch);
@@ -332,22 +376,16 @@
                 } else {
                     workingBatches.set(batchIndex, newBatch);
                 }
-
-                // Return true
-                return true;
-
             } catch(MemoryNotAvailableException e) {
                 throw BlockedOnMemoryException.INSTANCE;
             }
-
         }
-        return false;
     }
     
     private TupleSourceID unpinWorkingBatch(int batchIndex,
                                             TupleBatch currentBatch) throws TupleSourceNotFoundException,
                                                                     MetaMatrixComponentException {
-        TupleSourceID tsID = (TupleSourceID)activeTupleIDs.get(batchIndex);
+        TupleSourceID tsID = activeTupleIDs.get(batchIndex);
         int lastBeginRow = currentBatch.getBeginRow();
         int lastEndRow = currentBatch.getEndRow();
         bufferManager.unpinTupleBatch(tsID, lastBeginRow, lastEndRow);
@@ -358,19 +396,33 @@
         this.schema = this.bufferManager.getTupleSchema(this.sourceID);
         this.schemaTypes = TypeRetrievalUtil.getTypeNames(schema);
         this.batchSize = bufferManager.getProcessorBatchSize();
-        this.rowCount = bufferManager.getRowCount(this.sourceID);
         
+        if (useAllColumns && mode != Mode.SORT) {
+	        if (this.sortElements != null) {
+	        	this.sortElements = new ArrayList(this.sortElements);
+	        	List toAdd = new ArrayList(schema);
+	        	toAdd.removeAll(this.sortElements);
+	        	this.sortElements.addAll(toAdd);
+	        	this.sortTypes = new ArrayList<Boolean>(this.sortTypes);
+	        	this.sortTypes.addAll(Collections.nCopies(this.sortElements.size() - this.sortTypes.size(), OrderBy.ASC));
+        	} else {
+	    		this.sortElements = this.schema;
+	    		this.sortTypes = Collections.nCopies(this.sortElements.size(), OrderBy.ASC);
+        	}
+        }
+        
         int[] cols = new int[sortElements.size()];
 
         Iterator iter = sortElements.iterator();
         
         for (int i = 0; i < cols.length; i++) {
-            SingleElementSymbol elem = (SingleElementSymbol) iter.next();
+            SingleElementSymbol elem = (SingleElementSymbol)iter.next();
             
-            cols[i] = sourceElements.indexOf(elem);
+            cols[i] = schema.indexOf(elem);
             Assertion.assertTrue(cols[i] != -1);
         }
-        
         this.sortCols = cols;
+        this.comparator = new ListNestedSortComparator(sortCols, sortTypes);
     }
+    
 }

Added: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/TupleCollector.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/TupleCollector.java	                        (rev 0)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/TupleCollector.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -0,0 +1,86 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.query.processor.relational;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.common.buffer.BufferManager;
+import com.metamatrix.common.buffer.TupleBatch;
+import com.metamatrix.common.buffer.TupleSourceID;
+import com.metamatrix.common.buffer.TupleSourceNotFoundException;
+import com.metamatrix.common.buffer.BufferManager.TupleSourceStatus;
+
+public class TupleCollector {
+	
+	private TupleSourceID tsid;
+	
+	private int batchSize;
+	private ArrayList<List<?>> batch;
+	private int index;
+	private BufferManager bm;
+
+	public TupleCollector(TupleSourceID tsid, BufferManager bm) throws TupleSourceNotFoundException, MetaMatrixComponentException {
+		this.tsid = tsid;
+		this.batchSize = bm.getProcessorBatchSize();
+		this.bm = bm;
+		this.index = bm.getRowCount(tsid) + 1;
+	}
+	
+	public TupleSourceID getTupleSourceID() {
+		return tsid;
+	}
+	
+	public void addTuple(List<?> tuple) throws TupleSourceNotFoundException, MetaMatrixComponentException {
+		if (batch == null) {
+			batch = new ArrayList<List<?>>(batchSize/4);
+		}
+		batch.add(tuple);
+		if (batch.size() == batchSize) {
+			saveBatch(false);
+		}
+	}
+
+	public void saveBatch(boolean isLast) throws TupleSourceNotFoundException,
+			MetaMatrixComponentException {
+		ArrayList<List<?>> toSave = batch;
+		if (toSave == null || toSave.isEmpty()) {
+			if (!isLast) {
+				return;
+			}
+			toSave = new ArrayList<List<?>>(0);
+		}
+		TupleBatch tb = new TupleBatch(index, toSave);
+		tb.setTerminationFlag(isLast);
+		this.bm.addTupleBatch(tsid, tb);
+		this.index += toSave.size();
+		batch = null;
+	}
+	
+	public void close() throws TupleSourceNotFoundException, MetaMatrixComponentException {
+		saveBatch(true);
+		this.bm.setStatus(this.tsid, TupleSourceStatus.FULL);
+	}
+	
+}


Property changes on: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/TupleCollector.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Modified: trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestLimit.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestLimit.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestLimit.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -31,6 +31,8 @@
 import com.metamatrix.query.mapping.relational.QueryNode;
 import com.metamatrix.query.optimizer.TestOptimizer.DependentProjectNode;
 import com.metamatrix.query.optimizer.TestOptimizer.DependentSelectNode;
+import com.metamatrix.query.optimizer.TestOptimizer.DupRemoveNode;
+import com.metamatrix.query.optimizer.TestOptimizer.DupRemoveSortNode;
 import com.metamatrix.query.optimizer.capabilities.BasicSourceCapabilities;
 import com.metamatrix.query.optimizer.capabilities.FakeCapabilitiesFinder;
 import com.metamatrix.query.optimizer.capabilities.SourceCapabilities.Capability;
@@ -39,7 +41,6 @@
 import com.metamatrix.query.processor.TestProcessor;
 import com.metamatrix.query.processor.relational.AccessNode;
 import com.metamatrix.query.processor.relational.DependentAccessNode;
-import com.metamatrix.query.processor.relational.DupRemoveNode;
 import com.metamatrix.query.processor.relational.GroupingNode;
 import com.metamatrix.query.processor.relational.LimitNode;
 import com.metamatrix.query.processor.relational.MergeJoinStrategy;
@@ -575,7 +576,7 @@
             0,      // DependentAccess
             0,      // DependentSelect
             0,      // DependentProject
-            1,      // DupRemove
+            0,      // DupRemove
             0,      // Grouping
             1,      // Limit
             0,      // NestedLoopJoinStrategy
@@ -584,9 +585,10 @@
             0,      // PlanExecution
             0,      // Project
             0,      // Select
-            1,      // Sort
+            0,      // Sort
             1       // UnionAll
         }, NODE_TYPES);
+        TestOptimizer.checkNodeTypes(plan, new int[] {1}, new Class[]{DupRemoveSortNode.class});
     }
     
     public void testCombinedLimits() throws Exception {

Modified: trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -58,7 +58,6 @@
 import com.metamatrix.query.processor.ProcessorPlan;
 import com.metamatrix.query.processor.relational.AccessNode;
 import com.metamatrix.query.processor.relational.DependentAccessNode;
-import com.metamatrix.query.processor.relational.DupRemoveNode;
 import com.metamatrix.query.processor.relational.GroupingNode;
 import com.metamatrix.query.processor.relational.JoinNode;
 import com.metamatrix.query.processor.relational.MergeJoinStrategy;
@@ -72,6 +71,7 @@
 import com.metamatrix.query.processor.relational.SelectNode;
 import com.metamatrix.query.processor.relational.SortNode;
 import com.metamatrix.query.processor.relational.UnionAllNode;
+import com.metamatrix.query.processor.relational.SortUtility.Mode;
 import com.metamatrix.query.resolver.QueryResolver;
 import com.metamatrix.query.resolver.util.BindVariableVisitor;
 import com.metamatrix.query.rewriter.QueryRewriter;
@@ -92,6 +92,8 @@
     public interface DependentJoin {}
     public interface DependentSelectNode {}
     public interface DependentProjectNode {}
+    public interface DupRemoveNode {}
+    public interface DupRemoveSortNode {}
     
     public static final int[] FULL_PUSHDOWN = new int[] {
                                             1,      // Access
@@ -441,6 +443,19 @@
         	} else {
         		updateCounts(DependentSelectNode.class, counts, types);
         	}
+        } else if (nodeType.equals(SortNode.class)) {
+        	Mode mode = ((SortNode)relationalNode).getMode();
+        	switch(mode) {
+        	case DUP_REMOVE:
+                updateCounts(DupRemoveNode.class, counts, types);
+        		break;
+        	case DUP_REMOVE_SORT:
+                updateCounts(DupRemoveSortNode.class, counts, types);
+        		break;
+        	case SORT:
+                updateCounts(SortNode.class, counts, types);
+        		break;
+        	}
         } else {
             updateCounts(nodeType, counts, types);
         }
@@ -7145,7 +7160,7 @@
         // Create query 
         String sql = "select IntKey from bqt1.smalla where stringkey not like '2%'"; //$NON-NLS-1$
 
-        ProcessorPlan plan = helpPlan(sql, FakeMetadataFactory.exampleBQT(), null, capFinder, 
+        ProcessorPlan plan = helpPlan(sql, FakeMetadataFactory.exampleBQTCached(), null, capFinder, 
                                       new String[] {"SELECT stringkey, IntKey FROM bqt1.smalla"}, TestOptimizer.SHOULD_SUCCEED); //$NON-NLS-1$
         
         checkNodeTypes(plan, new int[] {

Added: trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestSortOptimization.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestSortOptimization.java	                        (rev 0)
+++ trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestSortOptimization.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -0,0 +1,111 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.query.optimizer;
+
+import static com.metamatrix.query.optimizer.TestOptimizer.*;
+
+import org.junit.Test;
+
+import com.metamatrix.query.optimizer.capabilities.BasicSourceCapabilities;
+import com.metamatrix.query.optimizer.capabilities.FakeCapabilitiesFinder;
+import com.metamatrix.query.processor.ProcessorPlan;
+import com.metamatrix.query.unittest.FakeMetadataFactory;
+
+public class TestSortOptimization {
+
+    @Test public void testSortDupCombination() { 
+        FakeCapabilitiesFinder capFinder = new FakeCapabilitiesFinder();
+        BasicSourceCapabilities caps = new BasicSourceCapabilities();
+        capFinder.addCapabilities("pm1", caps); //$NON-NLS-1$
+
+        // Create query 
+        String sql = "select distinct e1, e2 from pm1.g1 order by e2"; //$NON-NLS-1$
+
+        ProcessorPlan plan = helpPlan(sql, FakeMetadataFactory.example1Cached(), null, capFinder, 
+                                      new String[] {"SELECT e1, e2 FROM pm1.g1"}, TestOptimizer.SHOULD_SUCCEED); //$NON-NLS-1$
+        
+        checkNodeTypes(plan, FULL_PUSHDOWN); 
+        checkNodeTypes(plan, new int[] {1}, new Class[] {DupRemoveSortNode.class});
+    }
+    
+    @Test public void testSortDupCombination1() { 
+        FakeCapabilitiesFinder capFinder = new FakeCapabilitiesFinder();
+        BasicSourceCapabilities caps = new BasicSourceCapabilities();
+        capFinder.addCapabilities("pm1", caps); //$NON-NLS-1$
+
+        // Create query 
+        String sql = "select e1, e2 from pm1.g1 union select e1, e2 from pm1.g2 order by e2"; //$NON-NLS-1$
+
+        ProcessorPlan plan = helpPlan(sql, FakeMetadataFactory.example1Cached(), null, capFinder, 
+                                      new String[] {"SELECT e1, e2 FROM pm1.g1", "SELECT e1, e2 FROM pm1.g2"}, TestOptimizer.SHOULD_SUCCEED); //$NON-NLS-1$ //$NON-NLS-2$
+        
+        checkNodeTypes(plan, new int[] {
+                2,      // Access
+                0,      // DependentAccess
+                0,      // DependentSelect
+                0,      // DependentProject
+                0,      // DupRemove
+                0,      // Grouping
+                0,      // NestedLoopJoinStrategy
+                0,      // MergeJoinStrategy
+                0,      // Null
+                0,      // PlanExecution
+                0,      // Project
+                0,      // Select
+                0,      // Sort
+                1       // UnionAll
+            });
+        checkNodeTypes(plan, new int[] {1}, new Class[] {DupRemoveSortNode.class});
+    }
+	
+    @Test public void testSortDupCombination2() { 
+        FakeCapabilitiesFinder capFinder = new FakeCapabilitiesFinder();
+        BasicSourceCapabilities caps = new BasicSourceCapabilities();
+        capFinder.addCapabilities("pm1", caps); //$NON-NLS-1$
+
+        // Create query 
+        String sql = "select x.*, y.* from (select distinct e1, e2 from pm1.g1) x, (select distinct e1, e2 from pm1.g2) y where x.e1 = y.e1"; //$NON-NLS-1$
+
+        ProcessorPlan plan = helpPlan(sql, FakeMetadataFactory.example1Cached(), null, capFinder, 
+                                      new String[] {"SELECT e1, e2 FROM pm1.g1", "SELECT e1, e2 FROM pm1.g2"}, TestOptimizer.SHOULD_SUCCEED); //$NON-NLS-1$ //$NON-NLS-2$
+        
+        checkNodeTypes(plan, new int[] {
+                2,      // Access
+                0,      // DependentAccess
+                0,      // DependentSelect
+                0,      // DependentProject
+                0,      // DupRemove
+                0,      // Grouping
+                0,      // NestedLoopJoinStrategy
+                1,      // MergeJoinStrategy
+                0,      // Null
+                0,      // PlanExecution
+                1,      // Project
+                0,      // Select
+                0,      // Sort
+                0       // UnionAll
+            });
+        checkNodeTypes(plan, new int[] {0}, new Class[] {DupRemoveSortNode.class});
+    }
+
+}


Property changes on: trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestSortOptimization.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Modified: trunk/engine/src/test/java/com/metamatrix/query/optimizer/xml/TestXMLPlanner.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/optimizer/xml/TestXMLPlanner.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/test/java/com/metamatrix/query/optimizer/xml/TestXMLPlanner.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -923,8 +923,7 @@
         XMLPlan plan = helpPlan(
             "SELECT * FROM vm1.doc1 ORDER BY vm1.doc1.a0.a1.c1", //$NON-NLS-1$
             example1());
-        System.out.println((plan.getDescriptionProperties().get(Describable.PROP_CHILDREN)).toString());
-        assertEquals("[{formatted=false, encoding=UTF-8, type=START DOCUMENT}, {tag=a0, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {type=NEXT IN DOCUMENT}, {isStaging=true, resultSet=tm1.g1, type=Staging Table}, {sql=VM1.G1_1, isStaging=false, resultSet=VM1.G1_1, type=EXECUTE SQL}, {resultSet=VM1.G1_1, type=BLOCK}, {resultSet=VM1.G1_1, type=NEXT ROW}, {program={children=[{tag=a1, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {type=NEXT IN DOCUMENT}, {tag=a1, dataCol=VM1.G1_1.e1, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {tag=b1, dataCol=VM1.G1_1.e2, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {tag=c1, dataCol=VM1.G1_1.e3, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {type=UP IN DOCUMENT}, {resultSet=VM1.G1_1, type=NEXT ROW}], type=XML Program}, resultSet=VM1.G1_1, type=LOOP}, {resultSet=VM1.G1_1, type=CLOSE RESULTSET}, {type=UP IN DOCUMENT}, {type=END DOCUMENT}, {isStaging=true, resultSet=unload_tm1.g1, type=S!
 taging Table}, {children=[{removeDups=false, sortCols=[#TM1_G1.E3 ASC], nodeCostEstimates=[Estimated Node Cardinality: -1.0], children=[{nodeCostEstimates=[Estimated Node Cardinality: -1.0], children=[{modelName=__TEMP__, sql=SELECT #TM1_G1.E1, #TM1_G1.E2, #TM1_G1.E3, #TM1_G1.E4 FROM #TM1_G1, nodeCostEstimates=[Estimated Node Cardinality: -1.0], children=[], type=Access, outputCols=[E1 (string), E2 (integer), E3 (boolean), E4 (double)]}], selectCols=[#TM1_G1.E1, #TM1_G1.E2, #TM1_G1.E3, #TM1_G1.E4], type=Project, outputCols=[E1 (string), E2 (integer), E3 (boolean), E4 (double)]}], type=Sort, outputCols=[E1 (string), E2 (integer), E3 (boolean), E4 (double)]}], type=Relational Plan, outputCols=[E1 (string), E2 (integer), E3 (boolean), E4 (double)]}]", plan.getDescriptionProperties().get(Describable.PROP_CHILDREN).toString());//$NON-NLS-1$
+        assertEquals("[{formatted=false, encoding=UTF-8, type=START DOCUMENT}, {tag=a0, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {type=NEXT IN DOCUMENT}, {isStaging=true, resultSet=tm1.g1, type=Staging Table}, {sql=VM1.G1_1, isStaging=false, resultSet=VM1.G1_1, type=EXECUTE SQL}, {resultSet=VM1.G1_1, type=BLOCK}, {resultSet=VM1.G1_1, type=NEXT ROW}, {program={children=[{tag=a1, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {type=NEXT IN DOCUMENT}, {tag=a1, dataCol=VM1.G1_1.e1, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {tag=b1, dataCol=VM1.G1_1.e2, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {tag=c1, dataCol=VM1.G1_1.e3, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {type=UP IN DOCUMENT}, {resultSet=VM1.G1_1, type=NEXT ROW}], type=XML Program}, resultSet=VM1.G1_1, type=LOOP}, {resultSet=VM1.G1_1, type=CLOSE RESULTSET}, {type=UP IN DOCUMENT}, {type=END DOCUMENT}, {isStaging=true, resultSet=unload_tm1.g1, type=S!
 taging Table}, {children=[{removeDups=SORT, sortCols=[#TM1_G1.E3 ASC], nodeCostEstimates=[Estimated Node Cardinality: -1.0], children=[{nodeCostEstimates=[Estimated Node Cardinality: -1.0], children=[{modelName=__TEMP__, sql=SELECT #TM1_G1.E1, #TM1_G1.E2, #TM1_G1.E3, #TM1_G1.E4 FROM #TM1_G1, nodeCostEstimates=[Estimated Node Cardinality: -1.0], children=[], type=Access, outputCols=[E1 (string), E2 (integer), E3 (boolean), E4 (double)]}], selectCols=[#TM1_G1.E1, #TM1_G1.E2, #TM1_G1.E3, #TM1_G1.E4], type=Project, outputCols=[E1 (string), E2 (integer), E3 (boolean), E4 (double)]}], type=Sort, outputCols=[E1 (string), E2 (integer), E3 (boolean), E4 (double)]}], type=Relational Plan, outputCols=[E1 (string), E2 (integer), E3 (boolean), E4 (double)]}]", plan.getDescriptionProperties().get(Describable.PROP_CHILDREN).toString());//$NON-NLS-1$
     }
     // ################################## TEST SUITE ################################
 

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -7660,7 +7660,7 @@
             0,      // DependentAccess
             0,      // DependentSelect
             0,      // DependentProject
-            1,      // DupRemove
+            0,      // DupRemove
             0,      // Grouping
             0,      // NestedLoopJoinStrategy
             2,      // MergeJoinStrategy

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -26,7 +26,6 @@
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java	2009-04-27 19:02:50 UTC (rev 846)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java	2009-04-28 17:36:46 UTC (rev 847)
@@ -22,6 +22,8 @@
 
 package com.metamatrix.query.processor.relational;
 
+import static org.junit.Assert.*;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -29,7 +31,7 @@
 import java.util.List;
 import java.util.Set;
 
-import junit.framework.TestCase;
+import org.junit.Test;
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
@@ -39,24 +41,17 @@
 import com.metamatrix.common.buffer.impl.SizeUtility;
 import com.metamatrix.common.types.DataTypeManager;
 import com.metamatrix.query.processor.relational.NodeTestUtil.TestableBufferManagerImpl;
+import com.metamatrix.query.processor.relational.SortUtility.Mode;
 import com.metamatrix.query.sql.lang.OrderBy;
 import com.metamatrix.query.sql.symbol.ElementSymbol;
 import com.metamatrix.query.util.CommandContext;
 
-public class TestSortNode extends TestCase {
+public class TestSortNode {
     
     public static final int BATCH_SIZE = 100;
     public static final int INT_BATCH_SIZE = TestSortNode.getIntBatchSize(); //the size of 100 integers    
     
-    /**
-     * Constructor for TestSortNode.
-     * @param arg0
-     */
-    public TestSortNode(String arg0) {
-        super(arg0);
-    }
-    
-    private void helpTestSort(long bytesInMemory, List elements, List[] data, List sortElements, List sortTypes, List[] expected, Set blockOn, boolean removeDups) throws MetaMatrixComponentException, MetaMatrixProcessingException {
+    private void helpTestSort(long bytesInMemory, List elements, List[] data, List sortElements, List sortTypes, List[] expected, Set blockOn, Mode mode) throws MetaMatrixComponentException, MetaMatrixProcessingException {
         BufferManager mgr = NodeTestUtil.getTestBufferManager(bytesInMemory);
         TestableBufferManagerImpl impl = (TestableBufferManagerImpl) mgr;
         impl.setBlockOn(blockOn);
@@ -69,14 +64,15 @@
         dataNode.setElements(elements);
         dataNode.initialize(context, mgr, null);    
         
-        SortNode sortNode = null;
-        if (removeDups) {
-            sortNode = new DupRemoveNode(1);
+        SortNode sortNode = new SortNode(1);
+        if (mode == Mode.DUP_REMOVE) {
+        	sortTypes = Arrays.asList(new Boolean[elements.size()]);
+        	Collections.fill(sortTypes, OrderBy.ASC);
+            sortNode.setSortElements(elements, sortTypes);
         } else {
-            sortNode = new SortNode(1);
+        	sortNode.setSortElements(sortElements, sortTypes);
         }
-        
-        sortNode.setSortElements(sortElements, sortTypes);
+        sortNode.setMode(mode);
         sortNode.setElements(elements);
         sortNode.addChild(dataNode);        
         sortNode.initialize(context, mgr, null);    
@@ -87,26 +83,27 @@
         while(true) {
             try {
                 TupleBatch batch = sortNode.nextBatch();
-                
-                for(int row = currentRow; row <= batch.getEndRow(); row++) {
-                    assertEquals("Rows don't match at " + row, expected[row-1], batch.getTuple(row)); //$NON-NLS-1$
+                if (mode != Mode.DUP_REMOVE) {
+	                for(int row = currentRow; row <= batch.getEndRow(); row++) {
+	                    assertEquals("Rows don't match at " + row, expected[row-1], batch.getTuple(row)); //$NON-NLS-1$
+	                }
                 }
-                
+                currentRow += batch.getRowCount();    
                 if(batch.getTerminationFlag()) {
                     break;
                 }
-                currentRow += batch.getRowCount();    
             } catch (BlockedOnMemoryException e) {
                 if (!impl.wasBlocked()) {
                     throw new BlockedOnMemoryException();
                 }
             }
         }
+        assertEquals(expected.length, currentRow - 1);
     }
 
     public static int getIntBatchSize() {
         List[] expected = new List[] { 
-                Arrays.asList(new Object[] { new Integer(0) }), //$NON-NLS-1$ //$NON-NLS-2$
+                Arrays.asList(new Object[] { new Integer(0) }), 
            };     
         
         String[] types = { "integer" };     //$NON-NLS-1$
@@ -118,7 +115,7 @@
     /*
      * 1 batch all in memory
      */
-    private void helpTestBasicSort(List[] expected, boolean removeDups) throws Exception {
+    private void helpTestBasicSort(List[] expected, Mode mode) throws Exception {
         ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
         es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
 
@@ -157,11 +154,11 @@
             if (i > 0) {
                 blockedOn.add(new Integer(i));
             }
-            helpTestSort(INT_BATCH_SIZE*2, elements, data, sortElements, sortTypes, expected, blockedOn, removeDups);
+            helpTestSort(INT_BATCH_SIZE*2, elements, data, sortElements, sortTypes, expected, blockedOn, mode);
         }
     }
     
-    private void helpTestBiggerSort(int batches, int inMemoryBatches, boolean removeDups) throws Exception {
+    private void helpTestBiggerSort(int batches, int inMemoryBatches) throws Exception {
         ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
         es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
 
@@ -193,30 +190,31 @@
             expected[i].add(unsortedNumbers.get(i));
         }
         
-        /*
-         * the following code will do four tests, blocking in a variety of places
-         */
-        for (int i = 0; i < 3; i++) {
-            Set blockedOn = new HashSet();
-            if (i > 0) {
-                //block on a variety of positions
-                blockedOn.add(new Integer(i));
-                blockedOn.add(new Integer(inMemoryBatches*i));
-                blockedOn.add(new Integer(batches*i));
-                blockedOn.add(new Integer(batches*(i+1)));
-            }
-            //5 batches in memory out of 10 total
-            helpTestSort(INT_BATCH_SIZE * inMemoryBatches, elements, data, sortElements, sortTypes, expected, null, removeDups);
+        
+        for (Mode mode : Mode.values()) {
+	        /*
+	         * the following code will do four tests, blocking in a variety of places
+	         */
+	        for (int i = 0; i < 3; i++) {
+	            Set blockedOn = new HashSet();
+	            if (i > 0) {
+	                //block on a variety of positions
+	                blockedOn.add(new Integer(i));
+	                blockedOn.add(new Integer(inMemoryBatches*i));
+	                blockedOn.add(new Integer(batches*i));
+	                blockedOn.add(new Integer(batches*(i+1)));
+	            }
+	            //5 batches in memory out of 10 total
+	            helpTestSort(INT_BATCH_SIZE * inMemoryBatches, elements, data, sortElements, sortTypes, expected, blockedOn, mode);
+	        }
         }
     }
         
-    public void testNoSort() throws Exception {
-        helpTestBiggerSort(0, 2, false);
-        
-        helpTestBiggerSort(0, 2, true);
+    @Test public void testNoSort() throws Exception {
+        helpTestBiggerSort(0, 2);
     }    
     
-    public void testBasicSort() throws Exception {
+    @Test public void testBasicSort() throws Exception {
         List[] expected = new List[] { 
             Arrays.asList(new Object[] { new Integer(0), "0" }),    //$NON-NLS-1$
             Arrays.asList(new Object[] { new Integer(0), "3" }),    //$NON-NLS-1$
@@ -240,10 +238,14 @@
             Arrays.asList(new Object[] { new Integer(10), "4" })                //$NON-NLS-1$
         };
         
-        helpTestBasicSort(expected, false);
+        helpTestBasicSort(expected, Mode.SORT);
     }
     
-    public void testBasicSortRemoveDup() throws Exception {
+    /**
+     * Note the ordering here is not stable
+     * @throws Exception
+     */
+    @Test public void testBasicSortRemoveDup() throws Exception {
         List[] expected = new List[] { 
             Arrays.asList(new Object[] { new Integer(0), "0" }),    //$NON-NLS-1$
             Arrays.asList(new Object[] { new Integer(0), "3" }),    //$NON-NLS-1$
@@ -266,30 +268,41 @@
             Arrays.asList(new Object[] { new Integer(10), "9" })                //$NON-NLS-1$
         };
 
-        
-        helpTestBasicSort(expected, true);
-    }    
+        helpTestBasicSort(expected, Mode.DUP_REMOVE);
+    }   
     
-    public void testBiggerSort() throws Exception {
-        helpTestBiggerSort(10, 5, false);
-        
-        helpTestBiggerSort(10, 5, true);
+    @Test public void testBasicSortRemoveDupSort() throws Exception {
+    	List[] expected = new List[] { 
+                Arrays.asList(new Object[] { new Integer(0), "0" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(0), "3" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(1), "2" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(1), "5" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(2), "1" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(2), "4" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(3), "3" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(3), "6" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(4), "3" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(5), "2" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(5), "5" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(6), "1" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(6), "4" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(7), "3" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(8), "2" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(9), "1" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(9), "5" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(10), "4" }),    //$NON-NLS-1$
+                Arrays.asList(new Object[] { new Integer(10), "9" })                //$NON-NLS-1$
+            };
+
+        helpTestBasicSort(expected, Mode.DUP_REMOVE_SORT);
+    }   
+    
+    @Test public void testBiggerSort() throws Exception {
+        helpTestBiggerSort(10, 5);
     }
  
-    public void testBiggerSortLowMemory() throws Exception {
-        try {
-            helpTestBiggerSort(5, 1, false);
-            fail("Expected exception"); //$NON-NLS-1$
-        } catch (BlockedOnMemoryException e) {
-            //expected
-        } 
-        
-        try {
-            helpTestBiggerSort(5, 1, true);
-            fail("Expected exception"); //$NON-NLS-1$
-        } catch (BlockedOnMemoryException e) {
-            //expected
-        }
+    @Test(expected=BlockedOnMemoryException.class) public void testBiggerSortLowMemory() throws Exception {
+        helpTestBiggerSort(5, 1);
     }       
 
     /**
@@ -297,10 +310,8 @@
      * 
      * This is also a test of the multi-pass merge
      */
-    public void testBiggerSortLowMemory2() throws Exception {
-        helpTestBiggerSort(5, 2, false);
-        
-        helpTestBiggerSort(5, 2, true);
-    }       
-
+    @Test public void testBiggerSortLowMemory2() throws Exception {
+        helpTestBiggerSort(5, 2);
+    }
+    
 }




More information about the teiid-commits mailing list