[teiid-commits] teiid SVN: r4613 - in branches/7.7.x/engine/src: main/java/org/teiid/query/processor/relational and 1 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Tue Nov 12 09:17:07 EST 2013
Author: jolee
Date: 2013-11-12 09:17:03 -0500 (Tue, 12 Nov 2013)
New Revision: 4613
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
Log:
TEIID-2707: refinements
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java 2013-11-07 17:07:53 UTC (rev 4612)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java 2013-11-12 14:17:03 UTC (rev 4613)
@@ -69,7 +69,7 @@
}
batch = source.nextBatch();
done = batch.getTerminationFlag();
- if (buffer != null && (!saveOnMark || mark)) {
+ if (buffer != null && (!saveOnMark || mark) && !buffer.isForwardOnly()) {
buffer.addTupleBatch(batch, true);
}
if (done && buffer != null) {
@@ -168,10 +168,10 @@
if (buffer == null || done) {
return;
}
- if (this.buffer.getManagedRowCount() > limit) {
+ if (this.buffer.getManagedRowCount() >= limit) {
return;
}
- if (this.batch != null && this.buffer.getRowCount() < this.batch.getEndRow()) {
+ if (this.batch != null && this.buffer.getRowCount() < this.batch.getEndRow() && !this.buffer.isForwardOnly()) {
//haven't saved already
this.buffer.addTupleBatch(this.batch, true);
}
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2013-11-07 17:07:53 UTC (rev 4612)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2013-11-12 14:17:03 UTC (rev 4613)
@@ -383,6 +383,7 @@
}
//else this is a single scan against the index
if (currentSource == null) {
+ this.notSortedSource.setImplicitBuffer(ImplicitBuffer.NONE);
currentSource = this.notSortedSource.getIterator();
}
while (true) {
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2013-11-07 17:07:53 UTC (rev 4612)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2013-11-12 14:17:03 UTC (rev 4613)
@@ -160,6 +160,9 @@
BatchIterator bi = new BatchIterator(this.source);
if (this.collector != null) {
bi.setBuffer(this.collector.getTupleBuffer(), implicitBuffer == ImplicitBuffer.ON_MARK);
+ if (implicitBuffer == ImplicitBuffer.NONE) {
+ bi.getBuffer().setForwardOnly(true);
+ }
this.collector = null;
} else if (implicitBuffer != ImplicitBuffer.NONE) {
bi.setBuffer(createSourceTupleBuffer(), implicitBuffer == ImplicitBuffer.ON_MARK);
Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java 2013-11-07 17:07:53 UTC (rev 4612)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java 2013-11-12 14:17:03 UTC (rev 4613)
@@ -28,10 +28,14 @@
import java.util.List;
import org.junit.Test;
+import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.common.buffer.BufferManagerFactory;
+import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleBuffer;
+import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
import org.teiid.query.processor.relational.FakeRelationalNode;
import org.teiid.query.sql.symbol.ElementSymbol;
@@ -169,6 +173,7 @@
Arrays.asList(1),
Arrays.asList(1),
Arrays.asList(1),
+ Arrays.asList(1),
}, 2));
BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
TupleBuffer tb = bm.createTupleBuffer(Arrays.asList(new ElementSymbol("x", null, DataTypeManager.DefaultDataClasses.INTEGER)), "test", TupleSourceType.PROCESSOR);
@@ -179,15 +184,59 @@
bi.readAhead(100);
assertEquals(4, bi.getBuffer().getRowCount());
//shouldn't keep reading
- bi.readAhead(3);
+ bi.readAhead(2);
assertEquals(4, bi.getBuffer().getRowCount());
+
bi.readAhead(5);
assertEquals(6, bi.getBuffer().getRowCount());
bi.readAhead(8); //does nothing
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < 6; i++) {
assertNotNull(bi.nextTuple());
}
assertNull(bi.nextTuple());
}
+ @Test public void testNoSaveForwardOnly() throws Exception {
+ BatchIterator bi = new BatchIterator(new FakeRelationalNode(1, new List[] {
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ }, 2) {
+ @Override
+ public TupleBatch nextBatchDirect() throws BlockedException,
+ TeiidComponentException, TeiidProcessingException {
+ TupleBatch tb = super.nextBatchDirect();
+ tb.setRowOffset(tb.getBeginRow() + 3);
+ return tb;
+ }
+
+ });
+ BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
+ TupleBuffer tb = bm.createTupleBuffer(Arrays.asList(new ElementSymbol("x", null, DataTypeManager.DefaultDataClasses.INTEGER)), "test", TupleSourceType.PROCESSOR);
+ tb.setForwardOnly(true);
+ bi.setBuffer(tb, false); //$NON-NLS-1$
+
+ tb.addTuple(Arrays.asList(2));
+ tb.addTuple(Arrays.asList(2));
+ tb.addTuple(Arrays.asList(2));
+ assertEquals(3, bi.getBuffer().getManagedRowCount());
+ bi.nextTuple();
+ //pull the first batch
+ assertEquals(2, bi.available());
+ assertEquals(0, bi.getBuffer().getManagedRowCount());
+ for (int i = 0; i < 2; i++) {
+ assertNotNull(bi.nextTuple());
+ assertEquals(0, bi.getBuffer().getManagedRowCount());
+ }
+ bi.readAhead(3);
+ assertEquals(2, bi.getBuffer().getManagedRowCount());
+ for (int i = 0; i < 4; i++) {
+ assertNotNull(bi.nextTuple());
+ assertEquals(0, bi.getBuffer().getManagedRowCount());
+ }
+ assertNull(bi.nextTuple());
+ assertEquals(0, bi.getBuffer().getManagedRowCount());
+ }
+
}
More information about the teiid-commits
mailing list