teiid SVN: r4547 - in branches/7.7.x/engine/src: test/java/org/teiid/query/processor and 1 other directory.
by teiid-commits@lists.jboss.org
Author: jolee
Date: 2013-01-31 14:54:35 -0500 (Thu, 31 Jan 2013)
New Revision: 4547
TEIID-2363: correcting an issue with nested table joins
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
[View More]=
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2013-01-30 17:30:23 UTC (rev 4546)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2013-01-31 19:54:35 UTC (rev 4547)
@@ -176,6 +176,9 @@
* There are unfortunately quite a few cases to cover here.
protected void prefetch(boolean limit) throws TeiidComponentException, TeiidProcessingException {
+ if (!open) {
+ return;
+ }
if (this.prefetch == null) {
if (this.buffer != null) {
Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestTextTable.java
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestTextTable.java 2013-01-30 17:30:23 UTC (rev 4546)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestTextTable.java 2013-01-31 19:54:35 UTC (rev 4547)
@@ -300,6 +300,22 @@
assertTrue(!(join.getJoinStrategy() instanceof NestedTableJoinStrategy));
helpProcess(plan, createCommandContext(), dataManager, expected);
+ @Test public void testTextTableJoinPrefetch() throws Exception {
+ String sql = "select z.* from (select x.* from (select * from pm1.g1 where e1 = 'c') y, texttable(e1 || '\n' || e2 || '\n' || e3 COLUMNS x string) x) as z";
+ List[] expected = new List[] {
+ Arrays.asList("c"),
+ Arrays.asList("1"),
+ Arrays.asList("true"),
+ };
+ FakeDataManager dataManager = new FakeDataManager();
+ dataManager.setBlockOnce();
+ sampleData1(dataManager);
+ RelationalPlan plan = (RelationalPlan)helpGetPlan(helpParse(sql), RealMetadataFactory.example1Cached());
+ helpProcess(plan, createCommandContext(), dataManager, expected);
+ }
@Test public void testTextTableJoin1() throws Exception {
String sql = "select e1, e2 from texttable('a' COLUMNS col string) x, pm1.g1 where col = e1";
[View Less]
12 years, 1 month
teiid SVN: r4546 - in branches/7.7.x/engine/src: main/java/org/teiid/query/processor/relational and 3 other directories.
by teiid-commits@lists.jboss.org
Author: jolee
Date: 2013-01-30 12:30:23 -0500 (Wed, 30 Jan 2013)
New Revision: 4546
[View More]engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
TEIID-2363: proactive buffering not occurring for the inner side of an outer join on "MERGE JOIN (SORT/ALREADY_SORTED)"
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java 2013-01-17 18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java 2013-01-30 17:30:23 UTC (rev 4546)
@@ -72,6 +72,9 @@
if (buffer != null && (!saveOnMark || mark)) {
buffer.addTupleBatch(batch, true);
+ if (done && buffer != null) {
+ this.buffer.close();
+ }
return getCurrentTuple();
@@ -139,4 +142,26 @@
+ /**
+ * non-destructive method to set the mark
+ * @return true if the mark was set
+ */
+ public boolean ensureSave() {
+ if (!saveOnMark || mark) {
+ return false;
+ }
+ mark = true;
+ return true;
+ }
+ public void disableSave() {
+ if (buffer != null) {
+ this.saveOnMark = true;
+ this.mark = false;
+ if (batch != null && batch.getEndRow() <= this.buffer.getRowCount()) {
+ this.batch = null;
+ }
+ }
+ }
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2013-01-17 18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2013-01-30 17:30:23 UTC (rev 4546)
@@ -28,19 +28,19 @@
import java.util.LinkedHashSet;
import java.util.List;
+import org.teiid.common.buffer.BufferManager.BufferReserveMode;
import org.teiid.common.buffer.IndexedTupleSource;
import org.teiid.common.buffer.STree;
+import org.teiid.common.buffer.STree.InsertMode;
import org.teiid.common.buffer.TupleBrowser;
import org.teiid.common.buffer.TupleSource;
-import org.teiid.common.buffer.BufferManager.BufferReserveMode;
-import org.teiid.common.buffer.STree.InsertMode;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
-import org.teiid.query.optimizer.relational.rules.NewCalculateCostUtil;
+import org.teiid.query.processor.relational.SourceState.ImplicitBuffer;
import org.teiid.query.sql.lang.OrderBy;
import org.teiid.query.sql.symbol.ElementSymbol;
import org.teiid.query.sql.symbol.SingleElementSymbol;
@@ -182,7 +182,7 @@
int rowId = 0;
List<?> lastTuple = null;
boolean sortedDistinct = sorted && !state.isDistinct();
- int sizeHint = index.getExpectedHeight(state.getTupleBuffer().getRowCount());
+ int sizeHint = index.getExpectedHeight(state.getRowCount());
outer: while (its.hasNext()) {
//detect if sorted and distinct
@@ -233,9 +233,7 @@
private boolean shouldIndexIfSmall(SourceState source) throws TeiidComponentException, TeiidProcessingException {
- Number cardinality = source.getSource().getEstimateNodeCardinality();
- return (source.hasBuffer() || (cardinality != null && cardinality.floatValue() != NewCalculateCostUtil.UNKNOWN_VALUE && cardinality.floatValue() <= source.getSource().getBatchSize() / 4))
- && (source.getRowCount() <= source.getSource().getBatchSize() / 2);
+ return source.rowCountLE(source.getSource().getBatchSize() / 2);
@@ -247,7 +245,6 @@
} else if (!this.leftSource.hasBuffer() && processingSortLeft == SortOption.SORT && shouldIndexIfSmall(this.rightSource)) {
this.processingSortLeft = SortOption.NOT_SORTED;
} else {
- this.leftSource.getTupleBuffer();
if (!this.rightSource.hasBuffer() && processingSortRight == SortOption.SORT && shouldIndexIfSmall(this.leftSource)) {
this.processingSortRight = SortOption.NOT_SORTED;
} else if (processingSortRight == SortOption.SORT && shouldIndex(this.leftSource, this.rightSource)) {
@@ -318,10 +315,10 @@
boolean useIndex = false;
int indexSchemaSize = this.joinNode.getBufferManager().getSchemaSize(possibleIndex.getSource().getOutputElements());
//approximate that 1/2 of the index will be memory resident
- toReserve = (int)(indexSchemaSize * possibleIndex.getTupleBuffer().getRowCount() / (possibleIndex.getTupleBuffer().getBatchSize() * .5));
+ toReserve = (int)(indexSchemaSize * possibleIndex.getRowCount() / (possibleIndex.getSource().getBatchSize() * .5));
if (toReserve < this.joinNode.getBufferManager().getMaxProcessingSize()) {
useIndex = true;
- } else if (possibleIndex.getTupleBuffer().getRowCount() / this.joinNode.getBatchSize() < preferMemCutoff) {
+ } else if (possibleIndex.getRowCount() / this.joinNode.getBatchSize() < preferMemCutoff) {
useIndex = true;
if (useIndex) {
@@ -329,6 +326,7 @@
return true;
this.repeatedMerge = true;
+ possibleIndex.setImplicitBuffer(ImplicitBuffer.FULL);
return true;
@@ -344,7 +342,7 @@
- if (this.sortedSource.getTupleBuffer().getRowCount() == 0) {
+ if (this.sortedSource.getRowCount() == 0) {
if (repeatedMerge) {
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java 2013-01-17 18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java 2013-01-30 17:30:23 UTC (rev 4546)
@@ -42,291 +42,321 @@
import org.teiid.query.sql.lang.JoinType;
import org.teiid.query.util.CommandContext;
* @since 4.2
public class JoinNode extends SubqueryAwareRelationalNode {
- static class BatchAvailableException extends RuntimeException {}
- static BatchAvailableException BATCH_AVILABLE = new BatchAvailableException();
- public enum JoinStrategyType {
+ static class BatchAvailableException extends RuntimeException {
- private enum State { LOAD_LEFT, LOAD_RIGHT, EXECUTE }
- private State state = State.LOAD_LEFT;
- private JoinStrategy joinStrategy;
- private JoinType joinType;
- private String dependentValueSource;
- private List leftExpressions;
- private List rightExpressions;
- private boolean leftDistinct;
- private boolean rightDistinct;
- private Criteria joinCriteria;
- private Map combinedElementMap;
- private int[] projectionIndexes;
- private DependentValueSource dvs;
- public JoinNode(int nodeID) {
- super(nodeID);
- }
- public void setJoinType(JoinType type) {
- this.joinType = type;
- }
- public JoinStrategy getJoinStrategy() {
- return this.joinStrategy;
- }
- public void setJoinStrategy(JoinStrategy joinStrategy) {
- this.joinStrategy = joinStrategy;
- }
+ static BatchAvailableException BATCH_AVILABLE = new BatchAvailableException();
- public void setJoinExpressions(List leftExpressions, List rightExpressions) {
- this.leftExpressions = leftExpressions;
- this.rightExpressions = rightExpressions;
- }
- public boolean isLeftDistinct() {
+ public enum JoinStrategyType {
+ }
+ private enum State {
+ }
+ private State state = State.LOAD_LEFT;
+ private JoinStrategy joinStrategy;
+ private JoinType joinType;
+ private String dependentValueSource;
+ private List leftExpressions;
+ private List rightExpressions;
+ private boolean leftDistinct;
+ private boolean rightDistinct;
+ private Criteria joinCriteria;
+ private Map combinedElementMap;
+ private int[] projectionIndexes;
+ private DependentValueSource dvs;
+ public JoinNode(int nodeID) {
+ super(nodeID);
+ }
+ public void setJoinType(JoinType type) {
+ this.joinType = type;
+ }
+ public JoinStrategy getJoinStrategy() {
+ return this.joinStrategy;
+ }
+ public void setJoinStrategy(JoinStrategy joinStrategy) {
+ this.joinStrategy = joinStrategy;
+ }
+ public void setJoinExpressions(List leftExpressions, List rightExpressions) {
+ this.leftExpressions = leftExpressions;
+ this.rightExpressions = rightExpressions;
+ }
+ public boolean isLeftDistinct() {
return leftDistinct;
- public void setLeftDistinct(boolean leftDistinct) {
+ public void setLeftDistinct(boolean leftDistinct) {
this.leftDistinct = leftDistinct;
- public boolean isRightDistinct() {
+ public boolean isRightDistinct() {
return rightDistinct;
- public void setRightDistinct(boolean rightDistinct) {
+ public void setRightDistinct(boolean rightDistinct) {
this.rightDistinct = rightDistinct;
- public void setJoinCriteria(Criteria joinCriteria) {
- this.joinCriteria = joinCriteria;
- }
- @Override
- public void initialize(CommandContext context, BufferManager bufferManager,
- ProcessorDataManager dataMgr) {
- super.initialize(context, bufferManager, dataMgr);
- if (this.combinedElementMap == null) {
- // Create element lookup map for evaluating project expressions
- List combinedElements = new ArrayList(getChildren()[0].getElements());
- combinedElements.addAll(getChildren()[1].getElements());
- this.combinedElementMap = createLookupMap(combinedElements);
- this.projectionIndexes = getProjectionIndexes(combinedElementMap, getElements());
- }
- }
- public void open()
- throws TeiidComponentException, TeiidProcessingException {
- // Set Up Join Strategy
- this.joinStrategy.initialize(this);
- joinStrategy.openLeft();
- if(!isDependent()) {
- joinStrategy.openRight();
- }
- this.state = State.LOAD_LEFT;
- }
- /**
- * @see org.teiid.query.processor.relational.RelationalNode#clone()
- * @since 4.2
- */
- public Object clone() {
- JoinNode clonedNode = new JoinNode(super.getID());
- super.copy(this, clonedNode);
- clonedNode.joinType = this.joinType;
- clonedNode.joinStrategy = this.joinStrategy.clone();
- clonedNode.joinCriteria = this.joinCriteria;
- clonedNode.leftExpressions = leftExpressions;
- clonedNode.rightExpressions = rightExpressions;
- clonedNode.dependentValueSource = this.dependentValueSource;
- clonedNode.rightDistinct = rightDistinct;
- clonedNode.leftDistinct = leftDistinct;
- return clonedNode;
- }
+ public void setJoinCriteria(Criteria joinCriteria) {
+ this.joinCriteria = joinCriteria;
+ }
- /**
- * @see org.teiid.query.processor.relational.RelationalNode#nextBatchDirect()
- * @since 4.2
- */
- protected TupleBatch nextBatchDirect() throws BlockedException,
- TeiidComponentException,
- TeiidProcessingException {
- if (state == State.LOAD_LEFT) {
- if (this.joinType != JoinType.JOIN_FULL_OUTER) {
- this.joinStrategy.leftSource.setImplicitBuffer(ImplicitBuffer.NONE);
- }
- //left child was already opened by the join node
- this.joinStrategy.loadLeft();
- if (isDependent()) {
- TupleBuffer buffer = this.joinStrategy.leftSource.getTupleBuffer();
- dvs = new DependentValueSource(buffer);
- dvs.setDistinct(this.joinStrategy.leftSource.isDistinct());
- this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource, dvs);
- }
- state = State.LOAD_RIGHT;
- }
- if (state == State.LOAD_RIGHT) {
- this.joinStrategy.openRight();
- this.joinStrategy.loadRight();
- state = State.EXECUTE;
- }
- try {
- this.joinStrategy.process();
- this.terminateBatches();
- } catch (BatchAvailableException e) {
- //pull the batch
- }
- return pullBatch();
- }
+ @Override
+ public void initialize(CommandContext context, BufferManager bufferManager,
+ ProcessorDataManager dataMgr) {
+ super.initialize(context, bufferManager, dataMgr);
- /**
- * @see org.teiid.query.processor.relational.RelationalNode#getDescriptionProperties()
- * @since 4.2
- */
- public PlanNode getDescriptionProperties() {
- // Default implementation - should be overridden
- PlanNode props = super.getDescriptionProperties();
- if(isDependent()) {
- props.addProperty(PROP_DEPENDENT, Boolean.TRUE.toString());
- }
- props.addProperty(PROP_JOIN_STRATEGY, this.joinStrategy.toString());
- props.addProperty(PROP_JOIN_TYPE, this.joinType.toString());
- List<String> critList = getCriteriaList();
- props.addProperty(PROP_JOIN_CRITERIA, critList);
- return props;
- }
+ if (this.combinedElementMap == null) {
+ // Create element lookup map for evaluating project expressions
+ List combinedElements = new ArrayList(
+ getChildren()[0].getElements());
+ combinedElements.addAll(getChildren()[1].getElements());
+ this.combinedElementMap = createLookupMap(combinedElements);
+ this.projectionIndexes = getProjectionIndexes(combinedElementMap,
+ getElements());
+ }
+ }
+ public void open() throws TeiidComponentException, TeiidProcessingException {
+ // Set Up Join Strategy
+ this.joinStrategy.initialize(this);
+ joinStrategy.openLeft();
+ if (!isDependent()) {
+ joinStrategy.openRight();
+ }
+ this.state = State.LOAD_LEFT;
+ }
+ /**
+ * @see org.teiid.query.processor.relational.RelationalNode#clone()
+ * @since 4.2
+ */
+ public Object clone() {
+ JoinNode clonedNode = new JoinNode(super.getID());
+ super.copy(this, clonedNode);
+ clonedNode.joinType = this.joinType;
+ clonedNode.joinStrategy = this.joinStrategy.clone();
+ clonedNode.joinCriteria = this.joinCriteria;
+ clonedNode.leftExpressions = leftExpressions;
+ clonedNode.rightExpressions = rightExpressions;
+ clonedNode.dependentValueSource = this.dependentValueSource;
+ clonedNode.rightDistinct = rightDistinct;
+ clonedNode.leftDistinct = leftDistinct;
+ return clonedNode;
+ }
+ /**
+ * @see org.teiid.query.processor.relational.RelationalNode#nextBatchDirect()
+ * @since 4.2
+ */
+ protected TupleBatch nextBatchDirect() throws BlockedException,
+ TeiidComponentException, TeiidProcessingException {
+ try {
+ if (state == State.LOAD_LEFT) {
+ if (this.joinType != JoinType.JOIN_FULL_OUTER) {
+ this.joinStrategy.leftSource
+ .setImplicitBuffer(ImplicitBuffer.NONE);
+ }
+ // left child was already opened by the join node
+ this.joinStrategy.loadLeft();
+ if (isDependent()) {
+ TupleBuffer buffer = this.joinStrategy.leftSource
+ .getTupleBuffer();
+ // the tuplebuffer may be from a lower node, so pass in the
+ // schema
+ dvs = new DependentValueSource(buffer);
+ dvs.setDistinct(this.joinStrategy.leftSource.isDistinct());
+ this.getContext().getVariableContext()
+ .setGlobalValue(this.dependentValueSource, dvs);
+ }
+ state = State.LOAD_RIGHT;
+ }
+ } catch (BlockedException e) {
+ if (!isDependent()) {
+ this.joinStrategy.openRight();
+ this.joinStrategy.loadRight();
+ }
+ throw e;
+ }
+ try {
+ if (state == State.LOAD_RIGHT) {
+ this.joinStrategy.openRight();
+ this.joinStrategy.loadRight();
+ state = State.EXECUTE;
+ }
+ this.joinStrategy.process();
+ this.terminateBatches();
+ } catch (BatchAvailableException e) {
+ // pull the batch
+ } catch (BlockedException e) {
+ // TODO: this leads to duplicate exceptions, we
+ // could track which side is blocking
+ try {
+ this.joinStrategy.leftSource.prefetch(true);
+ } catch (BlockedException e1) {
+ }
+ this.joinStrategy.rightSource.prefetch(true);
+ throw e;
+ }
+ return pullBatch();
+ }
+ /**
+ * @see org.teiid.query.processor.relational.RelationalNode#getDescriptionProperties()
+ * @since 4.2
+ */
+ public PlanNode getDescriptionProperties() {
+ // Default implementation - should be overridden
+ PlanNode props = super.getDescriptionProperties();
+ if (isDependent()) {
+ props.addProperty(PROP_DEPENDENT, Boolean.TRUE.toString());
+ }
+ props.addProperty(PROP_JOIN_STRATEGY, this.joinStrategy.toString());
+ props.addProperty(PROP_JOIN_TYPE, this.joinType.toString());
+ List<String> critList = getCriteriaList();
+ props.addProperty(PROP_JOIN_CRITERIA, critList);
+ return props;
+ }
private List<String> getCriteriaList() {
List<String> critList = new ArrayList<String>();
- if (leftExpressions != null) {
- for(int i=0; i < this.leftExpressions.size(); i++) {
- critList.add(this.leftExpressions.get(i).toString() + "=" + this.rightExpressions.get(i).toString()); //$NON-NLS-1$
- }
- }
- if (this.joinCriteria != null) {
- for (Criteria crit : (List<Criteria>)Criteria.separateCriteriaByAnd(joinCriteria)) {
- critList.add(crit.toString());
- }
- }
+ if (leftExpressions != null) {
+ for (int i = 0; i < this.leftExpressions.size(); i++) {
+ critList.add(this.leftExpressions.get(i).toString()
+ + "=" + this.rightExpressions.get(i).toString()); //$NON-NLS-1$
+ }
+ }
+ if (this.joinCriteria != null) {
+ for (Criteria crit : (List<Criteria>) Criteria
+ .separateCriteriaByAnd(joinCriteria)) {
+ critList.add(crit.toString());
+ }
+ }
return critList;
- /**
- * @see org.teiid.query.processor.relational.RelationalNode#getNodeString(java.lang.StringBuffer)
- * @since 4.2
- */
- protected void getNodeString(StringBuffer str) {
- str.append(getClassName());
- str.append("("); //$NON-NLS-1$
- str.append(getID());
- str.append(") [");//$NON-NLS-1$
- if(isDependent()) {
- str.append("Dependent] [");//$NON-NLS-1$
- }
- str.append(this.joinStrategy.toString());
- str.append("] [");//$NON-NLS-1$
- str.append(this.joinType.toString());
- str.append("]"); //$NON-NLS-1$
- if (getJoinType() != JoinType.JOIN_CROSS) {
- str.append(" criteria=").append(getCriteriaList()); //$NON-NLS-1$
- }
- str.append(" output="); //$NON-NLS-1$
- str.append(getElements());
- }
+ /**
+ * @see org.teiid.query.processor.relational.RelationalNode#getNodeString(java.lang.StringBuffer)
+ * @since 4.2
+ */
+ protected void getNodeString(StringBuffer str) {
+ str.append(getClassName());
+ str.append("("); //$NON-NLS-1$
+ str.append(getID());
+ str.append(") [");//$NON-NLS-1$
+ if (isDependent()) {
+ str.append("Dependent] [");//$NON-NLS-1$
+ }
+ str.append(this.joinStrategy.toString());
+ str.append("] [");//$NON-NLS-1$
+ str.append(this.joinType.toString());
+ str.append("]"); //$NON-NLS-1$
+ if (getJoinType() != JoinType.JOIN_CROSS) {
+ str.append(" criteria=").append(getCriteriaList()); //$NON-NLS-1$
+ }
+ str.append(" output="); //$NON-NLS-1$
+ str.append(getElements());
+ }
- /**
- * @return Returns the isDependent.
- */
- public boolean isDependent() {
- return this.dependentValueSource != null;
- }
- /**
- * @param isDependent The isDependent to set.
- */
- public void setDependentValueSource(String dependentValueSource) {
- this.dependentValueSource = dependentValueSource;
- }
- public String getDependentValueSourceName() {
+ /**
+ * @return Returns the isDependent.
+ */
+ public boolean isDependent() {
+ return this.dependentValueSource != null;
+ }
+ /**
+ * @param isDependent
+ * The isDependent to set.
+ */
+ public void setDependentValueSource(String dependentValueSource) {
+ this.dependentValueSource = dependentValueSource;
+ }
+ public String getDependentValueSourceName() {
return dependentValueSource;
- public void closeDirect() {
- super.closeDirect();
- joinStrategy.close();
- if (this.getContext() != null && this.dependentValueSource != null) {
- this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource, null);
- }
- this.dvs = null;
- }
- @Override
- public void reset() {
- super.reset();
- this.dvs = null;
- }
- public JoinType getJoinType() {
- return this.joinType;
- }
+ public void closeDirect() {
+ super.closeDirect();
+ joinStrategy.close();
+ if (this.getContext() != null && this.dependentValueSource != null) {
+ this.getContext().getVariableContext()
+ .setGlobalValue(this.dependentValueSource, null);
+ }
+ this.dvs = null;
+ }
- Map getCombinedElementMap() {
- return this.combinedElementMap;
- }
+ @Override
+ public void reset() {
+ super.reset();
+ this.dvs = null;
+ }
- public Criteria getJoinCriteria() {
- return this.joinCriteria;
- }
- boolean matchesCriteria(List outputTuple) throws BlockedException, TeiidComponentException, ExpressionEvaluationException {
- return (this.joinCriteria == null || getEvaluator(this.combinedElementMap).evaluate(this.joinCriteria, outputTuple));
- }
+ public JoinType getJoinType() {
+ return this.joinType;
+ }
- public List getLeftExpressions() {
- return this.leftExpressions;
- }
+ Map getCombinedElementMap() {
+ return this.combinedElementMap;
+ }
- public List getRightExpressions() {
- return this.rightExpressions;
- }
- @Override
- protected void addBatchRow(List row) {
- List projectTuple = projectTuple(this.projectionIndexes, row);
- super.addBatchRow(projectTuple);
- if (isBatchFull()) {
- }
- }
- public DependentValueSource getDependentValueSource() {
+ public Criteria getJoinCriteria() {
+ return this.joinCriteria;
+ }
+ boolean matchesCriteria(List outputTuple) throws BlockedException,
+ TeiidComponentException, ExpressionEvaluationException {
+ return (this.joinCriteria == null || getEvaluator(
+ this.combinedElementMap).evaluate(this.joinCriteria,
+ outputTuple));
+ }
+ public List getLeftExpressions() {
+ return this.leftExpressions;
+ }
+ public List getRightExpressions() {
+ return this.rightExpressions;
+ }
+ @Override
+ protected void addBatchRow(List row) {
+ List projectTuple = projectTuple(this.projectionIndexes, row);
+ super.addBatchRow(projectTuple);
+ if (isBatchFull()) {
+ }
+ }
+ public DependentValueSource getDependentValueSource() {
return dvs;
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java 2013-01-17 18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java 2013-01-30 17:30:23 UTC (rev 4546)
@@ -342,10 +342,10 @@
protected void loadRight() throws TeiidComponentException,
TeiidProcessingException {
- this.rightSource.sort(this.processingSortRight);
- if (this.joinNode.getJoinType() != JoinType.JOIN_FULL_OUTER) {
+ if (this.joinNode.getJoinType() != JoinType.JOIN_FULL_OUTER) {
+ this.rightSource.sort(this.processingSortRight);
public void setProcessingSortRight(boolean processingSortRight) {
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2013-01-17 18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2013-01-30 17:30:23 UTC (rev 4546)
@@ -25,13 +25,12 @@
import java.util.Collections;
import java.util.List;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.common.buffer.IndexedTupleSource;
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.TupleSource;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
-import org.teiid.query.processor.BatchCollector;
import org.teiid.query.processor.BatchIterator;
import org.teiid.query.processor.relational.MergeJoinStrategy.SortOption;
import org.teiid.query.processor.relational.SortUtility.Mode;
@@ -46,7 +45,6 @@
private RelationalNode source;
private List expressions;
- private BatchCollector collector;
private TupleBuffer buffer;
private List<TupleBuffer> buffers;
private List<Object> outerVals;
@@ -57,6 +55,7 @@
private boolean distinct;
private ImplicitBuffer implicitBuffer = ImplicitBuffer.FULL;
boolean open;
+ private BatchIterator prefetch;
private SortUtility sortUtility;
@@ -98,7 +97,7 @@
return currentTuple;
- public void reset() throws TeiidComponentException {
+ public void reset() throws TeiidComponentException, TeiidProcessingException {
this.currentTuple = null;
@@ -120,28 +119,104 @@
this.iterator = null;
+ this.prefetch = null;
this.currentTuple = null;
public int getRowCount() throws TeiidComponentException, TeiidProcessingException {
return this.getTupleBuffer().getRowCount();
+ /**
+ * Uses the prefetch logic to determine an incremental row count
+ */
+ public boolean rowCountLE(int count) throws TeiidComponentException, TeiidProcessingException {
+ if (buffer == null) {
+ prefetch(false);
+ }
+ while (buffer.getRowCount() <= count) {
+ if (prefetch == null) {
+ return true;
+ }
+ prefetch(false);
+ }
+ return false;
+ }
- IndexedTupleSource getIterator() throws TeiidComponentException {
+ IndexedTupleSource getIterator() throws TeiidComponentException, TeiidProcessingException {
if (this.iterator == null) {
- if (this.buffer != null) {
- iterator = buffer.createIndexedTupleSource();
+ if (this.buffer == null) {
+ getTupleBuffer(false);
+ }
+ if (this.prefetch != null) {
+ this.iterator = this.prefetch;
} else {
- // return a TupleBatch tuplesource iterator
- BatchIterator bi = new BatchIterator(this.source);
- if (implicitBuffer != ImplicitBuffer.NONE) {
- bi.setBuffer(createSourceTupleBuffer(), implicitBuffer == ImplicitBuffer.ON_MARK);
- }
- this.iterator = bi;
+ iterator = buffer.createIndexedTupleSource(implicitBuffer == ImplicitBuffer.NONE);
return this.iterator;
+ /**
+ * Create a batch iterator to perform basic prefetching
+ * @throws TeiidComponentException
+ */
+ private void createPrefetch() throws TeiidComponentException {
+ this.prefetch = new BatchIterator(this.source);
+ boolean useMark = implicitBuffer != ImplicitBuffer.FULL;
+ this.buffer = createSourceTupleBuffer();
+ this.prefetch.setBuffer(this.buffer, useMark);
+ if (useMark) {
+ this.prefetch.mark();
+ }
+ }
+ /**
+ * Pro-actively pull batches for later use.
+ * There are unfortunately quite a few cases to cover here.
+ */
+ protected void prefetch(boolean limit) throws TeiidComponentException, TeiidProcessingException {
+ if (this.prefetch == null) {
+ if (this.buffer != null) {
+ return;
+ }
+ if (this.sortUtility != null) {
+ sortUtility.sort();
+ return;
+ }
+ if (this.source.hasFinalBuffer()) {
+ this.buffer = this.source.getFinalBuffer();
+ return;
+ }
+ createPrefetch();
+ }
+ if (limit && this.buffer.getManagedRowCount() >= this.source.getBatchSize() * this.source.getContext().getOptions().getJoinPrefetchBatches()) {
+ return;
+ }
+ int curIndex = this.prefetch.getCurrentIndex();
+ boolean marked = false;
+ if (this.prefetch.ensureSave()) {
+ marked = true;
+ }
+ this.prefetch.setPosition(this.buffer.getRowCount() + 1);
+ BatchIterator bi = this.prefetch; //even if we clear the prefetch, we may already be using it as the iterator
+ try {
+ if (!this.prefetch.hasNext()) {
+ this.prefetch = null;
+ if (this.iterator != bi) {
+ bi = null;
+ }
+ }
+ } finally {
+ if (bi != null) {
+ if (marked) {
+ bi.reset();
+ } else {
+ bi.setPosition(curIndex);
+ }
+ }
+ }
+ }
public List<Object> getOuterVals() {
return this.outerVals;
@@ -164,14 +239,26 @@
public TupleBuffer getTupleBuffer() throws TeiidComponentException, TeiidProcessingException {
+ return getTupleBuffer(true);
+ }
+ private TupleBuffer getTupleBuffer(boolean full) throws TeiidComponentException, TeiidProcessingException {
if (this.buffer == null) {
if (this.iterator instanceof BatchIterator) {
throw new AssertionError("cannot buffer the source"); //$NON-NLS-1$
- if (collector == null) {
- collector = new BatchCollector(source, source.getBufferManager(), source.getContext(), false);
- }
- this.buffer = collector.collectTuples();
+ if (source.hasFinalBuffer()) {
+ this.buffer = source.getFinalBuffer();
+ return this.buffer;
+ }
+ this.implicitBuffer = ImplicitBuffer.FULL;
+ createPrefetch();
+ }
+ if (full && this.prefetch != null) {
+ while (this.prefetch.hasNext()) {
+ this.prefetch.setPosition(this.prefetch.getCurrentIndex() + this.source.getBatchSize());
+ }
+ this.prefetch = null; //fully buffered
return this.buffer;
@@ -192,7 +279,13 @@
TupleSource ts = null;
if (this.buffer != null) {
- ts = this.buffer.createIndexedTupleSource();
+ if (this.prefetch != null) {
+ this.prefetch.setPosition(1);
+ this.prefetch.disableSave();
+ ts = this.prefetch;
+ } else {
+ ts = this.buffer.createIndexedTupleSource();
+ }
} else {
ts = new BatchIterator(this.source);
@@ -208,12 +301,17 @@
- this.buffer = sortUtility.sort();
+ TupleBuffer sorted = sortUtility.sort();
+ if (this.buffer != null) {
+ this.buffer.remove();
+ }
+ this.prefetch = null;
+ this.buffer = sorted;
public boolean hasBuffer() {
- return this.buffer != null;
+ return this.buffer != null && this.prefetch == null;
public boolean nextBuffer() {
@@ -223,6 +321,7 @@
this.buffer = this.buffers.remove(this.buffers.size() - 1);
+ this.prefetch = null;
return true;
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/util/Options.java
--- branches/7.7.x/engine/src/main/java/org/teiid/query/util/Options.java 2013-01-17 18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/util/Options.java 2013-01-30 17:30:23 UTC (rev 4546)
@@ -31,10 +31,12 @@
public static final String UNNEST_DEFAULT = "org.teiid.subqueryUnnestDefault"; //$NON-NLS-1$
public static final String PUSHDOWN_DEFAULT_NULL_ORDER = "org.teiid.pushdownDefaultNullOrder"; //$NON-NLS-1$
+ public static final String JOIN_PREFETCH_BATCHES = "org.teiid.joinPrefetchBatches"; //$NON-NLS-1$
private Properties properties;
private boolean subqueryUnnestDefault;
private boolean pushdownDefaultNullOrder = true;
+ private int joinPrefetchBatches = 10;
public Properties getProperties() {
return properties;
@@ -69,5 +71,17 @@
this.pushdownDefaultNullOrder = p;
return this;
+ public void setJoinPrefetchBatches(int joinPrefetchBatches) {
+ this.joinPrefetchBatches = joinPrefetchBatches;
+ }
+ public int getJoinPrefetchBatches() {
+ return joinPrefetchBatches;
+ }
+ public Options joinPrefetchBatches(int i) {
+ this.joinPrefetchBatches = i;
+ return this;
+ }
Modified: branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
--- branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2013-01-17 18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2013-01-30 17:30:23 UTC (rev 4546)
@@ -332,13 +332,18 @@
assertEquals(ThreadState.IDLE, item.getThreadState());
assertTrue(item.resultsBuffer.getManagedRowCount() <= rowsPerBatch*23);
//pull the rest of the results
- for (int j = 0; j < 48; j++) {
- item = core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
- message = core.processCursorRequest(reqMsg.getExecutionId(), (j + 2) * rowsPerBatch + 1, rowsPerBatch);
- rm = message.get(5000, TimeUnit.MILLISECONDS);
- assertNull(rm.getException());
- assertEquals(rowsPerBatch, rm.getResultsList().size());
+ int start = 17;
+ while (true) {
+ item = core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
+ message = core.processCursorRequest(reqMsg.getExecutionId(), start, rowsPerBatch);
+ rm = message.get(5000, TimeUnit.MILLISECONDS);
+ assertNull(rm.getException());
+ assertTrue(rowsPerBatch >= rm.getResultsList().size());
+ start += rm.getResultsList().size();
+ if (rm.getFinalRow() == rm.getLastRow()) {
+ break;
+ }
Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java 2013-01-17 18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java 2013-01-30 17:30:23 UTC (rev 4546)
@@ -29,9 +29,9 @@
import org.junit.Test;
import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.common.buffer.BufferManagerFactory;
import org.teiid.common.buffer.TupleBuffer;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.query.processor.relational.FakeRelationalNode;
import org.teiid.query.sql.symbol.ElementSymbol;
@@ -107,4 +107,28 @@
+ @Test public void testDisableSave() throws Exception {
+ BatchIterator bi = new BatchIterator(new FakeRelationalNode(1, new List[] {
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ }, 2));
+ BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
+ TupleBuffer tb = bm.createTupleBuffer(Arrays.asList(new ElementSymbol("x")), "test", TupleSourceType.PROCESSOR);
+ bi.setBuffer(tb, false); //$NON-NLS-1$
+ bi.setPosition(2);
+ assertTrue(bi.hasNext());
+ tb.setForwardOnly(true);
+ bi.setPosition(1);
+ bi.disableSave();
+ for (int i = 0; i < 6; i++) {
+ assertNotNull(bi.nextTuple());
+ }
+ assertNull(bi.nextTuple());
+ assertEquals(0, tb.getManagedRowCount());
+ }
[View Less]
12 years, 1 month