[teiid-commits] teiid SVN: r4606 - in branches/7.7.x/engine/src: test/java/org/teiid/query/processor and 1 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Oct 22 10:52:22 EDT 2013


Author: jolee
Date: 2013-10-22 10:52:22 -0400 (Tue, 22 Oct 2013)
New Revision: 4606

Modified:
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
   branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestTextTable.java
   branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
Log:
TEIID-2363:  (modified) backout of prefetch logic introducing Assertion error

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java	2013-10-22 14:52:13 UTC (rev 4605)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java	2013-10-22 14:52:22 UTC (rev 4606)
@@ -40,6 +40,7 @@
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
 import org.teiid.logging.MessageLevel;
+import org.teiid.query.optimizer.relational.rules.NewCalculateCostUtil;
 import org.teiid.query.processor.relational.SourceState.ImplicitBuffer;
 import org.teiid.query.sql.lang.JoinType;
 import org.teiid.query.sql.lang.OrderBy;
@@ -235,7 +236,9 @@
     }
     
     private boolean shouldIndexIfSmall(SourceState source) throws TeiidComponentException, TeiidProcessingException {
-    	return source.rowCountLE(source.getSource().getBatchSize() / 2);
+    	Number cardinality = source.getSource().getEstimateNodeCardinality();
+    	return (source.hasBuffer() || (cardinality != null && cardinality.floatValue() != NewCalculateCostUtil.UNKNOWN_VALUE && cardinality.floatValue() <= source.getSource().getBatchSize() / 4)) 
+    	&& (source.getRowCount() <= source.getSource().getBatchSize() / 2);
     }
     
     @Override
@@ -304,26 +307,7 @@
     }
     
     private boolean shouldIndex(SourceState possibleIndex, SourceState other) throws TeiidComponentException, TeiidProcessingException {
-    	long size = joinNode.getBatchSize();
-    	int indexSize = possibleIndex.hasBuffer()?possibleIndex.getRowCount():-1;
-    	int otherSize = other.hasBuffer()?other.getRowCount():-1;
-    	//determine sizes in an incremental fashion as to avoid a full buffer of the unsorted side
-    	while (size < Integer.MAX_VALUE && (indexSize == -1 || otherSize == -1)) {
-    		if (indexSize == -1 && (possibleIndex.rowCountLE((int)size) || possibleIndex.hasBuffer())) {
-    			indexSize = possibleIndex.getRowCount();
-    		}
-    		if (otherSize == -1 && (other.rowCountLE((int)size) || other.hasBuffer())) {
-    			otherSize = other.getRowCount();
-    		}
-    		if (indexSize == -1 && otherSize != -1 && size * 4 > otherSize) {
-    			return false;
-    		}
-    		if (indexSize != -1 && otherSize == -1 && indexSize * 4 <= size) {
-    			break;
-    		}
-    		size *=2;
-    	}
-		if ((size > Integer.MAX_VALUE && (indexSize == -1 || otherSize == -1)) || (indexSize != -1 && otherSize != -1 && indexSize * 4 > otherSize)) {
+    	if (possibleIndex.getRowCount() * 4 > other.getRowCount()) {
     		return false; //index is too large
     	}
     	int schemaSize = this.joinNode.getBufferManager().getSchemaSize(other.getSource().getOutputElements());

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java	2013-10-22 14:52:13 UTC (rev 4605)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java	2013-10-22 14:52:22 UTC (rev 4606)
@@ -197,7 +197,6 @@
     		if (!isDependent()) {
     			this.joinStrategy.openRight();
                 this.joinStrategy.loadRight();
-                this.joinStrategy.rightSource.prefetch(true);
     		}
     		throw e;
     	}
@@ -211,16 +210,6 @@
         	this.terminateBatches();
         } catch (BatchAvailableException e) {
         	//pull the batch
-        } catch (BlockedException e) {
-        	//TODO: this leads to duplicate exceptions, we 
-        	//could track which side is blocking
-        	try {
-        		this.joinStrategy.leftSource.prefetch(true);
-        	} catch (BlockedException e1) {
-        		
-        	}
-        	this.joinStrategy.rightSource.prefetch(true);
-        	throw e;
         }
         return pullBatch();
     }

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java	2013-10-22 14:52:13 UTC (rev 4605)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java	2013-10-22 14:52:22 UTC (rev 4606)
@@ -31,6 +31,7 @@
 import org.teiid.common.buffer.TupleSource;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
+import org.teiid.query.processor.BatchCollector;
 import org.teiid.query.processor.BatchIterator;
 import org.teiid.query.processor.relational.MergeJoinStrategy.SortOption;
 import org.teiid.query.processor.relational.SortUtility.Mode;
@@ -45,6 +46,7 @@
 	
     private RelationalNode source;
     private List expressions;
+    private BatchCollector collector;
     private TupleBuffer buffer;
     private List<TupleBuffer> buffers;
     private List<Object> outerVals;
@@ -55,7 +57,6 @@
     private boolean distinct;
     private ImplicitBuffer implicitBuffer = ImplicitBuffer.FULL;
     boolean open;
-    private BatchIterator prefetch;
     
     private SortUtility sortUtility;
     
@@ -119,107 +120,28 @@
 			this.iterator.closeSource();
         	this.iterator = null;
         }
-        this.prefetch = null;
         this.currentTuple = null;
 	}
 
     public int getRowCount() throws TeiidComponentException, TeiidProcessingException {
     	return this.getTupleBuffer().getRowCount();
     }
-    
-    /**
-     * Uses the prefetch logic to determine an incremental row count
-     */
-    public boolean rowCountLE(int count) throws TeiidComponentException, TeiidProcessingException {
-       	if (buffer == null) {
-       		prefetch(false);
-       	}
-       	while (buffer.getRowCount() <= count) {
-       		if (prefetch == null) {
-       			return true;
-       		}
-       		prefetch(false);
-       	}
-       	return false;
-    }
 
-    IndexedTupleSource getIterator() throws TeiidComponentException, TeiidProcessingException {
+    IndexedTupleSource getIterator() throws TeiidComponentException {
         if (this.iterator == null) {
-        	if (this.buffer == null) {
-        		getTupleBuffer(false);
-        	}
-        	if (this.prefetch != null) {
-        		this.iterator = this.prefetch;
+            if (this.buffer != null) {
+                iterator = buffer.createIndexedTupleSource();
             } else {
-            	iterator = buffer.createIndexedTupleSource(implicitBuffer == ImplicitBuffer.NONE);
+                // return a TupleBatch tuplesource iterator
+                BatchIterator bi = new BatchIterator(this.source);
+                if (implicitBuffer != ImplicitBuffer.NONE) {
+                	bi.setBuffer(createSourceTupleBuffer(), implicitBuffer == ImplicitBuffer.ON_MARK);
+                }
+                this.iterator = bi;
             }
         }
         return this.iterator;
     }
-    
-    /**
-     * Create a batch iterator to perform basic prefetching
-     * @throws TeiidComponentException
-     */
-    private void createPrefetch() throws TeiidComponentException {
-    	this.prefetch = new BatchIterator(this.source);
-    	boolean useMark = implicitBuffer != ImplicitBuffer.FULL;
-    	this.buffer = createSourceTupleBuffer();
-    	this.prefetch.setBuffer(this.buffer, useMark);
-    	if (useMark) {
-    		this.prefetch.mark();
-    	}
-    }
-    
-    /**
-     * Pro-actively pull batches for later use.
-     * There are unfortunately quite a few cases to cover here.
-     */
-    protected void prefetch(boolean limit) throws TeiidComponentException, TeiidProcessingException {
-        if (!open) {
-        	return;
-        }
-    	if (this.prefetch == null) {
-    		if (this.buffer != null) {
-    			return;
-    		}
-    		if (this.sortUtility != null) {
-    			sortUtility.sort();
-    			return;
-    		}
-    		if (this.source.hasFinalBuffer()) {
-    			this.buffer = this.source.getFinalBuffer();
-    			return;
-    		}
-    		createPrefetch();
-    	}
-    	if (limit && this.buffer.getManagedRowCount() >= this.source.getBatchSize() * this.source.getContext().getOptions().getJoinPrefetchBatches()) {
-    		return;
-    	}
-    	int curIndex = this.prefetch.getCurrentIndex();
-    	boolean marked = false;
-    	if (this.prefetch.ensureSave()) {
-    		marked = true;
-    	}
-    	this.prefetch.setPosition(this.buffer.getRowCount() + 1);
-    	BatchIterator bi = this.prefetch; //even if we clear the prefetch, we may already be using it as the iterator
-     	try {
- 	    	if (!this.prefetch.hasNext()) {
- 	    		this.prefetch = null;
- 	    		if (this.iterator != bi) {
- 	    			bi = null;
- 	    		}
- 	    	}
-     	} finally {
-     		if (bi != null) {
- 	    		if (marked) {
- 	    			bi.reset();
- 	    		} else {
- 	    			bi.setPosition(curIndex);
- 	    		}
-     		}
-     	}
-    }    
 
     public List<Object> getOuterVals() {
         return this.outerVals;
@@ -242,26 +164,14 @@
     }
 
     public TupleBuffer getTupleBuffer() throws TeiidComponentException, TeiidProcessingException {
-    	return getTupleBuffer(true);
-    }
-    
-    private TupleBuffer getTupleBuffer(boolean full) throws TeiidComponentException, TeiidProcessingException {
         if (this.buffer == null) {
         	if (this.iterator instanceof BatchIterator) {
         		throw new AssertionError("cannot buffer the source"); //$NON-NLS-1$
         	}
-        	if (source.hasFinalBuffer()) {
-        		this.buffer = source.getFinalBuffer();
-        		return this.buffer;
-        	}
-        	this.implicitBuffer = ImplicitBuffer.FULL;
-        	createPrefetch();
-        } 
-        if (full && this.prefetch != null) {
-        	while (this.prefetch.hasNext()) {
-        		this.prefetch.setPosition(this.prefetch.getCurrentIndex() + this.source.getBatchSize());
-        	}
-        	this.prefetch = null; //fully buffered
+        	if (collector == null) {
+                collector = new BatchCollector(source, source.getBufferManager(), source.getContext(), false);
+            }
+            this.buffer = collector.collectTuples();
         }
         return this.buffer;
     }
@@ -282,13 +192,7 @@
     		TupleSource ts = null;
     		if (this.buffer != null) {
     			this.buffer.setForwardOnly(true);
-    			if (this.prefetch != null) {
-    				this.prefetch.setPosition(1);
-    				this.prefetch.disableSave();
-    				ts = this.prefetch;
-    			} else {
-    				ts = this.buffer.createIndexedTupleSource();
-    			}
+    			ts = this.buffer.createIndexedTupleSource();
     		} else {
     			ts = new BatchIterator(this.source);
     		}
@@ -309,13 +213,12 @@
  	if (this.buffer != null && this.buffer != sorted) {
     		this.buffer.remove();
     	}
-    	this.prefetch = null;
     	this.buffer = sorted;
         this.markDistinct(sortUtility.isDistinct());
     }
     
     public boolean hasBuffer() {
-    	return this.buffer != null && this.prefetch == null;
+    	return this.buffer != null;
     }
     
     public boolean nextBuffer() {
@@ -325,7 +228,6 @@
     	}
     	this.buffer = this.buffers.remove(this.buffers.size() - 1);
     	this.buffer.setForwardOnly(false);
-    	this.prefetch = null;
     	this.resetState();
     	return true;
     }

Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestTextTable.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestTextTable.java	2013-10-22 14:52:13 UTC (rev 4605)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestTextTable.java	2013-10-22 14:52:22 UTC (rev 4606)
@@ -285,18 +285,18 @@
 	
 	@Test public void testTextTableJoin() throws Exception {
 		String sql = "select z.* from (select x.* from (select * from pm1.g1 where e1 = 'c') y, texttable(e1 || '\n' || e2 || '\n' || e3 COLUMNS x string) x) as z, " +
-				"(select x.* from (select * from pm1.g1 where e1 = 'c') y, texttable(e1 || '\n' || e2 || '\n' || e3 COLUMNS x string) x) as z1 where z.x = z1.x";
+				"(select x.* from (select * from pm1.g1 where e1 = 'c') y, texttable(e1 || '\n' || e2 || '\n' || e3 COLUMNS x string) x) as z1 where z.x = z1.x order by z.x";
     	
         List[] expected = new List[] {
+        		Arrays.asList("1"),
         		Arrays.asList("c"),
-        		Arrays.asList("1"),
         		Arrays.asList("true"),
         };    
 
         FakeDataManager dataManager = new FakeDataManager();
         sampleData1(dataManager);
         RelationalPlan plan = (RelationalPlan)helpGetPlan(helpParse(sql), RealMetadataFactory.example1Cached());
-        JoinNode join = (JoinNode) plan.getRootNode().getChildren()[0];
+        JoinNode join = (JoinNode) plan.getRootNode().getChildren()[0].getChildren()[0];
         assertTrue(!(join.getJoinStrategy() instanceof NestedTableJoinStrategy));
         helpProcess(plan, createCommandContext(), dataManager, expected);
     } 

Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java	2013-10-22 14:52:13 UTC (rev 4605)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java	2013-10-22 14:52:22 UTC (rev 4606)
@@ -792,52 +792,6 @@
         helpTestJoinDirect(expected, 40, 1);
     }
     
-    @Test public void testMergeJoinPrefetchAlreadySorted() throws Exception {
-        this.joinType = JoinType.JOIN_INNER;
-        int rows = 50;
-        List[] data = new List[rows];
-        for(int i=0; i<rows; i++) { 
-            data[i] = new ArrayList();
-            Integer value = new Integer((i*17) % 47);
-            data[i].add(value);
-        }
-        this.leftTuples = data;
-        this.rightTuples = new List[] {
-            Arrays.asList(1),  
-            Arrays.asList(2),
-            Arrays.asList(4),
-            Arrays.asList(6),
-            Arrays.asList(7),
-            Arrays.asList(8),
-        };
-        expected = new List[] {
-           Arrays.asList(new Object[] { 1, 1 }),
-           Arrays.asList(new Object[] { 2, 2 }),
-    	   Arrays.asList(new Object[] { 4, 4 }),
-           Arrays.asList(new Object[] { 6, 6 }),
-           Arrays.asList(new Object[] { 7, 7 }),
-           Arrays.asList(new Object[] { 8, 8 }),
-        };
-        helpCreateJoin();               
-        this.joinStrategy = new MergeJoinStrategy(SortOption.SORT, SortOption.ALREADY_SORTED, false);
-        FakeRelationalNode newNode = new FakeRelationalNode(2, rightTuples) {
-        	@Override
-        	public TupleBatch nextBatchDirect() throws BlockedException,
-        			TeiidComponentException, TeiidProcessingException {
-        		TupleBatch tb = super.nextBatchDirect();
-        		if (tb.getTerminationFlag()) {
-        			assertFalse(leftNode.isClosed());
-        		}
-        		return tb;
-        	}
-        };
-        newNode.setElements(rightNode.getElements());
-        rightNode = newNode;
-        
-        this.join.setJoinStrategy(joinStrategy);
-        helpTestJoinDirect(expected, 5, 1);
-    }
-
     @Test public void testRepeatedMerge() throws Exception {
     	helpTestRepeatedMerge(false);
     }



More information about the teiid-commits mailing list