Author: shawkins
Date: 2009-04-28 13:36:46 -0400 (Tue, 28 Apr 2009)
New Revision: 847
Added:
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RulePlanSorts.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/TupleCollector.java
trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestSortOptimization.java
Removed:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DupRemoveNode.java
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/MemoryNotAvailableException.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/PlanToProcessConverter.java
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/RelationalPlanner.java
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/plantree/NodeConstants.java
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/plantree/NodeEditor.java
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleConstants.java
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleImplementJoinStrategy.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchCollector.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchIterator.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/ListNestedSortComparator.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/MergeJoinStrategy.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/NestedLoopJoinStrategy.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestLimit.java
trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java
trunk/engine/src/test/java/com/metamatrix/query/optimizer/xml/TestXMLPlanner.java
trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java
trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java
Log:
TEIID-178, TEIID-175 adding support for a non blocking sort and general planning time
sort optimization.
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/MemoryNotAvailableException.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/MemoryNotAvailableException.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/MemoryNotAvailableException.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -22,12 +22,12 @@
package com.metamatrix.common.buffer;
-import com.metamatrix.api.exception.MetaMatrixProcessingException;
+import com.metamatrix.api.exception.MetaMatrixException;
/**
* Indicates memory was not available for the requested operation.
*/
-public class MemoryNotAvailableException extends MetaMatrixProcessingException {
+public class MemoryNotAvailableException extends MetaMatrixException {
/**
* No-arg costructor required by Externalizable semantics
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -470,7 +470,11 @@
if (LogManager.isMessageToBeRecorded(LogCommonConstants.CTX_BUFFER_MGR,
MessageLevel.TRACE)) {
LogManager.logTrace(LogCommonConstants.CTX_BUFFER_MGR, new
Object[]{"AddTupleBatch for", tupleSourceID, "with " +
tupleBatch.getRowCount() + " rows"}); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
-
+
+ if (tupleBatch.getRowCount() == 0 && !tupleBatch.getTerminationFlag()) {
+ return;
+ }
+
// Look up info
TupleSourceInfo info = getTupleSourceInfo(tupleSourceID, true);
Modified:
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/PlanToProcessConverter.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/PlanToProcessConverter.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/PlanToProcessConverter.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -49,7 +49,6 @@
import com.metamatrix.query.processor.relational.DependentAccessNode;
import com.metamatrix.query.processor.relational.DependentProcedureAccessNode;
import com.metamatrix.query.processor.relational.DependentProcedureExecutionNode;
-import com.metamatrix.query.processor.relational.DupRemoveNode;
import com.metamatrix.query.processor.relational.GroupingNode;
import com.metamatrix.query.processor.relational.JoinNode;
import com.metamatrix.query.processor.relational.LimitNode;
@@ -65,6 +64,7 @@
import com.metamatrix.query.processor.relational.SortNode;
import com.metamatrix.query.processor.relational.UnionAllNode;
import com.metamatrix.query.processor.relational.MergeJoinStrategy.SortOption;
+import com.metamatrix.query.processor.relational.SortUtility.Mode;
import com.metamatrix.query.resolver.util.ResolverUtil;
import com.metamatrix.query.sql.LanguageObject;
import com.metamatrix.query.sql.lang.Command;
@@ -201,7 +201,7 @@
List joinCrits = (List)
node.getProperty(NodeConstants.Info.JOIN_CRITERIA);
if(stype.equals(JoinStrategyType.MERGE)) {
- MergeJoinStrategy mjStrategy = new
MergeJoinStrategy(node.hasBooleanProperty(NodeConstants.Info.SORT_LEFT),
node.hasBooleanProperty(NodeConstants.Info.SORT_RIGHT));
+ MergeJoinStrategy mjStrategy = new
MergeJoinStrategy((SortOption)node.getProperty(NodeConstants.Info.SORT_LEFT),
(SortOption)node.getProperty(NodeConstants.Info.SORT_RIGHT), false);
jnode.setJoinStrategy(mjStrategy);
List leftExpressions = (List)
node.getProperty(NodeConstants.Info.LEFT_EXPRESSIONS);
List rightExpressions = (List)
node.getProperty(NodeConstants.Info.RIGHT_EXPRESSIONS);
@@ -319,21 +319,21 @@
break;
case NodeConstants.Types.SORT:
+ case NodeConstants.Types.DUP_REMOVE:
SortNode sortNode = new SortNode(getID());
List elements = (List) node.getProperty(NodeConstants.Info.SORT_ORDER);
List sortTypes = (List) node.getProperty(NodeConstants.Info.ORDER_TYPES);
sortNode.setSortElements(elements, sortTypes);
+ if (node.getType() == NodeConstants.Types.DUP_REMOVE) {
+ sortNode.setMode(Mode.DUP_REMOVE);
+ } else if (node.hasBooleanProperty(NodeConstants.Info.IS_DUP_REMOVAL)) {
+ sortNode.setMode(Mode.DUP_REMOVE_SORT);
+ }
processNode = sortNode;
break;
-
- case NodeConstants.Types.DUP_REMOVE:
- processNode = new DupRemoveNode(getID());
-
- break;
-
case NodeConstants.Types.GROUP:
GroupingNode gnode = new GroupingNode(getID());
gnode.setGroupingElements( (List) node.getProperty(NodeConstants.Info.GROUP_COLS) );
@@ -362,7 +362,11 @@
if(useAll) {
processNode = unionAllNode;
} else {
- processNode = new DupRemoveNode(getID());
+ SortNode sNode = new SortNode(getID());
+ boolean onlyDupRemoval =
node.hasBooleanProperty(NodeConstants.Info.IS_DUP_REMOVAL);
+ sNode.setMode(onlyDupRemoval?Mode.DUP_REMOVE:Mode.DUP_REMOVE_SORT);
+ processNode = sNode;
+
unionAllNode.setElements( (List)
node.getProperty(NodeConstants.Info.OUTPUT_COLS) );
processNode.addChild(unionAllNode);
}
Modified:
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/RelationalPlanner.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/RelationalPlanner.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/RelationalPlanner.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -293,6 +293,8 @@
RuleStack rules = new RuleStack();
rules.push(RuleConstants.COLLAPSE_SOURCE);
+
+ rules.push(RuleConstants.PLAN_SORTS);
if(hints.hasJoin) {
rules.push(RuleConstants.IMPLEMENT_JOIN_STRATEGY);
Modified:
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/plantree/NodeConstants.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/plantree/NodeConstants.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/plantree/NodeConstants.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -87,8 +87,8 @@
RIGHT_EXPRESSIONS, // List <SingleElementSymbol>
DEPENDENT_VALUE_SOURCE, // String
NON_EQUI_JOIN_CRITERIA, // List <CompareCriteria>
- SORT_LEFT, // Boolean
- SORT_RIGHT, // Boolean
+ SORT_LEFT, // SortOption
+ SORT_RIGHT, // SortOption
REMOVED_JOIN_GROUPS, //Set<GroupSymbol>
IS_OPTIONAL, // Boolean
@@ -110,7 +110,7 @@
// Sort node properties
ORDER_TYPES, // List <Boolean>
SORT_ORDER, // List <SingleElementSymbol>
- SORT_CONTROLLER, // Boolean
+ IS_DUP_REMOVAL, // Boolean
// Source node properties
SYMBOL_MAP, // SymbolMap
Modified:
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/plantree/NodeEditor.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/plantree/NodeEditor.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/plantree/NodeEditor.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -49,18 +49,18 @@
}
}
- public static final PlanNode findNodePreOrder(PlanNode root, int type) {
- return findNodePreOrder(root, type, NodeConstants.Types.NO_TYPE);
+ public static final PlanNode findNodePreOrder(PlanNode root, int types) {
+ return findNodePreOrder(root, types, NodeConstants.Types.NO_TYPE);
}
- public static final PlanNode findNodePreOrder(PlanNode root, int type, int stopTypes) {
- if(root.getType() == type) {
+ public static final PlanNode findNodePreOrder(PlanNode root, int types, int stopTypes)
{
+ if((types & root.getType()) == root.getType()) {
return root;
} else if((stopTypes & root.getType()) == root.getType()) {
return null;
} else if(root.getChildCount() > 0) {
for (PlanNode child : root.getChildren()) {
- PlanNode found = findNodePreOrder(child, type, stopTypes);
+ PlanNode found = findNodePreOrder(child, types, stopTypes);
if(found != null) {
return found;
}
Modified:
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleConstants.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleConstants.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleConstants.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -52,4 +52,5 @@
public static final OptimizerRule PLAN_UNIONS = new RulePlanUnions();
public static final OptimizerRule PLAN_PROCEDURES = new RulePlanProcedures();
public static final OptimizerRule CALCULATE_COST = new RuleCalculateCost();
+ public static final OptimizerRule PLAN_SORTS = new RulePlanSorts();
}
Modified:
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleImplementJoinStrategy.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleImplementJoinStrategy.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleImplementJoinStrategy.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -42,6 +42,7 @@
import com.metamatrix.query.optimizer.relational.plantree.NodeEditor;
import com.metamatrix.query.optimizer.relational.plantree.NodeFactory;
import com.metamatrix.query.optimizer.relational.plantree.PlanNode;
+import com.metamatrix.query.processor.relational.MergeJoinStrategy.SortOption;
import com.metamatrix.query.sql.lang.OrderBy;
import com.metamatrix.query.sql.symbol.SingleElementSymbol;
import com.metamatrix.query.util.CommandContext;
@@ -111,7 +112,7 @@
return;
}
- joinNode.setProperty(joinNode.getFirstChild() == childNode ?
NodeConstants.Info.SORT_LEFT : NodeConstants.Info.SORT_RIGHT, Boolean.TRUE);
+ joinNode.setProperty(joinNode.getFirstChild() == childNode ?
NodeConstants.Info.SORT_LEFT : NodeConstants.Info.SORT_RIGHT, SortOption.SORT);
if (needsCorrection) {
PlanNode projectNode = NodeFactory.getNewNode(NodeConstants.Types.PROJECT);
Added:
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RulePlanSorts.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RulePlanSorts.java
(rev 0)
+++
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RulePlanSorts.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -0,0 +1,146 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.query.optimizer.relational.rules;
+
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.api.exception.query.QueryMetadataException;
+import com.metamatrix.api.exception.query.QueryPlannerException;
+import com.metamatrix.query.analysis.AnalysisRecord;
+import com.metamatrix.query.metadata.QueryMetadataInterface;
+import com.metamatrix.query.optimizer.capabilities.CapabilitiesFinder;
+import com.metamatrix.query.optimizer.relational.OptimizerRule;
+import com.metamatrix.query.optimizer.relational.RuleStack;
+import com.metamatrix.query.optimizer.relational.plantree.JoinStrategyType;
+import com.metamatrix.query.optimizer.relational.plantree.NodeConstants;
+import com.metamatrix.query.optimizer.relational.plantree.NodeEditor;
+import com.metamatrix.query.optimizer.relational.plantree.PlanNode;
+import com.metamatrix.query.processor.relational.MergeJoinStrategy.SortOption;
+import com.metamatrix.query.sql.lang.SetQuery;
+import com.metamatrix.query.util.CommandContext;
+
+/**
+ * Attempts to minimize the cost of sorting operations across the plan.
+ */
+public class RulePlanSorts implements OptimizerRule {
+
+ @Override
+ public PlanNode execute(PlanNode plan, QueryMetadataInterface metadata,
+ CapabilitiesFinder capabilitiesFinder, RuleStack rules,
+ AnalysisRecord analysisRecord, CommandContext context)
+ throws QueryPlannerException, QueryMetadataException,
+ MetaMatrixComponentException {
+ optimizeSorts(false, plan);
+ return plan;
+ }
+
+ private void optimizeSorts(boolean parentBlocking, PlanNode node) {
+ node = NodeEditor.findNodePreOrder(node,
+ NodeConstants.Types.SORT
+ | NodeConstants.Types.DUP_REMOVE
+ | NodeConstants.Types.GROUP
+ | NodeConstants.Types.JOIN
+ | NodeConstants.Types.SET_OP, NodeConstants.Types.ACCESS);
+ if (node == null) {
+ return;
+ }
+ switch (node.getType()) {
+ case NodeConstants.Types.SORT:
+ parentBlocking = true;
+ if (node.hasBooleanProperty(NodeConstants.Info.IS_DUP_REMOVAL)) {
+ break;
+ }
+ if (mergeSortWithDupRemoval(node)) {
+ node.setProperty(NodeConstants.Info.IS_DUP_REMOVAL, true);
+ }
+ break;
+ case NodeConstants.Types.DUP_REMOVE:
+ if (parentBlocking) {
+ node.setType(NodeConstants.Types.SORT);
+ node.setProperty(NodeConstants.Info.IS_DUP_REMOVAL, true);
+ }
+ break;
+ case NodeConstants.Types.GROUP:
+ //TODO: check the join interesting order
+ parentBlocking = true;
+ break;
+ case NodeConstants.Types.JOIN:
+ if (node.getProperty(NodeConstants.Info.JOIN_STRATEGY) != JoinStrategyType.MERGE) {
+ break;
+ }
+ parentBlocking = true;
+ PlanNode toTest = node.getFirstChild();
+ if (mergeSortWithDupRemovalForJoin(toTest)) {
+ node.setProperty(NodeConstants.Info.SORT_LEFT, SortOption.SORT_DISTINCT);
+ }
+ toTest = node.getLastChild();
+ if (mergeSortWithDupRemovalForJoin(toTest)) {
+ node.setProperty(NodeConstants.Info.SORT_RIGHT, SortOption.SORT_DISTINCT);
+ }
+ break;
+ case NodeConstants.Types.SET_OP:
+ // assumes the use of the merge algorithm
+ if (node.getProperty(NodeConstants.Info.SET_OPERATION) != SetQuery.Operation.UNION) {
+ parentBlocking = true;
+ } else if (!node.hasBooleanProperty(NodeConstants.Info.USE_ALL) &&
!parentBlocking) {
+ node.setProperty(NodeConstants.Info.IS_DUP_REMOVAL, true);
+ }
+ break;
+ }
+ for (PlanNode child : node.getChildren()) {
+ optimizeSorts(parentBlocking, child);
+ }
+ }
+
+ /**
+ * Look under the left and the right sources for a dup removal operation
+ * join
+ * [project]
+ * source
+ * dup remove | union not all
+ */
+ private boolean mergeSortWithDupRemovalForJoin(PlanNode toTest) {
+ PlanNode source = NodeEditor.findNodePreOrder(toTest, NodeConstants.Types.SOURCE,
NodeConstants.Types.ACCESS | NodeConstants.Types.JOIN);
+ return source != null && mergeSortWithDupRemoval(source);
+ }
+
+ private boolean mergeSortWithDupRemoval(PlanNode node) {
+ switch (node.getFirstChild().getType()) {
+ case NodeConstants.Types.SET_OP:
+ if (node.getFirstChild().getProperty(NodeConstants.Info.SET_OPERATION) ==
SetQuery.Operation.UNION &&
!node.getFirstChild().hasBooleanProperty(NodeConstants.Info.USE_ALL)) {
+ node.getFirstChild().setProperty(NodeConstants.Info.USE_ALL, true);
+ return true;
+ }
+ break;
+ case NodeConstants.Types.DUP_REMOVE:
+ NodeEditor.removeChildNode(node, node.getFirstChild());
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "PlanSorts"; //$NON-NLS-1$
+ }
+
+}
Property changes on:
trunk/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RulePlanSorts.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchCollector.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchCollector.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchCollector.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -40,6 +40,7 @@
private boolean done = false;
private TupleSourceID tsID;
private int rowCount = 0;
+ private boolean collectedAny;
public BatchCollector(RelationalNode sourceNode) throws MetaMatrixComponentException
{
this.sourceNode = sourceNode;
@@ -61,6 +62,7 @@
if(batch.getRowCount() > 0) {
this.rowCount = batch.getEndRow();
sourceNode.getBufferManager().addTupleBatch(tsID, batch);
+ collectedAny = true;
}
// Check for termination condition - batch ending with null row
@@ -77,6 +79,12 @@
return tsID;
}
+ public boolean collectedAny() {
+ boolean result = collectedAny;
+ collectedAny = false;
+ return result;
+ }
+
public int getRowCount() {
return rowCount;
}
@@ -92,5 +100,9 @@
public TupleSourceID getTupleSourceID() {
return this.tsID;
}
+
+ public boolean isDone() {
+ return done;
+ }
}
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchIterator.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchIterator.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/BatchIterator.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -23,7 +23,6 @@
package com.metamatrix.query.processor.relational;
import java.util.List;
-import java.util.NoSuchElementException;
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
@@ -82,7 +81,7 @@
public List<?> nextTuple() throws MetaMatrixComponentException,
MetaMatrixProcessingException {
if (currentTuple == null && !hasNext()) {
- throw new NoSuchElementException();
+ return null;
}
List result = currentTuple;
currentTuple = null;
@@ -103,13 +102,17 @@
}
public void setPosition(int position) {
+ if (position == this.currentRow) {
+ return;
+ }
+ if (position < this.currentRow && (this.currentBatch == null || position
< this.currentBatch.getBeginRow())) {
+ throw new UnsupportedOperationException("Backwards positioning is not
allowed"); //$NON-NLS-1$
+ }
this.currentRow = position;
+ this.currentTuple = null;
+ if (currentBatch.getEndRow() < currentRow) {
+ this.currentBatch = null;
+ }
}
- public int available() {
- if (currentBatch == null) {
- return 0;
- }
- return currentBatch.getEndRow() - currentRow;
- }
}
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -94,8 +94,7 @@
} catch (TupleSourceNotFoundException e) {
throw new MetaMatrixComponentException(e);
}
- this.sortUtility = new SortUtility(originalVs.getTupleSourceID(),
ts.getSchema(), sortSymbols, sortDirection, true, dependentNode.getBufferManager(),
- dependentNode.getConnectionID());
+ this.sortUtility = new SortUtility(originalVs.getTupleSourceID(),
sortSymbols, sortDirection, true, dependentNode.getBufferManager(),
dependentNode.getConnectionID());
}
dvs = new DependentValueSource(sortUtility.sort(),
dependentNode.getBufferManager());
for (SetState setState : dependentSetStates) {
Deleted:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DupRemoveNode.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DupRemoveNode.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DupRemoveNode.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -1,77 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.query.processor.relational;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.metamatrix.api.exception.MetaMatrixComponentException;
-import com.metamatrix.api.exception.MetaMatrixProcessingException;
-import com.metamatrix.query.sql.lang.OrderBy;
-
-public class DupRemoveNode extends SortNode {
-
- public DupRemoveNode(int nodeID) {
- super(nodeID);
- }
-
- public void reset() {
- super.reset();
- }
-
- public void open()
- throws MetaMatrixComponentException, MetaMatrixProcessingException {
-
- // Set up duplicate removal
- super.setRemoveDuplicates(true);
-
- // Set up all elements for sorting
- List sourceElements = getChildren()[0].getElements();
- List sortTypes = new ArrayList(sourceElements.size());
- for(int i=0; i<sourceElements.size(); i++) {
- sortTypes.add(Boolean.valueOf(OrderBy.ASC));
- }
- super.setSortElements(sourceElements, sortTypes);
-
- super.open();
- }
-
- public Object clone(){
- DupRemoveNode clonedNode = new DupRemoveNode(super.getID());
- super.copy(this, clonedNode);
-
- return clonedNode;
- }
-
- /*
- * @see com.metamatrix.query.processor.Describable#getDescriptionProperties()
- */
- public Map getDescriptionProperties() {
- // Default implementation - should be overridden
- Map props = super.getDescriptionProperties();
- props.put(PROP_TYPE, "Duplicate Removal"); //$NON-NLS-1$
- return props;
- }
-
-}
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -29,8 +29,10 @@
import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.api.exception.query.ExpressionEvaluationException;
import com.metamatrix.api.exception.query.FunctionExecutionException;
-import com.metamatrix.common.buffer.*;
-import com.metamatrix.common.buffer.BufferManager.TupleSourceStatus;
+import com.metamatrix.common.buffer.BufferManager;
+import com.metamatrix.common.buffer.TupleSource;
+import com.metamatrix.common.buffer.TupleSourceID;
+import com.metamatrix.common.buffer.TupleSourceNotFoundException;
import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
import com.metamatrix.query.function.aggregate.AggregateFunction;
import com.metamatrix.query.sql.lang.OrderBy;
@@ -45,7 +47,6 @@
private AggregateFunction proxy;
private BufferManager mgr;
private String groupName;
- private int batchSize;
// Derived and static - can be reused
private List elements;
@@ -54,12 +55,10 @@
// Temporary state - should be reset
private TupleSourceID collectionID = null;
- private List collectionRows;
- private int collectionRow = 1;
private SortUtility sortUtility = null;
private TupleSourceID sortedID = null;
+ private TupleCollector tupleCollector;
-
/**
* Constructor for DuplicateFilter.
*/
@@ -69,7 +68,6 @@
this.proxy = proxy;
this.mgr = mgr;
this.groupName = groupName;
- this.batchSize = batchSize;
}
/**
@@ -93,8 +91,7 @@
this.proxy.reset();
this.collectionID = null;
- this.collectionRows = null;
- this.collectionRow = 1;
+ this.tupleCollector = null;
this.sortUtility = null;
this.sortedID = null;
}
@@ -108,24 +105,12 @@
try {
if(collectionID == null) {
collectionID = mgr.createTupleSource(elements, elementTypes, groupName,
TupleSourceType.PROCESSOR);
+ this.tupleCollector = new TupleCollector(collectionID, mgr);
}
- if(collectionRows == null) {
- collectionRows = new ArrayList(batchSize);
- }
-
List row = new ArrayList(1);
row.add(input);
- collectionRows.add(row);
- if(collectionRows.size() == batchSize) {
- TupleBatch batch = new TupleBatch(collectionRow, collectionRows);
- mgr.addTupleBatch(collectionID, batch);
-
- // Reset state for next batch
- collectionRow = collectionRow + batch.getRowCount();
- collectionRows = new ArrayList(batchSize);
- }
-
+ this.tupleCollector.addTuple(row);
} catch(TupleSourceNotFoundException e) {
throw new MetaMatrixComponentException(e, e.getMessage());
}
@@ -140,15 +125,10 @@
try {
if(collectionID != null) {
- // First save any hanging collection rows
- if(collectionRows.size() > 0) {
- TupleBatch batch = new TupleBatch(collectionRow, collectionRows);
- mgr.addTupleBatch(collectionID, batch);
- }
- mgr.setStatus(collectionID, TupleSourceStatus.FULL);
+ this.tupleCollector.close();
// Sort
- sortUtility = new SortUtility(collectionID, elements, elements,
sortTypes, true, mgr, groupName);
+ sortUtility = new SortUtility(collectionID, elements, sortTypes, true,
mgr, groupName);
this.sortedID = sortUtility.sort();
// Add all input to proxy
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -297,21 +297,22 @@
this.sourceBatch = null;
}
+ this.getBufferManager().setStatus(this.collectionID, TupleSourceStatus.FULL);
+
if(this.sortElements == null || this.rowCount == 0) {
// No need to sort
- this.getBufferManager().setStatus(this.collectionID, TupleSourceStatus.FULL);
this.sortedID = this.collectionID;
this.collectionID = null;
this.phase = GROUP;
} else {
- this.sortUtility = new SortUtility(collectionID, collectedExpressions,
- sortElements, sortTypes, false,
- getBufferManager(), getConnectionID());
+ this.sortUtility = new SortUtility(collectionID, sortElements,
+ sortTypes, false, getBufferManager(),
+ getConnectionID());
this.phase = SORT;
}
}
- private void sortPhase() throws BlockedException, MetaMatrixComponentException {
+ private void sortPhase() throws BlockedException, MetaMatrixComponentException,
MetaMatrixProcessingException {
this.sortedID = this.sortUtility.sort();
this.phase = GROUP;
}
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/ListNestedSortComparator.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/ListNestedSortComparator.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/ListNestedSortComparator.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -24,6 +24,8 @@
import java.util.*;
+import com.metamatrix.core.util.Assertion;
+
/**
* This class can be used for comparing lists of elements, when the fields to
* be sorted on and the comparison mechanism are dynamically specified. <p>
@@ -61,7 +63,7 @@
int[] sortParameters;
/**
- * Indicates whether comparision should be based on ascending or descending
+ * Indicates whether comparison should be based on ascending or descending
* order.
*/
boolean ascendingOrder = false;
@@ -146,7 +148,7 @@
} else if ( param1 instanceof Comparable ) {
compare = ((Comparable)param1).compareTo(param2);
} else {
- compare = param1.toString().compareTo(param2.toString());
+ Assertion.failed("Expected comparable types"); //$NON-NLS-1$
}
k++;
}
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/MergeJoinStrategy.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/MergeJoinStrategy.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/MergeJoinStrategy.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -29,6 +29,7 @@
import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.api.exception.query.CriteriaEvaluationException;
import com.metamatrix.common.buffer.TupleSourceNotFoundException;
+import com.metamatrix.query.processor.relational.SortUtility.Mode;
import com.metamatrix.query.sql.lang.JoinType;
import com.metamatrix.query.sql.lang.OrderBy;
@@ -89,11 +90,13 @@
private SortUtility rightSort;
private SortOption processingSortRight;
- public MergeJoinStrategy(boolean sortLeft, boolean sortRight) {
- this(sortLeft?SortOption.SORT:SortOption.SKIP_SORT,
sortRight?SortOption.SORT:SortOption.SKIP_SORT, false);
- }
-
public MergeJoinStrategy(SortOption sortLeft, SortOption sortRight, boolean grouping)
{
+ if (sortLeft == null) {
+ sortLeft = SortOption.SKIP_SORT;
+ }
+ if (sortRight == null) {
+ sortRight = SortOption.SKIP_SORT;
+ }
this.sortLeft = sortLeft;
this.sortRight = sortRight;
this.grouping = grouping;
@@ -260,13 +263,12 @@
}
if (!outerMatched) {
- if (matchState == MatchState.MATCH_LEFT) {
- if (this.joinNode.getJoinType().isOuter()) {
- return outputTuple(this.leftSource.getCurrentTuple(),
this.rightSource.getOuterVals());
- }
- } else if (joinNode.getJoinType() == JoinType.JOIN_FULL_OUTER) {
- return outputTuple(this.leftSource.getOuterVals(),
this.rightSource.getCurrentTuple());
+ if (matchState == MatchState.MATCH_RIGHT) {
+ return outputTuple(this.leftSource.getOuterVals(),
this.rightSource.getCurrentTuple());
}
+ if (this.joinNode.getJoinType().isOuter()) {
+ return outputTuple(this.leftSource.getCurrentTuple(),
this.rightSource.getOuterVals());
+ }
}
}
}
@@ -334,12 +336,11 @@
MetaMatrixProcessingException {
if (sortLeft != SortOption.SKIP_SORT) {
if (this.leftSort == null) {
- List sourceElements = joinNode.getChildren()[0].getElements();
List expressions = this.joinNode.getLeftExpressions();
- this.leftSort = new SortUtility(this.leftSource.collectTuples(),
sourceElements,
- expressions,
Collections.nCopies(expressions.size(), OrderBy.ASC), sortLeft ==
SortOption.SORT_DISTINCT,
- this.joinNode.getBufferManager(),
this.joinNode.getConnectionID());
- this.leftSource.setDistinct(sortLeft == SortOption.SORT_DISTINCT);
+ this.leftSort = new SortUtility(this.leftSource.collectTuples(),
+ expressions,
Collections.nCopies(expressions.size(), OrderBy.ASC), sortLeft ==
SortOption.SORT_DISTINCT?Mode.DUP_REMOVE_SORT:Mode.SORT,
+ this.joinNode.getBufferManager(),
this.joinNode.getConnectionID(), true);
+ this.leftSource.setDistinct(sortLeft == SortOption.SORT_DISTINCT
&& expressions.size() == this.leftSource.getOuterVals().size());
}
this.leftSource.setTupleSource(leftSort.sort());
} else if (this.joinNode.isDependent() ||
JoinType.JOIN_FULL_OUTER.equals(joinNode.getJoinType())) {
@@ -356,19 +357,20 @@
super.loadRight();
if (processingSortRight != SortOption.SKIP_SORT) {
if (this.rightSort == null) {
- List sourceElements = joinNode.getChildren()[1].getElements();
List expressions = this.joinNode.getRightExpressions();
- this.rightSort = new SortUtility(this.rightSource.getTupleSourceID(),
sourceElements,
- expressions,
Collections.nCopies(expressions.size(), OrderBy.ASC), processingSortRight ==
SortOption.SORT_DISTINCT,
- this.joinNode.getBufferManager(),
this.joinNode.getConnectionID());
- this.rightSource.setDistinct(processingSortRight ==
SortOption.SORT_DISTINCT);
+ this.rightSort = new SortUtility(this.rightSource.getTupleSourceID(),
+ expressions,
Collections.nCopies(expressions.size(), OrderBy.ASC), processingSortRight ==
SortOption.SORT_DISTINCT?Mode.DUP_REMOVE_SORT:Mode.SORT,
+ this.joinNode.getBufferManager(),
this.joinNode.getConnectionID(), true);
+ this.rightSource.setDistinct(processingSortRight ==
SortOption.SORT_DISTINCT && expressions.size() ==
this.rightSource.getOuterVals().size());
}
this.rightSource.setTupleSource(rightSort.sort());
}
}
public void setProcessingSortRight(boolean processingSortRight) {
- this.processingSortRight =
processingSortRight?SortOption.SORT:SortOption.SKIP_SORT;
+ if (processingSortRight && this.processingSortRight == SortOption.SKIP_SORT)
{
+ this.processingSortRight = SortOption.SORT;
+ }
}
/**
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/NestedLoopJoinStrategy.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/NestedLoopJoinStrategy.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/NestedLoopJoinStrategy.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -33,7 +33,7 @@
public class NestedLoopJoinStrategy extends MergeJoinStrategy {
public NestedLoopJoinStrategy() {
- super(false, false);
+ super(SortOption.SKIP_SORT, SortOption.SKIP_SORT, false);
}
/**
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -35,18 +35,19 @@
import com.metamatrix.common.buffer.TupleBatch;
import com.metamatrix.common.buffer.TupleSourceID;
import com.metamatrix.common.buffer.TupleSourceNotFoundException;
+import com.metamatrix.query.processor.relational.SortUtility.Mode;
import com.metamatrix.query.sql.lang.OrderBy;
public class SortNode extends RelationalNode {
private List sortElements;
- private List sortTypes;
- private boolean removeDuplicates;
+ private List<Boolean> sortTypes;
+ private Mode mode = Mode.SORT;
private SortUtility sortUtility;
private int phase = COLLECTION;
private TupleSourceID outputID;
- private int rowCount;
+ private int rowCount = -1;
private int outputBeginRow = 1;
private BatchCollector collector;
@@ -63,12 +64,12 @@
sortUtility = null;
phase = COLLECTION;
outputID = null;
- rowCount = 0;
+ rowCount = -1;
outputBeginRow = 1;
this.collector = null;
}
- public void setSortElements(List sortElements, List sortTypes) {
+ public void setSortElements(List sortElements, List<Boolean> sortTypes) {
this.sortElements = sortElements;
this.sortTypes = sortTypes;
}
@@ -76,10 +77,14 @@
public List getSortElements() {
return this.sortElements;
}
+
+ public Mode getMode() {
+ return mode;
+ }
- protected void setRemoveDuplicates(boolean removeDuplicates) {
- this.removeDuplicates = removeDuplicates;
- }
+ public void setMode(Mode mode) {
+ this.mode = mode;
+ }
public void open()
throws MetaMatrixComponentException, MetaMatrixProcessingException {
@@ -88,81 +93,54 @@
this.collector = new BatchCollector(this.getChildren()[0]);
}
- /**
- * 1ST PHASE - COLLECTION
- * Collect all batches from child node, save in collected tuple source
- *
- * 2ND PHASE - SORT INITIAL SUBLISTS
- * Repeat until all batches from collection TS have been read
- * Get and pin batches from collection TS until MemoryNotAvailableException
- * Sort batches
- * Write batches into new sorted TS
- * Unpin all batches
- * Remove collection TS
- *
- * 3RD PHASE - MERGE SORTED SUBLISTS
- * Repeat until there is one sublist
- * Repeat until all sorted sublists have been merged
- * For each sorted sublist S
- * Load and pin a batch until memory not available
- * Merge from pinned batches
- * As batch is done, unpin and load next
- * Output merge into new sublist T
- * Remove merged sublists
- * Let sublists = set of T's
- *
- * 4TH PHASE - OUTPUT
- * Return batches from single sublist from T
- */
public TupleBatch nextBatchDirect()
throws BlockedException, MetaMatrixComponentException, MetaMatrixProcessingException {
try {
if(this.phase == COLLECTION) {
- collectionPhase(null);
+ collectionPhase();
}
if(this.phase == SORT) {
sortPhase();
}
- if(this.phase == OUTPUT) {
- return outputPhase();
- }
+ return outputPhase();
} catch(TupleSourceNotFoundException e) {
throw new MetaMatrixComponentException(e, e.getMessage());
}
-
- TupleBatch terminationBatch = new TupleBatch(1, Collections.EMPTY_LIST);
- terminationBatch.setTerminationFlag(true);
- return terminationBatch;
}
- protected void collectionPhase(TupleBatch batch) throws BlockedException,
TupleSourceNotFoundException, MetaMatrixComponentException, MetaMatrixProcessingException
{
- RelationalNode sourceNode = this.getChildren()[0];
- TupleSourceID collectionID = collector.collectTuples(batch);
- this.rowCount = collector.getRowCount();
- if(this.rowCount == 0) {
- this.phase = OUTPUT;
- } else {
- List sourceElements = sourceNode.getElements();
- this.sortUtility = new SortUtility(collectionID, sourceElements,
- sortElements, sortTypes,
this.removeDuplicates,
- getBufferManager(), getConnectionID());
- this.phase = SORT;
- }
+ private void collectionPhase() throws BlockedException, TupleSourceNotFoundException,
MetaMatrixComponentException, MetaMatrixProcessingException {
+ try {
+ collector.collectTuples();
+ } catch (BlockedException e) {
+ if (mode != Mode.DUP_REMOVE || !collector.collectedAny()) {
+ throw e;
+ }
+ }
+ if (this.sortUtility == null) {
+ this.sortUtility = new SortUtility(collector.getTupleSourceID(), sortElements,
+ sortTypes, this.mode, getBufferManager(),
+ getConnectionID(), true);
+ }
+ this.phase = SORT;
}
- private void sortPhase() throws BlockedException, TupleSourceNotFoundException,
MetaMatrixComponentException {
- this.outputID = this.sortUtility.sort();
- this.rowCount = getBufferManager().getRowCount(outputID);
+ private void sortPhase() throws BlockedException, MetaMatrixComponentException,
MetaMatrixProcessingException {
+ this.outputID = this.sortUtility.sort();
this.phase = OUTPUT;
-
}
private TupleBatch outputPhase() throws BlockedException,
TupleSourceNotFoundException, MetaMatrixComponentException {
- if(this.rowCount == 0 || this.outputBeginRow > this.rowCount) {
+ if (this.rowCount == -1) {
+ this.rowCount = getBufferManager().getFinalRowCount(outputID);
+ if (this.rowCount == -1) {
+ this.phase = this.collector.isDone()?SORT:COLLECTION;
+ }
+ }
+ if(this.rowCount == 0 || (this.rowCount != -1 && this.outputBeginRow >
this.rowCount)) {
TupleBatch terminationBatch = new TupleBatch(1, Collections.EMPTY_LIST);
terminationBatch.setTerminationFlag(true);
return terminationBatch;
@@ -171,13 +149,13 @@
int endPinned = this.outputBeginRow+getBatchSize()-1;
try {
TupleBatch outputBatch = getBufferManager().pinTupleBatch(outputID,
beginPinned, endPinned);
-
+
this.outputBeginRow += outputBatch.getRowCount();
- if(outputBeginRow > rowCount) {
+ if(rowCount != -1 && outputBeginRow > rowCount) {
outputBatch.setTerminationFlag(true);
}
-
+
return outputBatch;
} catch(MemoryNotAvailableException e) {
throw BlockedOnMemoryException.INSTANCE;
@@ -206,18 +184,17 @@
protected void getNodeString(StringBuffer str) {
super.getNodeString(str);
- str.append(sortElements);
+ str.append("[").append(mode).append("] "); //$NON-NLS-1$
//$NON-NLS-2$
+ if (this.mode != Mode.DUP_REMOVE) {
+ str.append(sortElements);
+ }
}
protected void copy(SortNode source, SortNode target){
super.copy(source, target);
- if(source.sortElements != null){
- target.sortElements = new ArrayList(source.sortElements);
- }
- if(sortTypes != null){
- target.sortTypes = new ArrayList(source.sortTypes);
- }
- target.removeDuplicates = source.removeDuplicates;
+ target.sortElements = source.sortElements;
+ target.sortTypes = source.sortTypes;
+ target.mode = source.mode;
}
public Object clone(){
@@ -230,9 +207,19 @@
public Map getDescriptionProperties() {
// Default implementation - should be overridden
Map props = super.getDescriptionProperties();
- props.put(PROP_TYPE, "Sort"); //$NON-NLS-1$
+ switch (mode) {
+ case SORT:
+ props.put(PROP_TYPE, "Sort"); //$NON-NLS-1$
+ break;
+ case DUP_REMOVE:
+ props.put(PROP_TYPE, "Duplicate Removal"); //$NON-NLS-1$
+ break;
+ case DUP_REMOVE_SORT:
+ props.put(PROP_TYPE, "Duplicate Removal And Sort"); //$NON-NLS-1$
+ break;
+ }
- if(this.sortElements != null) {
+ if(this.mode != Mode.DUP_REMOVE && this.sortElements != null) {
Boolean ASC_B = Boolean.valueOf(OrderBy.ASC);
List cols = new ArrayList(this.sortElements.size());
for(int i=0; i<this.sortElements.size(); i++) {
@@ -246,7 +233,7 @@
props.put(PROP_SORT_COLS, cols);
}
- props.put(PROP_REMOVE_DUPS, "" + this.removeDuplicates); //$NON-NLS-1$
+ props.put(PROP_REMOVE_DUPS, this.mode);
return props;
}
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -30,9 +30,10 @@
import java.util.List;
import com.metamatrix.api.exception.MetaMatrixComponentException;
-import com.metamatrix.common.buffer.BlockedException;
+import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.common.buffer.BlockedOnMemoryException;
import com.metamatrix.common.buffer.BufferManager;
+import com.metamatrix.common.buffer.IndexedTupleSource;
import com.metamatrix.common.buffer.MemoryNotAvailableException;
import com.metamatrix.common.buffer.TupleBatch;
import com.metamatrix.common.buffer.TupleSourceID;
@@ -40,58 +41,74 @@
import com.metamatrix.common.buffer.BufferManager.TupleSourceStatus;
import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
import com.metamatrix.core.util.Assertion;
+import com.metamatrix.query.sql.lang.OrderBy;
import com.metamatrix.query.sql.symbol.SingleElementSymbol;
import com.metamatrix.query.util.TypeRetrievalUtil;
/**
*/
public class SortUtility {
+
+ public enum Mode {
+ SORT,
+ DUP_REMOVE,
+ DUP_REMOVE_SORT
+ }
+ //constructor state
private TupleSourceID sourceID;
- private List sourceElements;
- private List sortElements;
- private List sortTypes;
- private boolean removeDuplicates;
-
- private BufferManager bufferManager;
+ protected List sortElements;
+ protected List<Boolean> sortTypes;
+ private Mode mode;
+ protected BufferManager bufferManager;
private String groupName;
+ private boolean useAllColumns;
+
+ //init state
private int batchSize;
- private List schema;
+ protected List schema;
private String[] schemaTypes;
+ protected int[] sortCols;
+ private Comparator comparator;
- // For collecting tuples to be sorted
+ private TupleSourceID outputID;
+ private boolean doneReading;
private int sortPhaseRow = 1;
- private int phase = SORT;
- private int rowCount;
- private int[] sortCols;
- private List activeTupleIDs = new ArrayList();
- private List workingBatches;
+ private int phase = INITIAL_SORT;
+ protected List<TupleSourceID> activeTupleIDs = new
ArrayList<TupleSourceID>();
+ private List<TupleBatch> workingBatches;
private int[] workingPointers;
+ private int masterSortIndex;
+ private TupleSourceID mergedID;
+ private TupleSourceID tempOutId;
// Phase constants for readability
- private static final int SORT = 1;
+ private static final int INITIAL_SORT = 1;
private static final int MERGE = 2;
private static final int DONE = 3;
-
- /**
- * Constructor for SortUtility.
- */
- public SortUtility(TupleSourceID sourceID, List sourceElements, List sortElements,
List sortTypes, boolean removeDuplicates,
- BufferManager bufferMgr, String groupName) {
-
+
+ public SortUtility(TupleSourceID sourceID, List sortElements, List<Boolean>
sortTypes, boolean removeDups, BufferManager bufferMgr,
+ String groupName) {
+ this(sourceID, sortElements, sortTypes, removeDups?Mode.DUP_REMOVE_SORT:Mode.SORT,
bufferMgr, groupName, false);
+ }
+
+ public SortUtility(TupleSourceID sourceID, List sortElements, List<Boolean>
sortTypes, Mode mode, BufferManager bufferMgr,
+ String groupName, boolean useAllColumns) {
this.sourceID = sourceID;
- this.sourceElements = sourceElements;
this.sortElements = sortElements;
this.sortTypes = sortTypes;
- this.removeDuplicates = removeDuplicates;
+ this.mode = mode;
this.bufferManager = bufferMgr;
this.groupName = groupName;
+ this.useAllColumns = useAllColumns;
}
-
- /**
- */
+
+ public boolean isDone() {
+ return this.doneReading && this.phase == DONE;
+ }
+
public TupleSourceID sort()
- throws BlockedException, MetaMatrixComponentException {
+ throws BlockedOnMemoryException, MetaMatrixComponentException {
try {
// One time setup
@@ -99,23 +116,25 @@
initialize();
}
- if (rowCount == 0) {
- TupleSourceID mergedID = bufferManager.createTupleSource(this.schema,
this.schemaTypes, this.groupName, TupleSourceType.PROCESSOR);
- activeTupleIDs.add(mergedID);
- phase = DONE;
+ if(this.phase == INITIAL_SORT) {
+ initialSort();
}
- if(this.phase == SORT) {
- sortPhase();
- }
-
if(this.phase == MERGE) {
try {
mergePhase();
} finally {
+ if (this.mergedID != null) {
+ this.bufferManager.removeTupleSource(mergedID);
+ this.mergedID = null;
+ }
+ if (this.tempOutId != null) {
+ this.bufferManager.removeTupleSource(tempOutId);
+ this.tempOutId = null;
+ }
if (workingBatches != null) {
for (int i = 0; i < workingBatches.size(); i++) {
- TupleBatch tupleBatch = (TupleBatch)workingBatches.get(i);
+ TupleBatch tupleBatch = workingBatches.get(i);
if (tupleBatch != null) {
unpinWorkingBatch(i, tupleBatch);
}
@@ -124,191 +143,216 @@
workingBatches = null;
}
}
-
- TupleSourceID result = (TupleSourceID) this.activeTupleIDs.get(0);
- this.bufferManager.setStatus(result, TupleSourceStatus.FULL);
- return result;
+ if (this.outputID != null) {
+ return this.outputID;
+ }
+ return this.activeTupleIDs.get(0);
} catch(TupleSourceNotFoundException e) {
throw new MetaMatrixComponentException(e, e.getMessage());
}
}
- protected void sortPhase() throws BlockedException, TupleSourceNotFoundException,
MetaMatrixComponentException {
- ArrayList pinned = new ArrayList(); // of int[2] representing begin, end
+ private TupleSourceID createTupleSource() throws MetaMatrixComponentException {
+ return bufferManager.createTupleSource(this.schema, this.schemaTypes, this.groupName,
TupleSourceType.PROCESSOR);
+ }
- // Loop until all data has been read and sorted
- while(sortPhaseRow <= this.rowCount) {
- List workingTuples = new ArrayList();
+ protected void initialSort() throws BlockedOnMemoryException,
TupleSourceNotFoundException, MetaMatrixComponentException {
+ while(!doneReading) {
+ ArrayList<int[]> pinned = new ArrayList<int[]>(); // of int[2]
representing begin, end
+ List<List<Object>> workingTuples = new
ArrayList<List<Object>>();
+ // Load data until out of memory
+ while(!doneReading) {
+ try {
+ // Load and pin batch
+ TupleBatch batch = bufferManager.pinTupleBatch(sourceID, sortPhaseRow,
sortPhaseRow + batchSize - 1);
+
+ if (batch.getRowCount() == 0) {
+ if (bufferManager.getStatus(sourceID) == TupleSourceStatus.FULL) {
+ doneReading = true;
+ }
+ break;
+ }
+ // Remember pinned rows
+ pinned.add(new int[] { sortPhaseRow, batch.getEndRow() });
- // Load data until out of memory
- while(sortPhaseRow <= this.rowCount) {
- try {
- // Load and pin batch
- TupleBatch batch = bufferManager.pinTupleBatch(sourceID,
sortPhaseRow, sortPhaseRow + batchSize - 1);
+ addTuples(workingTuples, batch);
+
+ sortPhaseRow += batch.getRowCount();
+ } catch(MemoryNotAvailableException e) {
+ break;
+ }
+ }
+
+ if(workingTuples.isEmpty()) {
+ break;
+ }
+
+ TupleSourceID activeID = createTupleSource();
+ activeTupleIDs.add(activeID);
+ int sortedThisPass = workingTuples.size();
+ if (this.mode == Mode.SORT) {
+ //perform a stable sort
+ Collections.sort(workingTuples, comparator);
+ }
+ int writeBegin = 1;
+ while(writeBegin <= sortedThisPass) {
+ int writeEnd = Math.min(sortedThisPass, writeBegin + batchSize - 1);
+
+ TupleBatch writeBatch = new TupleBatch(writeBegin,
workingTuples.subList(writeBegin-1, writeEnd));
+ bufferManager.addTupleBatch(activeID, writeBatch);
+ writeBegin += writeBatch.getRowCount();
+ }
+
+ // Clean up - unpin rows
+ for (int[] bounds : pinned) {
+ bufferManager.unpinTupleBatch(sourceID, bounds[0], bounds[1]);
+ }
+ }
- if(batch.getRowCount() > 0) {
- // Remember pinned rows
- pinned.add(new int[] { sortPhaseRow, batch.getEndRow() });
+ if (!doneReading && (mode != Mode.DUP_REMOVE ||
this.activeTupleIDs.isEmpty())) {
+ throw BlockedOnMemoryException.INSTANCE;
+ }
+
+ if (this.activeTupleIDs.isEmpty()) {
+ activeTupleIDs.add(createTupleSource());
+ }
- // Add to previous batches for sorting
- workingTuples.addAll(Arrays.asList(batch.getAllTuples()));
+ // Clean up
+ this.phase = MERGE;
+ }
- // Adjust beginning of next batch
- sortPhaseRow = batch.getEndRow() + 1;
- }
-
+ protected void addTuples(List workingTuples, TupleBatch batch) {
+ if (this.mode == Mode.SORT) {
+ workingTuples.addAll(Arrays.asList(batch.getAllTuples()));
+ return;
+ }
+ for (List<Object> list : batch.getAllTuples()) {
+ int index = Collections.binarySearch(workingTuples, list, comparator);
+ if (index >= 0) {
+ continue;
+ }
+ workingTuples.add(-index - 1, list);
+ }
+ }
+
+ protected void mergePhase() throws BlockedOnMemoryException,
MetaMatrixComponentException, TupleSourceNotFoundException {
+ TupleCollector outCollector = null;
+ while(this.activeTupleIDs.size() > 1) {
+ // Load and pin batch from sorted sublists while memory available
+ this.workingBatches = new
ArrayList<TupleBatch>(activeTupleIDs.size());
+ int sortedIndex = 0;
+ for(; sortedIndex<activeTupleIDs.size(); sortedIndex++) {
+ TupleSourceID activeID = activeTupleIDs.get(sortedIndex);
+ try {
+ TupleBatch sortedBatch = bufferManager.pinTupleBatch(activeID, 1,
this.batchSize);
+ workingBatches.add(sortedBatch);
} catch(MemoryNotAvailableException e) {
break;
}
}
-
- // Check for no memory available and block
- if(workingTuples.isEmpty()) {
- /* Defect 19087: We couldn't load any batches in memory, and we need
to re-enqueue the work,
- * so this should be a BlockedOnMemoryException instead of a
BlockedException
- */
+
+ //if we cannot make progress, just block for now
+ if (workingBatches.size() < 2) {
throw BlockedOnMemoryException.INSTANCE;
}
+
+ // Initialize pointers into working batches
+ this.workingPointers = new int[workingBatches.size()];
+ Arrays.fill(this.workingPointers, 1);
- // Sort whatever is in memory
- Comparator comp = new ListNestedSortComparator(sortCols, sortTypes);
- Collections.sort( workingTuples, comp );
+ mergedID = createTupleSource();
- // Write to temporary tuple source
- TupleSourceID sortedID = bufferManager.createTupleSource(this.schema,
this.schemaTypes, this.groupName, TupleSourceType.PROCESSOR);
- activeTupleIDs.add(sortedID);
- int sortedThisPass = workingTuples.size();
- int writeBegin = 1;
- while(writeBegin <= sortedThisPass) {
- int writeEnd = Math.min(sortedThisPass, writeBegin + batchSize - 1);
-
- TupleBatch writeBatch = new TupleBatch(writeBegin,
workingTuples.subList(writeBegin-1, writeEnd));
- bufferManager.addTupleBatch(sortedID, writeBatch);
- writeBegin += writeBatch.getRowCount();
- }
-
- // Clean up - unpin rows
- Iterator iter = pinned.iterator();
- while(iter.hasNext()) {
- int[] bounds = (int[]) iter.next();
- bufferManager.unpinTupleBatch(sourceID, bounds[0], bounds[1]);
- }
- pinned.clear();
- }
-
- // Clean up
- this.phase = MERGE;
- }
-
- protected void mergePhase() throws BlockedException, TupleSourceNotFoundException,
MetaMatrixComponentException {
- // In the case where there is a single activeTupleID but removeDuplicates ==
true,
- // we need to execute this loop exactly once. We also need to execute any time
- // the number of active tuple IDs is > 1
-
- if(this.activeTupleIDs.size() > 1 || this.removeDuplicates) {
- do {
- // Load and pin batch from sorted sublists while memory available
- this.workingBatches = new ArrayList(activeTupleIDs.size());
- int sortedIndex = 0;
- for(; sortedIndex<activeTupleIDs.size(); sortedIndex++) {
- TupleSourceID activeID = (TupleSourceID)
activeTupleIDs.get(sortedIndex);
- try {
- TupleBatch sortedBatch = bufferManager.pinTupleBatch(activeID, 1,
this.batchSize);
- workingBatches.add(sortedBatch);
- } catch(MemoryNotAvailableException e) {
- break;
- }
- }
-
- //if we cannot make progress, just block for now
- if (workingBatches.size() < 2 && !(workingBatches.size() == 1
&& this.removeDuplicates && activeTupleIDs.size() == 1)) {
- throw BlockedOnMemoryException.INSTANCE;
- }
-
- // Initialize pointers into working batches
- this.workingPointers = new int[workingBatches.size()];
+ // Merge from working sorted batches
+ TupleCollector collector = new TupleCollector(mergedID, this.bufferManager);
+ while(true) {
+ // Find least valued row among working batches
+ List<?> currentRow = null;
+ int chosenBatchIndex = -1;
+ TupleBatch chosenBatch = null;
for(int i=0; i<workingBatches.size(); i++) {
- this.workingPointers[i] = 1;
- }
-
- // Initialize merge output
- TupleSourceID mergedID = bufferManager.createTupleSource(this.schema,
this.schemaTypes, this.groupName, TupleSourceType.PROCESSOR);
- int mergedRowBegin = 1;
-
- // Merge from working sorted batches
- List currentRows = new ArrayList(this.batchSize);
- ListNestedSortComparator comparator = new
ListNestedSortComparator(sortCols, sortTypes);
- while(true) {
- // Find least valued row among working batches
- List currentRow = null;
- int chosenBatchIndex = -1;
- TupleBatch chosenBatch = null;
- for(int i=0; i<workingBatches.size(); i++) {
- TupleBatch batch = (TupleBatch) workingBatches.get(i);
- if(batch != null) {
- List testRow = batch.getTuple(workingPointers[i]);
- if(currentRow == null || comparator.compare(testRow,
currentRow) < 0) {
- // Found lower row
- currentRow = testRow;
- chosenBatchIndex = i;
- chosenBatch = batch;
- }
- }
+ TupleBatch batch = workingBatches.get(i);
+ if(batch == null) {
+ continue;
}
-
- // Check for termination condition - all batches must have been null
- if(currentRow == null) {
- break;
+ List<?> testRow = batch.getTuple(workingPointers[i]);
+ int compare = -1;
+ if (currentRow != null) {
+ compare = comparator.compare(testRow, currentRow);
}
- // Output the row and update pointers
- currentRows.add(currentRow);
- incrementWorkingBatch(chosenBatchIndex, chosenBatch);
-
- // Move past this same row on all batches if dup removal is on
- if(this.removeDuplicates) {
- for(int i=0; i<workingBatches.size(); i++) {
- TupleBatch batch = (TupleBatch) workingBatches.get(i);
- while(batch != null) {
- List testRow = batch.getTuple(workingPointers[i]);
- if(comparator.compare(testRow, currentRow) == 0) {
- if(incrementWorkingBatch(i, batch)) {
- batch = (TupleBatch) workingBatches.get(i);
- }
- } else {
- break;
- }
- }
- }
+ if(compare < 0) {
+ // Found lower row
+ currentRow = testRow;
+ chosenBatchIndex = i;
+ chosenBatch = batch;
+ } else if (compare == 0 && this.mode != Mode.SORT) {
+ incrementWorkingBatch(i, batch);
}
-
-
- // Check for full batch and store
- if(currentRows.size() == this.batchSize) {
- bufferManager.addTupleBatch(mergedID, new
TupleBatch(mergedRowBegin, currentRows));
- mergedRowBegin = mergedRowBegin + this.batchSize;
- currentRows = new ArrayList(this.batchSize);
- }
}
- // Save any remaining partial batch
- if(currentRows.size() > 0) {
- bufferManager.addTupleBatch(mergedID, new TupleBatch(mergedRowBegin,
currentRows));
+ // Check for termination condition - all batches must have been null
+ if(currentRow == null) {
+ break;
}
-
- // Remove merged sublists
- for(int i=0; i<sortedIndex; i++) {
-
bufferManager.removeTupleSource((TupleSourceID)activeTupleIDs.remove(0));
+ // Output the row and update pointers
+ collector.addTuple(currentRow);
+ if (this.outputID != null && chosenBatchIndex != masterSortIndex
&& sortedIndex > masterSortIndex) {
+ if (outCollector == null) {
+ tempOutId = createTupleSource();
+ outCollector = new TupleCollector(tempOutId, this.bufferManager);
+ }
+ outCollector.addTuple(currentRow);
}
+ incrementWorkingBatch(chosenBatchIndex, chosenBatch);
+ }
- this.activeTupleIDs.add(mergedID);
-
- } while(this.activeTupleIDs.size() > 1);
+ // Save without closing
+ collector.saveBatch(false);
+
+ // Remove merged sublists
+ for(int i=0; i<sortedIndex; i++) {
+ TupleSourceID id = activeTupleIDs.remove(0);
+ if (!id.equals(this.outputID)) {
+ bufferManager.removeTupleSource(id);
+ }
+ }
+
+ this.activeTupleIDs.add(mergedID);
+ this.mergedID = null;
+ masterSortIndex = masterSortIndex - sortedIndex + 1;
+ if (masterSortIndex < 0) {
+ masterSortIndex = this.activeTupleIDs.size() - 1;
+ }
+ }
+
+ if (outCollector != null) {
+ outCollector.close();
+ //transfer the new dup removed tuples to the output id
+ TupleCollector tc = new TupleCollector(outputID, this.bufferManager);
+ IndexedTupleSource ts =
this.bufferManager.getTupleSource(outCollector.getTupleSourceID());
+ try {
+ while (ts.hasNext()) {
+ tc.addTuple(ts.nextTuple());
+ }
+ } catch (MetaMatrixProcessingException e) {
+ throw new MetaMatrixComponentException(e);
+ }
}
-
+
// Close sorted source (all others have been removed)
- bufferManager.setStatus((TupleSourceID) activeTupleIDs.get(0),
TupleSourceStatus.FULL);
- this.phase = DONE;
+ if (doneReading) {
+ bufferManager.setStatus(activeTupleIDs.get(0), TupleSourceStatus.FULL);
+ if (this.outputID != null) {
+ bufferManager.setStatus(outputID, TupleSourceStatus.FULL);
+ }
+ this.phase = DONE;
+ return;
+ }
+ Assertion.assertTrue(mode == Mode.DUP_REMOVE);
+ if (this.outputID == null) {
+ this.outputID = activeTupleIDs.get(0);
+ }
+ this.phase = INITIAL_SORT;
}
/**
@@ -316,7 +360,7 @@
* for that batchIndex, which we already happen to have. Return whether the batch
* was changed or not. True = changed.
*/
- private boolean incrementWorkingBatch(int batchIndex, TupleBatch currentBatch) throws
BlockedOnMemoryException, TupleSourceNotFoundException, MetaMatrixComponentException {
+ private void incrementWorkingBatch(int batchIndex, TupleBatch currentBatch) throws
BlockedOnMemoryException, TupleSourceNotFoundException, MetaMatrixComponentException {
workingPointers[batchIndex] += 1;
if(workingPointers[batchIndex] > currentBatch.getEndRow()) {
TupleSourceID tsID = unpinWorkingBatch(batchIndex, currentBatch);
@@ -332,22 +376,16 @@
} else {
workingBatches.set(batchIndex, newBatch);
}
-
- // Return true
- return true;
-
} catch(MemoryNotAvailableException e) {
throw BlockedOnMemoryException.INSTANCE;
}
-
}
- return false;
}
private TupleSourceID unpinWorkingBatch(int batchIndex,
TupleBatch currentBatch) throws
TupleSourceNotFoundException,
MetaMatrixComponentException {
- TupleSourceID tsID = (TupleSourceID)activeTupleIDs.get(batchIndex);
+ TupleSourceID tsID = activeTupleIDs.get(batchIndex);
int lastBeginRow = currentBatch.getBeginRow();
int lastEndRow = currentBatch.getEndRow();
bufferManager.unpinTupleBatch(tsID, lastBeginRow, lastEndRow);
@@ -358,19 +396,33 @@
this.schema = this.bufferManager.getTupleSchema(this.sourceID);
this.schemaTypes = TypeRetrievalUtil.getTypeNames(schema);
this.batchSize = bufferManager.getProcessorBatchSize();
- this.rowCount = bufferManager.getRowCount(this.sourceID);
+ if (useAllColumns && mode != Mode.SORT) {
+ if (this.sortElements != null) {
+ this.sortElements = new ArrayList(this.sortElements);
+ List toAdd = new ArrayList(schema);
+ toAdd.removeAll(this.sortElements);
+ this.sortElements.addAll(toAdd);
+ this.sortTypes = new ArrayList<Boolean>(this.sortTypes);
+ this.sortTypes.addAll(Collections.nCopies(this.sortElements.size() -
this.sortTypes.size(), OrderBy.ASC));
+ } else {
+ this.sortElements = this.schema;
+ this.sortTypes = Collections.nCopies(this.sortElements.size(), OrderBy.ASC);
+ }
+ }
+
int[] cols = new int[sortElements.size()];
Iterator iter = sortElements.iterator();
for (int i = 0; i < cols.length; i++) {
- SingleElementSymbol elem = (SingleElementSymbol) iter.next();
+ SingleElementSymbol elem = (SingleElementSymbol)iter.next();
- cols[i] = sourceElements.indexOf(elem);
+ cols[i] = schema.indexOf(elem);
Assertion.assertTrue(cols[i] != -1);
}
-
this.sortCols = cols;
+ this.comparator = new ListNestedSortComparator(sortCols, sortTypes);
}
+
}
Added:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/TupleCollector.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/TupleCollector.java
(rev 0)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/TupleCollector.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -0,0 +1,86 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.query.processor.relational;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.common.buffer.BufferManager;
+import com.metamatrix.common.buffer.TupleBatch;
+import com.metamatrix.common.buffer.TupleSourceID;
+import com.metamatrix.common.buffer.TupleSourceNotFoundException;
+import com.metamatrix.common.buffer.BufferManager.TupleSourceStatus;
+
+public class TupleCollector {
+
+ private TupleSourceID tsid;
+
+ private int batchSize;
+ private ArrayList<List<?>> batch;
+ private int index;
+ private BufferManager bm;
+
+ public TupleCollector(TupleSourceID tsid, BufferManager bm) throws
TupleSourceNotFoundException, MetaMatrixComponentException {
+ this.tsid = tsid;
+ this.batchSize = bm.getProcessorBatchSize();
+ this.bm = bm;
+ this.index = bm.getRowCount(tsid) + 1;
+ }
+
+ public TupleSourceID getTupleSourceID() {
+ return tsid;
+ }
+
+ public void addTuple(List<?> tuple) throws TupleSourceNotFoundException,
MetaMatrixComponentException {
+ if (batch == null) {
+ batch = new ArrayList<List<?>>(batchSize/4);
+ }
+ batch.add(tuple);
+ if (batch.size() == batchSize) {
+ saveBatch(false);
+ }
+ }
+
+ public void saveBatch(boolean isLast) throws TupleSourceNotFoundException,
+ MetaMatrixComponentException {
+ ArrayList<List<?>> toSave = batch;
+ if (toSave == null || toSave.isEmpty()) {
+ if (!isLast) {
+ return;
+ }
+ toSave = new ArrayList<List<?>>(0);
+ }
+ TupleBatch tb = new TupleBatch(index, toSave);
+ tb.setTerminationFlag(isLast);
+ this.bm.addTupleBatch(tsid, tb);
+ this.index += toSave.size();
+ batch = null;
+ }
+
+ public void close() throws TupleSourceNotFoundException, MetaMatrixComponentException {
+ saveBatch(true);
+ this.bm.setStatus(this.tsid, TupleSourceStatus.FULL);
+ }
+
+}
Property changes on:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/TupleCollector.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestLimit.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestLimit.java 2009-04-27
19:02:50 UTC (rev 846)
+++ trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestLimit.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -31,6 +31,8 @@
import com.metamatrix.query.mapping.relational.QueryNode;
import com.metamatrix.query.optimizer.TestOptimizer.DependentProjectNode;
import com.metamatrix.query.optimizer.TestOptimizer.DependentSelectNode;
+import com.metamatrix.query.optimizer.TestOptimizer.DupRemoveNode;
+import com.metamatrix.query.optimizer.TestOptimizer.DupRemoveSortNode;
import com.metamatrix.query.optimizer.capabilities.BasicSourceCapabilities;
import com.metamatrix.query.optimizer.capabilities.FakeCapabilitiesFinder;
import com.metamatrix.query.optimizer.capabilities.SourceCapabilities.Capability;
@@ -39,7 +41,6 @@
import com.metamatrix.query.processor.TestProcessor;
import com.metamatrix.query.processor.relational.AccessNode;
import com.metamatrix.query.processor.relational.DependentAccessNode;
-import com.metamatrix.query.processor.relational.DupRemoveNode;
import com.metamatrix.query.processor.relational.GroupingNode;
import com.metamatrix.query.processor.relational.LimitNode;
import com.metamatrix.query.processor.relational.MergeJoinStrategy;
@@ -575,7 +576,7 @@
0, // DependentAccess
0, // DependentSelect
0, // DependentProject
- 1, // DupRemove
+ 0, // DupRemove
0, // Grouping
1, // Limit
0, // NestedLoopJoinStrategy
@@ -584,9 +585,10 @@
0, // PlanExecution
0, // Project
0, // Select
- 1, // Sort
+ 0, // Sort
1 // UnionAll
}, NODE_TYPES);
+ TestOptimizer.checkNodeTypes(plan, new int[] {1}, new
Class[]{DupRemoveSortNode.class});
}
public void testCombinedLimits() throws Exception {
Modified: trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -58,7 +58,6 @@
import com.metamatrix.query.processor.ProcessorPlan;
import com.metamatrix.query.processor.relational.AccessNode;
import com.metamatrix.query.processor.relational.DependentAccessNode;
-import com.metamatrix.query.processor.relational.DupRemoveNode;
import com.metamatrix.query.processor.relational.GroupingNode;
import com.metamatrix.query.processor.relational.JoinNode;
import com.metamatrix.query.processor.relational.MergeJoinStrategy;
@@ -72,6 +71,7 @@
import com.metamatrix.query.processor.relational.SelectNode;
import com.metamatrix.query.processor.relational.SortNode;
import com.metamatrix.query.processor.relational.UnionAllNode;
+import com.metamatrix.query.processor.relational.SortUtility.Mode;
import com.metamatrix.query.resolver.QueryResolver;
import com.metamatrix.query.resolver.util.BindVariableVisitor;
import com.metamatrix.query.rewriter.QueryRewriter;
@@ -92,6 +92,8 @@
public interface DependentJoin {}
public interface DependentSelectNode {}
public interface DependentProjectNode {}
+ public interface DupRemoveNode {}
+ public interface DupRemoveSortNode {}
public static final int[] FULL_PUSHDOWN = new int[] {
1, // Access
@@ -441,6 +443,19 @@
} else {
updateCounts(DependentSelectNode.class, counts, types);
}
+ } else if (nodeType.equals(SortNode.class)) {
+ Mode mode = ((SortNode)relationalNode).getMode();
+ switch(mode) {
+ case DUP_REMOVE:
+ updateCounts(DupRemoveNode.class, counts, types);
+ break;
+ case DUP_REMOVE_SORT:
+ updateCounts(DupRemoveSortNode.class, counts, types);
+ break;
+ case SORT:
+ updateCounts(SortNode.class, counts, types);
+ break;
+ }
} else {
updateCounts(nodeType, counts, types);
}
@@ -7145,7 +7160,7 @@
// Create query
String sql = "select IntKey from bqt1.smalla where stringkey not like
'2%'"; //$NON-NLS-1$
- ProcessorPlan plan = helpPlan(sql, FakeMetadataFactory.exampleBQT(), null,
capFinder,
+ ProcessorPlan plan = helpPlan(sql, FakeMetadataFactory.exampleBQTCached(), null,
capFinder,
new String[] {"SELECT stringkey, IntKey FROM
bqt1.smalla"}, TestOptimizer.SHOULD_SUCCEED); //$NON-NLS-1$
checkNodeTypes(plan, new int[] {
Added:
trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestSortOptimization.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestSortOptimization.java
(rev 0)
+++
trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestSortOptimization.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -0,0 +1,111 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.query.optimizer;
+
+import static com.metamatrix.query.optimizer.TestOptimizer.*;
+
+import org.junit.Test;
+
+import com.metamatrix.query.optimizer.capabilities.BasicSourceCapabilities;
+import com.metamatrix.query.optimizer.capabilities.FakeCapabilitiesFinder;
+import com.metamatrix.query.processor.ProcessorPlan;
+import com.metamatrix.query.unittest.FakeMetadataFactory;
+
+public class TestSortOptimization {
+
+ @Test public void testSortDupCombination() {
+ FakeCapabilitiesFinder capFinder = new FakeCapabilitiesFinder();
+ BasicSourceCapabilities caps = new BasicSourceCapabilities();
+ capFinder.addCapabilities("pm1", caps); //$NON-NLS-1$
+
+ // Create query
+ String sql = "select distinct e1, e2 from pm1.g1 order by e2";
//$NON-NLS-1$
+
+ ProcessorPlan plan = helpPlan(sql, FakeMetadataFactory.example1Cached(), null,
capFinder,
+ new String[] {"SELECT e1, e2 FROM
pm1.g1"}, TestOptimizer.SHOULD_SUCCEED); //$NON-NLS-1$
+
+ checkNodeTypes(plan, FULL_PUSHDOWN);
+ checkNodeTypes(plan, new int[] {1}, new Class[] {DupRemoveSortNode.class});
+ }
+
+ @Test public void testSortDupCombination1() {
+ FakeCapabilitiesFinder capFinder = new FakeCapabilitiesFinder();
+ BasicSourceCapabilities caps = new BasicSourceCapabilities();
+ capFinder.addCapabilities("pm1", caps); //$NON-NLS-1$
+
+ // Create query
+ String sql = "select e1, e2 from pm1.g1 union select e1, e2 from pm1.g2
order by e2"; //$NON-NLS-1$
+
+ ProcessorPlan plan = helpPlan(sql, FakeMetadataFactory.example1Cached(), null,
capFinder,
+ new String[] {"SELECT e1, e2 FROM
pm1.g1", "SELECT e1, e2 FROM pm1.g2"}, TestOptimizer.SHOULD_SUCCEED);
//$NON-NLS-1$ //$NON-NLS-2$
+
+ checkNodeTypes(plan, new int[] {
+ 2, // Access
+ 0, // DependentAccess
+ 0, // DependentSelect
+ 0, // DependentProject
+ 0, // DupRemove
+ 0, // Grouping
+ 0, // NestedLoopJoinStrategy
+ 0, // MergeJoinStrategy
+ 0, // Null
+ 0, // PlanExecution
+ 0, // Project
+ 0, // Select
+ 0, // Sort
+ 1 // UnionAll
+ });
+ checkNodeTypes(plan, new int[] {1}, new Class[] {DupRemoveSortNode.class});
+ }
+
+ @Test public void testSortDupCombination2() {
+ FakeCapabilitiesFinder capFinder = new FakeCapabilitiesFinder();
+ BasicSourceCapabilities caps = new BasicSourceCapabilities();
+ capFinder.addCapabilities("pm1", caps); //$NON-NLS-1$
+
+ // Create query
+ String sql = "select x.*, y.* from (select distinct e1, e2 from pm1.g1) x,
(select distinct e1, e2 from pm1.g2) y where x.e1 = y.e1"; //$NON-NLS-1$
+
+ ProcessorPlan plan = helpPlan(sql, FakeMetadataFactory.example1Cached(), null,
capFinder,
+ new String[] {"SELECT e1, e2 FROM
pm1.g1", "SELECT e1, e2 FROM pm1.g2"}, TestOptimizer.SHOULD_SUCCEED);
//$NON-NLS-1$ //$NON-NLS-2$
+
+ checkNodeTypes(plan, new int[] {
+ 2, // Access
+ 0, // DependentAccess
+ 0, // DependentSelect
+ 0, // DependentProject
+ 0, // DupRemove
+ 0, // Grouping
+ 0, // NestedLoopJoinStrategy
+ 1, // MergeJoinStrategy
+ 0, // Null
+ 0, // PlanExecution
+ 1, // Project
+ 0, // Select
+ 0, // Sort
+ 0 // UnionAll
+ });
+ checkNodeTypes(plan, new int[] {0}, new Class[] {DupRemoveSortNode.class});
+ }
+
+}
Property changes on:
trunk/engine/src/test/java/com/metamatrix/query/optimizer/TestSortOptimization.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified:
trunk/engine/src/test/java/com/metamatrix/query/optimizer/xml/TestXMLPlanner.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/query/optimizer/xml/TestXMLPlanner.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/test/java/com/metamatrix/query/optimizer/xml/TestXMLPlanner.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -923,8 +923,7 @@
XMLPlan plan = helpPlan(
"SELECT * FROM vm1.doc1 ORDER BY vm1.doc1.a0.a1.c1", //$NON-NLS-1$
example1());
-
System.out.println((plan.getDescriptionProperties().get(Describable.PROP_CHILDREN)).toString());
- assertEquals("[{formatted=false, encoding=UTF-8, type=START DOCUMENT},
{tag=a0, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {type=NEXT IN DOCUMENT},
{isStaging=true, resultSet=tm1.g1, type=Staging Table}, {sql=VM1.G1_1, isStaging=false,
resultSet=VM1.G1_1, type=EXECUTE SQL}, {resultSet=VM1.G1_1, type=BLOCK},
{resultSet=VM1.G1_1, type=NEXT ROW}, {program={children=[{tag=a1, type=ADD ELEMENT,
namespaceDeclarations=[], namespace=}, {type=NEXT IN DOCUMENT}, {tag=a1,
dataCol=VM1.G1_1.e1, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {tag=b1,
dataCol=VM1.G1_1.e2, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {tag=c1,
dataCol=VM1.G1_1.e3, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {type=UP IN
DOCUMENT}, {resultSet=VM1.G1_1, type=NEXT ROW}], type=XML Program}, resultSet=VM1.G1_1,
type=LOOP}, {resultSet=VM1.G1_1, type=CLOSE RESULTSET}, {type=UP IN DOCUMENT}, {type=END
DOCUMENT}, {isStaging=true, resultSet=unload_tm1.g1, type=S!
taging Table}, {children=[{removeDups=false, sortCols=[#TM1_G1.E3 ASC],
nodeCostEstimates=[Estimated Node Cardinality: -1.0],
children=[{nodeCostEstimates=[Estimated Node Cardinality: -1.0],
children=[{modelName=__TEMP__, sql=SELECT #TM1_G1.E1, #TM1_G1.E2, #TM1_G1.E3, #TM1_G1.E4
FROM #TM1_G1, nodeCostEstimates=[Estimated Node Cardinality: -1.0], children=[],
type=Access, outputCols=[E1 (string), E2 (integer), E3 (boolean), E4 (double)]}],
selectCols=[#TM1_G1.E1, #TM1_G1.E2, #TM1_G1.E3, #TM1_G1.E4], type=Project, outputCols=[E1
(string), E2 (integer), E3 (boolean), E4 (double)]}], type=Sort, outputCols=[E1 (string),
E2 (integer), E3 (boolean), E4 (double)]}], type=Relational Plan, outputCols=[E1 (string),
E2 (integer), E3 (boolean), E4 (double)]}]",
plan.getDescriptionProperties().get(Describable.PROP_CHILDREN).toString());//$NON-NLS-1$
+ assertEquals("[{formatted=false, encoding=UTF-8, type=START DOCUMENT},
{tag=a0, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {type=NEXT IN DOCUMENT},
{isStaging=true, resultSet=tm1.g1, type=Staging Table}, {sql=VM1.G1_1, isStaging=false,
resultSet=VM1.G1_1, type=EXECUTE SQL}, {resultSet=VM1.G1_1, type=BLOCK},
{resultSet=VM1.G1_1, type=NEXT ROW}, {program={children=[{tag=a1, type=ADD ELEMENT,
namespaceDeclarations=[], namespace=}, {type=NEXT IN DOCUMENT}, {tag=a1,
dataCol=VM1.G1_1.e1, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {tag=b1,
dataCol=VM1.G1_1.e2, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {tag=c1,
dataCol=VM1.G1_1.e3, type=ADD ELEMENT, namespaceDeclarations=[], namespace=}, {type=UP IN
DOCUMENT}, {resultSet=VM1.G1_1, type=NEXT ROW}], type=XML Program}, resultSet=VM1.G1_1,
type=LOOP}, {resultSet=VM1.G1_1, type=CLOSE RESULTSET}, {type=UP IN DOCUMENT}, {type=END
DOCUMENT}, {isStaging=true, resultSet=unload_tm1.g1, type=S!
taging Table}, {children=[{removeDups=SORT, sortCols=[#TM1_G1.E3 ASC],
nodeCostEstimates=[Estimated Node Cardinality: -1.0],
children=[{nodeCostEstimates=[Estimated Node Cardinality: -1.0],
children=[{modelName=__TEMP__, sql=SELECT #TM1_G1.E1, #TM1_G1.E2, #TM1_G1.E3, #TM1_G1.E4
FROM #TM1_G1, nodeCostEstimates=[Estimated Node Cardinality: -1.0], children=[],
type=Access, outputCols=[E1 (string), E2 (integer), E3 (boolean), E4 (double)]}],
selectCols=[#TM1_G1.E1, #TM1_G1.E2, #TM1_G1.E3, #TM1_G1.E4], type=Project, outputCols=[E1
(string), E2 (integer), E3 (boolean), E4 (double)]}], type=Sort, outputCols=[E1 (string),
E2 (integer), E3 (boolean), E4 (double)]}], type=Relational Plan, outputCols=[E1 (string),
E2 (integer), E3 (boolean), E4 (double)]}]",
plan.getDescriptionProperties().get(Describable.PROP_CHILDREN).toString());//$NON-NLS-1$
}
// ################################## TEST SUITE ################################
Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -7660,7 +7660,7 @@
0, // DependentAccess
0, // DependentSelect
0, // DependentProject
- 1, // DupRemove
+ 0, // DupRemove
0, // Grouping
0, // NestedLoopJoinStrategy
2, // MergeJoinStrategy
Modified:
trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -26,7 +26,6 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Modified:
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java 2009-04-27
19:02:50 UTC (rev 846)
+++
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java 2009-04-28
17:36:46 UTC (rev 847)
@@ -22,6 +22,8 @@
package com.metamatrix.query.processor.relational;
+import static org.junit.Assert.*;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -29,7 +31,7 @@
import java.util.List;
import java.util.Set;
-import junit.framework.TestCase;
+import org.junit.Test;
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
@@ -39,24 +41,17 @@
import com.metamatrix.common.buffer.impl.SizeUtility;
import com.metamatrix.common.types.DataTypeManager;
import com.metamatrix.query.processor.relational.NodeTestUtil.TestableBufferManagerImpl;
+import com.metamatrix.query.processor.relational.SortUtility.Mode;
import com.metamatrix.query.sql.lang.OrderBy;
import com.metamatrix.query.sql.symbol.ElementSymbol;
import com.metamatrix.query.util.CommandContext;
-public class TestSortNode extends TestCase {
+public class TestSortNode {
public static final int BATCH_SIZE = 100;
public static final int INT_BATCH_SIZE = TestSortNode.getIntBatchSize(); //the size
of 100 integers
- /**
- * Constructor for TestSortNode.
- * @param arg0
- */
- public TestSortNode(String arg0) {
- super(arg0);
- }
-
- private void helpTestSort(long bytesInMemory, List elements, List[] data, List
sortElements, List sortTypes, List[] expected, Set blockOn, boolean removeDups) throws
MetaMatrixComponentException, MetaMatrixProcessingException {
+ private void helpTestSort(long bytesInMemory, List elements, List[] data, List
sortElements, List sortTypes, List[] expected, Set blockOn, Mode mode) throws
MetaMatrixComponentException, MetaMatrixProcessingException {
BufferManager mgr = NodeTestUtil.getTestBufferManager(bytesInMemory);
TestableBufferManagerImpl impl = (TestableBufferManagerImpl) mgr;
impl.setBlockOn(blockOn);
@@ -69,14 +64,15 @@
dataNode.setElements(elements);
dataNode.initialize(context, mgr, null);
- SortNode sortNode = null;
- if (removeDups) {
- sortNode = new DupRemoveNode(1);
+ SortNode sortNode = new SortNode(1);
+ if (mode == Mode.DUP_REMOVE) {
+ sortTypes = Arrays.asList(new Boolean[elements.size()]);
+ Collections.fill(sortTypes, OrderBy.ASC);
+ sortNode.setSortElements(elements, sortTypes);
} else {
- sortNode = new SortNode(1);
+ sortNode.setSortElements(sortElements, sortTypes);
}
-
- sortNode.setSortElements(sortElements, sortTypes);
+ sortNode.setMode(mode);
sortNode.setElements(elements);
sortNode.addChild(dataNode);
sortNode.initialize(context, mgr, null);
@@ -87,26 +83,27 @@
while(true) {
try {
TupleBatch batch = sortNode.nextBatch();
-
- for(int row = currentRow; row <= batch.getEndRow(); row++) {
- assertEquals("Rows don't match at " + row,
expected[row-1], batch.getTuple(row)); //$NON-NLS-1$
+ if (mode != Mode.DUP_REMOVE) {
+ for(int row = currentRow; row <= batch.getEndRow(); row++) {
+ assertEquals("Rows don't match at " + row,
expected[row-1], batch.getTuple(row)); //$NON-NLS-1$
+ }
}
-
+ currentRow += batch.getRowCount();
if(batch.getTerminationFlag()) {
break;
}
- currentRow += batch.getRowCount();
} catch (BlockedOnMemoryException e) {
if (!impl.wasBlocked()) {
throw new BlockedOnMemoryException();
}
}
}
+ assertEquals(expected.length, currentRow - 1);
}
public static int getIntBatchSize() {
List[] expected = new List[] {
- Arrays.asList(new Object[] { new Integer(0) }), //$NON-NLS-1$
//$NON-NLS-2$
+ Arrays.asList(new Object[] { new Integer(0) }),
};
String[] types = { "integer" }; //$NON-NLS-1$
@@ -118,7 +115,7 @@
/*
* 1 batch all in memory
*/
- private void helpTestBasicSort(List[] expected, boolean removeDups) throws Exception
{
+ private void helpTestBasicSort(List[] expected, Mode mode) throws Exception {
ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
@@ -157,11 +154,11 @@
if (i > 0) {
blockedOn.add(new Integer(i));
}
- helpTestSort(INT_BATCH_SIZE*2, elements, data, sortElements, sortTypes,
expected, blockedOn, removeDups);
+ helpTestSort(INT_BATCH_SIZE*2, elements, data, sortElements, sortTypes,
expected, blockedOn, mode);
}
}
- private void helpTestBiggerSort(int batches, int inMemoryBatches, boolean removeDups)
throws Exception {
+ private void helpTestBiggerSort(int batches, int inMemoryBatches) throws Exception {
ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
@@ -193,30 +190,31 @@
expected[i].add(unsortedNumbers.get(i));
}
- /*
- * the following code will do four tests, blocking in a variety of places
- */
- for (int i = 0; i < 3; i++) {
- Set blockedOn = new HashSet();
- if (i > 0) {
- //block on a variety of positions
- blockedOn.add(new Integer(i));
- blockedOn.add(new Integer(inMemoryBatches*i));
- blockedOn.add(new Integer(batches*i));
- blockedOn.add(new Integer(batches*(i+1)));
- }
- //5 batches in memory out of 10 total
- helpTestSort(INT_BATCH_SIZE * inMemoryBatches, elements, data, sortElements,
sortTypes, expected, null, removeDups);
+
+ for (Mode mode : Mode.values()) {
+ /*
+ * the following code will do four tests, blocking in a variety of places
+ */
+ for (int i = 0; i < 3; i++) {
+ Set blockedOn = new HashSet();
+ if (i > 0) {
+ //block on a variety of positions
+ blockedOn.add(new Integer(i));
+ blockedOn.add(new Integer(inMemoryBatches*i));
+ blockedOn.add(new Integer(batches*i));
+ blockedOn.add(new Integer(batches*(i+1)));
+ }
+ //5 batches in memory out of 10 total
+ helpTestSort(INT_BATCH_SIZE * inMemoryBatches, elements, data, sortElements,
sortTypes, expected, blockedOn, mode);
+ }
}
}
- public void testNoSort() throws Exception {
- helpTestBiggerSort(0, 2, false);
-
- helpTestBiggerSort(0, 2, true);
+ @Test public void testNoSort() throws Exception {
+ helpTestBiggerSort(0, 2);
}
- public void testBasicSort() throws Exception {
+ @Test public void testBasicSort() throws Exception {
List[] expected = new List[] {
Arrays.asList(new Object[] { new Integer(0), "0" }),
//$NON-NLS-1$
Arrays.asList(new Object[] { new Integer(0), "3" }),
//$NON-NLS-1$
@@ -240,10 +238,14 @@
Arrays.asList(new Object[] { new Integer(10), "4" })
//$NON-NLS-1$
};
- helpTestBasicSort(expected, false);
+ helpTestBasicSort(expected, Mode.SORT);
}
- public void testBasicSortRemoveDup() throws Exception {
+ /**
+ * Note the ordering here is not stable
+ * @throws Exception
+ */
+ @Test public void testBasicSortRemoveDup() throws Exception {
List[] expected = new List[] {
Arrays.asList(new Object[] { new Integer(0), "0" }),
//$NON-NLS-1$
Arrays.asList(new Object[] { new Integer(0), "3" }),
//$NON-NLS-1$
@@ -266,30 +268,41 @@
Arrays.asList(new Object[] { new Integer(10), "9" })
//$NON-NLS-1$
};
-
- helpTestBasicSort(expected, true);
- }
+ helpTestBasicSort(expected, Mode.DUP_REMOVE);
+ }
- public void testBiggerSort() throws Exception {
- helpTestBiggerSort(10, 5, false);
-
- helpTestBiggerSort(10, 5, true);
+ @Test public void testBasicSortRemoveDupSort() throws Exception {
+ List[] expected = new List[] {
+ Arrays.asList(new Object[] { new Integer(0), "0" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(0), "3" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(1), "2" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(1), "5" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(2), "1" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(2), "4" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(3), "3" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(3), "6" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(4), "3" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(5), "2" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(5), "5" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(6), "1" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(6), "4" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(7), "3" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(8), "2" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(9), "1" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(9), "5" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(10), "4" }),
//$NON-NLS-1$
+ Arrays.asList(new Object[] { new Integer(10), "9" })
//$NON-NLS-1$
+ };
+
+ helpTestBasicSort(expected, Mode.DUP_REMOVE_SORT);
+ }
+
+ @Test public void testBiggerSort() throws Exception {
+ helpTestBiggerSort(10, 5);
}
- public void testBiggerSortLowMemory() throws Exception {
- try {
- helpTestBiggerSort(5, 1, false);
- fail("Expected exception"); //$NON-NLS-1$
- } catch (BlockedOnMemoryException e) {
- //expected
- }
-
- try {
- helpTestBiggerSort(5, 1, true);
- fail("Expected exception"); //$NON-NLS-1$
- } catch (BlockedOnMemoryException e) {
- //expected
- }
+ @Test(expected=BlockedOnMemoryException.class) public void testBiggerSortLowMemory()
throws Exception {
+ helpTestBiggerSort(5, 1);
}
/**
@@ -297,10 +310,8 @@
*
* This is also a test of the multi-pass merge
*/
- public void testBiggerSortLowMemory2() throws Exception {
- helpTestBiggerSort(5, 2, false);
-
- helpTestBiggerSort(5, 2, true);
- }
-
+ @Test public void testBiggerSortLowMemory2() throws Exception {
+ helpTestBiggerSort(5, 2);
+ }
+
}