Author: jolee
Date: 2013-05-14 11:12:39 -0400 (Tue, 14 May 2013)
New Revision: 4565
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
Log:
TEIID-2363: proactive buffering not occurring for the inner side of an outer join on
"MERGE JOIN (SORT/ALREADY_SORTED)"
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java 2013-05-14
14:53:25 UTC (rev 4564)
+++
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java 2013-05-14
15:12:39 UTC (rev 4565)
@@ -170,63 +170,61 @@
return clonedNode;
}
- /**
- * @see org.teiid.query.processor.relational.RelationalNode#nextBatchDirect()
- * @since 4.2
- */
- protected TupleBatch nextBatchDirect() throws BlockedException,
- TeiidComponentException, TeiidProcessingException {
- try {
- if (state == State.LOAD_LEFT) {
- if (this.joinType != JoinType.JOIN_FULL_OUTER) {
- this.joinStrategy.leftSource
- .setImplicitBuffer(ImplicitBuffer.NONE);
- }
- // left child was already opened by the join node
- this.joinStrategy.loadLeft();
- if (isDependent()) {
- TupleBuffer buffer = this.joinStrategy.leftSource
- .getTupleBuffer();
- // the tuplebuffer may be from a lower node, so pass in the
- // schema
- dvs = new DependentValueSource(buffer);
- dvs.setDistinct(this.joinStrategy.leftSource.isDistinct());
- this.getContext().getVariableContext()
- .setGlobalValue(this.dependentValueSource, dvs);
- }
- state = State.LOAD_RIGHT;
- }
- } catch (BlockedException e) {
- if (!isDependent()) {
- this.joinStrategy.openRight();
- this.joinStrategy.loadRight();
- }
- throw e;
- }
- try {
- if (state == State.LOAD_RIGHT) {
- this.joinStrategy.openRight();
- this.joinStrategy.loadRight();
- state = State.EXECUTE;
- }
- this.joinStrategy.process();
- this.terminateBatches();
- } catch (BatchAvailableException e) {
- // pull the batch
- } catch (BlockedException e) {
- // TODO: this leads to duplicate exceptions, we
- // could track which side is blocking
- try {
- this.joinStrategy.leftSource.prefetch(true);
- } catch (BlockedException e1) {
+ /**
+ * @see org.teiid.query.processor.relational.RelationalNode#nextBatchDirect()
+ * @since 4.2
+ */
+ protected TupleBatch nextBatchDirect() throws BlockedException,
+ TeiidComponentException,
+ TeiidProcessingException {
+ try {
+ if (state == State.LOAD_LEFT) {
+ if (this.joinType != JoinType.JOIN_FULL_OUTER) {
+ this.joinStrategy.leftSource.setImplicitBuffer(ImplicitBuffer.NONE);
+ }
+ //left child was already opened by the join node
+ this.joinStrategy.loadLeft();
+ if (isDependent()) {
+ TupleBuffer buffer = this.joinStrategy.leftSource.getTupleBuffer();
+ //the tuplebuffer may be from a lower node, so pass in the schema
+ dvs = new DependentValueSource(buffer);
+ dvs.setDistinct(this.joinStrategy.leftSource.isDistinct());
+
this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource, dvs);
+ }
+ state = State.LOAD_RIGHT;
+ }
+ } catch (BlockedException e) {
+ if (!isDependent()) {
+ this.joinStrategy.openRight();
+ this.joinStrategy.loadRight();
+ this.joinStrategy.rightSource.prefetch(true);
+ }
+ throw e;
+ }
+ try {
+ if (state == State.LOAD_RIGHT) {
+ this.joinStrategy.openRight();
+ this.joinStrategy.loadRight();
+ state = State.EXECUTE;
+ }
+ this.joinStrategy.process();
+ this.terminateBatches();
+ } catch (BatchAvailableException e) {
+ //pull the batch
+ } catch (BlockedException e) {
+ //TODO: this leads to duplicate exceptions, we
+ //could track which side is blocking
+ try {
+ this.joinStrategy.leftSource.prefetch(true);
+ } catch (BlockedException e1) {
+
+ }
+ this.joinStrategy.rightSource.prefetch(true);
+ throw e;
+ }
+ return pullBatch();
+ }
- }
- this.joinStrategy.rightSource.prefetch(true);
- throw e;
- }
- return pullBatch();
- }
-
/**
* @see org.teiid.query.processor.relational.RelationalNode#getDescriptionProperties()
* @since 4.2
Modified:
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
===================================================================
---
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java 2013-05-14
14:53:25 UTC (rev 4564)
+++
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java 2013-05-14
15:12:39 UTC (rev 4565)
@@ -69,7 +69,7 @@
protected JoinNode join;
protected JoinStrategy joinStrategy;
- private RelationalNode leftNode;
+ private BlockingFakeRelationalNode leftNode;
private RelationalNode rightNode;
private FakeDataManager dataMgr;
@@ -791,6 +791,52 @@
this.join.setJoinStrategy(joinStrategy);
helpTestJoinDirect(expected, 40, 1);
}
+
+ @Test public void testMergeJoinPrefetchAlreadySorted() throws Exception {
+ this.joinType = JoinType.JOIN_INNER;
+ int rows = 50;
+ List[] data = new List[rows];
+ for(int i=0; i<rows; i++) {
+ data[i] = new ArrayList();
+ Integer value = new Integer((i*17) % 47);
+ data[i].add(value);
+ }
+ this.leftTuples = data;
+ this.rightTuples = new List[] {
+ Arrays.asList(1),
+ Arrays.asList(2),
+ Arrays.asList(4),
+ Arrays.asList(6),
+ Arrays.asList(7),
+ Arrays.asList(8),
+ };
+ expected = new List[] {
+ Arrays.asList(new Object[] { 1, 1 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 6, 6 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 8, 8 }),
+ };
+ helpCreateJoin();
+ this.joinStrategy = new MergeJoinStrategy(SortOption.SORT,
SortOption.ALREADY_SORTED, false);
+ FakeRelationalNode newNode = new FakeRelationalNode(2, rightTuples) {
+ @Override
+ public TupleBatch nextBatchDirect() throws BlockedException,
+ TeiidComponentException, TeiidProcessingException {
+ TupleBatch tb = super.nextBatchDirect();
+ if (tb.getTerminationFlag()) {
+ assertFalse(leftNode.isClosed());
+ }
+ return tb;
+ }
+ };
+ newNode.setElements(rightNode.getElements());
+ rightNode = newNode;
+
+ this.join.setJoinStrategy(joinStrategy);
+ helpTestJoinDirect(expected, 5, 1);
+ }
@Test public void testRepeatedMerge() throws Exception {
helpTestRepeatedMerge(false);