Author: jolee
Date: 2013-10-22 10:52:22 -0400 (Tue, 22 Oct 2013)
New Revision: 4606
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestTextTable.java
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
Log:
TEIID-2363: (modified) backout of prefetch logic introducing Assertion error
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2013-10-22
14:52:13 UTC (rev 4605)
+++
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2013-10-22
14:52:22 UTC (rev 4606)
@@ -40,6 +40,7 @@
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
+import org.teiid.query.optimizer.relational.rules.NewCalculateCostUtil;
import org.teiid.query.processor.relational.SourceState.ImplicitBuffer;
import org.teiid.query.sql.lang.JoinType;
import org.teiid.query.sql.lang.OrderBy;
@@ -235,7 +236,9 @@
}
private boolean shouldIndexIfSmall(SourceState source) throws
TeiidComponentException, TeiidProcessingException {
- return source.rowCountLE(source.getSource().getBatchSize() / 2);
+ Number cardinality = source.getSource().getEstimateNodeCardinality();
+ return (source.hasBuffer() || (cardinality != null &&
cardinality.floatValue() != NewCalculateCostUtil.UNKNOWN_VALUE &&
cardinality.floatValue() <= source.getSource().getBatchSize() / 4))
+ && (source.getRowCount() <= source.getSource().getBatchSize() / 2);
}
@Override
@@ -304,26 +307,7 @@
}
private boolean shouldIndex(SourceState possibleIndex, SourceState other) throws
TeiidComponentException, TeiidProcessingException {
- long size = joinNode.getBatchSize();
- int indexSize = possibleIndex.hasBuffer()?possibleIndex.getRowCount():-1;
- int otherSize = other.hasBuffer()?other.getRowCount():-1;
- //determine sizes in an incremental fashion as to avoid a full buffer of the
unsorted side
- while (size < Integer.MAX_VALUE && (indexSize == -1 || otherSize == -1))
{
- if (indexSize == -1 && (possibleIndex.rowCountLE((int)size) ||
possibleIndex.hasBuffer())) {
- indexSize = possibleIndex.getRowCount();
- }
- if (otherSize == -1 && (other.rowCountLE((int)size) || other.hasBuffer()))
{
- otherSize = other.getRowCount();
- }
- if (indexSize == -1 && otherSize != -1 && size * 4 > otherSize)
{
- return false;
- }
- if (indexSize != -1 && otherSize == -1 && indexSize * 4 <= size)
{
- break;
- }
- size *=2;
- }
- if ((size > Integer.MAX_VALUE && (indexSize == -1 || otherSize == -1)) ||
(indexSize != -1 && otherSize != -1 && indexSize * 4 > otherSize)) {
+ if (possibleIndex.getRowCount() * 4 > other.getRowCount()) {
return false; //index is too large
}
int schemaSize =
this.joinNode.getBufferManager().getSchemaSize(other.getSource().getOutputElements());
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java 2013-10-22
14:52:13 UTC (rev 4605)
+++
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java 2013-10-22
14:52:22 UTC (rev 4606)
@@ -197,7 +197,6 @@
if (!isDependent()) {
this.joinStrategy.openRight();
this.joinStrategy.loadRight();
- this.joinStrategy.rightSource.prefetch(true);
}
throw e;
}
@@ -211,16 +210,6 @@
this.terminateBatches();
} catch (BatchAvailableException e) {
//pull the batch
- } catch (BlockedException e) {
- //TODO: this leads to duplicate exceptions, we
- //could track which side is blocking
- try {
- this.joinStrategy.leftSource.prefetch(true);
- } catch (BlockedException e1) {
-
- }
- this.joinStrategy.rightSource.prefetch(true);
- throw e;
}
return pullBatch();
}
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2013-10-22
14:52:13 UTC (rev 4605)
+++
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2013-10-22
14:52:22 UTC (rev 4606)
@@ -31,6 +31,7 @@
import org.teiid.common.buffer.TupleSource;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
+import org.teiid.query.processor.BatchCollector;
import org.teiid.query.processor.BatchIterator;
import org.teiid.query.processor.relational.MergeJoinStrategy.SortOption;
import org.teiid.query.processor.relational.SortUtility.Mode;
@@ -45,6 +46,7 @@
private RelationalNode source;
private List expressions;
+ private BatchCollector collector;
private TupleBuffer buffer;
private List<TupleBuffer> buffers;
private List<Object> outerVals;
@@ -55,7 +57,6 @@
private boolean distinct;
private ImplicitBuffer implicitBuffer = ImplicitBuffer.FULL;
boolean open;
- private BatchIterator prefetch;
private SortUtility sortUtility;
@@ -119,107 +120,28 @@
this.iterator.closeSource();
this.iterator = null;
}
- this.prefetch = null;
this.currentTuple = null;
}
public int getRowCount() throws TeiidComponentException, TeiidProcessingException {
return this.getTupleBuffer().getRowCount();
}
-
- /**
- * Uses the prefetch logic to determine an incremental row count
- */
- public boolean rowCountLE(int count) throws TeiidComponentException,
TeiidProcessingException {
- if (buffer == null) {
- prefetch(false);
- }
- while (buffer.getRowCount() <= count) {
- if (prefetch == null) {
- return true;
- }
- prefetch(false);
- }
- return false;
- }
- IndexedTupleSource getIterator() throws TeiidComponentException,
TeiidProcessingException {
+ IndexedTupleSource getIterator() throws TeiidComponentException {
if (this.iterator == null) {
- if (this.buffer == null) {
- getTupleBuffer(false);
- }
- if (this.prefetch != null) {
- this.iterator = this.prefetch;
+ if (this.buffer != null) {
+ iterator = buffer.createIndexedTupleSource();
} else {
- iterator = buffer.createIndexedTupleSource(implicitBuffer ==
ImplicitBuffer.NONE);
+ // return a TupleBatch tuplesource iterator
+ BatchIterator bi = new BatchIterator(this.source);
+ if (implicitBuffer != ImplicitBuffer.NONE) {
+ bi.setBuffer(createSourceTupleBuffer(), implicitBuffer ==
ImplicitBuffer.ON_MARK);
+ }
+ this.iterator = bi;
}
}
return this.iterator;
}
-
- /**
- * Create a batch iterator to perform basic prefetching
- * @throws TeiidComponentException
- */
- private void createPrefetch() throws TeiidComponentException {
- this.prefetch = new BatchIterator(this.source);
- boolean useMark = implicitBuffer != ImplicitBuffer.FULL;
- this.buffer = createSourceTupleBuffer();
- this.prefetch.setBuffer(this.buffer, useMark);
- if (useMark) {
- this.prefetch.mark();
- }
- }
-
- /**
- * Pro-actively pull batches for later use.
- * There are unfortunately quite a few cases to cover here.
- */
- protected void prefetch(boolean limit) throws TeiidComponentException,
TeiidProcessingException {
- if (!open) {
- return;
- }
- if (this.prefetch == null) {
- if (this.buffer != null) {
- return;
- }
- if (this.sortUtility != null) {
- sortUtility.sort();
- return;
- }
- if (this.source.hasFinalBuffer()) {
- this.buffer = this.source.getFinalBuffer();
- return;
- }
- createPrefetch();
- }
- if (limit && this.buffer.getManagedRowCount() >=
this.source.getBatchSize() *
this.source.getContext().getOptions().getJoinPrefetchBatches()) {
- return;
- }
- int curIndex = this.prefetch.getCurrentIndex();
- boolean marked = false;
- if (this.prefetch.ensureSave()) {
- marked = true;
- }
- this.prefetch.setPosition(this.buffer.getRowCount() + 1);
- BatchIterator bi = this.prefetch; //even if we clear the prefetch, we may already be
using it as the iterator
- try {
- if (!this.prefetch.hasNext()) {
- this.prefetch = null;
- if (this.iterator != bi) {
- bi = null;
- }
- }
- } finally {
- if (bi != null) {
- if (marked) {
- bi.reset();
- } else {
- bi.setPosition(curIndex);
- }
- }
- }
- }
public List<Object> getOuterVals() {
return this.outerVals;
@@ -242,26 +164,14 @@
}
public TupleBuffer getTupleBuffer() throws TeiidComponentException,
TeiidProcessingException {
- return getTupleBuffer(true);
- }
-
- private TupleBuffer getTupleBuffer(boolean full) throws TeiidComponentException,
TeiidProcessingException {
if (this.buffer == null) {
if (this.iterator instanceof BatchIterator) {
throw new AssertionError("cannot buffer the source"); //$NON-NLS-1$
}
- if (source.hasFinalBuffer()) {
- this.buffer = source.getFinalBuffer();
- return this.buffer;
- }
- this.implicitBuffer = ImplicitBuffer.FULL;
- createPrefetch();
- }
- if (full && this.prefetch != null) {
- while (this.prefetch.hasNext()) {
- this.prefetch.setPosition(this.prefetch.getCurrentIndex() +
this.source.getBatchSize());
- }
- this.prefetch = null; //fully buffered
+ if (collector == null) {
+ collector = new BatchCollector(source, source.getBufferManager(),
source.getContext(), false);
+ }
+ this.buffer = collector.collectTuples();
}
return this.buffer;
}
@@ -282,13 +192,7 @@
TupleSource ts = null;
if (this.buffer != null) {
this.buffer.setForwardOnly(true);
- if (this.prefetch != null) {
- this.prefetch.setPosition(1);
- this.prefetch.disableSave();
- ts = this.prefetch;
- } else {
- ts = this.buffer.createIndexedTupleSource();
- }
+ ts = this.buffer.createIndexedTupleSource();
} else {
ts = new BatchIterator(this.source);
}
@@ -309,13 +213,12 @@
if (this.buffer != null && this.buffer != sorted) {
this.buffer.remove();
}
- this.prefetch = null;
this.buffer = sorted;
this.markDistinct(sortUtility.isDistinct());
}
public boolean hasBuffer() {
- return this.buffer != null && this.prefetch == null;
+ return this.buffer != null;
}
public boolean nextBuffer() {
@@ -325,7 +228,6 @@
}
this.buffer = this.buffers.remove(this.buffers.size() - 1);
this.buffer.setForwardOnly(false);
- this.prefetch = null;
this.resetState();
return true;
}
Modified:
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestTextTable.java
===================================================================
---
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestTextTable.java 2013-10-22
14:52:13 UTC (rev 4605)
+++
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestTextTable.java 2013-10-22
14:52:22 UTC (rev 4606)
@@ -285,18 +285,18 @@
@Test public void testTextTableJoin() throws Exception {
String sql = "select z.* from (select x.* from (select * from pm1.g1 where e1 =
'c') y, texttable(e1 || '\n' || e2 || '\n' || e3 COLUMNS x string)
x) as z, " +
- "(select x.* from (select * from pm1.g1 where e1 = 'c') y, texttable(e1
|| '\n' || e2 || '\n' || e3 COLUMNS x string) x) as z1 where z.x =
z1.x";
+ "(select x.* from (select * from pm1.g1 where e1 = 'c') y, texttable(e1
|| '\n' || e2 || '\n' || e3 COLUMNS x string) x) as z1 where z.x = z1.x
order by z.x";
List[] expected = new List[] {
+ Arrays.asList("1"),
Arrays.asList("c"),
- Arrays.asList("1"),
Arrays.asList("true"),
};
FakeDataManager dataManager = new FakeDataManager();
sampleData1(dataManager);
RelationalPlan plan = (RelationalPlan)helpGetPlan(helpParse(sql),
RealMetadataFactory.example1Cached());
- JoinNode join = (JoinNode) plan.getRootNode().getChildren()[0];
+ JoinNode join = (JoinNode) plan.getRootNode().getChildren()[0].getChildren()[0];
assertTrue(!(join.getJoinStrategy() instanceof NestedTableJoinStrategy));
helpProcess(plan, createCommandContext(), dataManager, expected);
}
Modified:
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
===================================================================
---
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java 2013-10-22
14:52:13 UTC (rev 4605)
+++
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java 2013-10-22
14:52:22 UTC (rev 4606)
@@ -792,52 +792,6 @@
helpTestJoinDirect(expected, 40, 1);
}
- @Test public void testMergeJoinPrefetchAlreadySorted() throws Exception {
- this.joinType = JoinType.JOIN_INNER;
- int rows = 50;
- List[] data = new List[rows];
- for(int i=0; i<rows; i++) {
- data[i] = new ArrayList();
- Integer value = new Integer((i*17) % 47);
- data[i].add(value);
- }
- this.leftTuples = data;
- this.rightTuples = new List[] {
- Arrays.asList(1),
- Arrays.asList(2),
- Arrays.asList(4),
- Arrays.asList(6),
- Arrays.asList(7),
- Arrays.asList(8),
- };
- expected = new List[] {
- Arrays.asList(new Object[] { 1, 1 }),
- Arrays.asList(new Object[] { 2, 2 }),
- Arrays.asList(new Object[] { 4, 4 }),
- Arrays.asList(new Object[] { 6, 6 }),
- Arrays.asList(new Object[] { 7, 7 }),
- Arrays.asList(new Object[] { 8, 8 }),
- };
- helpCreateJoin();
- this.joinStrategy = new MergeJoinStrategy(SortOption.SORT,
SortOption.ALREADY_SORTED, false);
- FakeRelationalNode newNode = new FakeRelationalNode(2, rightTuples) {
- @Override
- public TupleBatch nextBatchDirect() throws BlockedException,
- TeiidComponentException, TeiidProcessingException {
- TupleBatch tb = super.nextBatchDirect();
- if (tb.getTerminationFlag()) {
- assertFalse(leftNode.isClosed());
- }
- return tb;
- }
- };
- newNode.setElements(rightNode.getElements());
- rightNode = newNode;
-
- this.join.setJoinStrategy(joinStrategy);
- helpTestJoinDirect(expected, 5, 1);
- }
-
@Test public void testRepeatedMerge() throws Exception {
helpTestRepeatedMerge(false);
}