[teiid-commits] teiid SVN: r4612 - in branches/7.7.x/engine/src: main/java/org/teiid/query/processor and 3 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Thu Nov 7 12:07:53 EST 2013
Author: jolee
Date: 2013-11-07 12:07:53 -0500 (Thu, 07 Nov 2013)
New Revision: 4612
Added:
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchCollector.java
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
Log:
TEIID-2707: proactive buffering on an enhanced sort merge left outer join results in an assertionerror
Modified: branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -628,7 +628,7 @@
lock.lock();
try {
//don't wait for more than is available
- int waitCount = Math.min(additional, this.getMaxReserveKB() - reservedByThread.get());
+ int waitCount = Math.min(additional, this.getMaxReserveKB()<<10 - reservedByThread.get());
int committed = 0;
while (waitCount > 0 && waitCount > this.reserveBatchBytes.get() && committed < additional) {
long reserveBatchSample = this.reserveBatchBytes.get();
@@ -645,6 +645,8 @@
int result = noWaitReserve(additional - committed, false);
committed += result;
}
+ int result = noWaitReserve(additional - committed, false);
+ committed += result;
return committed;
} finally {
lock.unlock();
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchCollector.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchCollector.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -130,6 +130,10 @@
}
public TupleBuffer collectTuples() throws TeiidComponentException, TeiidProcessingException {
+ return collectTuples(false);
+ }
+
+ public TupleBuffer collectTuples(boolean singleBatch) throws TeiidComponentException, TeiidProcessingException {
TupleBatch batch = null;
while(!done) {
if (this.sourceNode.hasFinalBuffer()) {
@@ -156,6 +160,10 @@
}
break;
}
+
+ if (singleBatch) {
+ return null;
+ }
}
return buffer;
}
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -146,22 +146,44 @@
* non-destructive method to set the mark
* @return true if the mark was set
*/
- public boolean ensureSave() {
- if (!saveOnMark || mark) {
- return false;
- }
- mark = true;
- return true;
- }
-
- public void disableSave() {
- if (buffer != null) {
- this.saveOnMark = true;
- this.mark = false;
- if (batch != null && batch.getEndRow() <= this.buffer.getRowCount()) {
- this.batch = null;
- }
- }
- }
-
+ public boolean ensureSave() {
+ if (!saveOnMark || mark) {
+ return false;
+ }
+ mark = true;
+ return true;
+ }
+
+ public void disableSave() {
+ if (buffer != null) {
+ this.saveOnMark = true;
+ this.mark = false;
+ if (batch != null && batch.getEndRow() <= this.buffer.getRowCount()) {
+ this.batch = null;
+ }
+ }
+ }
+
+ public void readAhead(long limit) throws TeiidComponentException, TeiidProcessingException {
+ if (buffer == null || done) {
+ return;
+ }
+ if (this.buffer.getManagedRowCount() > limit) {
+ return;
+ }
+ if (this.batch != null && this.buffer.getRowCount() < this.batch.getEndRow()) {
+ //haven't saved already
+ this.buffer.addTupleBatch(this.batch, true);
+ }
+ TupleBatch tb = source.nextBatch();
+ done = tb.getTerminationFlag();
+ this.buffer.addTupleBatch(tb, true);
+ if (done) {
+ this.buffer.close();
+ }
+ }
+
+ public TupleBuffer getBuffer() {
+ return buffer;
+ }
}
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -40,7 +40,6 @@
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
-import org.teiid.query.optimizer.relational.rules.NewCalculateCostUtil;
import org.teiid.query.processor.relational.SourceState.ImplicitBuffer;
import org.teiid.query.sql.lang.JoinType;
import org.teiid.query.sql.lang.OrderBy;
@@ -236,9 +235,7 @@
}
private boolean shouldIndexIfSmall(SourceState source) throws TeiidComponentException, TeiidProcessingException {
- Number cardinality = source.getSource().getEstimateNodeCardinality();
- return (source.hasBuffer() || (cardinality != null && cardinality.floatValue() != NewCalculateCostUtil.UNKNOWN_VALUE && cardinality.floatValue() <= source.getSource().getBatchSize() / 4))
- && (source.getRowCount() <= source.getSource().getBatchSize() / 2);
+ return source.rowCountLE(source.getSource().getBatchSize() / 2);
}
@Override
@@ -307,7 +304,26 @@
}
private boolean shouldIndex(SourceState possibleIndex, SourceState other) throws TeiidComponentException, TeiidProcessingException {
- if (possibleIndex.getRowCount() * 4 > other.getRowCount()) {
+ long size = joinNode.getBatchSize();
+ int indexSize = possibleIndex.hasBuffer()?possibleIndex.getRowCount():-1;
+ int otherSize = other.hasBuffer()?other.getRowCount():-1;
+ //determine sizes in an incremental fashion as to avoid a full buffer of the unsorted side
+ while (size < Integer.MAX_VALUE && (indexSize == -1 || otherSize == -1)) {
+ if (indexSize == -1 && (possibleIndex.rowCountLE((int)size) || possibleIndex.hasBuffer())) {
+ indexSize = possibleIndex.getRowCount();
+ }
+ if (otherSize == -1 && (other.rowCountLE((int)size) || other.hasBuffer())) {
+ otherSize = other.getRowCount();
+ }
+ if (indexSize == -1 && otherSize != -1 && size * 4 > otherSize) {
+ return false;
+ }
+ if (indexSize != -1 && otherSize == -1 && indexSize * 4 <= size) {
+ break;
+ }
+ size *=2;
+ }
+ if ((size > Integer.MAX_VALUE && (indexSize == -1 || otherSize == -1)) || (indexSize != -1 && otherSize != -1 && indexSize * 4 > otherSize)) {
return false; //index is too large
}
int schemaSize = this.joinNode.getBufferManager().getSchemaSize(other.getSource().getOutputElements());
@@ -320,7 +336,7 @@
boolean useIndex = false;
int indexSchemaSize = this.joinNode.getBufferManager().getSchemaSize(possibleIndex.getSource().getOutputElements());
//approximate that 1/2 of the index will be memory resident
- toReserve = (int)(indexSchemaSize * possibleIndex.getRowCount() / (possibleIndex.getSource().getBatchSize() * .5));
+ toReserve = (int)(indexSchemaSize * possibleIndex.getRowCount() / (possibleIndex.getSource().getBatchSize()));
if (toReserve < this.joinNode.getBufferManager().getMaxProcessingSize()) {
useIndex = true;
} else if (possibleIndex.getRowCount() / this.joinNode.getBatchSize() < preferMemCutoff) {
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -197,6 +197,7 @@
if (!isDependent()) {
this.joinStrategy.openRight();
this.joinStrategy.loadRight();
+ prefetch(this.joinStrategy.rightSource, this.joinStrategy.leftSource);
}
throw e;
}
@@ -210,28 +211,43 @@
this.terminateBatches();
} catch (BatchAvailableException e) {
//pull the batch
+ } catch (BlockedException e) {
+ //TODO: this leads to duplicate exceptions, we
+ //could track which side is blocking
+ try {
+ prefetch(this.joinStrategy.leftSource, this.joinStrategy.rightSource);
+ } catch (BlockedException e1) {
+
+ }
+ prefetch(this.joinStrategy.rightSource, this.joinStrategy.leftSource);
+ throw e;
}
return pullBatch();
}
- /**
- * @see org.teiid.query.processor.relational.RelationalNode#getDescriptionProperties()
- * @since 4.2
- */
- public PlanNode getDescriptionProperties() {
- // Default implementation - should be overridden
- PlanNode props = super.getDescriptionProperties();
-
- if (isDependent()) {
- props.addProperty(PROP_DEPENDENT, Boolean.TRUE.toString());
- }
- props.addProperty(PROP_JOIN_STRATEGY, this.joinStrategy.toString());
- props.addProperty(PROP_JOIN_TYPE, this.joinType.toString());
- List<String> critList = getCriteriaList();
- props.addProperty(PROP_JOIN_CRITERIA, critList);
- return props;
+ private void prefetch(SourceState toFetch, SourceState other) throws TeiidComponentException,
+ TeiidProcessingException {
+ toFetch.prefetch(Math.max(1l, other.getIncrementalRowCount(false)/other.getSource().getBatchSize())*toFetch.getSource().getBatchSize());
}
+ /**
+ * @see org.teiid.query.processor.relational.RelationalNode#getDescriptionProperties()
+ * @since 4.2
+ */
+ public PlanNode getDescriptionProperties() {
+ // Default implementation - should be overridden
+ PlanNode props = super.getDescriptionProperties();
+
+ if(isDependent()) {
+ props.addProperty(PROP_DEPENDENT, Boolean.TRUE.toString());
+ }
+ props.addProperty(PROP_JOIN_STRATEGY, this.joinStrategy.toString());
+ props.addProperty(PROP_JOIN_TYPE, this.joinType.toString());
+ List<String> critList = getCriteriaList();
+ props.addProperty(PROP_JOIN_CRITERIA, critList);
+ return props;
+ }
+
private List<String> getCriteriaList() {
List<String> critList = new ArrayList<String>();
if (leftExpressions != null) {
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -632,5 +632,20 @@
}
throw e;
}
+
+ public boolean hasBuffer(boolean b) {
+ return false;
+ }
+
+ /**
+ * return the final tuple buffer or null if not available
+ * @return
+ * @throws TeiidProcessingException
+ * @throws TeiidComponentException
+ * @throws BlockedException
+ */
+ public TupleBuffer getBuffer(int maxRows) throws BlockedException, TeiidComponentException, TeiidProcessingException {
+ return null;
+ }
}
\ No newline at end of file
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -31,11 +31,11 @@
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.BufferManager.BufferReserveMode;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.common.buffer.IndexedTupleSource;
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.TupleSource;
-import org.teiid.common.buffer.BufferManager.BufferReserveMode;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.util.Assertion;
@@ -301,7 +301,7 @@
TupleBuffer merged = createTupleBuffer();
- int desiredSpace = activeTupleBuffers.size() * schemaSize;
+ int desiredSpace = (int)(activeTupleBuffers.size() * (long)schemaSize);
int reserved = Math.min(desiredSpace, Math.max(2*schemaSize, this.bufferManager.getMaxProcessingSize()));
bufferManager.reserveBuffers(reserved, BufferReserveMode.FORCE);
if (desiredSpace > reserved) {
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -31,6 +31,7 @@
import org.teiid.common.buffer.TupleSource;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.util.Assertion;
import org.teiid.query.processor.BatchCollector;
import org.teiid.query.processor.BatchIterator;
import org.teiid.query.processor.relational.MergeJoinStrategy.SortOption;
@@ -76,6 +77,10 @@
this.implicitBuffer = implicitBuffer;
}
+ public ImplicitBuffer getImplicitBuffer() {
+ return implicitBuffer;
+ }
+
static int[] getExpressionIndecies(List expressions,
List elements) {
if (expressions == null) {
@@ -126,15 +131,37 @@
public int getRowCount() throws TeiidComponentException, TeiidProcessingException {
return this.getTupleBuffer().getRowCount();
}
+
+ /**
+ * Uses the prefetch logic to determine an incremental row count
+ */
+ public boolean rowCountLE(long count) throws TeiidComponentException, TeiidProcessingException {
+ if (buffer != null) {
+ return buffer.getRowCount() <= count;
+ }
+ if (iterator != null || this.sortUtility != null) {
+ throw new IllegalStateException();
+ }
+ while (buffer == null) {
+ if (getIncrementalRowCount(true) > count) {
+ return false;
+ }
+ prefetch(Long.MAX_VALUE);
+ }
+ return buffer.getRowCount() <= count;
+ }
IndexedTupleSource getIterator() throws TeiidComponentException {
if (this.iterator == null) {
- if (this.buffer != null) {
+ if (this.buffer != null) {
iterator = buffer.createIndexedTupleSource();
} else {
// return a TupleBatch tuplesource iterator
BatchIterator bi = new BatchIterator(this.source);
- if (implicitBuffer != ImplicitBuffer.NONE) {
+ if (this.collector != null) {
+ bi.setBuffer(this.collector.getTupleBuffer(), implicitBuffer == ImplicitBuffer.ON_MARK);
+ this.collector = null;
+ } else if (implicitBuffer != ImplicitBuffer.NONE) {
bi.setBuffer(createSourceTupleBuffer(), implicitBuffer == ImplicitBuffer.ON_MARK);
}
this.iterator = bi;
@@ -142,6 +169,36 @@
}
return this.iterator;
}
+
+ /**
+ * Pro-actively pull batches for later use.
+ * There are unfortunately quite a few cases to cover here.
+ */
+ protected void prefetch(long limit) throws TeiidComponentException, TeiidProcessingException {
+ if (!open) {
+ return;
+ }
+ if (this.buffer == null) {
+ if (this.sortUtility != null) {
+ return;
+ }
+ if (this.iterator != null) {
+ ((BatchIterator)this.iterator).readAhead(limit);
+ return;
+ }
+ if (source.hasBuffer(true)) {
+ this.buffer = source.getBuffer(-1);
+ return;
+ }
+ if (collector == null) {
+ collector = new BatchCollector(source, source.getBufferManager(), source.getContext(), false);
+ }
+ if (collector.getTupleBuffer().getManagedRowCount() >= limit) {
+ return;
+ }
+ this.buffer = collector.collectTuples(true);
+ }
+ }
public List<Object> getOuterVals() {
return this.outerVals;
@@ -168,11 +225,16 @@
if (this.iterator instanceof BatchIterator) {
throw new AssertionError("cannot buffer the source"); //$NON-NLS-1$
}
+ if (source.hasBuffer(true)) {
+ this.buffer = source.getBuffer(-1);
+ Assertion.assertTrue(this.buffer.isFinal());
+ return this.buffer;
+ }
if (collector == null) {
collector = new BatchCollector(source, source.getBufferManager(), source.getContext(), false);
}
this.buffer = collector.collectTuples();
- }
+ }
return this.buffer;
}
@@ -190,6 +252,11 @@
}
if (this.sortUtility == null) {
TupleSource ts = null;
+ if (source.hasBuffer(true)) {
+ this.buffer = source.getBuffer(-1);
+ } else if (this.buffer == null && this.collector != null) {
+ this.buffer = this.collector.collectTuples();
+ }
if (this.buffer != null) {
this.buffer.setForwardOnly(true);
ts = this.buffer.createIndexedTupleSource();
@@ -213,12 +280,12 @@
if (this.buffer != null && this.buffer != sorted) {
this.buffer.remove();
}
- this.buffer = sorted;
+ this.buffer = sorted;
this.markDistinct(sortUtility.isDistinct());
}
public boolean hasBuffer() {
- return this.buffer != null;
+ return this.buffer != null || this.source.hasBuffer(true);
}
public boolean nextBuffer() {
@@ -243,5 +310,30 @@
this.currentTuple = null;
this.maxProbeMatch = 1;
}
+
+ public void setMaxProbePosition() throws TeiidComponentException, TeiidProcessingException {
+ this.getIterator().setPosition(this.getMaxProbeMatch());
+ this.currentTuple = null;
+ }
+
+ public int getIncrementalRowCount(boolean low) {
+ if (this.buffer != null) {
+ return this.buffer.getRowCount();
+ }
+ if (this.collector != null) {
+ return this.collector.getTupleBuffer().getRowCount();
+ }
+ if (sortUtility == null) {
+ if (this.iterator instanceof BatchIterator) {
+ TupleBuffer tb = ((BatchIterator)this.iterator).getBuffer();
+ if (tb != null) {
+ return tb.getRowCount();
+ }
+ //TODO: should estimate the rows
+ }
+ //TODO: should estimate the rows based upon what is being fed into the sort
+ }
+ return low?0:Integer.MAX_VALUE;
+ }
}
Added: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchCollector.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchCollector.java (rev 0)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchCollector.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -0,0 +1,53 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.query.processor;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import org.teiid.common.buffer.BufferManagerFactory;
+import org.teiid.core.types.DataTypeManager;
+import org.teiid.query.processor.relational.FakeRelationalNode;
+import org.teiid.query.sql.symbol.ElementSymbol;
+import org.teiid.query.util.CommandContext;
+
+ at SuppressWarnings("nls")
+public class TestBatchCollector {
+
+ @Test public void testCollect() throws Exception {
+ FakeRelationalNode sourceNode = new FakeRelationalNode(1, new List[] {
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1)
+ }, 1);
+ sourceNode.setElements(Arrays.asList(new ElementSymbol("x", null, DataTypeManager.DefaultDataClasses.INTEGER)));
+ BatchCollector bc = new BatchCollector(sourceNode, BufferManagerFactory.getStandaloneBufferManager(), new CommandContext(), false);
+ bc.collectTuples(true);
+ assertEquals(1, bc.getTupleBuffer().getManagedRowCount());
+ assertEquals(3, bc.collectTuples().getRowCount());
+ }
+
+}
Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -132,4 +132,62 @@
assertEquals(0, tb.getManagedRowCount());
}
+ @Test public void testReadAhead() throws Exception {
+ BatchIterator bi = new BatchIterator(new FakeRelationalNode(1, new List[] {
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ }, 2));
+ BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
+ TupleBuffer tb = bm.createTupleBuffer(Arrays.asList(new ElementSymbol("x", null, DataTypeManager.DefaultDataClasses.INTEGER)), "test", TupleSourceType.PROCESSOR);
+ bi.setBuffer(tb, false); //$NON-NLS-1$
+ bi.nextTuple();
+ assertEquals(1, bi.available());
+ assertEquals(2, bi.getBuffer().getRowCount());
+ bi.readAhead(100);
+ assertEquals(4, bi.getBuffer().getRowCount());
+ //shouldn't keep reading
+ bi.readAhead(3);
+ assertEquals(4, bi.getBuffer().getRowCount());
+ bi.readAhead(5);
+ assertEquals(6, bi.getBuffer().getRowCount());
+ bi.readAhead(8); //does nothing
+ for (int i = 0; i < 5; i++) {
+ assertNotNull(bi.nextTuple());
+ }
+ assertNull(bi.nextTuple());
+ }
+
+ @Test public void testReadAheadMark() throws Exception {
+ BatchIterator bi = new BatchIterator(new FakeRelationalNode(1, new List[] {
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ }, 2));
+ BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
+ TupleBuffer tb = bm.createTupleBuffer(Arrays.asList(new ElementSymbol("x", null, DataTypeManager.DefaultDataClasses.INTEGER)), "test", TupleSourceType.PROCESSOR);
+ bi.setBuffer(tb, true); //$NON-NLS-1$
+ bi.nextTuple();
+ assertEquals(1, bi.available());
+ assertEquals(0, bi.getBuffer().getRowCount());
+ bi.readAhead(100);
+ assertEquals(4, bi.getBuffer().getRowCount());
+ //shouldn't keep reading
+ bi.readAhead(3);
+ assertEquals(4, bi.getBuffer().getRowCount());
+ bi.readAhead(5);
+ assertEquals(6, bi.getBuffer().getRowCount());
+ bi.readAhead(8); //does nothing
+ for (int i = 0; i < 5; i++) {
+ assertNotNull(bi.nextTuple());
+ }
+ assertNull(bi.nextTuple());
+ }
+
}
Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -35,6 +35,7 @@
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManagerFactory;
import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.impl.BufferManagerImpl;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
@@ -179,7 +180,18 @@
List rightElements = new ArrayList();
rightElements.add(es2);
- rightNode = new FakeRelationalNode(2, rightTuples);
+ rightNode = new BlockingFakeRelationalNode(2, rightTuples) {
+ @Override
+ public boolean hasBuffer(boolean requireFinal) {
+ return false;
+ }
+
+ @Override
+ public TupleBuffer getBuffer(int maxRows) throws BlockedException, TeiidComponentException, TeiidProcessingException {
+ fail();
+ throw new AssertionError();
+ };
+ };
rightNode.setElements(rightElements);
List joinElements = new ArrayList();
@@ -600,10 +612,10 @@
}
@Test public void testMergeJoinOptimization() throws Exception {
- helpTestEnhancedSortMergeJoin(99);
+ helpTestEnhancedSortMergeJoin(99, false);
}
- private void helpTestEnhancedSortMergeJoin(int batchSize)
+ private void helpTestEnhancedSortMergeJoin(int batchSize, boolean repeated)
throws TeiidComponentException, TeiidProcessingException {
this.joinType = JoinType.JOIN_INNER;
int rows = 100;
@@ -615,26 +627,49 @@
}
this.leftTuples = data;
this.rightTuples = createTuples2();
- expected = new List[] {
- Arrays.asList(new Object[] { 4, 4 }),
- Arrays.asList(new Object[] { 4, 4 }),
- Arrays.asList(new Object[] { 7, 7 }),
- Arrays.asList(new Object[] { 7, 7 }),
- Arrays.asList(new Object[] { 2, 2 }),
- Arrays.asList(new Object[] { 2, 2 }),
- Arrays.asList(new Object[] { 6, 6 }),
- Arrays.asList(new Object[] { 1, 1 }),
- Arrays.asList(new Object[] { 4, 4 }),
- Arrays.asList(new Object[] { 4, 4 }),
- Arrays.asList(new Object[] { 7, 7 }),
- Arrays.asList(new Object[] { 7, 7 }),
- Arrays.asList(new Object[] { 2, 2 }),
- Arrays.asList(new Object[] { 2, 2 }),
- Arrays.asList(new Object[] { 6, 6 }),
- Arrays.asList(new Object[] { 1, 1 }),
- Arrays.asList(new Object[] { 4, 4 }),
- Arrays.asList(new Object[] { 4, 4 }),
- };
+ if (!repeated) {
+ expected = new List[] {
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 6, 6 }),
+ Arrays.asList(new Object[] { 1, 1 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 6, 6 }),
+ Arrays.asList(new Object[] { 1, 1 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ };
+ } else {
+ expected = new List[] {
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 1, 1 }),
+ Arrays.asList(new Object[] { 6, 6 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 1, 1 }),
+ Arrays.asList(new Object[] { 6, 6 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ };
+ }
helpCreateJoin();
this.joinStrategy = new EnhancedSortMergeJoinStrategy(SortOption.SORT, SortOption.SORT);
this.join.setJoinStrategy(joinStrategy);
@@ -713,9 +748,13 @@
}
@Test public void testMergeJoinOptimizationMultiBatch() throws Exception {
- helpTestEnhancedSortMergeJoin(10);
+ helpTestEnhancedSortMergeJoin(10, false);
}
+ @Test public void testMergeJoinOptimizationMultiBatch1() throws Exception {
+ helpTestEnhancedSortMergeJoin(1, true);
+ }
+
@Test public void testMergeJoinOptimizationNoRows() throws Exception {
this.joinType = JoinType.JOIN_INNER;
this.leftTuples = createTuples1();
More information about the teiid-commits
mailing list