Author: shawkins
Date: 2011-11-16 15:15:40 -0500 (Wed, 16 Nov 2011)
New Revision: 3657
Modified:
trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java
trunk/engine/src/main/java/org/teiid/query/function/FunctionDescriptor.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java
trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
Log:
TEIID-1518 adding a correlated subquery cache
Modified: trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java 2011-11-16 19:45:47 UTC
(rev 3656)
+++ trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java 2011-11-16 20:15:40 UTC
(rev 3657)
@@ -68,6 +68,7 @@
import org.teiid.core.types.basic.StringToSQLXMLTransform;
import org.teiid.core.util.EquivalenceUtil;
import org.teiid.language.Like.MatchMode;
+import org.teiid.metadata.FunctionMethod.Determinism;
import org.teiid.metadata.FunctionMethod.PushDown;
import org.teiid.query.QueryPlugin;
import org.teiid.query.function.FunctionDescriptor;
@@ -1058,6 +1059,11 @@
// Execute function
Object result = fd.invokeFunction(values);
+
+ if (context != null && fd.getDeterministic().ordinal() <=
Determinism.USER_DETERMINISTIC.ordinal()) {
+ context.setDeterminismLevel(fd.getDeterministic());
+ }
+
return result;
}
@@ -1082,7 +1088,15 @@
return result;
}
- protected ValueIterator evaluateSubquery(SubqueryContainer container, List<?>
tuple)
+ /**
+ * @param container
+ * @param tuple
+ * @return
+ * @throws TeiidProcessingException
+ * @throws BlockedException
+ * @throws TeiidComponentException
+ */
+ protected ValueIterator evaluateSubquery(SubqueryContainer<?> container,
List<?> tuple)
throws TeiidProcessingException, BlockedException, TeiidComponentException {
throw new UnsupportedOperationException("Subquery evaluation not possible with a
base Evaluator"); //$NON-NLS-1$
}
Modified: trunk/engine/src/main/java/org/teiid/query/function/FunctionDescriptor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/function/FunctionDescriptor.java 2011-11-16
19:45:47 UTC (rev 3656)
+++ trunk/engine/src/main/java/org/teiid/query/function/FunctionDescriptor.java 2011-11-16
20:15:40 UTC (rev 3657)
@@ -36,7 +36,6 @@
import org.teiid.metadata.FunctionMethod.Determinism;
import org.teiid.metadata.FunctionMethod.PushDown;
import org.teiid.query.QueryPlugin;
-import org.teiid.query.util.CommandContext;
/**
@@ -186,11 +185,6 @@
throw new FunctionExecutionException("ERR.015.001.0002",
QueryPlugin.Util.getString("ERR.015.001.0002", getName())); //$NON-NLS-1$
//$NON-NLS-2$
}
- if (getDeterministic().compareTo(Determinism.USER_DETERMINISTIC) <= 0
&& values.length > 0 && values[0] instanceof CommandContext) {
- CommandContext cc = (CommandContext)values[0];
- cc.setDeterminismLevel(getDeterministic());
- }
-
// Invoke the method and return the result
try {
if (method.isVarArgs()) {
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java 2011-11-16
19:45:47 UTC (rev 3656)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java 2011-11-16
20:15:40 UTC (rev 3657)
@@ -22,15 +22,20 @@
package org.teiid.query.processor.relational;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.TupleBuffer;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.util.LRUCache;
+import org.teiid.metadata.FunctionMethod.Determinism;
import org.teiid.query.eval.Evaluator;
import org.teiid.query.processor.BatchCollector;
import org.teiid.query.processor.ProcessorDataManager;
@@ -40,11 +45,9 @@
import org.teiid.query.sql.symbol.ContextReference;
import org.teiid.query.sql.symbol.ElementSymbol;
import org.teiid.query.sql.symbol.Expression;
-import org.teiid.query.sql.symbol.ScalarSubquery;
import org.teiid.query.sql.util.SymbolMap;
import org.teiid.query.sql.util.ValueIterator;
import org.teiid.query.sql.util.VariableContext;
-import org.teiid.query.sql.visitor.FunctionCollectorVisitor;
import org.teiid.query.util.CommandContext;
@@ -57,20 +60,19 @@
public class SubqueryState {
QueryProcessor processor;
BatchCollector collector;
- boolean done;
ProcessorPlan plan;
- boolean nonDeterministic;
List<Object> refValues;
boolean comparable = true;
- void close() {
+ void close(boolean removeBuffer) {
if (processor == null) {
return;
}
processor.closeProcessing();
- collector.getTupleBuffer().remove();
+ if (removeBuffer) {
+ collector.getTupleBuffer().remove();
+ }
processor = null;
- this.done = false;
}
}
@@ -79,11 +81,19 @@
//processing state
private Map<String, SubqueryState> subqueries = new HashMap<String,
SubqueryState>();
+ private LRUCache<List<?>, TupleBuffer> cache = new
LRUCache<List<?>, TupleBuffer>(1024);
+ private int maxTuples = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE << 4;
+ private int currentTuples = 0;
public SubqueryAwareEvaluator(Map elements, ProcessorDataManager dataMgr,
CommandContext context, BufferManager manager) {
super(elements, dataMgr, context);
this.manager = manager;
+ //default to 16 batches
+ if (this.manager != null) {
+ this.maxTuples = this.manager.getProcessorBatchSize() << 4;
+ }
+ //TODO the number of cache entries and the max tuples should be based upon the
reference count and types involved as well.
}
public void reset() {
@@ -94,23 +104,24 @@
public void close() {
for (SubqueryState state : subqueries.values()) {
- state.close();
+ state.close(true);
}
+ for (TupleBuffer buffer : cache.values()) {
+ buffer.remove();
+ }
+ cache.clear();
}
@Override
- protected ValueIterator evaluateSubquery(SubqueryContainer container,
- List tuple) throws TeiidProcessingException, BlockedException,
+ protected ValueIterator evaluateSubquery(SubqueryContainer<?> container,
+ List<?> tuple) throws TeiidProcessingException, BlockedException,
TeiidComponentException {
ContextReference ref = (ContextReference)container;
- String key = (ref).getContextSymbol();
+ String key = ref.getContextSymbol();
SubqueryState state = this.subqueries.get(key);
if (state == null) {
state = new SubqueryState();
state.plan = container.getCommand().getProcessorPlan().clone();
- if (container instanceof ScalarSubquery) {
- state.nonDeterministic =
FunctionCollectorVisitor.isNonDeterministic(container.getCommand());
- }
if (container.getCommand().getCorrelatedReferences() != null) {
for (ElementSymbol es :
container.getCommand().getCorrelatedReferences().getKeys()) {
if
(DataTypeManager.isNonComparable(DataTypeManager.getDataTypeName(es.getType()))) {
@@ -123,7 +134,16 @@
}
SymbolMap correlatedRefs = container.getCommand().getCorrelatedReferences();
VariableContext currentContext = null;
- boolean shouldClose = state.done && state.nonDeterministic;
+ boolean shouldClose = false;
+ boolean deterministic = true;
+ if (state.processor != null) {
+ Determinism determinism = state.processor.getContext().getDeterminismLevel();
+ deterministic = Determinism.COMMAND_DETERMINISTIC.compareTo(determinism) <= 0;
+ if (!deterministic) {
+ shouldClose = true;
+ }
+ }
+ boolean removeBuffer = true;
if (correlatedRefs != null) {
currentContext = new VariableContext();
for (Map.Entry<ElementSymbol, Expression> entry :
container.getCommand().getCorrelatedReferences().asMap().entrySet()) {
@@ -131,30 +151,58 @@
}
List<Object> refValues = currentContext.getLocalValues();
if (!refValues.equals(state.refValues)) {
+ if (state.comparable && deterministic) {
+ if (state.processor != null) {
+ //cache the old value
+ TupleBuffer tb = state.collector.collectTuples();
+ //recheck determinism as the plan may not have been fully processed by the
initial check
+ Determinism determinism =
state.processor.getContext().getDeterminismLevel();
+ deterministic = Determinism.COMMAND_DETERMINISTIC.compareTo(determinism)
<= 0;
+ if (deterministic) {
+ //allowed to track up to 4x the maximum results size
+ maxTuples = Math.max(tb.getRowCount() << 2, maxTuples);
+ ArrayList<Object> cacheKey = new
ArrayList<Object>(state.refValues);
+ cacheKey.add(key);
+ tb.saveBatch(); //ensure that we aren't leaving large last batches in
memory
+ this.cache.put(cacheKey, tb);
+ removeBuffer = false;
+ this.currentTuples += tb.getRowCount();
+ while (this.currentTuples > maxTuples && !cache.isEmpty()) {
+ //TODO: this should handle empty results better
+ Iterator<TupleBuffer> i = this.cache.values().iterator();
+ TupleBuffer buffer = i.next();
+ buffer.remove();
+ this.currentTuples -= buffer.getRowCount();
+ i.remove();
+ }
+ }
+ }
+ //find if we have cached values
+ List<Object> cacheKey = new ArrayList<Object>(refValues);
+ cacheKey.add(key);
+ TupleBuffer cachedResult = cache.get(cacheKey);
+ if (cachedResult != null) {
+ state.close(false);
+ return new TupleSourceValueIterator(cachedResult.createIndexedTupleSource(),
0);
+ }
+ }
state.refValues = refValues;
shouldClose = true;
}
}
if (shouldClose) {
- //if (state.done && state.comparable) {
- //cache
- //} else {
- state.close();
- //}
+ state.close(removeBuffer);
}
- if (!state.done) {
- if (state.processor == null) {
- CommandContext subContext = context.clone();
- state.plan.reset();
- state.processor = new QueryProcessor(state.plan, subContext, manager,
this.dataMgr);
- if (currentContext != null) {
- state.processor.getContext().pushVariableContext(currentContext);
- }
- state.collector = state.processor.createBatchCollector();
- }
- state.done = true;
+ if (state.processor == null) {
+ CommandContext subContext = context.clone();
+ state.plan.reset();
+ state.processor = new QueryProcessor(state.plan, subContext, manager,
this.dataMgr);
+ if (currentContext != null) {
+ state.processor.getContext().pushVariableContext(currentContext);
+ }
+ state.collector = state.processor.createBatchCollector();
}
- return new
DependentValueSource(state.collector.collectTuples()).getValueIterator(ref.getValueExpression());
+ return new
TupleSourceValueIterator(state.collector.collectTuples().createIndexedTupleSource(), 0);
}
}
Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2011-11-16
19:45:47 UTC (rev 3656)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2011-11-16
20:15:40 UTC (rev 3657)
@@ -2273,7 +2273,28 @@
// Run query
helpProcess(plan, dataManager, expected);
+ }
+
+ @Test public void testCorrelatedSubqueryCaching() throws Exception {
+ String sql = "Select e1 from pm1.g1 where e2 in (select e2 FROM pm2.g1 WHERE
pm1.g1.e3 = pm2.g1.e3)"; //$NON-NLS-1$
+ // Create expected results
+ List[] expected = new List[] {
+ Arrays.asList("a"), Arrays.asList((String)null),
Arrays.asList("a"), Arrays.asList("c"), Arrays.asList("b"),
Arrays.asList("a") //$NON-NLS-1$
+ };
+
+ // Construct data manager with data
+ FakeDataManager dataManager = new FakeDataManager();
+ sampleData1(dataManager);
+
+ // Plan query
+ ProcessorPlan plan = helpGetPlan(sql, RealMetadataFactory.example1Cached());
+
+ // Run query
+ doProcess(plan, dataManager, expected, createCommandContext());
+
+ //three queries - 1 for the outer and 1 each for true/false
+ assertEquals(3, dataManager.getQueries().size());
}
/**