[teiid-commits] teiid SVN: r2317 - in branches/7.0.x/engine/src: main/java/org/teiid/query/processor and 1 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Jun 29 08:20:44 EDT 2010


Author: shawkins
Date: 2010-06-29 08:20:43 -0400 (Tue, 29 Jun 2010)
New Revision: 2317

Added:
   branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java
Modified:
   branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
   branches/7.0.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
   branches/7.0.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
   branches/7.0.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
Log:
TEIID-1136 fix for full outer join error.  The batch iterator auto buffering introduced a regression

Added: branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java
===================================================================
--- branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java	                        (rev 0)
+++ branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java	2010-06-29 12:20:43 UTC (rev 2317)
@@ -0,0 +1,112 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package org.teiid.common.buffer;
+
+import java.util.List;
+
+import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
+
+public abstract class AbstractTupleSource implements IndexedTupleSource {
+    private int currentRow = 1;
+    private int mark = 1;
+	private List<?> currentTuple;
+	protected TupleBatch batch;
+
+    @Override
+    public int getCurrentIndex() {
+    	return this.currentRow;
+    }
+
+    @Override
+    public List<?> nextTuple()
+    throws TeiidComponentException, TeiidProcessingException{
+    	List<?> result = null;
+    	if (currentTuple != null){
+			result = currentTuple;
+			currentTuple = null;
+    	} else {
+    		result = getCurrentTuple();
+    	} 
+    	if (result != null) {
+    		currentRow++;
+    	}
+        return result;
+    }
+
+	protected List<?> getCurrentTuple() throws TeiidComponentException,
+			BlockedException, TeiidProcessingException {
+		if (available() > 0) {
+			//if (forwardOnly) {
+				if (batch == null || !batch.containsRow(currentRow)) {
+					batch = getBatch(currentRow);
+				}
+				return batch.getTuple(currentRow);
+			//} 
+			//TODO: determine if we should directly hold a soft reference here
+			//return getRow(currentRow);
+		}
+		batch = null;
+		return finalRow();
+	}
+
+	protected abstract List<?> finalRow() throws BlockedException, TeiidComponentException, TeiidProcessingException;
+	
+	protected abstract TupleBatch getBatch(int row) throws TeiidComponentException, TeiidProcessingException;
+	
+    @Override
+    public void closeSource() {
+    	batch = null;
+        mark = 1;
+        reset();
+    }
+    
+    @Override
+	public boolean hasNext() throws TeiidComponentException, TeiidProcessingException {
+        if (this.currentTuple != null) {
+            return true;
+        }
+        
+        this.currentTuple = getCurrentTuple();
+		return this.currentTuple != null;
+	}
+
+	@Override
+	public void reset() {
+		this.setPosition(mark);
+		this.mark = 1;
+	}
+
+    @Override
+    public void mark() {
+        this.mark = currentRow;
+    }
+
+    @Override
+    public void setPosition(int position) {
+        if (this.currentRow != position) {
+	        this.currentRow = position;
+	        this.currentTuple = null;
+        }
+    }
+    
+}
\ No newline at end of file


Property changes on: branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Modified: branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2010-06-28 19:12:13 UTC (rev 2316)
+++ branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2010-06-29 12:20:43 UTC (rev 2317)
@@ -44,100 +44,7 @@
 
 public class TupleBuffer {
 	
-	class TupleSourceImpl implements IndexedTupleSource {
-	    private int currentRow = 1;
-	    private int mark = 1;
-		private List<?> currentTuple;
-		private TupleBatch batch;
-
-	    @Override
-	    public int getCurrentIndex() {
-	    	return this.currentRow;
-	    }
-
-	    @Override
-	    public List getSchema(){
-	        return schema;
-	    }
-
-	    @Override
-	    public List<?> nextTuple()
-	    throws TeiidComponentException{
-	    	List<?> result = null;
-	    	if (currentTuple != null){
-				result = currentTuple;
-				currentTuple = null;
-	    	} else {
-	    		result = getCurrentTuple();
-	    	} 
-	    	if (result != null) {
-	    		currentRow++;
-	    	}
-	        return result;
-	    }
-
-		private List<?> getCurrentTuple() throws TeiidComponentException,
-				BlockedException {
-			if (currentRow <= rowCount) {
-				//if (forwardOnly) {
-					if (batch == null || !batch.containsRow(currentRow)) {
-						batch = getBatch(currentRow);
-					}
-					return batch.getTuple(currentRow);
-				//} 
-				//TODO: determine if we should directly hold a soft reference here
-				//return getRow(currentRow);
-			}
-			batch = null;
-			if(isFinal) {
-	            return null;
-	        } 
-	        throw BlockedException.INSTANCE;
-		}
-
-	    @Override
-	    public void closeSource() {
-	    	batch = null;
-	        mark = 1;
-	        reset();
-	    }
-	    
-	    @Override
-		public boolean hasNext() throws TeiidComponentException {
-	        if (this.currentTuple != null) {
-	            return true;
-	        }
-	        
-	        this.currentTuple = getCurrentTuple();
-			return this.currentTuple != null;
-		}
-
-		@Override
-		public void reset() {
-			this.setPosition(mark);
-			this.mark = 1;
-		}
-
-	    @Override
-	    public void mark() {
-	        this.mark = currentRow;
-	    }
-
-	    @Override
-	    public void setPosition(int position) {
-	        if (this.currentRow != position) {
-		        this.currentRow = position;
-		        this.currentTuple = null;
-	        }
-	    }
-	    
-	    @Override
-	    public int available() {
-	    	return rowCount - currentRow + 1;
-	    }
-	}
-	
-    /**
+	/**
      * Gets the data type names for each of the input expressions, in order.
      * @param expressions List of Expressions
      * @return
@@ -407,7 +314,31 @@
 	 * @return
 	 */
 	public IndexedTupleSource createIndexedTupleSource() {
-		return new TupleSourceImpl();
+		return new AbstractTupleSource() {
+			
+			@Override
+			public List<? extends Expression> getSchema() {
+				return (List<? extends Expression>) schema;
+			}
+			
+			@Override
+			protected List<?> finalRow() throws BlockedException {
+				if(isFinal) {
+		            return null;
+		        } 
+		        throw BlockedException.INSTANCE;
+			}
+			
+			@Override
+			public int available() {
+				return rowCount - getCurrentIndex() + 1;
+			}
+			
+			@Override
+			protected TupleBatch getBatch(int row) throws TeiidComponentException {
+				return TupleBuffer.this.getBatch(row);
+			}
+		};
 	}
 	
 	@Override

Modified: branches/7.0.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
===================================================================
--- branches/7.0.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java	2010-06-28 19:12:13 UTC (rev 2316)
+++ branches/7.0.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java	2010-06-29 12:20:43 UTC (rev 2317)
@@ -24,13 +24,14 @@
 
 import java.util.List;
 
-import org.teiid.common.buffer.IndexedTupleSource;
+import org.teiid.common.buffer.AbstractTupleSource;
+import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.TupleBatch;
 import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.query.processor.BatchCollector.BatchProducer;
-import org.teiid.query.sql.symbol.SingleElementSymbol;
+import org.teiid.query.sql.symbol.Expression;
 
 
 /**
@@ -40,68 +41,69 @@
  * 
  * Note that the saveOnMark buffering only lasts until the next mark is set.
  */
-public class BatchIterator implements IndexedTupleSource {
+public class BatchIterator extends AbstractTupleSource {
 
     private final BatchProducer source;
     private boolean saveOnMark;
     private TupleBuffer buffer;
-    private IndexedTupleSource bufferedTs;
+    private boolean done;
+    private boolean mark;
 
     public BatchIterator(BatchProducer source) {
         this.source = source;
     }
-
-    private boolean done;
-    private int currentRow = 1;
-    private TupleBatch currentBatch;
-    private List currentTuple;
-    private int bufferedIndex;
-    private boolean mark;
     
-    @Override
-    public boolean hasNext() throws TeiidComponentException,
-                            TeiidProcessingException {
-    	
-    	if (done && this.bufferedTs == null) {
-            return false;
-        }
-        while (currentTuple == null) {
-            if (currentBatch == null) {
-            	if (this.bufferedTs != null) {
-            		if (this.currentRow <= this.bufferedIndex) {
-	            		this.bufferedTs.setPosition(currentRow++);
-	            		this.currentTuple = this.bufferedTs.nextTuple();
-	            		return true;
-            		}
-            		if (done) {
-            			return false;
-            		}
-            	} 
-                currentBatch = this.source.nextBatch();
-                if (buffer != null && !saveOnMark) {
-                	buffer.addTupleBatch(currentBatch, true);
-                	bufferedIndex = currentBatch.getEndRow();
-                }
+	@Override
+	protected TupleBatch getBatch(int row) throws TeiidComponentException, TeiidProcessingException {
+		throw new UnsupportedOperationException();
+	}
+	
+	@Override
+	protected List<?> finalRow() throws TeiidComponentException, TeiidProcessingException {
+		if (this.buffer != null && this.getCurrentIndex() <= this.buffer.getRowCount()) {
+			batch = this.buffer.getBatch(this.getCurrentIndex());
+    	}
+		while (available() < 1) {
+			if (done) {
+				return null;
+			}
+			batch = source.nextBatch();
+			done = batch.getTerminationFlag();
+			if (buffer != null && !saveOnMark) {
+            	buffer.addTupleBatch(batch, true);
             }
-
-            if (currentBatch.containsRow(currentRow)) {
-                this.currentTuple = currentBatch.getTuple(currentRow++);
-            } else {
-                done = currentBatch.getTerminationFlag();
-                currentBatch = null;
-                if (done) {
-                    return false;
-                }
-            }
+		}
+		return getCurrentTuple();
+	}
+	
+	@Override
+	protected List<?> getCurrentTuple() throws TeiidComponentException,
+			BlockedException, TeiidProcessingException {
+		List<?> tuple = super.getCurrentTuple();
+		if (mark && saveOnMark && this.getCurrentIndex() > this.buffer.getRowCount()) {
+        	this.buffer.setRowCount(this.getCurrentIndex() - 1);
+        	this.buffer.addTuple(tuple);
         }
-        return true;
-    }
-    
+		return tuple;
+	}
+	
+	@Override
+	public int available() {
+		if (batch != null && batch.containsRow(getCurrentIndex())) {
+			return batch.getEndRow() - getCurrentIndex() + 1;
+		}
+		return 0;
+	}
+
+	@Override
+	public List<? extends Expression> getSchema() {
+		return source.getOutputElements();
+	}
+	
     public void setBuffer(TupleBuffer buffer, boolean saveOnMark) {
 		this.buffer = buffer;
-		this.bufferedTs = this.buffer.createIndexedTupleSource();
 		this.saveOnMark = saveOnMark;
-	}
+	}   
     
     @Override
     public void closeSource() {
@@ -112,80 +114,29 @@
     }
     
     @Override
-    public List<SingleElementSymbol> getSchema() {
-    	return source.getOutputElements();
-    }
-    
-    @Override
-    public List<?> nextTuple() throws TeiidComponentException,
-    		TeiidProcessingException {
-        if (currentTuple == null && !hasNext()) {
-            return null;
-        }
-        List result = currentTuple;
-        currentTuple = null;
-        if (mark && saveOnMark && this.currentRow - 1 > this.buffer.getRowCount()) {
-        	this.buffer.setRowCount(this.currentRow - 2);
-        	this.buffer.addTuple(result);
-        	this.bufferedIndex = this.currentRow - 1;
-        }
-        return result;
-    }
-
     public void reset() {
-    	if (this.bufferedTs != null) {
+    	super.reset();
+    	if (this.buffer != null) {
     		mark = false;
-    		this.bufferedTs.reset();
-    		if (this.currentRow != this.bufferedTs.getCurrentIndex()) {
-    			this.currentRow = this.bufferedTs.getCurrentIndex();
-    			this.currentTuple = null;
-    		}
     		return;
     	}
-        throw new UnsupportedOperationException();
     }
 
+    @Override
     public void mark() {
-    	if (this.bufferedTs != null) {
-    		this.bufferedTs.mark();
-    		if (saveOnMark && this.currentRow > this.bufferedIndex) {
-    			this.buffer.purge();
-    			this.bufferedIndex = 0;
-    		}
+    	super.mark();
+    	if (this.buffer != null && saveOnMark && this.getCurrentIndex() > this.buffer.getRowCount()) {
+			this.buffer.purge();
     	}
     	mark = true;
     }
-
+    
     @Override
-    public int getCurrentIndex() {
-        return currentRow;
-    }
-
     public void setPosition(int position) {
-    	if (this.bufferedTs != null) {
-    		this.bufferedTs.setPosition(position);
-    		this.currentRow = position;
-    	}
-    	if (this.currentBatch == null && position < this.currentRow) {
+    	super.setPosition(position);
+    	if (this.buffer == null && position < getCurrentIndex() && position < (this.batch != null ? batch.getBeginRow() : Integer.MAX_VALUE)) {
 			throw new UnsupportedOperationException("Backwards positioning is not allowed"); //$NON-NLS-1$
     	}
-    	this.currentRow = position;
-        this.currentTuple = null;
-    	if (this.currentBatch == null || !this.currentBatch.containsRow(position)) {
-        	this.currentBatch = null;
-    	}
     }
     
-    @Override
-    public int available() {
-    	if (this.currentRow <= this.bufferedIndex) {
-    		this.bufferedTs.setPosition(this.currentRow);
-    		return this.bufferedTs.available();
-    	}
-    	if (currentBatch != null) {
-    		return currentBatch.getEndRow() - currentRow + 1;
-    	}
-    	return 0;
-    }
-    
 }

Modified: branches/7.0.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
===================================================================
--- branches/7.0.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java	2010-06-28 19:12:13 UTC (rev 2316)
+++ branches/7.0.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java	2010-06-29 12:20:43 UTC (rev 2317)
@@ -22,12 +22,15 @@
 
 package org.teiid.query.processor;
 
+import static org.junit.Assert.*;
+
 import java.util.Arrays;
 import java.util.List;
 
 import org.junit.Test;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.BufferManagerFactory;
+import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.query.processor.BatchIterator;
 import org.teiid.query.processor.relational.FakeRelationalNode;
@@ -52,4 +55,21 @@
 		bi.nextTuple();
 	}
 	
+	@Test public void testReset1() throws Exception {
+		BatchIterator bi = new BatchIterator(new FakeRelationalNode(1, new List[] {
+			Arrays.asList(1),
+			Arrays.asList(2),
+			Arrays.asList(3)
+		}, 2));
+		BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
+		TupleBuffer tb = bm.createTupleBuffer(Arrays.asList(new ElementSymbol("x")), "test", TupleSourceType.PROCESSOR);
+		bi.setBuffer(tb, true);  //$NON-NLS-1$
+		bi.nextTuple();
+		bi.mark();
+		bi.nextTuple();
+		bi.reset();
+		assertEquals(2, bi.getCurrentIndex());
+		assertEquals(2, bi.nextTuple().get(0));
+	}
+	
 }

Modified: branches/7.0.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
--- branches/7.0.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java	2010-06-28 19:12:13 UTC (rev 2316)
+++ branches/7.0.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java	2010-06-29 12:20:43 UTC (rev 2317)
@@ -93,7 +93,7 @@
 import org.teiid.query.validator.Validator;
 import org.teiid.query.validator.ValidatorReport;
 
- at SuppressWarnings("nls")
+ at SuppressWarnings({"nls", "unchecked"})
 public class TestProcessor {
 
 	// ################################## TEST HELPERS ################################
@@ -1343,6 +1343,34 @@
         helpProcess(plan, dataManager, expected);
     }   
     
+    @Test public void testSortedFullOuterJoin() throws Exception { 
+        // Create query 
+        String sql = "SELECT pm1.g1.e2, pm2.g1.e2 FROM pm1.g1 FULL OUTER JOIN pm2.g1 ON pm1.g1.e2=pm2.g1.e2 and pm1.g1.e2 > 3"; //$NON-NLS-1$
+        
+        // Create expected results
+        List[] expected = new List[] { 
+            Arrays.asList(new Object[] { null, 2 }), //$NON-NLS-1$
+            Arrays.asList(new Object[] { 3, null }), //$NON-NLS-1$ //$NON-NLS-2$
+            Arrays.asList(new Object[] { null, 3 }), //$NON-NLS-1$
+            Arrays.asList(new Object[] { null, 4 }), //$NON-NLS-1$
+        };    
+        
+        HardcodedDataManager hdm = new HardcodedDataManager();
+        hdm.addData("SELECT g_0.e2 AS c_0 FROM pm1.g1 AS g_0 ORDER BY c_0", new List[] {
+        		Arrays.asList(3),
+        });
+        hdm.addData("SELECT g_0.e2 AS c_0 FROM pm2.g1 AS g_0 ORDER BY c_0", new List[] {
+        		Arrays.asList(2),
+        		Arrays.asList(3),
+        		Arrays.asList(4),
+        		
+        });
+        ProcessorPlan plan = helpGetPlan(helpParse(sql), FakeMetadataFactory.example1Cached(), TestOptimizer.getGenericFinder());
+        CommandContext cc = createCommandContext();
+        cc.setProcessorBatchSize(2);
+        helpProcess(plan, cc, hdm, expected);
+    } 
+    
     @Test public void testFullOuterJoin3() throws Exception { 
         // Create query 
         String sql = "SELECT a.e4 c0, b.e4 c1 FROM pm1.g1 b FULL OUTER JOIN (select e4, 1 x from pm1.g1 union all select e4, 2 from pm1.g1) a ON a.e4=b.e4 and a.x = 2 order by c0"; //$NON-NLS-1$
@@ -6821,7 +6849,7 @@
         sampleData1(manager);
         helpProcess(plan, manager, expected);
         //note that the e1 column is not used in the source query
-        assertEquals("SELECT pm1.g1.e3, pm1.g1.e2 FROM pm1.g1", (String)manager.getQueries().iterator().next()); //$NON-NLS-1$
+        assertEquals("SELECT pm1.g1.e3, pm1.g1.e2 FROM pm1.g1", manager.getQueries().iterator().next()); //$NON-NLS-1$
     }
     
     @Test public void testSortWithLimit2() {



More information about the teiid-commits mailing list