teiid SVN: r2317 - in branches/7.0.x/engine/src: main/java/org/teiid/query/processor and 1 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2010-06-29 08:20:43 -0400 (Tue, 29 Jun 2010)
New Revision: 2317
Added:
branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java
Modified:
branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
branches/7.0.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
branches/7.0.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
branches/7.0.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
Log:
TEIID-1136 fix for full outer join error. The batch iterator auto buffering introduced a regression
Added: branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java
===================================================================
--- branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java (rev 0)
+++ branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java 2010-06-29 12:20:43 UTC (rev 2317)
@@ -0,0 +1,112 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package org.teiid.common.buffer;
+
+import java.util.List;
+
+import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
+
+public abstract class AbstractTupleSource implements IndexedTupleSource {
+ private int currentRow = 1;
+ private int mark = 1;
+ private List<?> currentTuple;
+ protected TupleBatch batch;
+
+ @Override
+ public int getCurrentIndex() {
+ return this.currentRow;
+ }
+
+ @Override
+ public List<?> nextTuple()
+ throws TeiidComponentException, TeiidProcessingException{
+ List<?> result = null;
+ if (currentTuple != null){
+ result = currentTuple;
+ currentTuple = null;
+ } else {
+ result = getCurrentTuple();
+ }
+ if (result != null) {
+ currentRow++;
+ }
+ return result;
+ }
+
+ protected List<?> getCurrentTuple() throws TeiidComponentException,
+ BlockedException, TeiidProcessingException {
+ if (available() > 0) {
+ //if (forwardOnly) {
+ if (batch == null || !batch.containsRow(currentRow)) {
+ batch = getBatch(currentRow);
+ }
+ return batch.getTuple(currentRow);
+ //}
+ //TODO: determine if we should directly hold a soft reference here
+ //return getRow(currentRow);
+ }
+ batch = null;
+ return finalRow();
+ }
+
+ protected abstract List<?> finalRow() throws BlockedException, TeiidComponentException, TeiidProcessingException;
+
+ protected abstract TupleBatch getBatch(int row) throws TeiidComponentException, TeiidProcessingException;
+
+ @Override
+ public void closeSource() {
+ batch = null;
+ mark = 1;
+ reset();
+ }
+
+ @Override
+ public boolean hasNext() throws TeiidComponentException, TeiidProcessingException {
+ if (this.currentTuple != null) {
+ return true;
+ }
+
+ this.currentTuple = getCurrentTuple();
+ return this.currentTuple != null;
+ }
+
+ @Override
+ public void reset() {
+ this.setPosition(mark);
+ this.mark = 1;
+ }
+
+ @Override
+ public void mark() {
+ this.mark = currentRow;
+ }
+
+ @Override
+ public void setPosition(int position) {
+ if (this.currentRow != position) {
+ this.currentRow = position;
+ this.currentTuple = null;
+ }
+ }
+
+}
\ No newline at end of file
Property changes on: branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2010-06-28 19:12:13 UTC (rev 2316)
+++ branches/7.0.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2010-06-29 12:20:43 UTC (rev 2317)
@@ -44,100 +44,7 @@
public class TupleBuffer {
- class TupleSourceImpl implements IndexedTupleSource {
- private int currentRow = 1;
- private int mark = 1;
- private List<?> currentTuple;
- private TupleBatch batch;
-
- @Override
- public int getCurrentIndex() {
- return this.currentRow;
- }
-
- @Override
- public List getSchema(){
- return schema;
- }
-
- @Override
- public List<?> nextTuple()
- throws TeiidComponentException{
- List<?> result = null;
- if (currentTuple != null){
- result = currentTuple;
- currentTuple = null;
- } else {
- result = getCurrentTuple();
- }
- if (result != null) {
- currentRow++;
- }
- return result;
- }
-
- private List<?> getCurrentTuple() throws TeiidComponentException,
- BlockedException {
- if (currentRow <= rowCount) {
- //if (forwardOnly) {
- if (batch == null || !batch.containsRow(currentRow)) {
- batch = getBatch(currentRow);
- }
- return batch.getTuple(currentRow);
- //}
- //TODO: determine if we should directly hold a soft reference here
- //return getRow(currentRow);
- }
- batch = null;
- if(isFinal) {
- return null;
- }
- throw BlockedException.INSTANCE;
- }
-
- @Override
- public void closeSource() {
- batch = null;
- mark = 1;
- reset();
- }
-
- @Override
- public boolean hasNext() throws TeiidComponentException {
- if (this.currentTuple != null) {
- return true;
- }
-
- this.currentTuple = getCurrentTuple();
- return this.currentTuple != null;
- }
-
- @Override
- public void reset() {
- this.setPosition(mark);
- this.mark = 1;
- }
-
- @Override
- public void mark() {
- this.mark = currentRow;
- }
-
- @Override
- public void setPosition(int position) {
- if (this.currentRow != position) {
- this.currentRow = position;
- this.currentTuple = null;
- }
- }
-
- @Override
- public int available() {
- return rowCount - currentRow + 1;
- }
- }
-
- /**
+ /**
* Gets the data type names for each of the input expressions, in order.
* @param expressions List of Expressions
* @return
@@ -407,7 +314,31 @@
* @return
*/
public IndexedTupleSource createIndexedTupleSource() {
- return new TupleSourceImpl();
+ return new AbstractTupleSource() {
+
+ @Override
+ public List<? extends Expression> getSchema() {
+ return (List<? extends Expression>) schema;
+ }
+
+ @Override
+ protected List<?> finalRow() throws BlockedException {
+ if(isFinal) {
+ return null;
+ }
+ throw BlockedException.INSTANCE;
+ }
+
+ @Override
+ public int available() {
+ return rowCount - getCurrentIndex() + 1;
+ }
+
+ @Override
+ protected TupleBatch getBatch(int row) throws TeiidComponentException {
+ return TupleBuffer.this.getBatch(row);
+ }
+ };
}
@Override
Modified: branches/7.0.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
===================================================================
--- branches/7.0.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java 2010-06-28 19:12:13 UTC (rev 2316)
+++ branches/7.0.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java 2010-06-29 12:20:43 UTC (rev 2317)
@@ -24,13 +24,14 @@
import java.util.List;
-import org.teiid.common.buffer.IndexedTupleSource;
+import org.teiid.common.buffer.AbstractTupleSource;
+import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.query.processor.BatchCollector.BatchProducer;
-import org.teiid.query.sql.symbol.SingleElementSymbol;
+import org.teiid.query.sql.symbol.Expression;
/**
@@ -40,68 +41,69 @@
*
* Note that the saveOnMark buffering only lasts until the next mark is set.
*/
-public class BatchIterator implements IndexedTupleSource {
+public class BatchIterator extends AbstractTupleSource {
private final BatchProducer source;
private boolean saveOnMark;
private TupleBuffer buffer;
- private IndexedTupleSource bufferedTs;
+ private boolean done;
+ private boolean mark;
public BatchIterator(BatchProducer source) {
this.source = source;
}
-
- private boolean done;
- private int currentRow = 1;
- private TupleBatch currentBatch;
- private List currentTuple;
- private int bufferedIndex;
- private boolean mark;
- @Override
- public boolean hasNext() throws TeiidComponentException,
- TeiidProcessingException {
-
- if (done && this.bufferedTs == null) {
- return false;
- }
- while (currentTuple == null) {
- if (currentBatch == null) {
- if (this.bufferedTs != null) {
- if (this.currentRow <= this.bufferedIndex) {
- this.bufferedTs.setPosition(currentRow++);
- this.currentTuple = this.bufferedTs.nextTuple();
- return true;
- }
- if (done) {
- return false;
- }
- }
- currentBatch = this.source.nextBatch();
- if (buffer != null && !saveOnMark) {
- buffer.addTupleBatch(currentBatch, true);
- bufferedIndex = currentBatch.getEndRow();
- }
+ @Override
+ protected TupleBatch getBatch(int row) throws TeiidComponentException, TeiidProcessingException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected List<?> finalRow() throws TeiidComponentException, TeiidProcessingException {
+ if (this.buffer != null && this.getCurrentIndex() <= this.buffer.getRowCount()) {
+ batch = this.buffer.getBatch(this.getCurrentIndex());
+ }
+ while (available() < 1) {
+ if (done) {
+ return null;
+ }
+ batch = source.nextBatch();
+ done = batch.getTerminationFlag();
+ if (buffer != null && !saveOnMark) {
+ buffer.addTupleBatch(batch, true);
}
-
- if (currentBatch.containsRow(currentRow)) {
- this.currentTuple = currentBatch.getTuple(currentRow++);
- } else {
- done = currentBatch.getTerminationFlag();
- currentBatch = null;
- if (done) {
- return false;
- }
- }
+ }
+ return getCurrentTuple();
+ }
+
+ @Override
+ protected List<?> getCurrentTuple() throws TeiidComponentException,
+ BlockedException, TeiidProcessingException {
+ List<?> tuple = super.getCurrentTuple();
+ if (mark && saveOnMark && this.getCurrentIndex() > this.buffer.getRowCount()) {
+ this.buffer.setRowCount(this.getCurrentIndex() - 1);
+ this.buffer.addTuple(tuple);
}
- return true;
- }
-
+ return tuple;
+ }
+
+ @Override
+ public int available() {
+ if (batch != null && batch.containsRow(getCurrentIndex())) {
+ return batch.getEndRow() - getCurrentIndex() + 1;
+ }
+ return 0;
+ }
+
+ @Override
+ public List<? extends Expression> getSchema() {
+ return source.getOutputElements();
+ }
+
public void setBuffer(TupleBuffer buffer, boolean saveOnMark) {
this.buffer = buffer;
- this.bufferedTs = this.buffer.createIndexedTupleSource();
this.saveOnMark = saveOnMark;
- }
+ }
@Override
public void closeSource() {
@@ -112,80 +114,29 @@
}
@Override
- public List<SingleElementSymbol> getSchema() {
- return source.getOutputElements();
- }
-
- @Override
- public List<?> nextTuple() throws TeiidComponentException,
- TeiidProcessingException {
- if (currentTuple == null && !hasNext()) {
- return null;
- }
- List result = currentTuple;
- currentTuple = null;
- if (mark && saveOnMark && this.currentRow - 1 > this.buffer.getRowCount()) {
- this.buffer.setRowCount(this.currentRow - 2);
- this.buffer.addTuple(result);
- this.bufferedIndex = this.currentRow - 1;
- }
- return result;
- }
-
public void reset() {
- if (this.bufferedTs != null) {
+ super.reset();
+ if (this.buffer != null) {
mark = false;
- this.bufferedTs.reset();
- if (this.currentRow != this.bufferedTs.getCurrentIndex()) {
- this.currentRow = this.bufferedTs.getCurrentIndex();
- this.currentTuple = null;
- }
return;
}
- throw new UnsupportedOperationException();
}
+ @Override
public void mark() {
- if (this.bufferedTs != null) {
- this.bufferedTs.mark();
- if (saveOnMark && this.currentRow > this.bufferedIndex) {
- this.buffer.purge();
- this.bufferedIndex = 0;
- }
+ super.mark();
+ if (this.buffer != null && saveOnMark && this.getCurrentIndex() > this.buffer.getRowCount()) {
+ this.buffer.purge();
}
mark = true;
}
-
+
@Override
- public int getCurrentIndex() {
- return currentRow;
- }
-
public void setPosition(int position) {
- if (this.bufferedTs != null) {
- this.bufferedTs.setPosition(position);
- this.currentRow = position;
- }
- if (this.currentBatch == null && position < this.currentRow) {
+ super.setPosition(position);
+ if (this.buffer == null && position < getCurrentIndex() && position < (this.batch != null ? batch.getBeginRow() : Integer.MAX_VALUE)) {
throw new UnsupportedOperationException("Backwards positioning is not allowed"); //$NON-NLS-1$
}
- this.currentRow = position;
- this.currentTuple = null;
- if (this.currentBatch == null || !this.currentBatch.containsRow(position)) {
- this.currentBatch = null;
- }
}
- @Override
- public int available() {
- if (this.currentRow <= this.bufferedIndex) {
- this.bufferedTs.setPosition(this.currentRow);
- return this.bufferedTs.available();
- }
- if (currentBatch != null) {
- return currentBatch.getEndRow() - currentRow + 1;
- }
- return 0;
- }
-
}
Modified: branches/7.0.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
===================================================================
--- branches/7.0.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java 2010-06-28 19:12:13 UTC (rev 2316)
+++ branches/7.0.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java 2010-06-29 12:20:43 UTC (rev 2317)
@@ -22,12 +22,15 @@
package org.teiid.query.processor;
+import static org.junit.Assert.*;
+
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.BufferManagerFactory;
+import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.query.processor.BatchIterator;
import org.teiid.query.processor.relational.FakeRelationalNode;
@@ -52,4 +55,21 @@
bi.nextTuple();
}
+ @Test public void testReset1() throws Exception {
+ BatchIterator bi = new BatchIterator(new FakeRelationalNode(1, new List[] {
+ Arrays.asList(1),
+ Arrays.asList(2),
+ Arrays.asList(3)
+ }, 2));
+ BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
+ TupleBuffer tb = bm.createTupleBuffer(Arrays.asList(new ElementSymbol("x")), "test", TupleSourceType.PROCESSOR);
+ bi.setBuffer(tb, true); //$NON-NLS-1$
+ bi.nextTuple();
+ bi.mark();
+ bi.nextTuple();
+ bi.reset();
+ assertEquals(2, bi.getCurrentIndex());
+ assertEquals(2, bi.nextTuple().get(0));
+ }
+
}
Modified: branches/7.0.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
--- branches/7.0.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2010-06-28 19:12:13 UTC (rev 2316)
+++ branches/7.0.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2010-06-29 12:20:43 UTC (rev 2317)
@@ -93,7 +93,7 @@
import org.teiid.query.validator.Validator;
import org.teiid.query.validator.ValidatorReport;
-@SuppressWarnings("nls")
+@SuppressWarnings({"nls", "unchecked"})
public class TestProcessor {
// ################################## TEST HELPERS ################################
@@ -1343,6 +1343,34 @@
helpProcess(plan, dataManager, expected);
}
+ @Test public void testSortedFullOuterJoin() throws Exception {
+ // Create query
+ String sql = "SELECT pm1.g1.e2, pm2.g1.e2 FROM pm1.g1 FULL OUTER JOIN pm2.g1 ON pm1.g1.e2=pm2.g1.e2 and pm1.g1.e2 > 3"; //$NON-NLS-1$
+
+ // Create expected results
+ List[] expected = new List[] {
+ Arrays.asList(new Object[] { null, 2 }), //$NON-NLS-1$
+ Arrays.asList(new Object[] { 3, null }), //$NON-NLS-1$ //$NON-NLS-2$
+ Arrays.asList(new Object[] { null, 3 }), //$NON-NLS-1$
+ Arrays.asList(new Object[] { null, 4 }), //$NON-NLS-1$
+ };
+
+ HardcodedDataManager hdm = new HardcodedDataManager();
+ hdm.addData("SELECT g_0.e2 AS c_0 FROM pm1.g1 AS g_0 ORDER BY c_0", new List[] {
+ Arrays.asList(3),
+ });
+ hdm.addData("SELECT g_0.e2 AS c_0 FROM pm2.g1 AS g_0 ORDER BY c_0", new List[] {
+ Arrays.asList(2),
+ Arrays.asList(3),
+ Arrays.asList(4),
+
+ });
+ ProcessorPlan plan = helpGetPlan(helpParse(sql), FakeMetadataFactory.example1Cached(), TestOptimizer.getGenericFinder());
+ CommandContext cc = createCommandContext();
+ cc.setProcessorBatchSize(2);
+ helpProcess(plan, cc, hdm, expected);
+ }
+
@Test public void testFullOuterJoin3() throws Exception {
// Create query
String sql = "SELECT a.e4 c0, b.e4 c1 FROM pm1.g1 b FULL OUTER JOIN (select e4, 1 x from pm1.g1 union all select e4, 2 from pm1.g1) a ON a.e4=b.e4 and a.x = 2 order by c0"; //$NON-NLS-1$
@@ -6821,7 +6849,7 @@
sampleData1(manager);
helpProcess(plan, manager, expected);
//note that the e1 column is not used in the source query
- assertEquals("SELECT pm1.g1.e3, pm1.g1.e2 FROM pm1.g1", (String)manager.getQueries().iterator().next()); //$NON-NLS-1$
+ assertEquals("SELECT pm1.g1.e3, pm1.g1.e2 FROM pm1.g1", manager.getQueries().iterator().next()); //$NON-NLS-1$
}
@Test public void testSortWithLimit2() {