[teiid-commits] teiid SVN: r4613 - in branches/7.7.x/engine/src: main/java/org/teiid/query/processor/relational and 1 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Nov 12 09:17:07 EST 2013


Author: jolee
Date: 2013-11-12 09:17:03 -0500 (Tue, 12 Nov 2013)
New Revision: 4613

Modified:
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
   branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
Log:
TEIID-2707: refinements

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java	2013-11-07 17:07:53 UTC (rev 4612)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java	2013-11-12 14:17:03 UTC (rev 4613)
@@ -69,7 +69,7 @@
 			}
 			batch = source.nextBatch();
 			done = batch.getTerminationFlag();
-			if (buffer != null && (!saveOnMark || mark)) {
+			if (buffer != null && (!saveOnMark || mark) && !buffer.isForwardOnly()) {
             	buffer.addTupleBatch(batch, true);
             }
 			if (done && buffer != null) {
@@ -168,10 +168,10 @@
 		if (buffer == null || done) {
 			return;
 		}
-		if (this.buffer.getManagedRowCount() > limit) {
+		if (this.buffer.getManagedRowCount() >= limit) {
 			return;
 		}
-		if (this.batch != null && this.buffer.getRowCount() < this.batch.getEndRow()) {
+		if (this.batch != null && this.buffer.getRowCount() < this.batch.getEndRow() && !this.buffer.isForwardOnly()) {
 			//haven't saved already
 			this.buffer.addTupleBatch(this.batch, true);
 		}

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java	2013-11-07 17:07:53 UTC (rev 4612)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java	2013-11-12 14:17:03 UTC (rev 4613)
@@ -383,6 +383,7 @@
     	}
     	//else this is a single scan against the index
     	if (currentSource == null) {
+    		this.notSortedSource.setImplicitBuffer(ImplicitBuffer.NONE);
     		currentSource = this.notSortedSource.getIterator();
     	}
     	while (true) {

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java	2013-11-07 17:07:53 UTC (rev 4612)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java	2013-11-12 14:17:03 UTC (rev 4613)
@@ -160,6 +160,9 @@
                 BatchIterator bi = new BatchIterator(this.source);
                 if (this.collector != null) {
                 	bi.setBuffer(this.collector.getTupleBuffer(), implicitBuffer == ImplicitBuffer.ON_MARK);
+                	if (implicitBuffer == ImplicitBuffer.NONE) {
+                		bi.getBuffer().setForwardOnly(true);
+                	}
                 	this.collector = null;
                 } else if (implicitBuffer != ImplicitBuffer.NONE) {
                 	bi.setBuffer(createSourceTupleBuffer(), implicitBuffer == ImplicitBuffer.ON_MARK);

Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java	2013-11-07 17:07:53 UTC (rev 4612)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java	2013-11-12 14:17:03 UTC (rev 4613)
@@ -28,10 +28,14 @@
 import java.util.List;
 
 import org.junit.Test;
+import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.common.buffer.BufferManagerFactory;
+import org.teiid.common.buffer.TupleBatch;
 import org.teiid.common.buffer.TupleBuffer;
+import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.query.processor.relational.FakeRelationalNode;
 import org.teiid.query.sql.symbol.ElementSymbol;
@@ -169,6 +173,7 @@
 				Arrays.asList(1),
 				Arrays.asList(1),
 				Arrays.asList(1),
+				Arrays.asList(1),
 		}, 2));
 		BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
 		TupleBuffer tb = bm.createTupleBuffer(Arrays.asList(new ElementSymbol("x", null, DataTypeManager.DefaultDataClasses.INTEGER)), "test", TupleSourceType.PROCESSOR);
@@ -179,15 +184,59 @@
 		bi.readAhead(100);
 		assertEquals(4, bi.getBuffer().getRowCount());
 		//shouldn't keep reading
-		bi.readAhead(3);
+		bi.readAhead(2);
 		assertEquals(4, bi.getBuffer().getRowCount());
+		
 		bi.readAhead(5);
 		assertEquals(6, bi.getBuffer().getRowCount());
 		bi.readAhead(8); //does nothing
-		for (int i = 0; i < 5; i++) {
+		for (int i = 0; i < 6; i++) {
 			assertNotNull(bi.nextTuple());
 		}
 		assertNull(bi.nextTuple());
 	}
 	
+	@Test public void testNoSaveForwardOnly() throws Exception {
+		BatchIterator bi = new BatchIterator(new FakeRelationalNode(1, new List[] {
+				Arrays.asList(1),
+				Arrays.asList(1),
+				Arrays.asList(1),
+				Arrays.asList(1),
+		}, 2) {
+			@Override
+			public TupleBatch nextBatchDirect() throws BlockedException,
+					TeiidComponentException, TeiidProcessingException {
+				TupleBatch tb = super.nextBatchDirect();
+				tb.setRowOffset(tb.getBeginRow() + 3);
+				return tb;
+			}
+			
+		});
+		BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
+		TupleBuffer tb = bm.createTupleBuffer(Arrays.asList(new ElementSymbol("x", null, DataTypeManager.DefaultDataClasses.INTEGER)), "test", TupleSourceType.PROCESSOR);
+		tb.setForwardOnly(true);
+		bi.setBuffer(tb, false);  //$NON-NLS-1$
+		
+		tb.addTuple(Arrays.asList(2));
+		tb.addTuple(Arrays.asList(2));
+		tb.addTuple(Arrays.asList(2));
+		assertEquals(3, bi.getBuffer().getManagedRowCount());
+		bi.nextTuple();
+		//pull the first batch
+		assertEquals(2, bi.available()); 
+		assertEquals(0, bi.getBuffer().getManagedRowCount());
+		for (int i = 0; i < 2; i++) {
+			assertNotNull(bi.nextTuple());
+			assertEquals(0, bi.getBuffer().getManagedRowCount());
+		}
+		bi.readAhead(3);
+		assertEquals(2, bi.getBuffer().getManagedRowCount());
+		for (int i = 0; i < 4; i++) {
+			assertNotNull(bi.nextTuple());
+			assertEquals(0, bi.getBuffer().getManagedRowCount());
+		}
+		assertNull(bi.nextTuple());
+		assertEquals(0, bi.getBuffer().getManagedRowCount());
+	}
+	
 }



More information about the teiid-commits mailing list