[teiid-commits] teiid SVN: r1728 - in trunk: engine/src/main/java/com/metamatrix/query/processor and 5 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Sun Jan 10 00:42:03 EST 2010


Author: shawkins
Date: 2010-01-10 00:42:02 -0500 (Sun, 10 Jan 2010)
New Revision: 1728

Modified:
   trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/BatchCollector.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/JoinNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/RelationalNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SelectNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
   trunk/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/relational/BlockingFakeRelationalNode.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestGroupingNode.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java
   trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedConfigurationService.java
Log:
TEIID-926 TEIID-925 simplifying projection to simple indexing and correcting the dup removal logic in sort utility/sort node.  also modifying the dup removal algorithm to force progressively more sublists to be present before performing a merge.

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java	2010-01-10 00:19:53 UTC (rev 1727)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java	2010-01-10 05:42:02 UTC (rev 1728)
@@ -204,8 +204,16 @@
 			saveBatch(false);
 		}
 	}
+	
+	/**
+	 * Force the persistence of any rows held in memory.
+	 * @throws MetaMatrixComponentException
+	 */
+	public void saveBatch() throws MetaMatrixComponentException {
+		this.saveBatch(false);
+	}
 
-	public void saveBatch(boolean finalBatch) throws MetaMatrixComponentException {
+	void saveBatch(boolean finalBatch) throws MetaMatrixComponentException {
 		Assertion.assertTrue(!this.isRemoved());
 		if (batchBuffer == null || batchBuffer.isEmpty()) {
 			return;

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/BatchCollector.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/BatchCollector.java	2010-01-10 00:19:53 UTC (rev 1727)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/BatchCollector.java	2010-01-10 05:42:02 UTC (rev 1728)
@@ -117,8 +117,4 @@
         return buffer.getRowCount();
     }
     
-    public boolean isDone() {
-		return done;
-	}
-
 }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java	2010-01-10 00:19:53 UTC (rev 1727)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java	2010-01-10 05:42:02 UTC (rev 1728)
@@ -54,7 +54,6 @@
     private volatile boolean requestCanceled = false;
     private static final int DEFAULT_WAIT = 50;       
     private boolean processorClosed = false;
-    private volatile int highestRow;
     
     private boolean nonBlocking = false;
          
@@ -132,7 +131,6 @@
 	        		throw new MetaMatrixProcessingException("Query timed out"); //$NON-NLS-1$
 	        	}
 	            result = processPlan.nextBatch();
-	            this.highestRow = result.getEndRow();
 	
 	        	if(result.getTerminationFlag()) {
 	        		done = true;
@@ -202,10 +200,6 @@
         this.requestCanceled = true;
     }
     
-	public int getHighestRow() {
-		return highestRow;
-	}    
-	
 	public BatchCollector createBatchCollector() throws MetaMatrixComponentException {
 		return new BatchCollector(this, this.bufferMgr.createTupleBuffer(this.processPlan.getOutputElements(), context.getConnectionID(), TupleSourceType.PROCESSOR));
 	}

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java	2010-01-10 00:19:53 UTC (rev 1727)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java	2010-01-10 05:42:02 UTC (rev 1728)
@@ -23,8 +23,8 @@
 package com.metamatrix.query.processor.relational;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -32,6 +32,7 @@
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
 import com.metamatrix.api.exception.query.ExpressionEvaluationException;
 import com.metamatrix.common.buffer.BlockedException;
+import com.metamatrix.common.buffer.BufferManager;
 import com.metamatrix.common.buffer.TupleBatch;
 import com.metamatrix.common.buffer.TupleBuffer;
 import com.metamatrix.common.buffer.TupleSource;
@@ -44,12 +45,14 @@
 import com.metamatrix.query.function.aggregate.Min;
 import com.metamatrix.query.function.aggregate.NullFilter;
 import com.metamatrix.query.function.aggregate.Sum;
+import com.metamatrix.query.processor.ProcessorDataManager;
 import com.metamatrix.query.processor.relational.SortUtility.Mode;
 import com.metamatrix.query.sql.ReservedWords;
 import com.metamatrix.query.sql.lang.OrderBy;
 import com.metamatrix.query.sql.symbol.AggregateSymbol;
 import com.metamatrix.query.sql.symbol.Expression;
 import com.metamatrix.query.sql.symbol.SingleElementSymbol;
+import com.metamatrix.query.util.CommandContext;
 
 public class GroupingNode extends RelationalNode {
 
@@ -65,12 +68,12 @@
        
     // Sort phase
     private SortUtility sortUtility;
-    private TupleBuffer sortedID;
+    private TupleBuffer sortBuffer;
     private TupleSource groupTupleSource;
-
+    
     // Group phase
-    private Map expressionMap;                 // Index map for all collected expressions (Expression -> index in collectedExpressions)
     private AggregateFunction[] functions;
+    private int[] aggProjectionIndexes;
     private List lastRow;
 	private List currentGroupTuple;
 
@@ -86,16 +89,18 @@
         super.reset();
 
         phase = COLLECTION;
-        elementMap = null;
-        collectedExpressions = null;
                 
         sortUtility = null;
-        sortedID = null;
+        sortBuffer = null;
         
-        expressionMap = null;
-        functions = null;
         lastRow = null;
         currentGroupTuple = null;
+        
+        if (this.functions != null) {
+	    	for (AggregateFunction function : this.functions) {
+				function.reset();
+			}
+        }
     }
     
     public void setRemoveDuplicates(boolean removeDuplicates) {
@@ -112,37 +117,24 @@
     public void setGroupingElements(List groupingElements) {
         this.sortElements = groupingElements;
         if(groupingElements != null) {
-            sortTypes = new ArrayList(groupingElements.size());
-            for(int i=0; i<groupingElements.size(); i++) {
-                sortTypes.add(Boolean.valueOf(OrderBy.ASC));
-            }
+            sortTypes = Collections.nCopies(groupingElements.size(), Boolean.valueOf(OrderBy.ASC));
         }
     }
 
-	public void open()
-		throws MetaMatrixComponentException, MetaMatrixProcessingException {
-
-		super.open();
-
+	@Override
+	public void initialize(CommandContext context, BufferManager bufferManager,
+			ProcessorDataManager dataMgr) {
+		super.initialize(context, bufferManager, dataMgr);
+		
+		if (this.functions != null) {
+			return;
+		}
+		
         // Incoming elements and lookup map for evaluating expressions
         List sourceElements = this.getChildren()[0].getElements();
         this.elementMap = createLookupMap(sourceElements);
-        
-        // Determine expressions to build (all grouping expressions + expressions used by aggregates)   
-        collectExpressions();
 
-        initializeFunctionAccumulators();
-	}
-
-    /** 
-     * Collect a list of all expressions that must be evaluated during collection.  This
-     * will include all the expressions being sorted on AND all expressions used within 
-     * aggregate functions.
-     * 
-     * @since 4.2
-     */
-    private void collectExpressions() {
-        // List should contain all grouping columns / expressions as we need those for sorting
+    	// List should contain all grouping columns / expressions as we need those for sorting
         if(this.sortElements != null) {
             this.collectedExpressions = new ArrayList(this.sortElements.size() + getElements().size());
             this.collectedExpressions.addAll(sortElements);
@@ -150,27 +142,10 @@
             this.collectedExpressions = new ArrayList(getElements().size());
         }
         
-        // Also need to include all expressions used within aggregates so that we can evaluate 
-        // them once up front during collection rather than repeatedly during aggregate evaluation
-        Iterator outputIter = getElements().iterator();
-        while(outputIter.hasNext()) {
-            Object outputSymbol = outputIter.next();
-            if(outputSymbol instanceof AggregateSymbol) {
-                AggregateSymbol agg = (AggregateSymbol) outputSymbol;
-                Expression expr = agg.getExpression();
-                if(expr != null && ! this.collectedExpressions.contains(expr)) {
-                    this.collectedExpressions.add(expr);
-                }
-            }
-        }
-        
-        // Build lookup map for evaluating aggregates later
-        this.expressionMap = createLookupMap(collectedExpressions);
-    }
-
-    private void initializeFunctionAccumulators() {
         // Construct aggregate function state accumulators
         functions = new AggregateFunction[getElements().size()];
+        aggProjectionIndexes = new int[getElements().size()];
+        Arrays.fill(aggProjectionIndexes, -1);
         for(int i=0; i<getElements().size(); i++) {
             SingleElementSymbol symbol = (SingleElementSymbol)getElements().get(i);
             Class<?> outputType = symbol.getType();
@@ -181,6 +156,12 @@
                 if(aggSymbol.getExpression() == null) {
                     functions[i] = new Count();
                 } else {
+                	int index = this.collectedExpressions.indexOf(aggSymbol.getExpression());
+                	if(index == -1) {
+                        index = this.collectedExpressions.size();
+                        this.collectedExpressions.add(aggSymbol.getExpression());
+                    }
+                	aggProjectionIndexes[i] = index;
                     String function = aggSymbol.getAggregateFunction();
                     if(function.equals(ReservedWords.COUNT)) {
                         functions[i] = new Count();
@@ -203,6 +184,7 @@
                 }
             } else {
                 functions[i] = new ConstantFunction();
+                aggProjectionIndexes[i] = this.collectedExpressions.indexOf(symbol);
             }
             functions[i].initialize(outputType, inputType);
         }
@@ -304,8 +286,8 @@
     }
 
     private void sortPhase() throws BlockedException, MetaMatrixComponentException, MetaMatrixProcessingException {
-        this.sortedID = this.sortUtility.sort();
-        this.groupTupleSource = this.sortedID.createIndexedTupleSource();
+        this.sortBuffer = this.sortUtility.sort();
+        this.groupTupleSource = this.sortBuffer.createIndexedTupleSource();
         this.phase = GROUP;
     }
 
@@ -328,10 +310,6 @@
                 List row = new ArrayList(functions.length);
                 for(int i=0; i<functions.length; i++) {
                     row.add( functions[i].getResult() );
-                }
-
-                // Start a new group
-                for(int i=0; i<functions.length; i++) {
                     functions[i].reset();
                 }
 
@@ -395,15 +373,9 @@
     throws MetaMatrixComponentException, ExpressionEvaluationException {
 
         for(int i=0; i<functions.length; i++) {
-            Expression expression = (SingleElementSymbol) getElements().get(i);
-            if(expression instanceof AggregateSymbol) {
-                expression = ((AggregateSymbol)expression).getExpression();
-            }
-
             Object value = null;
-            if(expression != null) {
-                Integer exprIndex = (Integer)expressionMap.get(expression);
-                value = tuple.get(exprIndex.intValue());
+            if(aggProjectionIndexes[i] != -1) {
+                value = tuple.get(aggProjectionIndexes[i]);
             }
             functions[i].addInput(value);
         }
@@ -411,6 +383,10 @@
 
     public void close() throws MetaMatrixComponentException {
         if (!isClosed()) {
+        	if (this.sortBuffer != null) {
+        		this.sortBuffer.remove();
+        		this.sortBuffer = null;
+        	}
             super.close();
         }
     }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/JoinNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/JoinNode.java	2010-01-10 00:19:53 UTC (rev 1727)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/JoinNode.java	2010-01-10 05:42:02 UTC (rev 1728)
@@ -70,6 +70,7 @@
     private Criteria joinCriteria;
     
     private Map combinedElementMap;
+    private int[] projectionIndexes;
     
     public JoinNode(int nodeID) {
         super(nodeID);
@@ -127,10 +128,13 @@
     		ProcessorDataManager dataMgr) {
     	super.initialize(context, bufferManager, dataMgr);
     	
-        // Create element lookup map for evaluating project expressions
-        List combinedElements = new ArrayList(getChildren()[0].getElements());
-        combinedElements.addAll(getChildren()[1].getElements());
-        this.combinedElementMap = createLookupMap(combinedElements);
+    	if (this.combinedElementMap == null) {
+	        // Create element lookup map for evaluating project expressions
+	        List combinedElements = new ArrayList(getChildren()[0].getElements());
+	        combinedElements.addAll(getChildren()[1].getElements());
+	        this.combinedElementMap = createLookupMap(combinedElements);
+	        this.projectionIndexes = getProjectionIndexes(combinedElementMap, getElements());
+    	}
     }
     
     public void open()
@@ -211,7 +215,7 @@
             }
             List outputTuple = this.joinStrategy.nextTuple();
             if(outputTuple != null) {
-                List projectTuple = projectTuple(this.combinedElementMap, outputTuple, getElements());
+                List projectTuple = projectTuple(this.projectionIndexes, outputTuple);
                 super.addBatchRow(projectTuple);
             } else {
                 super.terminateBatches();

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/RelationalNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/RelationalNode.java	2010-01-10 00:19:53 UTC (rev 1727)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/RelationalNode.java	2010-01-10 05:42:02 UTC (rev 1728)
@@ -26,7 +26,6 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +37,7 @@
 import com.metamatrix.common.buffer.TupleBatch;
 import com.metamatrix.common.log.LogManager;
 import com.metamatrix.core.log.MessageLevel;
+import com.metamatrix.core.util.Assertion;
 import com.metamatrix.dqp.util.LogConstants;
 import com.metamatrix.query.execution.QueryExecPlugin;
 import com.metamatrix.query.processor.Describable;
@@ -338,23 +338,27 @@
 	/**
 	 * Helper method for all the node that will filter the elements needed for the next node.
 	 */
-	protected List projectTuple(Map tupleElements, List tupleValues, List projectElements)
-		throws MetaMatrixComponentException {
+	protected int[] getProjectionIndexes(Map<SingleElementSymbol, Integer> tupleElements, List<SingleElementSymbol> projectElements) {
+		int[] result = new int[projectElements.size()];
 
-		List projectedTuple = new ArrayList(projectElements.size());
+		int i = 0;
+		for (SingleElementSymbol symbol : projectElements) {
+			Integer index = tupleElements.get(symbol);
+			Assertion.isNotNull(index);
+			result[i++] = index;
+		}
 
-		Iterator projectIter = projectElements.iterator();
-		while(projectIter.hasNext()) {
-			SingleElementSymbol symbol = (SingleElementSymbol) projectIter.next();
-
-			Integer index = (Integer) tupleElements.get(symbol);
-            if(index == null) {
-                throw new MetaMatrixComponentException(QueryExecPlugin.Util.getString(ErrorMessageKeys.PROCESSOR_0035, new Object[]{symbol, tupleElements}));
-			}
-
-			projectedTuple.add(tupleValues.get(index.intValue()));
+		return result;
+	}
+	
+	protected List<?> projectTuple(int[] indexes, List<?> tupleValues) {
+	
+		List<Object> projectedTuple = new ArrayList<Object>(indexes.length);
+	
+		for (int index : indexes) {
+			projectedTuple.add(tupleValues.get(index));
 		}
-
+	
 		return projectedTuple;
 	}
 

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SelectNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SelectNode.java	2010-01-10 00:19:53 UTC (rev 1727)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SelectNode.java	2010-01-10 05:42:02 UTC (rev 1728)
@@ -42,7 +42,8 @@
 	private Criteria criteria;
     
     // Derived element lookup map
-    private Map elementMap;    
+    private Map elementMap; 
+    private int[] projectionIndexes;
 	
     // State if blocked on evaluating a criteria
     private boolean blockedOnCriteria = false;
@@ -78,6 +79,7 @@
         // Create element lookup map for evaluating project expressions
         if(this.elementMap == null) {
             this.elementMap = createLookupMap(this.getChildren()[0].getElements());
+            this.projectionIndexes = getProjectionIndexes(this.elementMap, getElements());
         }
 	}
 	
@@ -109,7 +111,7 @@
             // Evaluate criteria with tuple
             try {
                 if(getEvaluator(this.elementMap).evaluate(this.criteria, tuple)) {
-                    addBatchRow( projectTuple(elementMap, tuple, getElements()) );
+                    addBatchRow(projectTuple(this.projectionIndexes, tuple));
                 }
             } catch(BlockedException e) {
                 // Save state and rethrow
@@ -142,6 +144,7 @@
 		super.copy(source, target);
 		target.criteria = criteria;
 		target.elementMap = source.elementMap;
+		target.projectionIndexes = source.projectionIndexes;
 	}
     
     /* 

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java	2010-01-10 00:19:53 UTC (rev 1727)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java	2010-01-10 05:42:02 UTC (rev 1728)
@@ -23,7 +23,6 @@
 package com.metamatrix.query.processor.relational;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -32,6 +31,7 @@
 import com.metamatrix.common.buffer.BlockedException;
 import com.metamatrix.common.buffer.TupleBatch;
 import com.metamatrix.common.buffer.TupleBuffer;
+import com.metamatrix.common.buffer.TupleSource;
 import com.metamatrix.query.processor.BatchIterator;
 import com.metamatrix.query.processor.relational.SortUtility.Mode;
 import com.metamatrix.query.sql.lang.OrderBy;
@@ -44,9 +44,8 @@
 
     private SortUtility sortUtility;
     private int phase = SORT;
-    private TupleBuffer outputID;
-    private int rowCount = -1;
-    private int outputBeginRow = 1;
+    private TupleBuffer output;
+    private TupleSource outputTs;
 
     private static final int SORT = 2;
     private static final int OUTPUT = 3;
@@ -59,9 +58,8 @@
         super.reset();
         sortUtility = null;
         phase = SORT;
-        outputID = null;
-        rowCount = -1;
-        outputBeginRow = 1;
+        output = null;
+        outputTs = null;
     }
 
 	public void setSortElements(List sortElements, List<Boolean> sortTypes) {
@@ -81,12 +79,6 @@
 		this.mode = mode;
 	}
 
-	public void open()
-		throws MetaMatrixComponentException, MetaMatrixProcessingException {
-
-		super.open();
-	}
-
 	public TupleBatch nextBatchDirect()
 		throws BlockedException, MetaMatrixComponentException, MetaMatrixProcessingException {
         if(this.phase == SORT) {
@@ -102,55 +94,43 @@
 	                                            sortTypes, this.mode, getBufferManager(),
 	                                            getConnectionID());
 		}
-		this.outputID = this.sortUtility.sort();
+		this.output = this.sortUtility.sort();
+		if (this.outputTs == null) {
+			this.outputTs = this.output.createIndexedTupleSource();
+		}
         this.phase = OUTPUT;
     }
 
-    private TupleBatch outputPhase() throws BlockedException, MetaMatrixComponentException {
-    	if (this.rowCount == -1) {
-    		if (this.outputID.isFinal()) {
-    			this.rowCount = this.outputID.getRowCount();
-    		} else {
-    			this.phase = SORT;
-    		}
-    	}
-        if(this.rowCount == 0 || (this.rowCount != -1 && this.outputBeginRow > this.rowCount)) {
-            TupleBatch terminationBatch = new TupleBatch(1, Collections.EMPTY_LIST);
-            terminationBatch.setTerminationFlag(true);
-            return terminationBatch;
-        }
-        int beginPinned = this.outputBeginRow;
-        TupleBatch outputBatch = this.outputID.getBatch(beginPinned);
-        
-        this.outputBeginRow += outputBatch.getRowCount();
-
-        outputBatch = removeUnrelatedColumns(outputBatch);
-
-        if(rowCount != -1 && outputBeginRow > rowCount) {
-            outputBatch.setTerminationFlag(true);
-        }
-
-        return outputBatch;
-    }
-
-	private TupleBatch removeUnrelatedColumns(TupleBatch outputBatch) {
-		int extraColumns = this.getChildren()[0].getElements().size() - this.getElements().size();
-		
-		if (extraColumns > 0) {
-			for (List tuple : outputBatch.getAllTuples()) {
+    private TupleBatch outputPhase() throws BlockedException, MetaMatrixComponentException, MetaMatrixProcessingException {
+		if (!this.output.isFinal()) {
+			this.phase = SORT;
+		}
+		List<?> tuple = null;
+		try {
+			while ((tuple = this.outputTs.nextTuple()) != null) {
+				//resize to remove unrelated columns
 				addBatchRow(tuple.subList(0, this.getElements().size()));
+				if (this.isBatchFull()) {
+					return pullBatch();
+				}
 			}
-			outputBatch = pullBatch();
+		} catch (BlockedException e) {
+			if (this.hasPendingRows()) {
+				return this.pullBatch();
+			}
+			throw e;
 		}
-		return outputBatch;
-	}
+		this.terminateBatches();
+		return this.pullBatch();
+    }
 
     public void close() throws MetaMatrixComponentException {
         if (!isClosed()) {
             super.close();
-            if(this.outputID != null) {
-                this.outputID.remove();
-                this.outputID = null;
+            if(this.output != null) {
+                this.output.remove();
+                this.output = null;
+                this.outputTs = null;
             }
         }
     }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java	2010-01-10 00:19:53 UTC (rev 1727)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java	2010-01-10 05:42:02 UTC (rev 1728)
@@ -35,7 +35,10 @@
 import com.metamatrix.common.buffer.TupleBuffer;
 import com.metamatrix.common.buffer.TupleSource;
 import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
+import com.metamatrix.common.log.LogManager;
+import com.metamatrix.core.log.MessageLevel;
 import com.metamatrix.core.util.Assertion;
+import com.metamatrix.dqp.util.LogConstants;
 import com.metamatrix.query.sql.lang.OrderBy;
 import com.metamatrix.query.sql.symbol.SingleElementSymbol;
 
@@ -63,11 +66,17 @@
 		int index;
 		boolean duplicate;
 		IndexedTupleSource its;
+		int limit = Integer.MAX_VALUE;
 		
 		@Override
 		public int compareTo(SortedSublist o) {
 			return comparator.compare(this.tuple, o.tuple);
 		}
+		
+		@Override
+		public String toString() {
+			return index + " " + tuple + " " + duplicate; //$NON-NLS-1$ //$NON-NLS-2$
+		}
 	}
 
 	//constructor state
@@ -83,6 +92,8 @@
     private int phase = INITIAL_SORT;
     private List<TupleBuffer> activeTupleBuffers = new ArrayList<TupleBuffer>();
     private int masterSortIndex;
+    
+    private int initialSortPass = 1;     //used to track the number of times through the initial sort method
 
     // Phase constants for readability
     private static final int INITIAL_SORT = 1;
@@ -165,8 +176,8 @@
 	            	
                     addTuple(workingTuples, tuple);
 	            } catch(BlockedException e) {
-	            	if (workingTuples.isEmpty() && (mode != Mode.DUP_REMOVE || activeTupleBuffers.isEmpty())) {
-	            		throw e; //block if no work can be performed
+	            	if (workingTuples.isEmpty() && (mode != Mode.DUP_REMOVE || activeTupleBuffers.size() < initialSortPass)) {
+            			throw e; //block if no work can be performed
 	            	}
 	            	break;
 	            } 
@@ -185,13 +196,13 @@
 	        for (List<?> list : workingTuples) {
 				sublist.addTuple(list);
 			}
-	        sublist.saveBatch(false);
+	        sublist.saveBatch();
         }
     	
     	if (this.activeTupleBuffers.isEmpty()) {
             activeTupleBuffers.add(createTupleBuffer());
         }  
-
+    	this.initialSortPass++;
         this.phase = MERGE;
     }
 
@@ -209,42 +220,49 @@
 		
     protected void mergePhase() throws MetaMatrixComponentException, MetaMatrixProcessingException {
     	while(this.activeTupleBuffers.size() > 1) {    		
-    		ArrayList<SortedSublist> workingTuples = new ArrayList<SortedSublist>(activeTupleBuffers.size());
+    		ArrayList<SortedSublist> sublists = new ArrayList<SortedSublist>(activeTupleBuffers.size());
             
             TupleBuffer merged = createTupleBuffer();
 
-            int sortedIndex = 0;
             int maxSortIndex = Math.min(this.bufferManager.getMaxProcessingBatches() * 2, activeTupleBuffers.size());
-            for(; sortedIndex<maxSortIndex; sortedIndex++) { 
-            	TupleBuffer activeID = activeTupleBuffers.get(sortedIndex);
+        	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+            	LogManager.logTrace(LogConstants.CTX_DQP, "Merging", maxSortIndex, "sublists out of", activeTupleBuffers.size()); //$NON-NLS-1$ //$NON-NLS-2$
+            }
+        	
+        	// initialize the sublists with the min value
+            for(int i = 0; i<maxSortIndex; i++) { 
+             	TupleBuffer activeID = activeTupleBuffers.get(i);
             	SortedSublist sortedSublist = new SortedSublist();
             	sortedSublist.its = activeID.createIndexedTupleSource();
-            	sortedSublist.index = sortedIndex;
-            	addWorkingTuple(workingTuples, sortedSublist);
+            	sortedSublist.index = i;
+            	if (activeID == output) {
+            		sortedSublist.limit = output.getRowCount();
+            	}
+            	incrementWorkingTuple(sublists, sortedSublist);
             }
             
             // iteratively process the lowest tuple
-            while (workingTuples.size() > 0) {
-            	SortedSublist sortedSublist = workingTuples.remove(0);
-            	if (!sortedSublist.duplicate) {
+            while (sublists.size() > 0) {
+            	SortedSublist sortedSublist = sublists.remove(0);
+            	if (this.mode == Mode.SORT || !sortedSublist.duplicate) {
                 	merged.addTuple(sortedSublist.tuple);
-                    if (this.output != null && sortedSublist.index != masterSortIndex && sortedIndex > masterSortIndex) {
+                    if (this.output != null && sortedSublist.index > masterSortIndex) {
                     	this.output.addTuple(sortedSublist.tuple); //a new distinct row
                     }
             	}
-            	addWorkingTuple(workingTuples, sortedSublist);
+            	incrementWorkingTuple(sublists, sortedSublist);
             }                
 
             // Remove merged sublists
-            for(int i=0; i<sortedIndex; i++) {
+            for(int i=0; i<maxSortIndex; i++) {
             	TupleBuffer id = activeTupleBuffers.remove(0);
             	if (id != this.output) {
             		id.remove();
             	}
             }
-            merged.saveBatch(false);
+            merged.saveBatch();
             this.activeTupleBuffers.add(merged);           
-            masterSortIndex = masterSortIndex - sortedIndex + 1;
+            masterSortIndex = masterSortIndex - maxSortIndex + 1;
             if (masterSortIndex < 0) {
             	masterSortIndex = this.activeTupleBuffers.size() - 1;
             }
@@ -253,7 +271,7 @@
         // Close sorted source (all others have been removed)
         if (doneReading) {
         	activeTupleBuffers.get(0).close();
-	        if (this.output != null) {
+        	if (this.output != null) {
 	        	this.output.close();
 	        }
 	        this.phase = DONE;
@@ -266,8 +284,11 @@
     	this.phase = INITIAL_SORT;
     }
 
-	private void addWorkingTuple(ArrayList<SortedSublist> workingTuples, SortedSublist sortedSublist) throws MetaMatrixComponentException, MetaMatrixProcessingException {
+	private void incrementWorkingTuple(ArrayList<SortedSublist> subLists, SortedSublist sortedSublist) throws MetaMatrixComponentException, MetaMatrixProcessingException {
 		sortedSublist.tuple = null;
+		if (sortedSublist.limit < sortedSublist.its.getCurrentIndex()) {
+			return; //special case for still reading the output tuplebuffer
+		}
 		try {
 			sortedSublist.tuple = sortedSublist.its.nextTuple();
         } catch (BlockedException e) {
@@ -277,12 +298,39 @@
         	return; // done with this sublist
         }
         sortedSublist.duplicate = false;
-		int index = Collections.binarySearch(workingTuples, sortedSublist);
+		int index = Collections.binarySearch(subLists, sortedSublist);
 		if (index >= 0) {
-			sortedSublist.duplicate = mode != Mode.SORT;
-			workingTuples.add(index, sortedSublist);
+			/* In dup removal mode we need to ensure that a sublist other than the master
+			 * gets marked as duplicate
+			 */
+			if (mode == Mode.DUP_REMOVE && this.output != null && sortedSublist.index == masterSortIndex) {
+				if (!subLists.get(index).duplicate) {
+					subLists.get(index).duplicate = true;
+				} else {
+					//there's an evil twin somewhere, search before and after the index found
+					for (int i = 1; i < subLists.size(); i++) {
+						int actualIndex = i;
+						if (i <= index) {
+							actualIndex = index - i;
+						} 
+						SortedSublist sublist = subLists.get(actualIndex);
+						if (sublist.compareTo(sortedSublist) != 0) {
+							Assertion.assertTrue(actualIndex < index);
+							i = index; 
+							continue;
+						}
+						if (!sublist.duplicate) {
+							sublist.duplicate = true;
+							break;
+						}
+					}
+				}
+			} else {
+				sortedSublist.duplicate = true;
+			}
+			subLists.add(index, sortedSublist);
 		} else {
-			workingTuples.add(-index - 1, sortedSublist);
+			subLists.add(-index - 1, sortedSublist);
 		}
 	} 
 

Modified: trunk/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java	2010-01-10 00:19:53 UTC (rev 1727)
+++ trunk/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java	2010-01-10 05:42:02 UTC (rev 1728)
@@ -299,7 +299,7 @@
 				tsId.addTuple(list);
 			}
             //TODO: this leads to fragmented batches, which may require recreating the table
-            tsId.saveBatch(false);
+            tsId.saveBatch();
             
             tuplesAdded = tuples.size();
         } catch (QueryMetadataException err) {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-01-10 00:19:53 UTC (rev 1727)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-01-10 05:42:02 UTC (rev 1728)
@@ -229,7 +229,7 @@
 		if (!doneProducingBatches) {
 			sendResultsIfNeeded(null);
 			collector.collectTuples();
-		    doneProducingBatches = collector.isDone();
+		    doneProducingBatches = this.resultsBuffer.isFinal();
 		}
 		if (doneProducingBatches) {
             /*if(rsCache != null && requestMsg.useResultSetCache() && originalCommand.areResultsCachable()){
@@ -274,6 +274,7 @@
 	 * Any errors that occur will not make it to the client, instead we just log them here.
 	 */
 	protected void attemptClose() {
+		int rowcount = -1;
 		if (this.resultsBuffer != null) {
 			try {
 				this.processor.closeProcessing();
@@ -286,6 +287,7 @@
 			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
 		        LogManager.logDetail(LogConstants.CTX_DQP, "Removing tuplesource for the request " + requestID); //$NON-NLS-1$
 		    }
+			rowcount = resultsBuffer.getRowCount();
 		    resultsBuffer.remove();
 			
 			for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
@@ -326,7 +328,7 @@
 		if (this.processingException != null) {
 			sendError();			
 		} else {
-	        dqpCore.logMMCommand(this, false, false, this.processor.getHighestRow());
+	        dqpCore.logMMCommand(this, false, false, rowcount);
 		}
 	}
 
@@ -369,7 +371,7 @@
 		
 		synchronized (this) {
 			if (this.resultsReceiver == null
-					|| (this.begin > this.processor.getHighestRow() && !doneProducingBatches)
+					|| (this.begin > this.resultsBuffer.getRowCount() && !doneProducingBatches)
 					|| (this.transactionState == TransactionState.ACTIVE)) {
 				return;
 			}
@@ -392,7 +394,7 @@
                 batch = new TupleBatch(beginRow, rows);
     		}
     	}
-        int finalRowCount = doneProducingBatches?this.processor.getHighestRow():-1;
+        int finalRowCount = this.resultsBuffer.isFinal()?this.resultsBuffer.getRowCount():-1;
         
         response = createResultsMessage(requestMsg, batch.getAllTuples(), this.processor.getProcessorPlan().getOutputElements(), analysisRecord);
         response.setFirstRow(batch.getBeginRow());
@@ -431,9 +433,7 @@
             receiver = this.resultsReceiver;
             this.resultsReceiver = null;    
 		}
-
         receiver.receiveResults(response);
-        
 	}
     
     public static ResultsMessage createResultsMessage(RequestMessage message, List[] batch, List columnSymbols, AnalysisRecord analysisRecord) {

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/BlockingFakeRelationalNode.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/BlockingFakeRelationalNode.java	2010-01-10 00:19:53 UTC (rev 1727)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/BlockingFakeRelationalNode.java	2010-01-10 05:42:02 UTC (rev 1728)
@@ -30,7 +30,10 @@
 
 public class BlockingFakeRelationalNode extends FakeRelationalNode {
 
-    private boolean blocked = false;
+    private int count = 1;
+    
+    private int returnPeriod = 2;
+    
     /**
      * @param nodeID
      * @param data
@@ -51,13 +54,15 @@
     public BlockingFakeRelationalNode(int nodeID, TupleSource source, int batchSize) {
         super(nodeID, source, batchSize);
     }
+    
+    public void setReturnPeriod(int returnPeriod) {
+		this.returnPeriod = returnPeriod;
+	}
 
     public TupleBatch nextBatchDirect() throws BlockedException, MetaMatrixComponentException, MetaMatrixProcessingException {
-        if(! blocked) {
-            blocked = true;
-            throw BlockedException.INSTANCE;            
+        if (count++%returnPeriod != 0) {
+        	throw BlockedException.INSTANCE;
         }
-        blocked = false;
         return super.nextBatchDirect();
     }
 

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestGroupingNode.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestGroupingNode.java	2010-01-10 00:19:53 UTC (rev 1727)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestGroupingNode.java	2010-01-10 05:42:02 UTC (rev 1728)
@@ -22,6 +22,8 @@
 
 package com.metamatrix.query.processor.relational;
 
+import static org.junit.Assert.*;
+
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -29,7 +31,7 @@
 import java.util.List;
 import java.util.Map;
 
-import junit.framework.TestCase;
+import org.junit.Test;
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
@@ -46,6 +48,7 @@
 import com.metamatrix.query.function.aggregate.NullFilter;
 import com.metamatrix.query.processor.FakeDataManager;
 import com.metamatrix.query.processor.FakeTupleSource;
+import com.metamatrix.query.processor.ProcessorDataManager;
 import com.metamatrix.query.sql.symbol.AggregateSymbol;
 import com.metamatrix.query.sql.symbol.Constant;
 import com.metamatrix.query.sql.symbol.ElementSymbol;
@@ -53,16 +56,8 @@
 import com.metamatrix.query.sql.symbol.Function;
 import com.metamatrix.query.util.CommandContext;
 
-public class TestGroupingNode extends TestCase {
+public class TestGroupingNode {
 
-	// ################################## FRAMEWORK ################################
-	
-	public TestGroupingNode(String name) { 
-		super(name);
-	}	
-	
-	// ################################## TEST HELPERS ################################
-
 	public static TupleSource createTupleSource1() { 
 		List<ElementSymbol> symbols = new ArrayList<ElementSymbol>();
 		symbols.add(new ElementSymbol("col1")); //$NON-NLS-1$
@@ -93,24 +88,24 @@
 	private void helpProcess(BufferManager mgr,
                              GroupingNode node,
                              CommandContext context,
-                             List[] expected) throws MetaMatrixComponentException,
+                             List[] expected, ProcessorDataManager dataMgr) throws MetaMatrixComponentException,
                                              BlockedException,
                                              MetaMatrixProcessingException {
         TupleSource dataSource = createTupleSource1();
-        helpProcess(mgr, node, context, expected, dataSource);
+        helpProcess(mgr, node, context, expected, dataSource, dataMgr);
     }
     
     private void helpProcess(BufferManager mgr,
                              GroupingNode node,
                              CommandContext context,
                              List[] expected,
-                             TupleSource dataSource) throws MetaMatrixComponentException,
+                             TupleSource dataSource, ProcessorDataManager dataMgr) throws MetaMatrixComponentException,
                                                     BlockedException,
                                                     MetaMatrixProcessingException {
         RelationalNode dataNode = new FakeRelationalNode(0, dataSource, mgr.getProcessorBatchSize());
         dataNode.setElements(dataSource.getSchema());            
-        dataNode.initialize(context, mgr, null);
-        node.addChild(dataNode);            
+        node.addChild(dataNode);    
+        node.initialize(context, mgr, dataMgr);
         node.open();
         
         int currentRow = 1;
@@ -134,7 +129,7 @@
     
 	// ################################## ACTUAL TESTS ################################
 	
-	public void test1() throws Exception {
+	@Test public void test1() throws Exception {
         BufferManager mgr = BufferManagerFactory.getStandaloneBufferManager();
 
         // Set up
@@ -159,10 +154,9 @@
 		node.setElements(outputElements);
 		
 		List groupingElements = new ArrayList();
-		groupingElements.add(col1); //$NON-NLS-1$
+		groupingElements.add(col1);
 		node.setGroupingElements(groupingElements);	  
         CommandContext context = new CommandContext("pid", "test", null, null, null);               //$NON-NLS-1$ //$NON-NLS-2$
-        node.initialize(context, mgr, null);
         
         List[] expected = new List[] {
             Arrays.asList(new Object[] { null, new Integer(2), new Integer(1), new Integer(1), new Long(3), new Long(3), new Double(3.0), new Double(3.0), new Integer(3), new Integer(3), new Integer(3), new Integer(3) }),
@@ -175,7 +169,7 @@
             Arrays.asList(new Object[] { new Integer(6), new Integer(2), new Integer(2), new Integer(2), new Long(7), new Long(7), new Double(3.5), new Double(3.5), new Integer(3), new Integer(3), new Integer(4), new Integer(4) })
         };
         
-        helpProcess(mgr, node, context, expected);
+        helpProcess(mgr, node, context, expected, null);
         
         //ensure that the distinct input type is correct
         AggregateFunction[] functions = node.getFunctions();
@@ -184,12 +178,11 @@
         assertEquals(DataTypeManager.DefaultDataClasses.INTEGER, ((ElementSymbol)dup.getElements().get(0)).getType());
 	}
 
-    public void test2() throws Exception {
+    @Test public void test2() throws Exception {
         BufferManager mgr = BufferManagerFactory.getStandaloneBufferManager();
 
         GroupingNode node = getExampleGroupingNode();         
         CommandContext context = new CommandContext("pid", "test", null, null, null);               //$NON-NLS-1$ //$NON-NLS-2$
-        node.initialize(context, mgr, null);
         
         List[] expected = new List[] {
             Arrays.asList(new Object[] { null, new Integer(1) }),
@@ -202,17 +195,16 @@
             Arrays.asList(new Object[] { new Integer(6), new Integer(2) })
         };
                 
-        helpProcess(mgr, node, context, expected);
+        helpProcess(mgr, node, context, expected, null);
     }
 
     // Same as test2, but uses processor batch size smaller than number of groups
-    public void test3() throws Exception {
+    @Test public void test3() throws Exception {
         BufferManager mgr = BufferManagerFactory.getStandaloneBufferManager();
         ((BufferManagerImpl)mgr).setProcessorBatchSize(5);
 
         GroupingNode node = getExampleGroupingNode();         
         CommandContext context = new CommandContext("pid", "test", null, null,  null);               //$NON-NLS-1$ //$NON-NLS-2$
-        node.initialize(context, mgr, null);
         
         List[] expected = new List[] {
             Arrays.asList(new Object[] { null, new Integer(1) }),
@@ -225,10 +217,10 @@
             Arrays.asList(new Object[] { new Integer(6), new Integer(2) })
         };
                 
-        helpProcess(mgr, node, context, expected);
+        helpProcess(mgr, node, context, expected, null);
     }
     
-    public void testDefect5769() throws Exception {
+    @Test public void testDefect5769() throws Exception {
         BufferManager mgr = BufferManagerFactory.getStandaloneBufferManager();
 
         ElementSymbol bigDecimal = new ElementSymbol("value"); //$NON-NLS-1$
@@ -244,7 +236,6 @@
         // Set grouping elements to null 
         node.setGroupingElements(null);         
         CommandContext context = new CommandContext("pid", "test", null, null, null);               //$NON-NLS-1$ //$NON-NLS-2$
-        node.initialize(context, mgr, null);
         
         List[] data = new List[] {
             Arrays.asList(new Object[] { new BigDecimal("0.0") }),     //$NON-NLS-1$
@@ -261,10 +252,10 @@
         List symbols = new ArrayList();
         symbols.add(bigDecimal);
         TupleSource dataSource = new FakeTupleSource(symbols, data);            
-        helpProcess(mgr, node, context, expected, dataSource);
+        helpProcess(mgr, node, context, expected, dataSource, null);
     }
 
-    public void testdefect9842() throws Exception {
+    @Test public void testdefect9842() throws Exception {
         BufferManager mgr = BufferManagerFactory.getStandaloneBufferManager();
 
         ElementSymbol col1 = new ElementSymbol("col1"); //$NON-NLS-1$
@@ -285,7 +276,6 @@
         groupingElements.add(col1); //$NON-NLS-1$
         node.setGroupingElements(groupingElements);         
         CommandContext context = new CommandContext("pid", "test", null, null, null);               //$NON-NLS-1$ //$NON-NLS-2$
-        node.initialize(context, mgr, null);
         
         List[] data = new List[] {
             Arrays.asList(new Object[] { new Integer(1), new BigDecimal("0.0") }),     //$NON-NLS-1$
@@ -304,7 +294,7 @@
         symbols.add(col1);
         symbols.add(bigDecimal);
         TupleSource dataSource = new FakeTupleSource(symbols, data);            
-        helpProcess(mgr, node, context, expected, dataSource);
+        helpProcess(mgr, node, context, expected, dataSource, null);
     }
 
     private void helpTestLookupFunctionInAggregate(int batchSize) throws Exception {
@@ -345,8 +335,6 @@
         valueMap.put(new Integer(4), new Integer(5));
         dataMgr.defineCodeTable("pm1.g1", "e1", "e2", valueMap); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
                    
-        node.initialize(context, mgr, dataMgr);
-        
         List[] expected = new List[] {
             Arrays.asList(new Object[] { null,           new Integer(1), new Long(4), new Long(4) }),
             Arrays.asList(new Object[] { new Integer(0), new Integer(1), new Long(5), new Long(5) }),
@@ -358,7 +346,7 @@
             Arrays.asList(new Object[] { new Integer(6), new Integer(2), new Long(9), new Long(9) })
         };
         
-        helpProcess(mgr, node, context, expected);
+        helpProcess(mgr, node, context, expected, dataMgr);
     }
     
     public void helpTestEmptyGroup(boolean groupBy) throws Exception {
@@ -383,7 +371,6 @@
             node.setGroupingElements(groupingElements);
         }
         CommandContext context = new CommandContext("pid", "test", null, null, null);               //$NON-NLS-1$ //$NON-NLS-2$
-        node.initialize(context, mgr, null);
         
         List[] data = new List[] {
         };
@@ -400,28 +387,27 @@
         symbols.add(col1);
         symbols.add(bigDecimal);
         TupleSource dataSource = new FakeTupleSource(symbols, data);            
-        helpProcess(mgr, node, context, expected, dataSource);
+        helpProcess(mgr, node, context, expected, dataSource, null);
     }
     
-    public void testTestEmptyGroupWithoutGroupBy() throws Exception {
+    @Test public void testTestEmptyGroupWithoutGroupBy() throws Exception {
         helpTestEmptyGroup(false);
     }
     
-    public void testTestEmptyGroupWithGroupBy() throws Exception {
+    @Test public void testTestEmptyGroupWithGroupBy() throws Exception {
         helpTestEmptyGroup(true);
     }
 
-    public void testLookupFunctionMultipleBatches() throws Exception {
+    @Test public void testLookupFunctionMultipleBatches() throws Exception {
         helpTestLookupFunctionInAggregate(3);
     }
     
-    public void testDupSort() throws Exception {
+    @Test public void testDupSort() throws Exception {
         BufferManager mgr = BufferManagerFactory.getStandaloneBufferManager();
 
         GroupingNode node = getExampleGroupingNode();     
         node.setRemoveDuplicates(true);
         CommandContext context = new CommandContext("pid", "test", null, null,  null);               //$NON-NLS-1$ //$NON-NLS-2$
-        node.initialize(context, mgr, null);
         
         List[] expected = new List[] {
             Arrays.asList(new Object[] { null, new Integer(1) }),
@@ -434,7 +420,7 @@
             Arrays.asList(new Object[] { new Integer(6), new Integer(2) })
         };
                 
-        helpProcess(mgr, node, context, expected);
+        helpProcess(mgr, node, context, expected, null);
     }
 
 	private GroupingNode getExampleGroupingNode() {

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java	2010-01-10 00:19:53 UTC (rev 1727)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java	2010-01-10 05:42:02 UTC (rev 1728)
@@ -27,6 +27,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.TreeSet;
 
 import org.junit.Test;
 
@@ -53,7 +54,8 @@
         BufferManager mgr = NodeTestUtil.getTestBufferManager(100, BATCH_SIZE, BATCH_SIZE);
         CommandContext context = new CommandContext ("pid", "test", null, null, null);               //$NON-NLS-1$ //$NON-NLS-2$
         
-        FakeRelationalNode dataNode = new BlockingFakeRelationalNode(2, data);
+        BlockingFakeRelationalNode dataNode = new BlockingFakeRelationalNode(2, data);
+        dataNode.setReturnPeriod(3);
         dataNode.setElements(elements);
         dataNode.initialize(context, mgr, null);    
         
@@ -127,38 +129,33 @@
         elements.add(es1);
         
         int rows = batches * BATCH_SIZE;
-        
+
+        ListNestedSortComparator comparator = new ListNestedSortComparator(new int[] {0}, OrderBy.DESC);
+
         List[] expected = new List[rows];
         List[] data = new List[rows];
+        TreeSet<List> distinct = new TreeSet<List>(comparator);
         for(int i=0; i<rows; i++) { 
-            data[i] = new ArrayList();
-            expected[i] = new ArrayList();
-            Integer value = new Integer((i*51) % 12321);
-            data[i].add(value);
-            expected[i].add(value);
+            Integer value = new Integer((i*51) % 11);
+            data[i] = Arrays.asList(value);
+            expected[i] = Arrays.asList(value);
+            distinct.add(Arrays.asList(value));
         }
+        List[] expectedDistinct = distinct.toArray(new List[distinct.size()]);
         
         List sortElements = new ArrayList();
         sortElements.add(es1);
         
         List sortTypes = new ArrayList();
-        sortTypes.add(new Boolean(OrderBy.ASC));
+        sortTypes.add(new Boolean(OrderBy.DESC));
 
-        ListNestedSortComparator comparator = new ListNestedSortComparator(new int[] {0}, OrderBy.ASC);
         Arrays.sort(expected, comparator);
         
         for (Mode mode : Mode.values()) {
-            helpTestSort(elements, data, sortElements, sortTypes, expected, mode);
+        	if (mode == Mode.DUP_REMOVE) {
+        		helpTestSort(elements, data, sortElements, sortTypes, mode==Mode.SORT?expected:expectedDistinct, mode);
+        	}
         }
-        
-        comparator = new ListNestedSortComparator(new int[] {0}, OrderBy.DESC);
-        Arrays.sort(expected, comparator);
-        sortTypes.clear();
-        sortTypes.add(new Boolean(OrderBy.DESC));
-        
-        for (Mode mode : Mode.values()) {
-            helpTestSort(elements, data, sortElements, sortTypes, expected, mode);
-        }
     }
         
     @Test public void testNoSort() throws Exception {
@@ -276,7 +273,7 @@
     }   
     
     @Test public void testBiggerSort() throws Exception {
-        helpTestAllSorts(10);
+        helpTestAllSorts(100);
     }
  
     @Test public void testAllSort() throws Exception {

Modified: trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedConfigurationService.java
===================================================================
--- trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedConfigurationService.java	2010-01-10 00:19:53 UTC (rev 1727)
+++ trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedConfigurationService.java	2010-01-10 05:42:02 UTC (rev 1728)
@@ -45,6 +45,7 @@
 import com.metamatrix.common.application.ApplicationEnvironment;
 import com.metamatrix.common.application.exception.ApplicationInitializationException;
 import com.metamatrix.common.application.exception.ApplicationLifecycleException;
+import com.metamatrix.common.buffer.BufferManager;
 import com.metamatrix.common.config.api.ComponentType;
 import com.metamatrix.common.config.api.ComponentTypeID;
 import com.metamatrix.common.config.api.ConfigurationModelContainer;
@@ -1376,10 +1377,10 @@
     
     
     public String getProcessorBatchSize() {
-        return getUserPreferences().getProperty(DQPEmbeddedProperties.DQP_PROCESSOR_BATCH_SIZE, "2000"); //$NON-NLS-1$
+        return getUserPreferences().getProperty(DQPEmbeddedProperties.DQP_PROCESSOR_BATCH_SIZE, String.valueOf(BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE));
     }
     public String getConnectorBatchSize() {
-        return getUserPreferences().getProperty(DQPEmbeddedProperties.DQP_CONNECTOR_BATCH_SIZE, "2000"); //$NON-NLS-1$
+        return getUserPreferences().getProperty(DQPEmbeddedProperties.DQP_CONNECTOR_BATCH_SIZE, String.valueOf(BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE));
     }
 
 	@Override



More information about the teiid-commits mailing list