Author: jolee
Date: 2013-01-30 12:30:23 -0500 (Wed, 30 Jan 2013)
New Revision: 4546
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
branches/7.7.x/engine/src/main/java/org/teiid/query/util/Options.java
branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
Log:
TEIID-2363: proactive buffering not occurring for the inner side of an outer join on
"MERGE JOIN (SORT/ALREADY_SORTED)"
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java 2013-01-17
18:36:23 UTC (rev 4545)
+++
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java 2013-01-30
17:30:23 UTC (rev 4546)
@@ -72,6 +72,9 @@
if (buffer != null && (!saveOnMark || mark)) {
buffer.addTupleBatch(batch, true);
}
+ if (done && buffer != null) {
+ this.buffer.close();
+ }
}
return getCurrentTuple();
}
@@ -139,4 +142,26 @@
super.setPosition(position);
}
+ /**
+ * non-destructive method to set the mark
+ * @return true if the mark was set
+ */
+ public boolean ensureSave() {
+ if (!saveOnMark || mark) {
+ return false;
+ }
+ mark = true;
+ return true;
+ }
+
+ public void disableSave() {
+ if (buffer != null) {
+ this.saveOnMark = true;
+ this.mark = false;
+ if (batch != null && batch.getEndRow() <= this.buffer.getRowCount()) {
+ this.batch = null;
+ }
+ }
+ }
+
}
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2013-01-17
18:36:23 UTC (rev 4545)
+++
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2013-01-30
17:30:23 UTC (rev 4546)
@@ -28,19 +28,19 @@
import java.util.LinkedHashSet;
import java.util.List;
+import org.teiid.common.buffer.BufferManager.BufferReserveMode;
import org.teiid.common.buffer.IndexedTupleSource;
import org.teiid.common.buffer.STree;
+import org.teiid.common.buffer.STree.InsertMode;
import org.teiid.common.buffer.TupleBrowser;
import org.teiid.common.buffer.TupleSource;
-import org.teiid.common.buffer.BufferManager.BufferReserveMode;
-import org.teiid.common.buffer.STree.InsertMode;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
-import org.teiid.query.optimizer.relational.rules.NewCalculateCostUtil;
+import org.teiid.query.processor.relational.SourceState.ImplicitBuffer;
import org.teiid.query.sql.lang.OrderBy;
import org.teiid.query.sql.symbol.ElementSymbol;
import org.teiid.query.sql.symbol.SingleElementSymbol;
@@ -182,7 +182,7 @@
int rowId = 0;
List<?> lastTuple = null;
boolean sortedDistinct = sorted && !state.isDistinct();
- int sizeHint = index.getExpectedHeight(state.getTupleBuffer().getRowCount());
+ int sizeHint = index.getExpectedHeight(state.getRowCount());
index.setBatchInsert(sorted);
outer: while (its.hasNext()) {
//detect if sorted and distinct
@@ -233,9 +233,7 @@
}
private boolean shouldIndexIfSmall(SourceState source) throws
TeiidComponentException, TeiidProcessingException {
- Number cardinality = source.getSource().getEstimateNodeCardinality();
- return (source.hasBuffer() || (cardinality != null &&
cardinality.floatValue() != NewCalculateCostUtil.UNKNOWN_VALUE &&
cardinality.floatValue() <= source.getSource().getBatchSize() / 4))
- && (source.getRowCount() <= source.getSource().getBatchSize() / 2);
+ return source.rowCountLE(source.getSource().getBatchSize() / 2);
}
@Override
@@ -247,7 +245,6 @@
} else if (!this.leftSource.hasBuffer() && processingSortLeft ==
SortOption.SORT && shouldIndexIfSmall(this.rightSource)) {
this.processingSortLeft = SortOption.NOT_SORTED;
} else {
- this.leftSource.getTupleBuffer();
if (!this.rightSource.hasBuffer() && processingSortRight == SortOption.SORT
&& shouldIndexIfSmall(this.leftSource)) {
this.processingSortRight = SortOption.NOT_SORTED;
} else if (processingSortRight == SortOption.SORT &&
shouldIndex(this.leftSource, this.rightSource)) {
@@ -318,10 +315,10 @@
boolean useIndex = false;
int indexSchemaSize =
this.joinNode.getBufferManager().getSchemaSize(possibleIndex.getSource().getOutputElements());
//approximate that 1/2 of the index will be memory resident
- toReserve = (int)(indexSchemaSize * possibleIndex.getTupleBuffer().getRowCount() /
(possibleIndex.getTupleBuffer().getBatchSize() * .5));
+ toReserve = (int)(indexSchemaSize * possibleIndex.getRowCount() /
(possibleIndex.getSource().getBatchSize() * .5));
if (toReserve < this.joinNode.getBufferManager().getMaxProcessingSize()) {
useIndex = true;
- } else if (possibleIndex.getTupleBuffer().getRowCount() /
this.joinNode.getBatchSize() < preferMemCutoff) {
+ } else if (possibleIndex.getRowCount() / this.joinNode.getBatchSize() <
preferMemCutoff) {
useIndex = true;
}
if (useIndex) {
@@ -329,6 +326,7 @@
return true;
}
this.repeatedMerge = true;
+ possibleIndex.setImplicitBuffer(ImplicitBuffer.FULL);
return true;
}
@@ -344,7 +342,7 @@
super.process();
return;
}
- if (this.sortedSource.getTupleBuffer().getRowCount() == 0) {
+ if (this.sortedSource.getRowCount() == 0) {
return;
}
if (repeatedMerge) {
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java 2013-01-17
18:36:23 UTC (rev 4545)
+++
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java 2013-01-30
17:30:23 UTC (rev 4546)
@@ -42,291 +42,321 @@
import org.teiid.query.sql.lang.JoinType;
import org.teiid.query.util.CommandContext;
-
-/**
+/**
* @since 4.2
*/
public class JoinNode extends SubqueryAwareRelationalNode {
-
- static class BatchAvailableException extends RuntimeException {}
-
- static BatchAvailableException BATCH_AVILABLE = new BatchAvailableException();
-
- public enum JoinStrategyType {
- MERGE,
- ENHANCED_SORT,
- NESTED_LOOP,
- NESTED_TABLE
+
+ static class BatchAvailableException extends RuntimeException {
}
-
- private enum State { LOAD_LEFT, LOAD_RIGHT, EXECUTE }
- private State state = State.LOAD_LEFT;
-
- private JoinStrategy joinStrategy;
- private JoinType joinType;
- private String dependentValueSource;
-
- private List leftExpressions;
- private List rightExpressions;
- private boolean leftDistinct;
- private boolean rightDistinct;
- private Criteria joinCriteria;
-
- private Map combinedElementMap;
- private int[] projectionIndexes;
-
- private DependentValueSource dvs;
-
- public JoinNode(int nodeID) {
- super(nodeID);
- }
-
- public void setJoinType(JoinType type) {
- this.joinType = type;
- }
-
- public JoinStrategy getJoinStrategy() {
- return this.joinStrategy;
- }
- public void setJoinStrategy(JoinStrategy joinStrategy) {
- this.joinStrategy = joinStrategy;
- }
+ static BatchAvailableException BATCH_AVILABLE = new BatchAvailableException();
- public void setJoinExpressions(List leftExpressions, List rightExpressions) {
- this.leftExpressions = leftExpressions;
- this.rightExpressions = rightExpressions;
- }
-
- public boolean isLeftDistinct() {
+ public enum JoinStrategyType {
+ MERGE, ENHANCED_SORT, NESTED_LOOP, NESTED_TABLE
+ }
+
+ private enum State {
+ LOAD_LEFT, LOAD_RIGHT, EXECUTE
+ }
+
+ private State state = State.LOAD_LEFT;
+
+ private JoinStrategy joinStrategy;
+ private JoinType joinType;
+ private String dependentValueSource;
+
+ private List leftExpressions;
+ private List rightExpressions;
+ private boolean leftDistinct;
+ private boolean rightDistinct;
+ private Criteria joinCriteria;
+
+ private Map combinedElementMap;
+ private int[] projectionIndexes;
+
+ private DependentValueSource dvs;
+
+ public JoinNode(int nodeID) {
+ super(nodeID);
+ }
+
+ public void setJoinType(JoinType type) {
+ this.joinType = type;
+ }
+
+ public JoinStrategy getJoinStrategy() {
+ return this.joinStrategy;
+ }
+
+ public void setJoinStrategy(JoinStrategy joinStrategy) {
+ this.joinStrategy = joinStrategy;
+ }
+
+ public void setJoinExpressions(List leftExpressions, List rightExpressions) {
+ this.leftExpressions = leftExpressions;
+ this.rightExpressions = rightExpressions;
+ }
+
+ public boolean isLeftDistinct() {
return leftDistinct;
}
-
- public void setLeftDistinct(boolean leftDistinct) {
+
+ public void setLeftDistinct(boolean leftDistinct) {
this.leftDistinct = leftDistinct;
}
-
- public boolean isRightDistinct() {
+
+ public boolean isRightDistinct() {
return rightDistinct;
}
-
- public void setRightDistinct(boolean rightDistinct) {
+
+ public void setRightDistinct(boolean rightDistinct) {
this.rightDistinct = rightDistinct;
}
-
- public void setJoinCriteria(Criteria joinCriteria) {
- this.joinCriteria = joinCriteria;
- }
-
- @Override
- public void initialize(CommandContext context, BufferManager bufferManager,
- ProcessorDataManager dataMgr) {
- super.initialize(context, bufferManager, dataMgr);
-
- if (this.combinedElementMap == null) {
- // Create element lookup map for evaluating project expressions
- List combinedElements = new ArrayList(getChildren()[0].getElements());
- combinedElements.addAll(getChildren()[1].getElements());
- this.combinedElementMap = createLookupMap(combinedElements);
- this.projectionIndexes = getProjectionIndexes(combinedElementMap,
getElements());
- }
- }
-
- public void open()
- throws TeiidComponentException, TeiidProcessingException {
- // Set Up Join Strategy
- this.joinStrategy.initialize(this);
-
- joinStrategy.openLeft();
-
- if(!isDependent()) {
- joinStrategy.openRight();
- }
-
- this.state = State.LOAD_LEFT;
- }
- /**
- * @see org.teiid.query.processor.relational.RelationalNode#clone()
- * @since 4.2
- */
- public Object clone() {
- JoinNode clonedNode = new JoinNode(super.getID());
- super.copy(this, clonedNode);
-
- clonedNode.joinType = this.joinType;
- clonedNode.joinStrategy = this.joinStrategy.clone();
-
- clonedNode.joinCriteria = this.joinCriteria;
-
- clonedNode.leftExpressions = leftExpressions;
-
- clonedNode.rightExpressions = rightExpressions;
- clonedNode.dependentValueSource = this.dependentValueSource;
- clonedNode.rightDistinct = rightDistinct;
- clonedNode.leftDistinct = leftDistinct;
-
- return clonedNode;
- }
+ public void setJoinCriteria(Criteria joinCriteria) {
+ this.joinCriteria = joinCriteria;
+ }
- /**
- * @see org.teiid.query.processor.relational.RelationalNode#nextBatchDirect()
- * @since 4.2
- */
- protected TupleBatch nextBatchDirect() throws BlockedException,
- TeiidComponentException,
- TeiidProcessingException {
- if (state == State.LOAD_LEFT) {
- if (this.joinType != JoinType.JOIN_FULL_OUTER) {
- this.joinStrategy.leftSource.setImplicitBuffer(ImplicitBuffer.NONE);
- }
- //left child was already opened by the join node
- this.joinStrategy.loadLeft();
- if (isDependent()) {
- TupleBuffer buffer = this.joinStrategy.leftSource.getTupleBuffer();
- dvs = new DependentValueSource(buffer);
- dvs.setDistinct(this.joinStrategy.leftSource.isDistinct());
-
this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource, dvs);
- }
- state = State.LOAD_RIGHT;
- }
- if (state == State.LOAD_RIGHT) {
- this.joinStrategy.openRight();
- this.joinStrategy.loadRight();
- state = State.EXECUTE;
- }
- try {
- this.joinStrategy.process();
- this.terminateBatches();
- } catch (BatchAvailableException e) {
- //pull the batch
- }
- return pullBatch();
- }
+ @Override
+ public void initialize(CommandContext context, BufferManager bufferManager,
+ ProcessorDataManager dataMgr) {
+ super.initialize(context, bufferManager, dataMgr);
- /**
- * @see
org.teiid.query.processor.relational.RelationalNode#getDescriptionProperties()
- * @since 4.2
- */
- public PlanNode getDescriptionProperties() {
- // Default implementation - should be overridden
- PlanNode props = super.getDescriptionProperties();
-
- if(isDependent()) {
- props.addProperty(PROP_DEPENDENT, Boolean.TRUE.toString());
- }
- props.addProperty(PROP_JOIN_STRATEGY, this.joinStrategy.toString());
- props.addProperty(PROP_JOIN_TYPE, this.joinType.toString());
- List<String> critList = getCriteriaList();
- props.addProperty(PROP_JOIN_CRITERIA, critList);
- return props;
- }
+ if (this.combinedElementMap == null) {
+ // Create element lookup map for evaluating project expressions
+ List combinedElements = new ArrayList(
+ getChildren()[0].getElements());
+ combinedElements.addAll(getChildren()[1].getElements());
+ this.combinedElementMap = createLookupMap(combinedElements);
+ this.projectionIndexes = getProjectionIndexes(combinedElementMap,
+ getElements());
+ }
+ }
+ public void open() throws TeiidComponentException, TeiidProcessingException {
+ // Set Up Join Strategy
+ this.joinStrategy.initialize(this);
+
+ joinStrategy.openLeft();
+
+ if (!isDependent()) {
+ joinStrategy.openRight();
+ }
+
+ this.state = State.LOAD_LEFT;
+ }
+
+ /**
+ * @see org.teiid.query.processor.relational.RelationalNode#clone()
+ * @since 4.2
+ */
+ public Object clone() {
+ JoinNode clonedNode = new JoinNode(super.getID());
+ super.copy(this, clonedNode);
+
+ clonedNode.joinType = this.joinType;
+ clonedNode.joinStrategy = this.joinStrategy.clone();
+
+ clonedNode.joinCriteria = this.joinCriteria;
+
+ clonedNode.leftExpressions = leftExpressions;
+
+ clonedNode.rightExpressions = rightExpressions;
+ clonedNode.dependentValueSource = this.dependentValueSource;
+ clonedNode.rightDistinct = rightDistinct;
+ clonedNode.leftDistinct = leftDistinct;
+
+ return clonedNode;
+ }
+
+ /**
+ * @see org.teiid.query.processor.relational.RelationalNode#nextBatchDirect()
+ * @since 4.2
+ */
+ protected TupleBatch nextBatchDirect() throws BlockedException,
+ TeiidComponentException, TeiidProcessingException {
+ try {
+ if (state == State.LOAD_LEFT) {
+ if (this.joinType != JoinType.JOIN_FULL_OUTER) {
+ this.joinStrategy.leftSource
+ .setImplicitBuffer(ImplicitBuffer.NONE);
+ }
+ // left child was already opened by the join node
+ this.joinStrategy.loadLeft();
+ if (isDependent()) {
+ TupleBuffer buffer = this.joinStrategy.leftSource
+ .getTupleBuffer();
+ // the tuplebuffer may be from a lower node, so pass in the
+ // schema
+ dvs = new DependentValueSource(buffer);
+ dvs.setDistinct(this.joinStrategy.leftSource.isDistinct());
+ this.getContext().getVariableContext()
+ .setGlobalValue(this.dependentValueSource, dvs);
+ }
+ state = State.LOAD_RIGHT;
+ }
+ } catch (BlockedException e) {
+ if (!isDependent()) {
+ this.joinStrategy.openRight();
+ this.joinStrategy.loadRight();
+ }
+ throw e;
+ }
+ try {
+ if (state == State.LOAD_RIGHT) {
+ this.joinStrategy.openRight();
+ this.joinStrategy.loadRight();
+ state = State.EXECUTE;
+ }
+ this.joinStrategy.process();
+ this.terminateBatches();
+ } catch (BatchAvailableException e) {
+ // pull the batch
+ } catch (BlockedException e) {
+ // TODO: this leads to duplicate exceptions, we
+ // could track which side is blocking
+ try {
+ this.joinStrategy.leftSource.prefetch(true);
+ } catch (BlockedException e1) {
+
+ }
+ this.joinStrategy.rightSource.prefetch(true);
+ throw e;
+ }
+ return pullBatch();
+ }
+
+ /**
+ * @see org.teiid.query.processor.relational.RelationalNode#getDescriptionProperties()
+ * @since 4.2
+ */
+ public PlanNode getDescriptionProperties() {
+ // Default implementation - should be overridden
+ PlanNode props = super.getDescriptionProperties();
+
+ if (isDependent()) {
+ props.addProperty(PROP_DEPENDENT, Boolean.TRUE.toString());
+ }
+ props.addProperty(PROP_JOIN_STRATEGY, this.joinStrategy.toString());
+ props.addProperty(PROP_JOIN_TYPE, this.joinType.toString());
+ List<String> critList = getCriteriaList();
+ props.addProperty(PROP_JOIN_CRITERIA, critList);
+ return props;
+ }
+
private List<String> getCriteriaList() {
List<String> critList = new ArrayList<String>();
- if (leftExpressions != null) {
- for(int i=0; i < this.leftExpressions.size(); i++) {
- critList.add(this.leftExpressions.get(i).toString() + "=" +
this.rightExpressions.get(i).toString()); //$NON-NLS-1$
- }
- }
- if (this.joinCriteria != null) {
- for (Criteria crit :
(List<Criteria>)Criteria.separateCriteriaByAnd(joinCriteria)) {
- critList.add(crit.toString());
- }
- }
+ if (leftExpressions != null) {
+ for (int i = 0; i < this.leftExpressions.size(); i++) {
+ critList.add(this.leftExpressions.get(i).toString()
+ + "=" + this.rightExpressions.get(i).toString()); //$NON-NLS-1$
+ }
+ }
+ if (this.joinCriteria != null) {
+ for (Criteria crit : (List<Criteria>) Criteria
+ .separateCriteriaByAnd(joinCriteria)) {
+ critList.add(crit.toString());
+ }
+ }
return critList;
}
- /**
- * @see
org.teiid.query.processor.relational.RelationalNode#getNodeString(java.lang.StringBuffer)
- * @since 4.2
- */
- protected void getNodeString(StringBuffer str) {
- str.append(getClassName());
- str.append("("); //$NON-NLS-1$
- str.append(getID());
- str.append(") [");//$NON-NLS-1$
- if(isDependent()) {
- str.append("Dependent] [");//$NON-NLS-1$
- }
- str.append(this.joinStrategy.toString());
- str.append("] [");//$NON-NLS-1$
- str.append(this.joinType.toString());
- str.append("]"); //$NON-NLS-1$
- if (getJoinType() != JoinType.JOIN_CROSS) {
- str.append(" criteria=").append(getCriteriaList()); //$NON-NLS-1$
- }
- str.append(" output="); //$NON-NLS-1$
- str.append(getElements());
- }
+ /**
+ * @see
org.teiid.query.processor.relational.RelationalNode#getNodeString(java.lang.StringBuffer)
+ * @since 4.2
+ */
+ protected void getNodeString(StringBuffer str) {
+ str.append(getClassName());
+ str.append("("); //$NON-NLS-1$
+ str.append(getID());
+ str.append(") [");//$NON-NLS-1$
+ if (isDependent()) {
+ str.append("Dependent] [");//$NON-NLS-1$
+ }
+ str.append(this.joinStrategy.toString());
+ str.append("] [");//$NON-NLS-1$
+ str.append(this.joinType.toString());
+ str.append("]"); //$NON-NLS-1$
+ if (getJoinType() != JoinType.JOIN_CROSS) {
+ str.append(" criteria=").append(getCriteriaList()); //$NON-NLS-1$
+ }
+ str.append(" output="); //$NON-NLS-1$
+ str.append(getElements());
+ }
- /**
- * @return Returns the isDependent.
- */
- public boolean isDependent() {
- return this.dependentValueSource != null;
- }
-
- /**
- * @param isDependent The isDependent to set.
- */
- public void setDependentValueSource(String dependentValueSource) {
- this.dependentValueSource = dependentValueSource;
- }
-
- public String getDependentValueSourceName() {
+ /**
+ * @return Returns the isDependent.
+ */
+ public boolean isDependent() {
+ return this.dependentValueSource != null;
+ }
+
+ /**
+ * @param isDependent
+ * The isDependent to set.
+ */
+ public void setDependentValueSource(String dependentValueSource) {
+ this.dependentValueSource = dependentValueSource;
+ }
+
+ public String getDependentValueSourceName() {
return dependentValueSource;
}
-
- public void closeDirect() {
- super.closeDirect();
- joinStrategy.close();
- if (this.getContext() != null && this.dependentValueSource != null) {
- this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource,
null);
- }
- this.dvs = null;
- }
-
- @Override
- public void reset() {
- super.reset();
- this.dvs = null;
- }
- public JoinType getJoinType() {
- return this.joinType;
- }
+ public void closeDirect() {
+ super.closeDirect();
+ joinStrategy.close();
+ if (this.getContext() != null && this.dependentValueSource != null) {
+ this.getContext().getVariableContext()
+ .setGlobalValue(this.dependentValueSource, null);
+ }
+ this.dvs = null;
+ }
- Map getCombinedElementMap() {
- return this.combinedElementMap;
- }
+ @Override
+ public void reset() {
+ super.reset();
+ this.dvs = null;
+ }
- public Criteria getJoinCriteria() {
- return this.joinCriteria;
- }
-
- boolean matchesCriteria(List outputTuple) throws BlockedException,
TeiidComponentException, ExpressionEvaluationException {
- return (this.joinCriteria == null ||
getEvaluator(this.combinedElementMap).evaluate(this.joinCriteria, outputTuple));
- }
+ public JoinType getJoinType() {
+ return this.joinType;
+ }
- public List getLeftExpressions() {
- return this.leftExpressions;
- }
+ Map getCombinedElementMap() {
+ return this.combinedElementMap;
+ }
- public List getRightExpressions() {
- return this.rightExpressions;
- }
-
- @Override
- protected void addBatchRow(List row) {
- List projectTuple = projectTuple(this.projectionIndexes, row);
- super.addBatchRow(projectTuple);
- if (isBatchFull()) {
- throw BATCH_AVILABLE;
- }
- }
-
- public DependentValueSource getDependentValueSource() {
+ public Criteria getJoinCriteria() {
+ return this.joinCriteria;
+ }
+
+ boolean matchesCriteria(List outputTuple) throws BlockedException,
+ TeiidComponentException, ExpressionEvaluationException {
+ return (this.joinCriteria == null || getEvaluator(
+ this.combinedElementMap).evaluate(this.joinCriteria,
+ outputTuple));
+ }
+
+ public List getLeftExpressions() {
+ return this.leftExpressions;
+ }
+
+ public List getRightExpressions() {
+ return this.rightExpressions;
+ }
+
+ @Override
+ protected void addBatchRow(List row) {
+ List projectTuple = projectTuple(this.projectionIndexes, row);
+ super.addBatchRow(projectTuple);
+ if (isBatchFull()) {
+ throw BATCH_AVILABLE;
+ }
+ }
+
+ public DependentValueSource getDependentValueSource() {
return dvs;
}
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java 2013-01-17
18:36:23 UTC (rev 4545)
+++
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java 2013-01-30
17:30:23 UTC (rev 4546)
@@ -342,10 +342,10 @@
@Override
protected void loadRight() throws TeiidComponentException,
TeiidProcessingException {
- this.rightSource.sort(this.processingSortRight);
- if (this.joinNode.getJoinType() != JoinType.JOIN_FULL_OUTER) {
+ if (this.joinNode.getJoinType() != JoinType.JOIN_FULL_OUTER) {
this.rightSource.setImplicitBuffer(ImplicitBuffer.ON_MARK);
}
+ this.rightSource.sort(this.processingSortRight);
}
public void setProcessingSortRight(boolean processingSortRight) {
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2013-01-17
18:36:23 UTC (rev 4545)
+++
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2013-01-30
17:30:23 UTC (rev 4546)
@@ -25,13 +25,12 @@
import java.util.Collections;
import java.util.List;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.common.buffer.IndexedTupleSource;
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.TupleSource;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
-import org.teiid.query.processor.BatchCollector;
import org.teiid.query.processor.BatchIterator;
import org.teiid.query.processor.relational.MergeJoinStrategy.SortOption;
import org.teiid.query.processor.relational.SortUtility.Mode;
@@ -46,7 +45,6 @@
private RelationalNode source;
private List expressions;
- private BatchCollector collector;
private TupleBuffer buffer;
private List<TupleBuffer> buffers;
private List<Object> outerVals;
@@ -57,6 +55,7 @@
private boolean distinct;
private ImplicitBuffer implicitBuffer = ImplicitBuffer.FULL;
boolean open;
+ private BatchIterator prefetch;
private SortUtility sortUtility;
@@ -98,7 +97,7 @@
return currentTuple;
}
- public void reset() throws TeiidComponentException {
+ public void reset() throws TeiidComponentException, TeiidProcessingException {
this.getIterator().reset();
this.getIterator().mark();
this.currentTuple = null;
@@ -120,28 +119,104 @@
this.iterator.closeSource();
this.iterator = null;
}
+ this.prefetch = null;
this.currentTuple = null;
}
public int getRowCount() throws TeiidComponentException, TeiidProcessingException {
return this.getTupleBuffer().getRowCount();
}
+
+ /**
+ * Uses the prefetch logic to determine an incremental row count
+ */
+ public boolean rowCountLE(int count) throws TeiidComponentException,
TeiidProcessingException {
+ if (buffer == null) {
+ prefetch(false);
+ }
+ while (buffer.getRowCount() <= count) {
+ if (prefetch == null) {
+ return true;
+ }
+ prefetch(false);
+ }
+ return false;
+ }
- IndexedTupleSource getIterator() throws TeiidComponentException {
+ IndexedTupleSource getIterator() throws TeiidComponentException,
TeiidProcessingException {
if (this.iterator == null) {
- if (this.buffer != null) {
- iterator = buffer.createIndexedTupleSource();
+ if (this.buffer == null) {
+ getTupleBuffer(false);
+ }
+ if (this.prefetch != null) {
+ this.iterator = this.prefetch;
} else {
- // return a TupleBatch tuplesource iterator
- BatchIterator bi = new BatchIterator(this.source);
- if (implicitBuffer != ImplicitBuffer.NONE) {
- bi.setBuffer(createSourceTupleBuffer(), implicitBuffer ==
ImplicitBuffer.ON_MARK);
- }
- this.iterator = bi;
+ iterator = buffer.createIndexedTupleSource(implicitBuffer ==
ImplicitBuffer.NONE);
}
}
return this.iterator;
}
+
+ /**
+ * Create a batch iterator to perform basic prefetching
+ * @throws TeiidComponentException
+ */
+ private void createPrefetch() throws TeiidComponentException {
+ this.prefetch = new BatchIterator(this.source);
+ boolean useMark = implicitBuffer != ImplicitBuffer.FULL;
+ this.buffer = createSourceTupleBuffer();
+ this.prefetch.setBuffer(this.buffer, useMark);
+ if (useMark) {
+ this.prefetch.mark();
+ }
+ }
+
+ /**
+ * Pro-actively pull batches for later use.
+ * There are unfortunately quite a few cases to cover here.
+ */
+ protected void prefetch(boolean limit) throws TeiidComponentException,
TeiidProcessingException {
+ if (this.prefetch == null) {
+ if (this.buffer != null) {
+ return;
+ }
+ if (this.sortUtility != null) {
+ sortUtility.sort();
+ return;
+ }
+ if (this.source.hasFinalBuffer()) {
+ this.buffer = this.source.getFinalBuffer();
+ return;
+ }
+ createPrefetch();
+ }
+ if (limit && this.buffer.getManagedRowCount() >=
this.source.getBatchSize() *
this.source.getContext().getOptions().getJoinPrefetchBatches()) {
+ return;
+ }
+ int curIndex = this.prefetch.getCurrentIndex();
+ boolean marked = false;
+ if (this.prefetch.ensureSave()) {
+ marked = true;
+ }
+ this.prefetch.setPosition(this.buffer.getRowCount() + 1);
+ BatchIterator bi = this.prefetch; //even if we clear the prefetch, we may already be
using it as the iterator
+ try {
+ if (!this.prefetch.hasNext()) {
+ this.prefetch = null;
+ if (this.iterator != bi) {
+ bi = null;
+ }
+ }
+ } finally {
+ if (bi != null) {
+ if (marked) {
+ bi.reset();
+ } else {
+ bi.setPosition(curIndex);
+ }
+ }
+ }
+ }
public List<Object> getOuterVals() {
return this.outerVals;
@@ -164,14 +239,26 @@
}
public TupleBuffer getTupleBuffer() throws TeiidComponentException,
TeiidProcessingException {
+ return getTupleBuffer(true);
+ }
+
+ private TupleBuffer getTupleBuffer(boolean full) throws TeiidComponentException,
TeiidProcessingException {
if (this.buffer == null) {
if (this.iterator instanceof BatchIterator) {
throw new AssertionError("cannot buffer the source"); //$NON-NLS-1$
}
- if (collector == null) {
- collector = new BatchCollector(source, source.getBufferManager(),
source.getContext(), false);
- }
- this.buffer = collector.collectTuples();
+ if (source.hasFinalBuffer()) {
+ this.buffer = source.getFinalBuffer();
+ return this.buffer;
+ }
+ this.implicitBuffer = ImplicitBuffer.FULL;
+ createPrefetch();
+ }
+ if (full && this.prefetch != null) {
+ while (this.prefetch.hasNext()) {
+ this.prefetch.setPosition(this.prefetch.getCurrentIndex() +
this.source.getBatchSize());
+ }
+ this.prefetch = null; //fully buffered
}
return this.buffer;
}
@@ -192,7 +279,13 @@
TupleSource ts = null;
if (this.buffer != null) {
this.buffer.setForwardOnly(true);
- ts = this.buffer.createIndexedTupleSource();
+ if (this.prefetch != null) {
+ this.prefetch.setPosition(1);
+ this.prefetch.disableSave();
+ ts = this.prefetch;
+ } else {
+ ts = this.buffer.createIndexedTupleSource();
+ }
} else {
ts = new BatchIterator(this.source);
}
@@ -208,12 +301,17 @@
nextBuffer();
return;
}
- this.buffer = sortUtility.sort();
+ TupleBuffer sorted = sortUtility.sort();
+ if (this.buffer != null) {
+ this.buffer.remove();
+ }
+ this.prefetch = null;
+ this.buffer = sorted;
this.markDistinct(sortUtility.isDistinct());
}
public boolean hasBuffer() {
- return this.buffer != null;
+ return this.buffer != null && this.prefetch == null;
}
public boolean nextBuffer() {
@@ -223,6 +321,7 @@
}
this.buffer = this.buffers.remove(this.buffers.size() - 1);
this.buffer.setForwardOnly(false);
+ this.prefetch = null;
this.resetState();
return true;
}
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/util/Options.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/util/Options.java 2013-01-17
18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/util/Options.java 2013-01-30
17:30:23 UTC (rev 4546)
@@ -31,10 +31,12 @@
public static final String UNNEST_DEFAULT = "org.teiid.subqueryUnnestDefault";
//$NON-NLS-1$
public static final String PUSHDOWN_DEFAULT_NULL_ORDER =
"org.teiid.pushdownDefaultNullOrder"; //$NON-NLS-1$
+ public static final String JOIN_PREFETCH_BATCHES =
"org.teiid.joinPrefetchBatches"; //$NON-NLS-1$
private Properties properties;
private boolean subqueryUnnestDefault;
private boolean pushdownDefaultNullOrder = true;
+ private int joinPrefetchBatches = 10;
public Properties getProperties() {
return properties;
@@ -69,5 +71,17 @@
this.pushdownDefaultNullOrder = p;
return this;
}
-
+
+ public void setJoinPrefetchBatches(int joinPrefetchBatches) {
+ this.joinPrefetchBatches = joinPrefetchBatches;
+ }
+
+ public int getJoinPrefetchBatches() {
+ return joinPrefetchBatches;
+ }
+
+ public Options joinPrefetchBatches(int i) {
+ this.joinPrefetchBatches = i;
+ return this;
+ }
}
Modified:
branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
---
branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2013-01-17
18:36:23 UTC (rev 4545)
+++
branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2013-01-30
17:30:23 UTC (rev 4546)
@@ -332,13 +332,18 @@
assertEquals(ThreadState.IDLE, item.getThreadState());
assertTrue(item.resultsBuffer.getManagedRowCount() <= rowsPerBatch*23);
//pull the rest of the results
- for (int j = 0; j < 48; j++) {
- item =
core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
-
- message = core.processCursorRequest(reqMsg.getExecutionId(), (j + 2) *
rowsPerBatch + 1, rowsPerBatch);
- rm = message.get(5000, TimeUnit.MILLISECONDS);
- assertNull(rm.getException());
- assertEquals(rowsPerBatch, rm.getResultsList().size());
+ int start = 17;
+ while (true) {
+ item =
core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
+
+ message = core.processCursorRequest(reqMsg.getExecutionId(), start,
rowsPerBatch);
+ rm = message.get(5000, TimeUnit.MILLISECONDS);
+ assertNull(rm.getException());
+ assertTrue(rowsPerBatch >= rm.getResultsList().size());
+ start += rm.getResultsList().size();
+ if (rm.getFinalRow() == rm.getLastRow()) {
+ break;
+ }
}
}
Modified:
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
===================================================================
---
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java 2013-01-17
18:36:23 UTC (rev 4545)
+++
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java 2013-01-30
17:30:23 UTC (rev 4546)
@@ -29,9 +29,9 @@
import org.junit.Test;
import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.common.buffer.BufferManagerFactory;
import org.teiid.common.buffer.TupleBuffer;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.query.processor.relational.FakeRelationalNode;
import org.teiid.query.sql.symbol.ElementSymbol;
@@ -107,4 +107,28 @@
assertNull(bi.nextTuple());
}
+ @Test public void testDisableSave() throws Exception {
+ BatchIterator bi = new BatchIterator(new FakeRelationalNode(1, new List[] {
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ }, 2));
+ BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
+ TupleBuffer tb = bm.createTupleBuffer(Arrays.asList(new ElementSymbol("x")),
"test", TupleSourceType.PROCESSOR);
+ bi.setBuffer(tb, false); //$NON-NLS-1$
+ bi.setPosition(2);
+ assertTrue(bi.hasNext());
+ tb.setForwardOnly(true);
+ bi.setPosition(1);
+ bi.disableSave();
+ for (int i = 0; i < 6; i++) {
+ assertNotNull(bi.nextTuple());
+ }
+ assertNull(bi.nextTuple());
+ assertEquals(0, tb.getManagedRowCount());
+ }
+
}