[teiid-commits] teiid SVN: r4612 - in branches/7.7.x/engine/src: main/java/org/teiid/query/processor and 3 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Nov 7 12:07:53 EST 2013


Author: jolee
Date: 2013-11-07 12:07:53 -0500 (Thu, 07 Nov 2013)
New Revision: 4612

Added:
   branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchCollector.java
Modified:
   branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
   branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
   branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
Log:
TEIID-2707: proactive buffering on an enhanced sort merge left outer join results in an assertionerror

Modified: branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2013-11-07 17:07:53 UTC (rev 4612)
@@ -628,7 +628,7 @@
     	lock.lock();
     	try {
 			//don't wait for more than is available
-			int waitCount = Math.min(additional, this.getMaxReserveKB() - reservedByThread.get());
+			int waitCount = Math.min(additional, this.getMaxReserveKB()<<10 - reservedByThread.get());
 			int committed = 0;
 	    	while (waitCount > 0 && waitCount > this.reserveBatchBytes.get() && committed < additional) {
 	    		long reserveBatchSample = this.reserveBatchBytes.get();
@@ -645,6 +645,8 @@
 		    	int result = noWaitReserve(additional - committed, false);
 		    	committed += result;
 	    	}	
+	    	int result = noWaitReserve(additional - committed, false);
+	    	committed += result;
 	    	return committed;
     	} finally {
     		lock.unlock();

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchCollector.java	2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchCollector.java	2013-11-07 17:07:53 UTC (rev 4612)
@@ -130,6 +130,10 @@
     }
 
     public TupleBuffer collectTuples() throws TeiidComponentException, TeiidProcessingException {
+    	return collectTuples(false);
+    }
+    
+    public TupleBuffer collectTuples(boolean singleBatch) throws TeiidComponentException, TeiidProcessingException {
         TupleBatch batch = null;
     	while(!done) {
     		if (this.sourceNode.hasFinalBuffer()) {
@@ -156,6 +160,10 @@
             	}
                 break;
             }
+            
+            if (singleBatch) {
+            	return null;
+            }
         }
         return buffer;
     }

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java	2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java	2013-11-07 17:07:53 UTC (rev 4612)
@@ -146,22 +146,44 @@
      * non-destructive method to set the mark
      * @return true if the mark was set
      */
-    public boolean ensureSave() {
-    	if (!saveOnMark || mark) {
-    		return false;
-    	}
-    	mark = true;
-    	return true;
-    }
-    
-    public void disableSave() {
-    	if (buffer != null) {
-    		this.saveOnMark = true;
-    		this.mark = false;
-    		if (batch != null && batch.getEndRow() <= this.buffer.getRowCount()) {
-    			this.batch = null;
-    		}
-    	}
-    }
-    
+	public boolean ensureSave() {
+		if (!saveOnMark || mark) {
+			return false;
+		}
+		mark = true;
+		return true;
+	}
+	
+	public void disableSave() {
+		if (buffer != null) {
+			this.saveOnMark = true;
+			this.mark = false;
+			if (batch != null && batch.getEndRow() <= this.buffer.getRowCount()) {
+				this.batch = null;
+			}
+		}
+	}
+	
+	public void readAhead(long limit) throws TeiidComponentException, TeiidProcessingException {
+		if (buffer == null || done) {
+			return;
+		}
+		if (this.buffer.getManagedRowCount() > limit) {
+			return;
+		}
+		if (this.batch != null && this.buffer.getRowCount() < this.batch.getEndRow()) {
+			//haven't saved already
+			this.buffer.addTupleBatch(this.batch, true);
+		}
+		TupleBatch tb = source.nextBatch();
+		done = tb.getTerminationFlag();
+		this.buffer.addTupleBatch(tb, true);
+		if (done) {
+			this.buffer.close();
+		}
+	}
+	
+	public TupleBuffer getBuffer() {
+		return buffer;
+	}    
 }

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java	2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java	2013-11-07 17:07:53 UTC (rev 4612)
@@ -40,7 +40,6 @@
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
 import org.teiid.logging.MessageLevel;
-import org.teiid.query.optimizer.relational.rules.NewCalculateCostUtil;
 import org.teiid.query.processor.relational.SourceState.ImplicitBuffer;
 import org.teiid.query.sql.lang.JoinType;
 import org.teiid.query.sql.lang.OrderBy;
@@ -236,9 +235,7 @@
     }
     
     private boolean shouldIndexIfSmall(SourceState source) throws TeiidComponentException, TeiidProcessingException {
-    	Number cardinality = source.getSource().getEstimateNodeCardinality();
-    	return (source.hasBuffer() || (cardinality != null && cardinality.floatValue() != NewCalculateCostUtil.UNKNOWN_VALUE && cardinality.floatValue() <= source.getSource().getBatchSize() / 4)) 
-    	&& (source.getRowCount() <= source.getSource().getBatchSize() / 2);
+    	return source.rowCountLE(source.getSource().getBatchSize() / 2);
     }
     
     @Override
@@ -307,7 +304,26 @@
     }
     
     private boolean shouldIndex(SourceState possibleIndex, SourceState other) throws TeiidComponentException, TeiidProcessingException {
-    	if (possibleIndex.getRowCount() * 4 > other.getRowCount()) {
+    	long size = joinNode.getBatchSize();
+    	int indexSize = possibleIndex.hasBuffer()?possibleIndex.getRowCount():-1;
+    	int otherSize = other.hasBuffer()?other.getRowCount():-1;
+    	//determine sizes in an incremental fashion as to avoid a full buffer of the unsorted side
+    	while (size < Integer.MAX_VALUE && (indexSize == -1 || otherSize == -1)) {
+    		if (indexSize == -1 && (possibleIndex.rowCountLE((int)size) || possibleIndex.hasBuffer())) {
+    			indexSize = possibleIndex.getRowCount();
+    		}
+    		if (otherSize == -1 && (other.rowCountLE((int)size) || other.hasBuffer())) {
+    			otherSize = other.getRowCount();
+    		}
+    		if (indexSize == -1 && otherSize != -1 && size * 4 > otherSize) {
+    			return false;
+    		}
+    		if (indexSize != -1 && otherSize == -1 && indexSize * 4 <= size) {
+    			break;
+    		}
+    		size *=2;
+    	}
+		if ((size > Integer.MAX_VALUE && (indexSize == -1 || otherSize == -1)) || (indexSize != -1 && otherSize != -1 && indexSize * 4 > otherSize)) {
     		return false; //index is too large
     	}
     	int schemaSize = this.joinNode.getBufferManager().getSchemaSize(other.getSource().getOutputElements());
@@ -320,7 +336,7 @@
     	boolean useIndex = false;
     	int indexSchemaSize = this.joinNode.getBufferManager().getSchemaSize(possibleIndex.getSource().getOutputElements());
     	//approximate that 1/2 of the index will be memory resident 
-    	toReserve = (int)(indexSchemaSize * possibleIndex.getRowCount() / (possibleIndex.getSource().getBatchSize() * .5));  
+    	toReserve = (int)(indexSchemaSize * possibleIndex.getRowCount() / (possibleIndex.getSource().getBatchSize())); 
     	if (toReserve < this.joinNode.getBufferManager().getMaxProcessingSize()) {
     		useIndex = true;
     	} else if (possibleIndex.getRowCount() / this.joinNode.getBatchSize() < preferMemCutoff) {

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java	2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java	2013-11-07 17:07:53 UTC (rev 4612)
@@ -197,6 +197,7 @@
     		if (!isDependent()) {
     			this.joinStrategy.openRight();
                 this.joinStrategy.loadRight();
+                prefetch(this.joinStrategy.rightSource, this.joinStrategy.leftSource);
     		}
     		throw e;
     	}
@@ -210,28 +211,43 @@
         	this.terminateBatches();
         } catch (BatchAvailableException e) {
         	//pull the batch
+        } catch (BlockedException e) {
+        	//TODO: this leads to duplicate exceptions, we 
+        	//could track which side is blocking
+        	try {
+        		prefetch(this.joinStrategy.leftSource, this.joinStrategy.rightSource);
+        	} catch (BlockedException e1) {
+        		
+        	}
+        	prefetch(this.joinStrategy.rightSource, this.joinStrategy.leftSource);
+        	throw e;
         }
         return pullBatch();
     }
 
-	/**
-	 * @see org.teiid.query.processor.relational.RelationalNode#getDescriptionProperties()
-	 * @since 4.2
-	 */
-	public PlanNode getDescriptionProperties() {
-		// Default implementation - should be overridden
-		PlanNode props = super.getDescriptionProperties();
-
-		if (isDependent()) {
-			props.addProperty(PROP_DEPENDENT, Boolean.TRUE.toString());
-		}
-		props.addProperty(PROP_JOIN_STRATEGY, this.joinStrategy.toString());
-		props.addProperty(PROP_JOIN_TYPE, this.joinType.toString());
-		List<String> critList = getCriteriaList();
-		props.addProperty(PROP_JOIN_CRITERIA, critList);
-		return props;
+	private void prefetch(SourceState toFetch, SourceState other) throws TeiidComponentException,
+			TeiidProcessingException {
+		toFetch.prefetch(Math.max(1l, other.getIncrementalRowCount(false)/other.getSource().getBatchSize())*toFetch.getSource().getBatchSize());
 	}
 
+    /** 
+     * @see org.teiid.query.processor.relational.RelationalNode#getDescriptionProperties()
+     * @since 4.2
+     */
+    public PlanNode getDescriptionProperties() {
+        // Default implementation - should be overridden     
+    	PlanNode props = super.getDescriptionProperties();
+        
+        if(isDependent()) {
+        	props.addProperty(PROP_DEPENDENT, Boolean.TRUE.toString());
+        }
+        props.addProperty(PROP_JOIN_STRATEGY, this.joinStrategy.toString());
+        props.addProperty(PROP_JOIN_TYPE, this.joinType.toString());
+        List<String> critList = getCriteriaList();
+        props.addProperty(PROP_JOIN_CRITERIA, critList);
+        return props;
+    }
+
 	private List<String> getCriteriaList() {
 		List<String> critList = new ArrayList<String>();
 		if (leftExpressions != null) {

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java	2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java	2013-11-07 17:07:53 UTC (rev 4612)
@@ -632,5 +632,20 @@
 		}
 		throw e;
 	}
+
+	public boolean hasBuffer(boolean b) {
+		return false;
+	}
+
+	/**
+     * return the final tuple buffer or null if not available
+     * @return
+	 * @throws TeiidProcessingException 
+	 * @throws TeiidComponentException 
+	 * @throws BlockedException 
+     */
+	public TupleBuffer getBuffer(int maxRows) throws BlockedException, TeiidComponentException, TeiidProcessingException {
+		return null;
+	}
 	
 }
\ No newline at end of file

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java	2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java	2013-11-07 17:07:53 UTC (rev 4612)
@@ -31,11 +31,11 @@
 
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.BufferManager.BufferReserveMode;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.common.buffer.IndexedTupleSource;
 import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.common.buffer.TupleSource;
-import org.teiid.common.buffer.BufferManager.BufferReserveMode;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.util.Assertion;
@@ -301,7 +301,7 @@
             
             TupleBuffer merged = createTupleBuffer();
 
-            int desiredSpace = activeTupleBuffers.size() * schemaSize;
+            int desiredSpace = (int)(activeTupleBuffers.size() * (long)schemaSize);
             int reserved = Math.min(desiredSpace, Math.max(2*schemaSize, this.bufferManager.getMaxProcessingSize()));
             bufferManager.reserveBuffers(reserved, BufferReserveMode.FORCE);
             if (desiredSpace > reserved) {

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java	2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java	2013-11-07 17:07:53 UTC (rev 4612)
@@ -31,6 +31,7 @@
 import org.teiid.common.buffer.TupleSource;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.util.Assertion;
 import org.teiid.query.processor.BatchCollector;
 import org.teiid.query.processor.BatchIterator;
 import org.teiid.query.processor.relational.MergeJoinStrategy.SortOption;
@@ -76,6 +77,10 @@
 		this.implicitBuffer = implicitBuffer;
 	}
     
+    public ImplicitBuffer getImplicitBuffer() {
+		return implicitBuffer;
+	}
+    
     static int[] getExpressionIndecies(List expressions,
                                         List elements) {
         if (expressions == null) {
@@ -126,15 +131,37 @@
     public int getRowCount() throws TeiidComponentException, TeiidProcessingException {
     	return this.getTupleBuffer().getRowCount();
     }
+    
+    /**
+     * Uses the prefetch logic to determine an incremental row count
+     */
+    public boolean rowCountLE(long count) throws TeiidComponentException, TeiidProcessingException {
+    	if (buffer != null) {
+    		return buffer.getRowCount() <= count;
+    	}
+    	if (iterator != null || this.sortUtility != null) {
+    		throw new IllegalStateException();
+    	}
+    	while (buffer == null) {
+    		if (getIncrementalRowCount(true) > count) {
+    			return false;
+    		}
+    		prefetch(Long.MAX_VALUE);
+    	}
+		return buffer.getRowCount() <= count;
+    }
 
     IndexedTupleSource getIterator() throws TeiidComponentException {
         if (this.iterator == null) {
-            if (this.buffer != null) {
+        	if (this.buffer != null) {
                 iterator = buffer.createIndexedTupleSource();
             } else {
                 // return a TupleBatch tuplesource iterator
                 BatchIterator bi = new BatchIterator(this.source);
-                if (implicitBuffer != ImplicitBuffer.NONE) {
+                if (this.collector != null) {
+                	bi.setBuffer(this.collector.getTupleBuffer(), implicitBuffer == ImplicitBuffer.ON_MARK);
+                	this.collector = null;
+                } else if (implicitBuffer != ImplicitBuffer.NONE) {
                 	bi.setBuffer(createSourceTupleBuffer(), implicitBuffer == ImplicitBuffer.ON_MARK);
                 }
                 this.iterator = bi;
@@ -142,6 +169,36 @@
         }
         return this.iterator;
     }
+    
+    /**
+     * Pro-actively pull batches for later use.
+     * There are unfortunately quite a few cases to cover here.
+     */
+    protected void prefetch(long limit) throws TeiidComponentException, TeiidProcessingException {
+    	if (!open) {
+    		return;
+    	}
+    	if (this.buffer == null) {
+    		if (this.sortUtility != null) {
+    			return;
+    		}
+    		if (this.iterator != null) {
+    			((BatchIterator)this.iterator).readAhead(limit);
+    			return;
+    		}
+    		if (source.hasBuffer(true)) {
+    			this.buffer = source.getBuffer(-1);
+    			return;
+    		}
+	    	if (collector == null) {
+	            collector = new BatchCollector(source, source.getBufferManager(), source.getContext(), false);
+	    	}
+	    	if (collector.getTupleBuffer().getManagedRowCount() >= limit) {
+	    		return;
+	    	}
+	        this.buffer = collector.collectTuples(true);
+    	}
+    }
 
     public List<Object> getOuterVals() {
         return this.outerVals;
@@ -168,11 +225,16 @@
         	if (this.iterator instanceof BatchIterator) {
         		throw new AssertionError("cannot buffer the source"); //$NON-NLS-1$
         	}
+    		if (source.hasBuffer(true)) {
+    			this.buffer = source.getBuffer(-1);
+    			Assertion.assertTrue(this.buffer.isFinal());
+    			return this.buffer;
+    		}
         	if (collector == null) {
                 collector = new BatchCollector(source, source.getBufferManager(), source.getContext(), false);
             }
             this.buffer = collector.collectTuples();
-        }
+        } 
         return this.buffer;
     }
 
@@ -190,6 +252,11 @@
     	}
     	if (this.sortUtility == null) {
     		TupleSource ts = null;
+    		if (source.hasBuffer(true)) {
+    			this.buffer = source.getBuffer(-1);
+    		} else if (this.buffer == null && this.collector != null) {
+    			this.buffer = this.collector.collectTuples();
+    		}
     		if (this.buffer != null) {
     			this.buffer.setForwardOnly(true);
     			ts = this.buffer.createIndexedTupleSource();
@@ -213,12 +280,12 @@
  	if (this.buffer != null && this.buffer != sorted) {
     		this.buffer.remove();
     	}
-    	this.buffer = sorted;
+		this.buffer = sorted;
         this.markDistinct(sortUtility.isDistinct());
     }
     
     public boolean hasBuffer() {
-    	return this.buffer != null;
+    	return this.buffer != null || this.source.hasBuffer(true);
     }
     
     public boolean nextBuffer() {
@@ -243,5 +310,30 @@
 		this.currentTuple = null;
 		this.maxProbeMatch = 1;
 	}
+	
+	public void setMaxProbePosition() throws TeiidComponentException, TeiidProcessingException {
+		this.getIterator().setPosition(this.getMaxProbeMatch());
+		this.currentTuple = null;
+	}
+
+	public int getIncrementalRowCount(boolean low) {
+		if (this.buffer != null) {
+			return this.buffer.getRowCount();
+		}
+		if (this.collector != null) {
+			return this.collector.getTupleBuffer().getRowCount();
+		}
+		if (sortUtility == null) {
+			if (this.iterator instanceof BatchIterator) {
+				TupleBuffer tb = ((BatchIterator)this.iterator).getBuffer();
+				if (tb != null) {
+					return tb.getRowCount();
+				}
+				//TODO: should estimate the rows
+			}
+			//TODO: should estimate the rows based upon what is being fed into the sort
+		}
+		return low?0:Integer.MAX_VALUE;
+	}
     
 }

Added: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchCollector.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchCollector.java	                        (rev 0)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchCollector.java	2013-11-07 17:07:53 UTC (rev 4612)
@@ -0,0 +1,53 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.query.processor;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import org.teiid.common.buffer.BufferManagerFactory;
+import org.teiid.core.types.DataTypeManager;
+import org.teiid.query.processor.relational.FakeRelationalNode;
+import org.teiid.query.sql.symbol.ElementSymbol;
+import org.teiid.query.util.CommandContext;
+
+ at SuppressWarnings("nls")
+public class TestBatchCollector {
+
+	@Test public void testCollect() throws Exception {
+		FakeRelationalNode sourceNode = new FakeRelationalNode(1, new List[] {
+				Arrays.asList(1),
+				Arrays.asList(1),
+				Arrays.asList(1)
+			}, 1);
+		sourceNode.setElements(Arrays.asList(new ElementSymbol("x", null, DataTypeManager.DefaultDataClasses.INTEGER)));
+		BatchCollector bc = new BatchCollector(sourceNode, BufferManagerFactory.getStandaloneBufferManager(), new CommandContext(), false);
+		bc.collectTuples(true);
+		assertEquals(1, bc.getTupleBuffer().getManagedRowCount());
+		assertEquals(3, bc.collectTuples().getRowCount());
+	}
+	
+}

Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java	2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java	2013-11-07 17:07:53 UTC (rev 4612)
@@ -132,4 +132,62 @@
 		assertEquals(0, tb.getManagedRowCount());
 	}
 	
+	@Test public void testReadAhead() throws Exception {
+		BatchIterator bi = new BatchIterator(new FakeRelationalNode(1, new List[] {
+				Arrays.asList(1),
+				Arrays.asList(1),
+				Arrays.asList(1),
+				Arrays.asList(1),
+				Arrays.asList(1),
+				Arrays.asList(1),
+		}, 2));
+		BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
+		TupleBuffer tb = bm.createTupleBuffer(Arrays.asList(new ElementSymbol("x", null, DataTypeManager.DefaultDataClasses.INTEGER)), "test", TupleSourceType.PROCESSOR);
+		bi.setBuffer(tb, false);  //$NON-NLS-1$
+		bi.nextTuple();
+		assertEquals(1, bi.available());
+		assertEquals(2, bi.getBuffer().getRowCount());
+		bi.readAhead(100);
+		assertEquals(4, bi.getBuffer().getRowCount());
+		//shouldn't keep reading
+		bi.readAhead(3);
+		assertEquals(4, bi.getBuffer().getRowCount());
+		bi.readAhead(5);
+		assertEquals(6, bi.getBuffer().getRowCount());
+		bi.readAhead(8); //does nothing
+		for (int i = 0; i < 5; i++) {
+			assertNotNull(bi.nextTuple());
+		}
+		assertNull(bi.nextTuple());
+	}
+	
+	@Test public void testReadAheadMark() throws Exception {
+		BatchIterator bi = new BatchIterator(new FakeRelationalNode(1, new List[] {
+				Arrays.asList(1),
+				Arrays.asList(1),
+				Arrays.asList(1),
+				Arrays.asList(1),
+				Arrays.asList(1),
+				Arrays.asList(1),
+		}, 2));
+		BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
+		TupleBuffer tb = bm.createTupleBuffer(Arrays.asList(new ElementSymbol("x", null, DataTypeManager.DefaultDataClasses.INTEGER)), "test", TupleSourceType.PROCESSOR);
+		bi.setBuffer(tb, true);  //$NON-NLS-1$
+		bi.nextTuple();
+		assertEquals(1, bi.available());
+		assertEquals(0, bi.getBuffer().getRowCount());
+		bi.readAhead(100);
+		assertEquals(4, bi.getBuffer().getRowCount());
+		//shouldn't keep reading
+		bi.readAhead(3);
+		assertEquals(4, bi.getBuffer().getRowCount());
+		bi.readAhead(5);
+		assertEquals(6, bi.getBuffer().getRowCount());
+		bi.readAhead(8); //does nothing
+		for (int i = 0; i < 5; i++) {
+			assertNotNull(bi.nextTuple());
+		}
+		assertNull(bi.nextTuple());
+	}
+	
 }

Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java	2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java	2013-11-07 17:07:53 UTC (rev 4612)
@@ -35,6 +35,7 @@
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManagerFactory;
 import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.common.buffer.impl.BufferManagerImpl;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
@@ -179,7 +180,18 @@
         
         List rightElements = new ArrayList();
         rightElements.add(es2);
-        rightNode = new FakeRelationalNode(2, rightTuples);
+        rightNode = new BlockingFakeRelationalNode(2, rightTuples) {
+        	@Override
+        	public boolean hasBuffer(boolean requireFinal) {
+        		return false;
+        	}
+
+        	@Override
+        	public TupleBuffer getBuffer(int maxRows) throws BlockedException, TeiidComponentException, TeiidProcessingException {
+        		fail();
+        		throw new AssertionError();
+        	};
+        };
         rightNode.setElements(rightElements);
         
         List joinElements = new ArrayList();
@@ -600,10 +612,10 @@
     }
     
     @Test public void testMergeJoinOptimization() throws Exception {
-        helpTestEnhancedSortMergeJoin(99);
+        helpTestEnhancedSortMergeJoin(99, false);
     }
 
-	private void helpTestEnhancedSortMergeJoin(int batchSize)
+	private void helpTestEnhancedSortMergeJoin(int batchSize, boolean repeated)
 			throws TeiidComponentException, TeiidProcessingException {
 		this.joinType = JoinType.JOIN_INNER;
         int rows = 100;
@@ -615,26 +627,49 @@
         }
         this.leftTuples = data;
         this.rightTuples = createTuples2();
-        expected = new List[] {
-           Arrays.asList(new Object[] { 4, 4 }),
-           Arrays.asList(new Object[] { 4, 4 }),
-           Arrays.asList(new Object[] { 7, 7 }),
-           Arrays.asList(new Object[] { 7, 7 }),
-           Arrays.asList(new Object[] { 2, 2 }),
-           Arrays.asList(new Object[] { 2, 2 }),
-           Arrays.asList(new Object[] { 6, 6 }),
-           Arrays.asList(new Object[] { 1, 1 }),  
-           Arrays.asList(new Object[] { 4, 4 }),
-           Arrays.asList(new Object[] { 4, 4 }),
-           Arrays.asList(new Object[] { 7, 7 }),
-           Arrays.asList(new Object[] { 7, 7 }),
-           Arrays.asList(new Object[] { 2, 2 }),
-           Arrays.asList(new Object[] { 2, 2 }),
-           Arrays.asList(new Object[] { 6, 6 }),
-           Arrays.asList(new Object[] { 1, 1 }),
-           Arrays.asList(new Object[] { 4, 4 }),
-           Arrays.asList(new Object[] { 4, 4 }),
-        };
+        if (!repeated) {
+        	expected = new List[] {
+                Arrays.asList(new Object[] { 4, 4 }),
+                Arrays.asList(new Object[] { 4, 4 }),
+                Arrays.asList(new Object[] { 7, 7 }),
+                Arrays.asList(new Object[] { 7, 7 }),
+                Arrays.asList(new Object[] { 2, 2 }),
+                Arrays.asList(new Object[] { 2, 2 }),
+                Arrays.asList(new Object[] { 6, 6 }),
+                Arrays.asList(new Object[] { 1, 1 }),  
+                Arrays.asList(new Object[] { 4, 4 }),
+                Arrays.asList(new Object[] { 4, 4 }),
+                Arrays.asList(new Object[] { 7, 7 }),
+                Arrays.asList(new Object[] { 7, 7 }),
+                Arrays.asList(new Object[] { 2, 2 }),
+                Arrays.asList(new Object[] { 2, 2 }),
+                Arrays.asList(new Object[] { 6, 6 }),
+                Arrays.asList(new Object[] { 1, 1 }),
+                Arrays.asList(new Object[] { 4, 4 }),
+                Arrays.asList(new Object[] { 4, 4 }),
+             };
+        } else {
+	        expected = new List[] {
+	           Arrays.asList(new Object[] { 4, 4 }),
+	           Arrays.asList(new Object[] { 4, 4 }),
+	           Arrays.asList(new Object[] { 1, 1 }),  
+	           Arrays.asList(new Object[] { 6, 6 }),
+	           Arrays.asList(new Object[] { 2, 2 }),
+	           Arrays.asList(new Object[] { 2, 2 }),
+	           Arrays.asList(new Object[] { 7, 7 }),
+	           Arrays.asList(new Object[] { 7, 7 }),
+	           Arrays.asList(new Object[] { 4, 4 }),
+	           Arrays.asList(new Object[] { 4, 4 }),
+	           Arrays.asList(new Object[] { 1, 1 }),
+	           Arrays.asList(new Object[] { 6, 6 }),
+	           Arrays.asList(new Object[] { 2, 2 }),
+	           Arrays.asList(new Object[] { 2, 2 }),
+	           Arrays.asList(new Object[] { 7, 7 }),
+	           Arrays.asList(new Object[] { 7, 7 }),
+	           Arrays.asList(new Object[] { 4, 4 }),
+	           Arrays.asList(new Object[] { 4, 4 }),
+	        };
+        }
         helpCreateJoin();               
         this.joinStrategy = new EnhancedSortMergeJoinStrategy(SortOption.SORT, SortOption.SORT);
         this.join.setJoinStrategy(joinStrategy);
@@ -713,9 +748,13 @@
 	}
     
     @Test public void testMergeJoinOptimizationMultiBatch() throws Exception {
-    	helpTestEnhancedSortMergeJoin(10);
+    	helpTestEnhancedSortMergeJoin(10, false);
     }
     
+    @Test public void testMergeJoinOptimizationMultiBatch1() throws Exception {
+    	helpTestEnhancedSortMergeJoin(1, true);
+    }
+    
     @Test public void testMergeJoinOptimizationNoRows() throws Exception {
         this.joinType = JoinType.JOIN_INNER;
         this.leftTuples = createTuples1();



More information about the teiid-commits mailing list