teiid SVN: r4612 - in branches/7.7.x/engine/src: main/java/org/teiid/query/processor and 3 other directories.
by teiid-commits@lists.jboss.org
Author: jolee
Date: 2013-11-07 12:07:53 -0500 (Thu, 07 Nov 2013)
New Revision: 4612
Added:
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchCollector.java
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
Log:
TEIID-2707: proactive buffering on an enhanced sort merge left outer join results in an assertionerror
Modified: branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -628,7 +628,7 @@
lock.lock();
try {
//don't wait for more than is available
- int waitCount = Math.min(additional, this.getMaxReserveKB() - reservedByThread.get());
+ int waitCount = Math.min(additional, this.getMaxReserveKB()<<10 - reservedByThread.get());
int committed = 0;
while (waitCount > 0 && waitCount > this.reserveBatchBytes.get() && committed < additional) {
long reserveBatchSample = this.reserveBatchBytes.get();
@@ -645,6 +645,8 @@
int result = noWaitReserve(additional - committed, false);
committed += result;
}
+ int result = noWaitReserve(additional - committed, false);
+ committed += result;
return committed;
} finally {
lock.unlock();
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchCollector.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchCollector.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -130,6 +130,10 @@
}
public TupleBuffer collectTuples() throws TeiidComponentException, TeiidProcessingException {
+ return collectTuples(false);
+ }
+
+ public TupleBuffer collectTuples(boolean singleBatch) throws TeiidComponentException, TeiidProcessingException {
TupleBatch batch = null;
while(!done) {
if (this.sourceNode.hasFinalBuffer()) {
@@ -156,6 +160,10 @@
}
break;
}
+
+ if (singleBatch) {
+ return null;
+ }
}
return buffer;
}
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/BatchIterator.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -146,22 +146,44 @@
* non-destructive method to set the mark
* @return true if the mark was set
*/
- public boolean ensureSave() {
- if (!saveOnMark || mark) {
- return false;
- }
- mark = true;
- return true;
- }
-
- public void disableSave() {
- if (buffer != null) {
- this.saveOnMark = true;
- this.mark = false;
- if (batch != null && batch.getEndRow() <= this.buffer.getRowCount()) {
- this.batch = null;
- }
- }
- }
-
+ public boolean ensureSave() {
+ if (!saveOnMark || mark) {
+ return false;
+ }
+ mark = true;
+ return true;
+ }
+
+ public void disableSave() {
+ if (buffer != null) {
+ this.saveOnMark = true;
+ this.mark = false;
+ if (batch != null && batch.getEndRow() <= this.buffer.getRowCount()) {
+ this.batch = null;
+ }
+ }
+ }
+
+ public void readAhead(long limit) throws TeiidComponentException, TeiidProcessingException {
+ if (buffer == null || done) {
+ return;
+ }
+ if (this.buffer.getManagedRowCount() > limit) {
+ return;
+ }
+ if (this.batch != null && this.buffer.getRowCount() < this.batch.getEndRow()) {
+ //haven't saved already
+ this.buffer.addTupleBatch(this.batch, true);
+ }
+ TupleBatch tb = source.nextBatch();
+ done = tb.getTerminationFlag();
+ this.buffer.addTupleBatch(tb, true);
+ if (done) {
+ this.buffer.close();
+ }
+ }
+
+ public TupleBuffer getBuffer() {
+ return buffer;
+ }
}
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -40,7 +40,6 @@
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
-import org.teiid.query.optimizer.relational.rules.NewCalculateCostUtil;
import org.teiid.query.processor.relational.SourceState.ImplicitBuffer;
import org.teiid.query.sql.lang.JoinType;
import org.teiid.query.sql.lang.OrderBy;
@@ -236,9 +235,7 @@
}
private boolean shouldIndexIfSmall(SourceState source) throws TeiidComponentException, TeiidProcessingException {
- Number cardinality = source.getSource().getEstimateNodeCardinality();
- return (source.hasBuffer() || (cardinality != null && cardinality.floatValue() != NewCalculateCostUtil.UNKNOWN_VALUE && cardinality.floatValue() <= source.getSource().getBatchSize() / 4))
- && (source.getRowCount() <= source.getSource().getBatchSize() / 2);
+ return source.rowCountLE(source.getSource().getBatchSize() / 2);
}
@Override
@@ -307,7 +304,26 @@
}
private boolean shouldIndex(SourceState possibleIndex, SourceState other) throws TeiidComponentException, TeiidProcessingException {
- if (possibleIndex.getRowCount() * 4 > other.getRowCount()) {
+ long size = joinNode.getBatchSize();
+ int indexSize = possibleIndex.hasBuffer()?possibleIndex.getRowCount():-1;
+ int otherSize = other.hasBuffer()?other.getRowCount():-1;
+ //determine sizes in an incremental fashion as to avoid a full buffer of the unsorted side
+ while (size < Integer.MAX_VALUE && (indexSize == -1 || otherSize == -1)) {
+ if (indexSize == -1 && (possibleIndex.rowCountLE((int)size) || possibleIndex.hasBuffer())) {
+ indexSize = possibleIndex.getRowCount();
+ }
+ if (otherSize == -1 && (other.rowCountLE((int)size) || other.hasBuffer())) {
+ otherSize = other.getRowCount();
+ }
+ if (indexSize == -1 && otherSize != -1 && size * 4 > otherSize) {
+ return false;
+ }
+ if (indexSize != -1 && otherSize == -1 && indexSize * 4 <= size) {
+ break;
+ }
+ size *=2;
+ }
+ if ((size > Integer.MAX_VALUE && (indexSize == -1 || otherSize == -1)) || (indexSize != -1 && otherSize != -1 && indexSize * 4 > otherSize)) {
return false; //index is too large
}
int schemaSize = this.joinNode.getBufferManager().getSchemaSize(other.getSource().getOutputElements());
@@ -320,7 +336,7 @@
boolean useIndex = false;
int indexSchemaSize = this.joinNode.getBufferManager().getSchemaSize(possibleIndex.getSource().getOutputElements());
//approximate that 1/2 of the index will be memory resident
- toReserve = (int)(indexSchemaSize * possibleIndex.getRowCount() / (possibleIndex.getSource().getBatchSize() * .5));
+ toReserve = (int)(indexSchemaSize * possibleIndex.getRowCount() / (possibleIndex.getSource().getBatchSize()));
if (toReserve < this.joinNode.getBufferManager().getMaxProcessingSize()) {
useIndex = true;
} else if (possibleIndex.getRowCount() / this.joinNode.getBatchSize() < preferMemCutoff) {
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -197,6 +197,7 @@
if (!isDependent()) {
this.joinStrategy.openRight();
this.joinStrategy.loadRight();
+ prefetch(this.joinStrategy.rightSource, this.joinStrategy.leftSource);
}
throw e;
}
@@ -210,28 +211,43 @@
this.terminateBatches();
} catch (BatchAvailableException e) {
//pull the batch
+ } catch (BlockedException e) {
+ //TODO: this leads to duplicate exceptions, we
+ //could track which side is blocking
+ try {
+ prefetch(this.joinStrategy.leftSource, this.joinStrategy.rightSource);
+ } catch (BlockedException e1) {
+
+ }
+ prefetch(this.joinStrategy.rightSource, this.joinStrategy.leftSource);
+ throw e;
}
return pullBatch();
}
- /**
- * @see org.teiid.query.processor.relational.RelationalNode#getDescriptionProperties()
- * @since 4.2
- */
- public PlanNode getDescriptionProperties() {
- // Default implementation - should be overridden
- PlanNode props = super.getDescriptionProperties();
-
- if (isDependent()) {
- props.addProperty(PROP_DEPENDENT, Boolean.TRUE.toString());
- }
- props.addProperty(PROP_JOIN_STRATEGY, this.joinStrategy.toString());
- props.addProperty(PROP_JOIN_TYPE, this.joinType.toString());
- List<String> critList = getCriteriaList();
- props.addProperty(PROP_JOIN_CRITERIA, critList);
- return props;
+ private void prefetch(SourceState toFetch, SourceState other) throws TeiidComponentException,
+ TeiidProcessingException {
+ toFetch.prefetch(Math.max(1l, other.getIncrementalRowCount(false)/other.getSource().getBatchSize())*toFetch.getSource().getBatchSize());
}
+ /**
+ * @see org.teiid.query.processor.relational.RelationalNode#getDescriptionProperties()
+ * @since 4.2
+ */
+ public PlanNode getDescriptionProperties() {
+ // Default implementation - should be overridden
+ PlanNode props = super.getDescriptionProperties();
+
+ if(isDependent()) {
+ props.addProperty(PROP_DEPENDENT, Boolean.TRUE.toString());
+ }
+ props.addProperty(PROP_JOIN_STRATEGY, this.joinStrategy.toString());
+ props.addProperty(PROP_JOIN_TYPE, this.joinType.toString());
+ List<String> critList = getCriteriaList();
+ props.addProperty(PROP_JOIN_CRITERIA, critList);
+ return props;
+ }
+
private List<String> getCriteriaList() {
List<String> critList = new ArrayList<String>();
if (leftExpressions != null) {
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -632,5 +632,20 @@
}
throw e;
}
+
+ public boolean hasBuffer(boolean b) {
+ return false;
+ }
+
+ /**
+ * return the final tuple buffer or null if not available
+ * @return
+ * @throws TeiidProcessingException
+ * @throws TeiidComponentException
+ * @throws BlockedException
+ */
+ public TupleBuffer getBuffer(int maxRows) throws BlockedException, TeiidComponentException, TeiidProcessingException {
+ return null;
+ }
}
\ No newline at end of file
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -31,11 +31,11 @@
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.BufferManager.BufferReserveMode;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.common.buffer.IndexedTupleSource;
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.TupleSource;
-import org.teiid.common.buffer.BufferManager.BufferReserveMode;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.util.Assertion;
@@ -301,7 +301,7 @@
TupleBuffer merged = createTupleBuffer();
- int desiredSpace = activeTupleBuffers.size() * schemaSize;
+ int desiredSpace = (int)(activeTupleBuffers.size() * (long)schemaSize);
int reserved = Math.min(desiredSpace, Math.max(2*schemaSize, this.bufferManager.getMaxProcessingSize()));
bufferManager.reserveBuffers(reserved, BufferReserveMode.FORCE);
if (desiredSpace > reserved) {
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -31,6 +31,7 @@
import org.teiid.common.buffer.TupleSource;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.util.Assertion;
import org.teiid.query.processor.BatchCollector;
import org.teiid.query.processor.BatchIterator;
import org.teiid.query.processor.relational.MergeJoinStrategy.SortOption;
@@ -76,6 +77,10 @@
this.implicitBuffer = implicitBuffer;
}
+ public ImplicitBuffer getImplicitBuffer() {
+ return implicitBuffer;
+ }
+
static int[] getExpressionIndecies(List expressions,
List elements) {
if (expressions == null) {
@@ -126,15 +131,37 @@
public int getRowCount() throws TeiidComponentException, TeiidProcessingException {
return this.getTupleBuffer().getRowCount();
}
+
+ /**
+ * Uses the prefetch logic to determine an incremental row count
+ */
+ public boolean rowCountLE(long count) throws TeiidComponentException, TeiidProcessingException {
+ if (buffer != null) {
+ return buffer.getRowCount() <= count;
+ }
+ if (iterator != null || this.sortUtility != null) {
+ throw new IllegalStateException();
+ }
+ while (buffer == null) {
+ if (getIncrementalRowCount(true) > count) {
+ return false;
+ }
+ prefetch(Long.MAX_VALUE);
+ }
+ return buffer.getRowCount() <= count;
+ }
IndexedTupleSource getIterator() throws TeiidComponentException {
if (this.iterator == null) {
- if (this.buffer != null) {
+ if (this.buffer != null) {
iterator = buffer.createIndexedTupleSource();
} else {
// return a TupleBatch tuplesource iterator
BatchIterator bi = new BatchIterator(this.source);
- if (implicitBuffer != ImplicitBuffer.NONE) {
+ if (this.collector != null) {
+ bi.setBuffer(this.collector.getTupleBuffer(), implicitBuffer == ImplicitBuffer.ON_MARK);
+ this.collector = null;
+ } else if (implicitBuffer != ImplicitBuffer.NONE) {
bi.setBuffer(createSourceTupleBuffer(), implicitBuffer == ImplicitBuffer.ON_MARK);
}
this.iterator = bi;
@@ -142,6 +169,36 @@
}
return this.iterator;
}
+
+ /**
+ * Pro-actively pull batches for later use.
+ * There are unfortunately quite a few cases to cover here.
+ */
+ protected void prefetch(long limit) throws TeiidComponentException, TeiidProcessingException {
+ if (!open) {
+ return;
+ }
+ if (this.buffer == null) {
+ if (this.sortUtility != null) {
+ return;
+ }
+ if (this.iterator != null) {
+ ((BatchIterator)this.iterator).readAhead(limit);
+ return;
+ }
+ if (source.hasBuffer(true)) {
+ this.buffer = source.getBuffer(-1);
+ return;
+ }
+ if (collector == null) {
+ collector = new BatchCollector(source, source.getBufferManager(), source.getContext(), false);
+ }
+ if (collector.getTupleBuffer().getManagedRowCount() >= limit) {
+ return;
+ }
+ this.buffer = collector.collectTuples(true);
+ }
+ }
public List<Object> getOuterVals() {
return this.outerVals;
@@ -168,11 +225,16 @@
if (this.iterator instanceof BatchIterator) {
throw new AssertionError("cannot buffer the source"); //$NON-NLS-1$
}
+ if (source.hasBuffer(true)) {
+ this.buffer = source.getBuffer(-1);
+ Assertion.assertTrue(this.buffer.isFinal());
+ return this.buffer;
+ }
if (collector == null) {
collector = new BatchCollector(source, source.getBufferManager(), source.getContext(), false);
}
this.buffer = collector.collectTuples();
- }
+ }
return this.buffer;
}
@@ -190,6 +252,11 @@
}
if (this.sortUtility == null) {
TupleSource ts = null;
+ if (source.hasBuffer(true)) {
+ this.buffer = source.getBuffer(-1);
+ } else if (this.buffer == null && this.collector != null) {
+ this.buffer = this.collector.collectTuples();
+ }
if (this.buffer != null) {
this.buffer.setForwardOnly(true);
ts = this.buffer.createIndexedTupleSource();
@@ -213,12 +280,12 @@
if (this.buffer != null && this.buffer != sorted) {
this.buffer.remove();
}
- this.buffer = sorted;
+ this.buffer = sorted;
this.markDistinct(sortUtility.isDistinct());
}
public boolean hasBuffer() {
- return this.buffer != null;
+ return this.buffer != null || this.source.hasBuffer(true);
}
public boolean nextBuffer() {
@@ -243,5 +310,30 @@
this.currentTuple = null;
this.maxProbeMatch = 1;
}
+
+ public void setMaxProbePosition() throws TeiidComponentException, TeiidProcessingException {
+ this.getIterator().setPosition(this.getMaxProbeMatch());
+ this.currentTuple = null;
+ }
+
+ public int getIncrementalRowCount(boolean low) {
+ if (this.buffer != null) {
+ return this.buffer.getRowCount();
+ }
+ if (this.collector != null) {
+ return this.collector.getTupleBuffer().getRowCount();
+ }
+ if (sortUtility == null) {
+ if (this.iterator instanceof BatchIterator) {
+ TupleBuffer tb = ((BatchIterator)this.iterator).getBuffer();
+ if (tb != null) {
+ return tb.getRowCount();
+ }
+ //TODO: should estimate the rows
+ }
+ //TODO: should estimate the rows based upon what is being fed into the sort
+ }
+ return low?0:Integer.MAX_VALUE;
+ }
}
Added: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchCollector.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchCollector.java (rev 0)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchCollector.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -0,0 +1,53 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.query.processor;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import org.teiid.common.buffer.BufferManagerFactory;
+import org.teiid.core.types.DataTypeManager;
+import org.teiid.query.processor.relational.FakeRelationalNode;
+import org.teiid.query.sql.symbol.ElementSymbol;
+import org.teiid.query.util.CommandContext;
+
+@SuppressWarnings("nls")
+public class TestBatchCollector {
+
+ @Test public void testCollect() throws Exception {
+ FakeRelationalNode sourceNode = new FakeRelationalNode(1, new List[] {
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1)
+ }, 1);
+ sourceNode.setElements(Arrays.asList(new ElementSymbol("x", null, DataTypeManager.DefaultDataClasses.INTEGER)));
+ BatchCollector bc = new BatchCollector(sourceNode, BufferManagerFactory.getStandaloneBufferManager(), new CommandContext(), false);
+ bc.collectTuples(true);
+ assertEquals(1, bc.getTupleBuffer().getManagedRowCount());
+ assertEquals(3, bc.collectTuples().getRowCount());
+ }
+
+}
Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestBatchIterator.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -132,4 +132,62 @@
assertEquals(0, tb.getManagedRowCount());
}
+ @Test public void testReadAhead() throws Exception {
+ BatchIterator bi = new BatchIterator(new FakeRelationalNode(1, new List[] {
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ }, 2));
+ BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
+ TupleBuffer tb = bm.createTupleBuffer(Arrays.asList(new ElementSymbol("x", null, DataTypeManager.DefaultDataClasses.INTEGER)), "test", TupleSourceType.PROCESSOR);
+ bi.setBuffer(tb, false); //$NON-NLS-1$
+ bi.nextTuple();
+ assertEquals(1, bi.available());
+ assertEquals(2, bi.getBuffer().getRowCount());
+ bi.readAhead(100);
+ assertEquals(4, bi.getBuffer().getRowCount());
+ //shouldn't keep reading
+ bi.readAhead(3);
+ assertEquals(4, bi.getBuffer().getRowCount());
+ bi.readAhead(5);
+ assertEquals(6, bi.getBuffer().getRowCount());
+ bi.readAhead(8); //does nothing
+ for (int i = 0; i < 5; i++) {
+ assertNotNull(bi.nextTuple());
+ }
+ assertNull(bi.nextTuple());
+ }
+
+ @Test public void testReadAheadMark() throws Exception {
+ BatchIterator bi = new BatchIterator(new FakeRelationalNode(1, new List[] {
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1),
+ }, 2));
+ BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
+ TupleBuffer tb = bm.createTupleBuffer(Arrays.asList(new ElementSymbol("x", null, DataTypeManager.DefaultDataClasses.INTEGER)), "test", TupleSourceType.PROCESSOR);
+ bi.setBuffer(tb, true); //$NON-NLS-1$
+ bi.nextTuple();
+ assertEquals(1, bi.available());
+ assertEquals(0, bi.getBuffer().getRowCount());
+ bi.readAhead(100);
+ assertEquals(4, bi.getBuffer().getRowCount());
+ //shouldn't keep reading
+ bi.readAhead(3);
+ assertEquals(4, bi.getBuffer().getRowCount());
+ bi.readAhead(5);
+ assertEquals(6, bi.getBuffer().getRowCount());
+ bi.readAhead(8); //does nothing
+ for (int i = 0; i < 5; i++) {
+ assertNotNull(bi.nextTuple());
+ }
+ assertNull(bi.nextTuple());
+ }
+
}
Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java 2013-11-07 15:55:23 UTC (rev 4611)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java 2013-11-07 17:07:53 UTC (rev 4612)
@@ -35,6 +35,7 @@
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManagerFactory;
import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.impl.BufferManagerImpl;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
@@ -179,7 +180,18 @@
List rightElements = new ArrayList();
rightElements.add(es2);
- rightNode = new FakeRelationalNode(2, rightTuples);
+ rightNode = new BlockingFakeRelationalNode(2, rightTuples) {
+ @Override
+ public boolean hasBuffer(boolean requireFinal) {
+ return false;
+ }
+
+ @Override
+ public TupleBuffer getBuffer(int maxRows) throws BlockedException, TeiidComponentException, TeiidProcessingException {
+ fail();
+ throw new AssertionError();
+ };
+ };
rightNode.setElements(rightElements);
List joinElements = new ArrayList();
@@ -600,10 +612,10 @@
}
@Test public void testMergeJoinOptimization() throws Exception {
- helpTestEnhancedSortMergeJoin(99);
+ helpTestEnhancedSortMergeJoin(99, false);
}
- private void helpTestEnhancedSortMergeJoin(int batchSize)
+ private void helpTestEnhancedSortMergeJoin(int batchSize, boolean repeated)
throws TeiidComponentException, TeiidProcessingException {
this.joinType = JoinType.JOIN_INNER;
int rows = 100;
@@ -615,26 +627,49 @@
}
this.leftTuples = data;
this.rightTuples = createTuples2();
- expected = new List[] {
- Arrays.asList(new Object[] { 4, 4 }),
- Arrays.asList(new Object[] { 4, 4 }),
- Arrays.asList(new Object[] { 7, 7 }),
- Arrays.asList(new Object[] { 7, 7 }),
- Arrays.asList(new Object[] { 2, 2 }),
- Arrays.asList(new Object[] { 2, 2 }),
- Arrays.asList(new Object[] { 6, 6 }),
- Arrays.asList(new Object[] { 1, 1 }),
- Arrays.asList(new Object[] { 4, 4 }),
- Arrays.asList(new Object[] { 4, 4 }),
- Arrays.asList(new Object[] { 7, 7 }),
- Arrays.asList(new Object[] { 7, 7 }),
- Arrays.asList(new Object[] { 2, 2 }),
- Arrays.asList(new Object[] { 2, 2 }),
- Arrays.asList(new Object[] { 6, 6 }),
- Arrays.asList(new Object[] { 1, 1 }),
- Arrays.asList(new Object[] { 4, 4 }),
- Arrays.asList(new Object[] { 4, 4 }),
- };
+ if (!repeated) {
+ expected = new List[] {
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 6, 6 }),
+ Arrays.asList(new Object[] { 1, 1 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 6, 6 }),
+ Arrays.asList(new Object[] { 1, 1 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ };
+ } else {
+ expected = new List[] {
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 1, 1 }),
+ Arrays.asList(new Object[] { 6, 6 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 1, 1 }),
+ Arrays.asList(new Object[] { 6, 6 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 2, 2 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 7, 7 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ Arrays.asList(new Object[] { 4, 4 }),
+ };
+ }
helpCreateJoin();
this.joinStrategy = new EnhancedSortMergeJoinStrategy(SortOption.SORT, SortOption.SORT);
this.join.setJoinStrategy(joinStrategy);
@@ -713,9 +748,13 @@
}
@Test public void testMergeJoinOptimizationMultiBatch() throws Exception {
- helpTestEnhancedSortMergeJoin(10);
+ helpTestEnhancedSortMergeJoin(10, false);
}
+ @Test public void testMergeJoinOptimizationMultiBatch1() throws Exception {
+ helpTestEnhancedSortMergeJoin(1, true);
+ }
+
@Test public void testMergeJoinOptimizationNoRows() throws Exception {
this.joinType = JoinType.JOIN_INNER;
this.leftTuples = createTuples1();
11 years, 2 months
teiid SVN: r4611 - in branches/7.7.x/engine/src: test/java/org/teiid/common/buffer/impl and 1 other directory.
by teiid-commits@lists.jboss.org
Author: jolee
Date: 2013-11-07 10:55:23 -0500 (Thu, 07 Nov 2013)
New Revision: 4611
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
Log:
TEIID-2714: Buffer defrag not working, adding an option to make the buffer files more compact and
defaulting to a somewhat more conservative default
Modified: branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2013-11-06 21:18:40 UTC (rev 4610)
+++ branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2013-11-07 15:55:23 UTC (rev 4611)
@@ -59,6 +59,7 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.util.ExecutorUtils;
+import org.teiid.core.util.PropertiesUtils;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
@@ -377,7 +378,9 @@
private AtomicBoolean defragRunning = new AtomicBoolean();
private AtomicInteger freedCounter = new AtomicInteger();
- private int truncateInterval = 10;
+ private boolean compactBufferFiles = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.compactBufferFiles", false); //$NON-NLS-1$
+
+ private int truncateInterval = 4;
//defrag to release freespace held by storage files
final class DefragTask implements Runnable {
private AtomicInteger runs = new AtomicInteger();
@@ -398,6 +401,7 @@
}
private long truncate(boolean anySpace) {
+ anySpace |= compactBufferFiles;
long freed = 0;
for (int i = 0; i < sizeBasedStores.length; i++) {
BlockStore blockStore = sizeBasedStores[i];
@@ -412,6 +416,7 @@
}
private void defrag(boolean all) {
+ all |= compactBufferFiles;
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Running defrag"); //$NON-NLS-1$
}
@@ -585,6 +590,7 @@
}
} while ((size>>1) < maxStorageObjectSize);
this.sizeBasedStores = stores.toArray(new BlockStore[stores.size()]);
+ this.truncateInterval = compactBufferFiles?1:8;
}
boolean lowBlocks(boolean critical) {
@@ -666,7 +672,7 @@
}
}
} catch (Throwable e) {
- if ((e == BlockOutputStream.exceededMax && newEntry) || e == PhysicalInfo.sizeChanged) {
+ if (e == PhysicalInfo.sizeChanged) {
//entries are mutable after adding, the original should be removed shortly so just ignore
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Object "+ entry.getId() +" changed size since first persistence, keeping the original."); //$NON-NLS-1$ //$NON-NLS-2$
} else if (e == BlockOutputStream.exceededMax){
@@ -1018,7 +1024,7 @@
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Freed storage data block", info.block, "of size", blockStore.blockSize); //$NON-NLS-1$ //$NON-NLS-2$
}
if (!defragRunning.get()
- && (freedCounter.getAndIncrement()&Short.MAX_VALUE)==Short.MAX_VALUE //should be several gigabytes of turn over
+ && (freedCounter.getAndIncrement()&0x3fff)==0x3fff //should be several hundred megs of turn over
&& defragRunning.compareAndSet(false, true)) {
this.asynchPool.execute(defragTask);
}
@@ -1196,4 +1202,8 @@
this.asynchPool.shutdownNow();
}
+ public void setCompactBufferFiles(boolean compactBufferFiles) {
+ this.compactBufferFiles = compactBufferFiles;
+ }
+
}
\ No newline at end of file
Modified: branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2013-11-06 21:18:40 UTC (rev 4610)
+++ branches/7.7.x/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2013-11-07 15:55:23 UTC (rev 4611)
@@ -349,6 +349,38 @@
assertEquals(131072, cache.getDiskUsage());
}
+ @Test public void testDefragTruncateCompact() throws Exception {
+ cache = createLayeredCache(1<<15, 1<<15, true);
+ cache.setCompactBufferFiles(true);
+ cache.setTruncateInterval(1);
+ cache.setMinDefrag(10000000);
+ Serializer<Integer> s = new SimpleSerializer();
+ WeakReference<? extends Serializer<?>> ref = new WeakReference<Serializer<?>>(s);
+ cache.createCacheGroup(s.getId());
+ Integer cacheObject = Integer.valueOf(5000);
+
+ for (int i = 0; i < 30; i++) {
+ CacheEntry ce = new CacheEntry((long)i);
+ ce.setSerializer(ref);
+ ce.setObject(cacheObject);
+
+ cache.addToCacheGroup(s.getId(), ce.getId());
+ cache.add(ce, s);
+ }
+ assertEquals(950272, cache.getDiskUsage());
+ for (int i = 0; i < 25; i++) {
+ cache.remove(1l, (long)i);
+ }
+ assertEquals(950272, cache.getDiskUsage());
+ cache.setMinDefrag(0);
+ cache.setTruncateInterval(1);
+ cache.defragTask.run();
+ assertEquals(131072, cache.getDiskUsage());
+ cache.defragTask.run();
+ //we've reached a stable size
+ assertEquals(131072, cache.getDiskUsage());
+ }
+
@Test public void testDefragMin() throws Exception {
cache = createLayeredCache(1<<15, 1<<15, true);
cache.setMinDefrag(10000000);
11 years, 2 months
[teiid/teiid] 6c3b2a: TEIID-2714 correcting defrag logic and refining th...
by shawkins
Branch: refs/heads/8.4.x
Home: https://github.com/teiid/teiid
Commit: 6c3b2a076066bdcdaecd13166804fe6a650a8c13
https://github.com/teiid/teiid/commit/6c3b2a076066bdcdaecd13166804fe6a650...
Author: shawkins <shawkins(a)redhat.com>
Date: 2013-11-04 (Mon, 04 Nov 2013)
Changed paths:
M engine/src/main/java/org/teiid/common/buffer/Cache.java
M engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java
M engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
M engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
M engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
M engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
M engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
M engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
M engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
M engine/src/main/resources/org/teiid/query/i18n.properties
M engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
M engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
Log Message:
-----------
TEIID-2714 correcting defrag logic and refining the defrag/truncation
strategies
Commit: 4f1a16e958dc4c7a90f9068ce92eaf942832053f
https://github.com/teiid/teiid/commit/4f1a16e958dc4c7a90f9068ce92eaf94283...
Author: shawkins <shawkins(a)redhat.com>
Date: 2013-11-04 (Mon, 04 Nov 2013)
Changed paths:
M engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
M engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
A engine/src/main/java/org/teiid/common/buffer/impl/OutOfDiskException.java
M engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
M engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
M test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java
Log Message:
-----------
TEIID-2714 further refining defrag behavior
Commit: 9f4d5622b9e88f042b9e30d9f6c513c160653a09
https://github.com/teiid/teiid/commit/9f4d5622b9e88f042b9e30d9f6c513c1606...
Author: shawkins <shawkins(a)redhat.com>
Date: 2013-11-07 (Thu, 07 Nov 2013)
Changed paths:
M engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
M test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java
Log Message:
-----------
TEIID-2728 ensuring desired space calculation doesn't overflow
Commit: 8acc651a3109ea725194d502586dbae0a33df198
https://github.com/teiid/teiid/commit/8acc651a3109ea725194d502586dbae0a33...
Author: shawkins <shawkins(a)redhat.com>
Date: 2013-11-07 (Thu, 07 Nov 2013)
Changed paths:
M engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
M engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
Log Message:
-----------
TEIID-2714 adding an option to make the buffer files more compact and
defaulting to a somewhat more conservative default
Commit: 8cbc732a5a93c0cecba5e61cb62997bf9e6ba667
https://github.com/teiid/teiid/commit/8cbc732a5a93c0cecba5e61cb62997bf9e6...
Author: shawkins <shawkins(a)redhat.com>
Date: 2013-11-07 (Thu, 07 Nov 2013)
Changed paths:
M engine/src/main/java/org/teiid/query/processor/BatchCollector.java
M engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
M engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
A engine/src/test/java/org/teiid/query/processor/TestBatchCollector.java
Log Message:
-----------
TEIID-2707 ensuring prefetch works for non-blocking sources as well
Compare: https://github.com/teiid/teiid/compare/12745e2eb2d2...8cbc732a5a93
11 years, 2 months
teiid SVN: r4610 - in branches/7.7.x/engine/src: test/java/org/teiid/query/validator and 1 other directory.
by teiid-commits@lists.jboss.org
Author: jolee
Date: 2013-11-06 16:18:40 -0500 (Wed, 06 Nov 2013)
New Revision: 4610
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/resolver/QueryResolver.java
branches/7.7.x/engine/src/test/java/org/teiid/query/validator/TestValidator.java
Log:
TEIID-2721: Resolving an aliased view prevents inherent update from working
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/resolver/QueryResolver.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/resolver/QueryResolver.java 2013-11-04 19:45:55 UTC (rev 4609)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/resolver/QueryResolver.java 2013-11-06 21:18:40 UTC (rev 4610)
@@ -466,6 +466,13 @@
String updatePlan = qmi.getUpdatePlan(virtualGroup.getMetadataID());
String deletePlan = qmi.getDeletePlan(virtualGroup.getMetadataID());
String insertPlan = qmi.getInsertPlan(virtualGroup.getMetadataID());
+
+ //the elements must be against the view and not the alias
+ if (virtualGroup.getDefinition() != null) {
+ GroupSymbol group = new GroupSymbol(virtualGroup.getNonCorrelationName());
+ group.setMetadataID(virtualGroup.getMetadataID());
+ virtualGroup = group;
+ }
List<ElementSymbol> elements = ResolverUtil.resolveElementsInGroup(virtualGroup, qmi);
UpdateValidator validator = new UpdateValidator(qmi, determineType(insertPlan), determineType(updatePlan), determineType(deletePlan));
Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/validator/TestValidator.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/validator/TestValidator.java 2013-11-04 19:45:55 UTC (rev 4609)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/validator/TestValidator.java 2013-11-06 21:18:40 UTC (rev 4610)
@@ -34,6 +34,7 @@
import org.junit.Ignore;
import org.junit.Test;
import org.teiid.api.exception.query.QueryMetadataException;
+import org.teiid.api.exception.query.QueryParserException;
import org.teiid.api.exception.query.QueryResolverException;
import org.teiid.api.exception.query.QueryValidatorException;
import org.teiid.client.metadata.ParameterInfo;
@@ -42,15 +43,15 @@
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.types.DataTypeManager;
import org.teiid.dqp.internal.process.multisource.MultiSourceMetadataWrapper;
+import org.teiid.metadata.BaseColumn.NullType;
import org.teiid.metadata.Column;
+import org.teiid.metadata.Column.SearchType;
import org.teiid.metadata.ColumnSet;
import org.teiid.metadata.MetadataStore;
import org.teiid.metadata.Procedure;
import org.teiid.metadata.ProcedureParameter;
import org.teiid.metadata.Schema;
import org.teiid.metadata.Table;
-import org.teiid.metadata.BaseColumn.NullType;
-import org.teiid.metadata.Column.SearchType;
import org.teiid.query.analysis.AnalysisRecord;
import org.teiid.query.mapping.relational.QueryNode;
import org.teiid.query.mapping.xml.MappingDocument;
@@ -58,6 +59,7 @@
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.metadata.TransformationMetadata;
import org.teiid.query.parser.QueryParser;
+import org.teiid.query.processor.TestProcessor;
import org.teiid.query.resolver.QueryResolver;
import org.teiid.query.sql.LanguageObject;
import org.teiid.query.sql.lang.Command;
@@ -1961,4 +1963,16 @@
helpValidate("SELECT XMLELEMENT(NAME metadata, XMLFOREST(e1 AS objectName), (SELECT XMLAGG(XMLELEMENT(NAME subTypes, XMLFOREST(e1))) FROM pm1.g2 AS b WHERE b.e2 = a.e2)) FROM pm1.g1 AS a GROUP BY e1", new String[] {"a.e2"}, RealMetadataFactory.example1Cached());
}
+ @Test public void testInsertIntoVirtualWithQueryExpression() throws QueryParserException, QueryResolverException, TeiidComponentException {
+ TransformationMetadata metadata = TestUpdateValidator.example1();
+ TestUpdateValidator.createView("select * from pm1.g1", metadata, "gx");
+
+ String sql = "select * from gx as x"; //$NON-NLS-1$
+
+ TestProcessor.helpGetPlan(sql, metadata);
+
+ sql = "insert into gx (e1, e2, e3, e4) select * from pm1.g1"; //$NON-NLS-1$
+
+ TestValidator.helpValidate(sql, new String[] {}, metadata);
+ }
}
11 years, 2 months