[teiid-commits] teiid SVN: r4546 - in branches/7.7.x/engine/src: main/java/org/teiid/query/processor/relational and 3 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Jan 30 12:30:23 EST 2013


Author: jolee
Date: 2013-01-30 12:30:23 -0500 (Wed, 30 Jan 2013)
New Revision: 4546

Modified:
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/util/Options.java
   branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
   branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
Log:
TEIID-2363:  proactive buffering not occurring for the inner side of an outer join on "MERGE JOIN (SORT/ALREADY_SORTED)"

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java	2013-01-17 18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java	2013-01-30 17:30:23 UTC (rev 4546)
@@ -72,6 +72,9 @@
 			if (buffer != null && (!saveOnMark || mark)) {
             	buffer.addTupleBatch(batch, true);
             }
+			if (done && buffer != null) {
+				this.buffer.close();
+			}
 		}
 		return getCurrentTuple();
 	}
@@ -139,4 +142,26 @@
     	super.setPosition(position);
     }
     
+    /**
+     * non-destructive method to set the mark
+     * @return true if the mark was set
+     */
+    public boolean ensureSave() {
+    	if (!saveOnMark || mark) {
+    		return false;
+    	}
+    	mark = true;
+    	return true;
+    }
+    
+    public void disableSave() {
+    	if (buffer != null) {
+    		this.saveOnMark = true;
+    		this.mark = false;
+    		if (batch != null && batch.getEndRow() <= this.buffer.getRowCount()) {
+    			this.batch = null;
+    		}
+    	}
+    }
+    
 }

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java	2013-01-17 18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java	2013-01-30 17:30:23 UTC (rev 4546)
@@ -28,19 +28,19 @@
 import java.util.LinkedHashSet;
 import java.util.List;
 
+import org.teiid.common.buffer.BufferManager.BufferReserveMode;
 import org.teiid.common.buffer.IndexedTupleSource;
 import org.teiid.common.buffer.STree;
+import org.teiid.common.buffer.STree.InsertMode;
 import org.teiid.common.buffer.TupleBrowser;
 import org.teiid.common.buffer.TupleSource;
-import org.teiid.common.buffer.BufferManager.BufferReserveMode;
-import org.teiid.common.buffer.STree.InsertMode;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
 import org.teiid.logging.MessageLevel;
-import org.teiid.query.optimizer.relational.rules.NewCalculateCostUtil;
+import org.teiid.query.processor.relational.SourceState.ImplicitBuffer;
 import org.teiid.query.sql.lang.OrderBy;
 import org.teiid.query.sql.symbol.ElementSymbol;
 import org.teiid.query.sql.symbol.SingleElementSymbol;
@@ -182,7 +182,7 @@
     	int rowId = 0;
     	List<?> lastTuple = null;
     	boolean sortedDistinct = sorted && !state.isDistinct();
-    	int sizeHint = index.getExpectedHeight(state.getTupleBuffer().getRowCount());
+    	int sizeHint = index.getExpectedHeight(state.getRowCount());
     	index.setBatchInsert(sorted);
     	outer: while (its.hasNext()) {
     		//detect if sorted and distinct
@@ -233,9 +233,7 @@
     }
     
     private boolean shouldIndexIfSmall(SourceState source) throws TeiidComponentException, TeiidProcessingException {
-    	Number cardinality = source.getSource().getEstimateNodeCardinality();
-    	return (source.hasBuffer() || (cardinality != null && cardinality.floatValue() != NewCalculateCostUtil.UNKNOWN_VALUE && cardinality.floatValue() <= source.getSource().getBatchSize() / 4)) 
-    	&& (source.getRowCount() <= source.getSource().getBatchSize() / 2);
+    	return source.rowCountLE(source.getSource().getBatchSize() / 2);
     }
     
     @Override
@@ -247,7 +245,6 @@
     	} else if (!this.leftSource.hasBuffer() && processingSortLeft == SortOption.SORT && shouldIndexIfSmall(this.rightSource)) {
     		this.processingSortLeft = SortOption.NOT_SORTED;
     	} else { 
-    		this.leftSource.getTupleBuffer();
     		if (!this.rightSource.hasBuffer() && processingSortRight == SortOption.SORT && shouldIndexIfSmall(this.leftSource)) {
         		this.processingSortRight = SortOption.NOT_SORTED; 
         	} else if (processingSortRight == SortOption.SORT && shouldIndex(this.leftSource, this.rightSource)) {
@@ -318,10 +315,10 @@
     	boolean useIndex = false;
     	int indexSchemaSize = this.joinNode.getBufferManager().getSchemaSize(possibleIndex.getSource().getOutputElements());
     	//approximate that 1/2 of the index will be memory resident 
-    	toReserve = (int)(indexSchemaSize * possibleIndex.getTupleBuffer().getRowCount() / (possibleIndex.getTupleBuffer().getBatchSize() * .5)); 
+    	toReserve = (int)(indexSchemaSize * possibleIndex.getRowCount() / (possibleIndex.getSource().getBatchSize() * .5));  
     	if (toReserve < this.joinNode.getBufferManager().getMaxProcessingSize()) {
     		useIndex = true;
-    	} else if (possibleIndex.getTupleBuffer().getRowCount() / this.joinNode.getBatchSize() < preferMemCutoff) {
+    	} else if (possibleIndex.getRowCount() / this.joinNode.getBatchSize() < preferMemCutoff) {
     		useIndex = true;
     	} 
     	if (useIndex) {
@@ -329,6 +326,7 @@
     		return true;
     	} 
     	this.repeatedMerge = true;
+    	possibleIndex.setImplicitBuffer(ImplicitBuffer.FULL);
     	return true;
     }
     
@@ -344,7 +342,7 @@
     		super.process();
     		return;
     	}
-    	if (this.sortedSource.getTupleBuffer().getRowCount() == 0) {
+    	if (this.sortedSource.getRowCount() == 0) {
     		return;
     	}
     	if (repeatedMerge) {

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java	2013-01-17 18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java	2013-01-30 17:30:23 UTC (rev 4546)
@@ -42,291 +42,321 @@
 import org.teiid.query.sql.lang.JoinType;
 import org.teiid.query.util.CommandContext;
 
-
-/** 
+/**
  * @since 4.2
  */
 public class JoinNode extends SubqueryAwareRelationalNode {
-	
-	static class BatchAvailableException extends RuntimeException {}
-	
-	static BatchAvailableException BATCH_AVILABLE = new BatchAvailableException(); 
-	
-	public enum JoinStrategyType {    
-	    MERGE,
-	    ENHANCED_SORT,
-	    NESTED_LOOP,
-	    NESTED_TABLE
+
+	static class BatchAvailableException extends RuntimeException {
 	}
-        
-    private enum State { LOAD_LEFT, LOAD_RIGHT, EXECUTE }    
-    private State state = State.LOAD_LEFT;
-    
-    private JoinStrategy joinStrategy;
-    private JoinType joinType;
-    private String dependentValueSource;
-   
-    private List leftExpressions;
-    private List rightExpressions;
-    private boolean leftDistinct;
-    private boolean rightDistinct;
-    private Criteria joinCriteria;
-    
-    private Map combinedElementMap;
-    private int[] projectionIndexes;
-    
-    private DependentValueSource dvs;
-    
-    public JoinNode(int nodeID) {
-        super(nodeID);
-    }
-    
-    public void setJoinType(JoinType type) {
-        this.joinType = type;
-    }
-    
-    public JoinStrategy getJoinStrategy() {
-        return this.joinStrategy;
-    }
 
-    public void setJoinStrategy(JoinStrategy joinStrategy) {
-        this.joinStrategy = joinStrategy;
-    }
+	static BatchAvailableException BATCH_AVILABLE = new BatchAvailableException();
 
-    public void setJoinExpressions(List leftExpressions, List rightExpressions) {
-        this.leftExpressions = leftExpressions;
-        this.rightExpressions = rightExpressions;
-    }
-    
-    public boolean isLeftDistinct() {
+	public enum JoinStrategyType {
+		MERGE, ENHANCED_SORT, NESTED_LOOP, NESTED_TABLE
+	}
+
+	private enum State {
+		LOAD_LEFT, LOAD_RIGHT, EXECUTE
+	}
+
+	private State state = State.LOAD_LEFT;
+
+	private JoinStrategy joinStrategy;
+	private JoinType joinType;
+	private String dependentValueSource;
+
+	private List leftExpressions;
+	private List rightExpressions;
+	private boolean leftDistinct;
+	private boolean rightDistinct;
+	private Criteria joinCriteria;
+
+	private Map combinedElementMap;
+	private int[] projectionIndexes;
+
+	private DependentValueSource dvs;
+
+	public JoinNode(int nodeID) {
+		super(nodeID);
+	}
+
+	public void setJoinType(JoinType type) {
+		this.joinType = type;
+	}
+
+	public JoinStrategy getJoinStrategy() {
+		return this.joinStrategy;
+	}
+
+	public void setJoinStrategy(JoinStrategy joinStrategy) {
+		this.joinStrategy = joinStrategy;
+	}
+
+	public void setJoinExpressions(List leftExpressions, List rightExpressions) {
+		this.leftExpressions = leftExpressions;
+		this.rightExpressions = rightExpressions;
+	}
+
+	public boolean isLeftDistinct() {
 		return leftDistinct;
 	}
-    
-    public void setLeftDistinct(boolean leftDistinct) {
+
+	public void setLeftDistinct(boolean leftDistinct) {
 		this.leftDistinct = leftDistinct;
 	}
-    
-    public boolean isRightDistinct() {
+
+	public boolean isRightDistinct() {
 		return rightDistinct;
 	}
-    
-    public void setRightDistinct(boolean rightDistinct) {
+
+	public void setRightDistinct(boolean rightDistinct) {
 		this.rightDistinct = rightDistinct;
 	}
-    
-    public void setJoinCriteria(Criteria joinCriteria) {
-        this.joinCriteria = joinCriteria;
-    }
-    
-    @Override
-    public void initialize(CommandContext context, BufferManager bufferManager,
-    		ProcessorDataManager dataMgr) {
-    	super.initialize(context, bufferManager, dataMgr);
-    	
-    	if (this.combinedElementMap == null) {
-	        // Create element lookup map for evaluating project expressions
-	        List combinedElements = new ArrayList(getChildren()[0].getElements());
-	        combinedElements.addAll(getChildren()[1].getElements());
-	        this.combinedElementMap = createLookupMap(combinedElements);
-	        this.projectionIndexes = getProjectionIndexes(combinedElementMap, getElements());
-    	}
-    }
-    
-    public void open()
-        throws TeiidComponentException, TeiidProcessingException {
-        // Set Up Join Strategy
-        this.joinStrategy.initialize(this);
-        
-        joinStrategy.openLeft();
-        
-        if(!isDependent()) {
-        	joinStrategy.openRight();
-        }
-            
-        this.state = State.LOAD_LEFT;
-    }
 
-    /** 
-     * @see org.teiid.query.processor.relational.RelationalNode#clone()
-     * @since 4.2
-     */
-    public Object clone() {
-        JoinNode clonedNode = new JoinNode(super.getID());
-        super.copy(this, clonedNode);
-        
-        clonedNode.joinType = this.joinType;
-        clonedNode.joinStrategy = this.joinStrategy.clone();
-        
-        clonedNode.joinCriteria = this.joinCriteria;
-        
-        clonedNode.leftExpressions = leftExpressions;
-        
-        clonedNode.rightExpressions = rightExpressions;
-        clonedNode.dependentValueSource = this.dependentValueSource;
-        clonedNode.rightDistinct = rightDistinct;
-        clonedNode.leftDistinct = leftDistinct;
-        
-        return clonedNode;
-    }
+	public void setJoinCriteria(Criteria joinCriteria) {
+		this.joinCriteria = joinCriteria;
+	}
 
-    /** 
-     * @see org.teiid.query.processor.relational.RelationalNode#nextBatchDirect()
-     * @since 4.2
-     */
-    protected TupleBatch nextBatchDirect() throws BlockedException,
-                                          TeiidComponentException,
-                                          TeiidProcessingException {
-        if (state == State.LOAD_LEFT) {
-        	if (this.joinType != JoinType.JOIN_FULL_OUTER) {
-            	this.joinStrategy.leftSource.setImplicitBuffer(ImplicitBuffer.NONE);
-            }
-        	//left child was already opened by the join node
-            this.joinStrategy.loadLeft();
-            if (isDependent()) { 
-                TupleBuffer buffer = this.joinStrategy.leftSource.getTupleBuffer();
-                dvs = new DependentValueSource(buffer);
-                dvs.setDistinct(this.joinStrategy.leftSource.isDistinct());
-                this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource, dvs);
-            }
-            state = State.LOAD_RIGHT;
-        }
-        if (state == State.LOAD_RIGHT) {
-        	this.joinStrategy.openRight();
-            this.joinStrategy.loadRight();
-            state = State.EXECUTE;
-        }
-        try {
-        	this.joinStrategy.process();
-        	this.terminateBatches();
-        } catch (BatchAvailableException e) {
-        	//pull the batch
-        }
-        return pullBatch();
-    }
+	@Override
+	public void initialize(CommandContext context, BufferManager bufferManager,
+			ProcessorDataManager dataMgr) {
+		super.initialize(context, bufferManager, dataMgr);
 
-    /** 
-     * @see org.teiid.query.processor.relational.RelationalNode#getDescriptionProperties()
-     * @since 4.2
-     */
-    public PlanNode getDescriptionProperties() {
-        // Default implementation - should be overridden     
-    	PlanNode props = super.getDescriptionProperties();
-        
-        if(isDependent()) {
-        	props.addProperty(PROP_DEPENDENT, Boolean.TRUE.toString());
-        }
-        props.addProperty(PROP_JOIN_STRATEGY, this.joinStrategy.toString());
-        props.addProperty(PROP_JOIN_TYPE, this.joinType.toString());
-        List<String> critList = getCriteriaList();
-        props.addProperty(PROP_JOIN_CRITERIA, critList);
-        return props;
-    }
+		if (this.combinedElementMap == null) {
+			// Create element lookup map for evaluating project expressions
+			List combinedElements = new ArrayList(
+					getChildren()[0].getElements());
+			combinedElements.addAll(getChildren()[1].getElements());
+			this.combinedElementMap = createLookupMap(combinedElements);
+			this.projectionIndexes = getProjectionIndexes(combinedElementMap,
+					getElements());
+		}
+	}
 
+	public void open() throws TeiidComponentException, TeiidProcessingException {
+		// Set Up Join Strategy
+		this.joinStrategy.initialize(this);
+
+		joinStrategy.openLeft();
+
+		if (!isDependent()) {
+			joinStrategy.openRight();
+		}
+
+		this.state = State.LOAD_LEFT;
+	}
+
+	/**
+	 * @see org.teiid.query.processor.relational.RelationalNode#clone()
+	 * @since 4.2
+	 */
+	public Object clone() {
+		JoinNode clonedNode = new JoinNode(super.getID());
+		super.copy(this, clonedNode);
+
+		clonedNode.joinType = this.joinType;
+		clonedNode.joinStrategy = this.joinStrategy.clone();
+
+		clonedNode.joinCriteria = this.joinCriteria;
+
+		clonedNode.leftExpressions = leftExpressions;
+
+		clonedNode.rightExpressions = rightExpressions;
+		clonedNode.dependentValueSource = this.dependentValueSource;
+		clonedNode.rightDistinct = rightDistinct;
+		clonedNode.leftDistinct = leftDistinct;
+
+		return clonedNode;
+	}
+
+	/**
+	 * @see org.teiid.query.processor.relational.RelationalNode#nextBatchDirect()
+	 * @since 4.2
+	 */
+	protected TupleBatch nextBatchDirect() throws BlockedException,
+			TeiidComponentException, TeiidProcessingException {
+		try {
+			if (state == State.LOAD_LEFT) {
+				if (this.joinType != JoinType.JOIN_FULL_OUTER) {
+					this.joinStrategy.leftSource
+							.setImplicitBuffer(ImplicitBuffer.NONE);
+				}
+				// left child was already opened by the join node
+				this.joinStrategy.loadLeft();
+				if (isDependent()) {
+					TupleBuffer buffer = this.joinStrategy.leftSource
+							.getTupleBuffer();
+					// the tuplebuffer may be from a lower node, so pass in the
+					// schema
+					dvs = new DependentValueSource(buffer);
+					dvs.setDistinct(this.joinStrategy.leftSource.isDistinct());
+					this.getContext().getVariableContext()
+							.setGlobalValue(this.dependentValueSource, dvs);
+				}
+				state = State.LOAD_RIGHT;
+			}
+		} catch (BlockedException e) {
+			if (!isDependent()) {
+				this.joinStrategy.openRight();
+				this.joinStrategy.loadRight();
+			}
+			throw e;
+		}
+		try {
+			if (state == State.LOAD_RIGHT) {
+				this.joinStrategy.openRight();
+				this.joinStrategy.loadRight();
+				state = State.EXECUTE;
+			}
+			this.joinStrategy.process();
+			this.terminateBatches();
+		} catch (BatchAvailableException e) {
+			// pull the batch
+		} catch (BlockedException e) {
+			// TODO: this leads to duplicate exceptions, we
+			// could track which side is blocking
+			try {
+				this.joinStrategy.leftSource.prefetch(true);
+			} catch (BlockedException e1) {
+
+			}
+			this.joinStrategy.rightSource.prefetch(true);
+			throw e;
+		}
+		return pullBatch();
+	}
+
+	/**
+	 * @see org.teiid.query.processor.relational.RelationalNode#getDescriptionProperties()
+	 * @since 4.2
+	 */
+	public PlanNode getDescriptionProperties() {
+		// Default implementation - should be overridden
+		PlanNode props = super.getDescriptionProperties();
+
+		if (isDependent()) {
+			props.addProperty(PROP_DEPENDENT, Boolean.TRUE.toString());
+		}
+		props.addProperty(PROP_JOIN_STRATEGY, this.joinStrategy.toString());
+		props.addProperty(PROP_JOIN_TYPE, this.joinType.toString());
+		List<String> critList = getCriteriaList();
+		props.addProperty(PROP_JOIN_CRITERIA, critList);
+		return props;
+	}
+
 	private List<String> getCriteriaList() {
 		List<String> critList = new ArrayList<String>();
-        if (leftExpressions != null) {
-            for(int i=0; i < this.leftExpressions.size(); i++) {
-                critList.add(this.leftExpressions.get(i).toString() + "=" + this.rightExpressions.get(i).toString());  //$NON-NLS-1$
-            }
-        }
-        if (this.joinCriteria != null) {
-            for (Criteria crit : (List<Criteria>)Criteria.separateCriteriaByAnd(joinCriteria)) {
-                critList.add(crit.toString());
-            }
-        }
+		if (leftExpressions != null) {
+			for (int i = 0; i < this.leftExpressions.size(); i++) {
+				critList.add(this.leftExpressions.get(i).toString()
+						+ "=" + this.rightExpressions.get(i).toString()); //$NON-NLS-1$
+			}
+		}
+		if (this.joinCriteria != null) {
+			for (Criteria crit : (List<Criteria>) Criteria
+					.separateCriteriaByAnd(joinCriteria)) {
+				critList.add(crit.toString());
+			}
+		}
 		return critList;
 	}
 
-    /** 
-     * @see org.teiid.query.processor.relational.RelationalNode#getNodeString(java.lang.StringBuffer)
-     * @since 4.2
-     */
-    protected void getNodeString(StringBuffer str) {
-        str.append(getClassName());
-        str.append("("); //$NON-NLS-1$
-        str.append(getID());
-        str.append(") [");//$NON-NLS-1$
-        if(isDependent()) {
-            str.append("Dependent] [");//$NON-NLS-1$
-        }
-        str.append(this.joinStrategy.toString());
-        str.append("] [");//$NON-NLS-1$
-        str.append(this.joinType.toString());
-        str.append("]"); //$NON-NLS-1$
-        if (getJoinType() != JoinType.JOIN_CROSS) {
-        	str.append(" criteria=").append(getCriteriaList()); //$NON-NLS-1$
-        }
-        str.append(" output="); //$NON-NLS-1$
-        str.append(getElements());
-    }
+	/**
+	 * @see org.teiid.query.processor.relational.RelationalNode#getNodeString(java.lang.StringBuffer)
+	 * @since 4.2
+	 */
+	protected void getNodeString(StringBuffer str) {
+		str.append(getClassName());
+		str.append("("); //$NON-NLS-1$
+		str.append(getID());
+		str.append(") [");//$NON-NLS-1$
+		if (isDependent()) {
+			str.append("Dependent] [");//$NON-NLS-1$
+		}
+		str.append(this.joinStrategy.toString());
+		str.append("] [");//$NON-NLS-1$
+		str.append(this.joinType.toString());
+		str.append("]"); //$NON-NLS-1$
+		if (getJoinType() != JoinType.JOIN_CROSS) {
+			str.append(" criteria=").append(getCriteriaList()); //$NON-NLS-1$
+		}
+		str.append(" output="); //$NON-NLS-1$
+		str.append(getElements());
+	}
 
-    /** 
-     * @return Returns the isDependent.
-     */
-    public boolean isDependent() {
-        return this.dependentValueSource != null;
-    }
-    
-    /** 
-     * @param isDependent The isDependent to set.
-     */
-    public void setDependentValueSource(String dependentValueSource) {
-        this.dependentValueSource = dependentValueSource;
-    }
-    
-    public String getDependentValueSourceName() {
+	/**
+	 * @return Returns the isDependent.
+	 */
+	public boolean isDependent() {
+		return this.dependentValueSource != null;
+	}
+
+	/**
+	 * @param isDependent
+	 *            The isDependent to set.
+	 */
+	public void setDependentValueSource(String dependentValueSource) {
+		this.dependentValueSource = dependentValueSource;
+	}
+
+	public String getDependentValueSourceName() {
 		return dependentValueSource;
 	}
-    
-    public void closeDirect() {
-        super.closeDirect();
-        joinStrategy.close();
-        if (this.getContext() != null && this.dependentValueSource != null) {
-        	this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource, null);
-        }
-        this.dvs = null;
-    }
-    
-    @Override
-    public void reset() {
-    	super.reset();
-    	this.dvs = null;
-    }
 
-    public JoinType getJoinType() {
-        return this.joinType;
-    }
+	public void closeDirect() {
+		super.closeDirect();
+		joinStrategy.close();
+		if (this.getContext() != null && this.dependentValueSource != null) {
+			this.getContext().getVariableContext()
+					.setGlobalValue(this.dependentValueSource, null);
+		}
+		this.dvs = null;
+	}
 
-    Map getCombinedElementMap() {
-        return this.combinedElementMap;
-    }
+	@Override
+	public void reset() {
+		super.reset();
+		this.dvs = null;
+	}
 
-    public Criteria getJoinCriteria() {
-        return this.joinCriteria;
-    }
-    
-    boolean matchesCriteria(List outputTuple) throws BlockedException, TeiidComponentException, ExpressionEvaluationException {
-		return (this.joinCriteria == null || getEvaluator(this.combinedElementMap).evaluate(this.joinCriteria, outputTuple));
-    }
+	public JoinType getJoinType() {
+		return this.joinType;
+	}
 
-    public List getLeftExpressions() {
-        return this.leftExpressions;
-    }
+	Map getCombinedElementMap() {
+		return this.combinedElementMap;
+	}
 
-    public List getRightExpressions() {
-        return this.rightExpressions;
-    }
-    
-    @Override
-    protected void addBatchRow(List row) {
-        List projectTuple = projectTuple(this.projectionIndexes, row);
-        super.addBatchRow(projectTuple);
-        if (isBatchFull()) {
-        	throw BATCH_AVILABLE;
-        }
-    }
-    
-    public DependentValueSource getDependentValueSource() {
+	public Criteria getJoinCriteria() {
+		return this.joinCriteria;
+	}
+
+	boolean matchesCriteria(List outputTuple) throws BlockedException,
+			TeiidComponentException, ExpressionEvaluationException {
+		return (this.joinCriteria == null || getEvaluator(
+				this.combinedElementMap).evaluate(this.joinCriteria,
+				outputTuple));
+	}
+
+	public List getLeftExpressions() {
+		return this.leftExpressions;
+	}
+
+	public List getRightExpressions() {
+		return this.rightExpressions;
+	}
+
+	@Override
+	protected void addBatchRow(List row) {
+		List projectTuple = projectTuple(this.projectionIndexes, row);
+		super.addBatchRow(projectTuple);
+		if (isBatchFull()) {
+			throw BATCH_AVILABLE;
+		}
+	}
+
+	public DependentValueSource getDependentValueSource() {
 		return dvs;
 	}
 

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java	2013-01-17 18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java	2013-01-30 17:30:23 UTC (rev 4546)
@@ -342,10 +342,10 @@
     @Override
     protected void loadRight() throws TeiidComponentException,
     		TeiidProcessingException {
-		this.rightSource.sort(this.processingSortRight);
-		if (this.joinNode.getJoinType() != JoinType.JOIN_FULL_OUTER) {
+    	if (this.joinNode.getJoinType() != JoinType.JOIN_FULL_OUTER) {
 			this.rightSource.setImplicitBuffer(ImplicitBuffer.ON_MARK);
 		}
+    	this.rightSource.sort(this.processingSortRight);
 	}
         
     public void setProcessingSortRight(boolean processingSortRight) {

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java	2013-01-17 18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java	2013-01-30 17:30:23 UTC (rev 4546)
@@ -25,13 +25,12 @@
 import java.util.Collections;
 import java.util.List;
 
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.common.buffer.IndexedTupleSource;
 import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.common.buffer.TupleSource;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
-import org.teiid.query.processor.BatchCollector;
 import org.teiid.query.processor.BatchIterator;
 import org.teiid.query.processor.relational.MergeJoinStrategy.SortOption;
 import org.teiid.query.processor.relational.SortUtility.Mode;
@@ -46,7 +45,6 @@
 	
     private RelationalNode source;
     private List expressions;
-    private BatchCollector collector;
     private TupleBuffer buffer;
     private List<TupleBuffer> buffers;
     private List<Object> outerVals;
@@ -57,6 +55,7 @@
     private boolean distinct;
     private ImplicitBuffer implicitBuffer = ImplicitBuffer.FULL;
     boolean open;
+    private BatchIterator prefetch;
     
     private SortUtility sortUtility;
     
@@ -98,7 +97,7 @@
         return currentTuple;
     }
     
-    public void reset() throws TeiidComponentException {
+    public void reset() throws TeiidComponentException, TeiidProcessingException {
         this.getIterator().reset();
         this.getIterator().mark();
         this.currentTuple = null;
@@ -120,28 +119,104 @@
 			this.iterator.closeSource();
         	this.iterator = null;
         }
+        this.prefetch = null;
         this.currentTuple = null;
 	}
 
     public int getRowCount() throws TeiidComponentException, TeiidProcessingException {
     	return this.getTupleBuffer().getRowCount();
     }
+    
+    /**
+     * Uses the prefetch logic to determine an incremental row count
+     */
+    public boolean rowCountLE(int count) throws TeiidComponentException, TeiidProcessingException {
+       	if (buffer == null) {
+       		prefetch(false);
+       	}
+       	while (buffer.getRowCount() <= count) {
+       		if (prefetch == null) {
+       			return true;
+       		}
+       		prefetch(false);
+       	}
+       	return false;
+    }
 
-    IndexedTupleSource getIterator() throws TeiidComponentException {
+    IndexedTupleSource getIterator() throws TeiidComponentException, TeiidProcessingException {
         if (this.iterator == null) {
-            if (this.buffer != null) {
-                iterator = buffer.createIndexedTupleSource();
+        	if (this.buffer == null) {
+        		getTupleBuffer(false);
+        	}
+        	if (this.prefetch != null) {
+        		this.iterator = this.prefetch;
             } else {
-                // return a TupleBatch tuplesource iterator
-                BatchIterator bi = new BatchIterator(this.source);
-                if (implicitBuffer != ImplicitBuffer.NONE) {
-                	bi.setBuffer(createSourceTupleBuffer(), implicitBuffer == ImplicitBuffer.ON_MARK);
-                }
-                this.iterator = bi;
+            	iterator = buffer.createIndexedTupleSource(implicitBuffer == ImplicitBuffer.NONE);
             }
         }
         return this.iterator;
     }
+    
+    /**
+     * Create a batch iterator to perform basic prefetching
+     * @throws TeiidComponentException
+     */
+    private void createPrefetch() throws TeiidComponentException {
+    	this.prefetch = new BatchIterator(this.source);
+    	boolean useMark = implicitBuffer != ImplicitBuffer.FULL;
+    	this.buffer = createSourceTupleBuffer();
+    	this.prefetch.setBuffer(this.buffer, useMark);
+    	if (useMark) {
+    		this.prefetch.mark();
+    	}
+    }
+    
+    /**
+     * Pro-actively pull batches for later use.
+     * There are unfortunately quite a few cases to cover here.
+     */
+    protected void prefetch(boolean limit) throws TeiidComponentException, TeiidProcessingException {
+    	if (this.prefetch == null) {
+    		if (this.buffer != null) {
+    			return;
+    		}
+    		if (this.sortUtility != null) {
+    			sortUtility.sort();
+    			return;
+    		}
+    		if (this.source.hasFinalBuffer()) {
+    			this.buffer = this.source.getFinalBuffer();
+    			return;
+    		}
+    		createPrefetch();
+    	}
+    	if (limit && this.buffer.getManagedRowCount() >= this.source.getBatchSize() * this.source.getContext().getOptions().getJoinPrefetchBatches()) {
+    		return;
+    	}
+    	int curIndex = this.prefetch.getCurrentIndex();
+    	boolean marked = false;
+    	if (this.prefetch.ensureSave()) {
+    		marked = true;
+    	}
+    	this.prefetch.setPosition(this.buffer.getRowCount() + 1);
+    	BatchIterator bi = this.prefetch; //even if we clear the prefetch, we may already be using it as the iterator
+     	try {
+ 	    	if (!this.prefetch.hasNext()) {
+ 	    		this.prefetch = null;
+ 	    		if (this.iterator != bi) {
+ 	    			bi = null;
+ 	    		}
+ 	    	}
+     	} finally {
+     		if (bi != null) {
+ 	    		if (marked) {
+ 	    			bi.reset();
+ 	    		} else {
+ 	    			bi.setPosition(curIndex);
+ 	    		}
+     		}
+     	}
+    }    
 
     public List<Object> getOuterVals() {
         return this.outerVals;
@@ -164,14 +239,26 @@
     }
 
     public TupleBuffer getTupleBuffer() throws TeiidComponentException, TeiidProcessingException {
+    	return getTupleBuffer(true);
+    }
+    
+    private TupleBuffer getTupleBuffer(boolean full) throws TeiidComponentException, TeiidProcessingException {
         if (this.buffer == null) {
         	if (this.iterator instanceof BatchIterator) {
         		throw new AssertionError("cannot buffer the source"); //$NON-NLS-1$
         	}
-        	if (collector == null) {
-                collector = new BatchCollector(source, source.getBufferManager(), source.getContext(), false);
-            }
-            this.buffer = collector.collectTuples();
+        	if (source.hasFinalBuffer()) {
+        		this.buffer = source.getFinalBuffer();
+        		return this.buffer;
+        	}
+        	this.implicitBuffer = ImplicitBuffer.FULL;
+        	createPrefetch();
+        } 
+        if (full && this.prefetch != null) {
+        	while (this.prefetch.hasNext()) {
+        		this.prefetch.setPosition(this.prefetch.getCurrentIndex() + this.source.getBatchSize());
+        	}
+        	this.prefetch = null; //fully buffered
         }
         return this.buffer;
     }
@@ -192,7 +279,13 @@
     		TupleSource ts = null;
     		if (this.buffer != null) {
     			this.buffer.setForwardOnly(true);
-    			ts = this.buffer.createIndexedTupleSource();
+    			if (this.prefetch != null) {
+    				this.prefetch.setPosition(1);
+    				this.prefetch.disableSave();
+    				ts = this.prefetch;
+    			} else {
+    				ts = this.buffer.createIndexedTupleSource();
+    			}
     		} else {
     			ts = new BatchIterator(this.source);
     		}
@@ -208,12 +301,17 @@
     		nextBuffer();
     		return;
     	} 
-		this.buffer = sortUtility.sort();
+    	TupleBuffer sorted = sortUtility.sort();
+    	if (this.buffer != null) {
+    		this.buffer.remove();
+    	}
+    	this.prefetch = null;
+    	this.buffer = sorted;
         this.markDistinct(sortUtility.isDistinct());
     }
     
     public boolean hasBuffer() {
-    	return this.buffer != null;
+    	return this.buffer != null && this.prefetch == null;
     }
     
     public boolean nextBuffer() {
@@ -223,6 +321,7 @@
     	}
     	this.buffer = this.buffers.remove(this.buffers.size() - 1);
     	this.buffer.setForwardOnly(false);
+    	this.prefetch = null;
     	this.resetState();
     	return true;
     }

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/util/Options.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/util/Options.java	2013-01-17 18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/util/Options.java	2013-01-30 17:30:23 UTC (rev 4546)
@@ -31,10 +31,12 @@
 
 	public static final String UNNEST_DEFAULT = "org.teiid.subqueryUnnestDefault"; //$NON-NLS-1$
 	public static final String PUSHDOWN_DEFAULT_NULL_ORDER = "org.teiid.pushdownDefaultNullOrder"; //$NON-NLS-1$ 
+	public static final String JOIN_PREFETCH_BATCHES = "org.teiid.joinPrefetchBatches"; //$NON-NLS-1$
 
 	private Properties properties;
 	private boolean subqueryUnnestDefault;
 	private boolean pushdownDefaultNullOrder = true;
+	private int joinPrefetchBatches = 10;
 	
 	public Properties getProperties() {
 		return properties;
@@ -69,5 +71,17 @@
 		this.pushdownDefaultNullOrder = p;
 		return this;
 	}
-
+	
+	public void setJoinPrefetchBatches(int joinPrefetchBatches) {
+		this.joinPrefetchBatches = joinPrefetchBatches;
+	}
+	
+	public int getJoinPrefetchBatches() {
+		return joinPrefetchBatches;
+	}
+	
+	public Options joinPrefetchBatches(int i) {
+		this.joinPrefetchBatches = i;
+		return this;
+	}
 }

Modified: branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2013-01-17 18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2013-01-30 17:30:23 UTC (rev 4546)
@@ -332,13 +332,18 @@
         assertEquals(ThreadState.IDLE, item.getThreadState());
         assertTrue(item.resultsBuffer.getManagedRowCount() <= rowsPerBatch*23);
         //pull the rest of the results
-        for (int j = 0; j < 48; j++) {
-            item = core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
-
-	        message = core.processCursorRequest(reqMsg.getExecutionId(), (j + 2) * rowsPerBatch + 1, rowsPerBatch);
-	        rm = message.get(5000, TimeUnit.MILLISECONDS);
-	        assertNull(rm.getException());
-	        assertEquals(rowsPerBatch, rm.getResultsList().size());
+        int start = 17;
+        while (true) {
+        	item = core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
+        	 
+        	message = core.processCursorRequest(reqMsg.getExecutionId(), start, rowsPerBatch);
+        	rm = message.get(5000, TimeUnit.MILLISECONDS);
+        	assertNull(rm.getException());
+        	assertTrue(rowsPerBatch >= rm.getResultsList().size());
+        	start += rm.getResultsList().size();
+        	if (rm.getFinalRow() == rm.getLastRow()) {
+        		break;
+        	}
         }
     }
     

Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java	2013-01-17 18:36:23 UTC (rev 4545)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java	2013-01-30 17:30:23 UTC (rev 4546)
@@ -29,9 +29,9 @@
 
 import org.junit.Test;
 import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.common.buffer.BufferManagerFactory;
 import org.teiid.common.buffer.TupleBuffer;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.query.processor.relational.FakeRelationalNode;
 import org.teiid.query.sql.symbol.ElementSymbol;
 
@@ -107,4 +107,28 @@
 		assertNull(bi.nextTuple());
 	}
 	
+	@Test public void testDisableSave() throws Exception {
+		BatchIterator bi = new BatchIterator(new FakeRelationalNode(1, new List[] {
+				Arrays.asList(1),
+				Arrays.asList(1),
+				Arrays.asList(1),
+				Arrays.asList(1),
+				Arrays.asList(1),
+				Arrays.asList(1),
+		}, 2));
+		BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
+		TupleBuffer tb = bm.createTupleBuffer(Arrays.asList(new ElementSymbol("x")), "test", TupleSourceType.PROCESSOR);
+		bi.setBuffer(tb, false);  //$NON-NLS-1$
+		bi.setPosition(2);
+		assertTrue(bi.hasNext());
+		tb.setForwardOnly(true);
+		bi.setPosition(1);
+		bi.disableSave();
+		for (int i = 0; i < 6; i++) {
+			assertNotNull(bi.nextTuple());
+		}
+		assertNull(bi.nextTuple());
+		assertEquals(0, tb.getManagedRowCount());
+	}
+	
 }



More information about the teiid-commits mailing list