Author: shawkins
Date: 2012-06-26 11:22:47 -0400 (Tue, 26 Jun 2012)
New Revision: 4203
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java
trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
Log:
TEIID-2077 improving subquery result caching such that the same commands at the same node
will share results
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java 2012-06-22
22:16:53 UTC (rev 4202)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java 2012-06-26
15:22:47 UTC (rev 4203)
@@ -41,6 +41,7 @@
import org.teiid.query.processor.ProcessorDataManager;
import org.teiid.query.processor.ProcessorPlan;
import org.teiid.query.processor.QueryProcessor;
+import org.teiid.query.sql.lang.Command;
import org.teiid.query.sql.lang.SubqueryContainer;
import org.teiid.query.sql.symbol.ContextReference;
import org.teiid.query.sql.symbol.ElementSymbol;
@@ -57,6 +58,29 @@
*/
public class SubqueryAwareEvaluator extends Evaluator {
+ @SuppressWarnings("serial")
+ private final class LRUBufferCache extends LRUCache<List<?>, TupleBuffer> {
+
+ private LRUCache<List<?>, TupleBuffer> spillOver;
+
+ private LRUBufferCache(int maxSize, LRUCache<List<?>, TupleBuffer>
spillOver) {
+ super(maxSize);
+ this.spillOver = spillOver;
+ }
+
+ protected boolean
removeEldestEntry(Map.Entry<java.util.List<?>,TupleBuffer> eldest) {
+ if (super.removeEldestEntry(eldest)) {
+ if (spillOver != null && eldest.getValue().getRowCount() <= 2) {
+ spillOver.put(eldest.getKey(), eldest.getValue());
+ } else {
+ eldest.getValue().remove();
+ }
+ return true;
+ }
+ return false;
+ }
+ }
+
public class SubqueryState {
QueryProcessor processor;
BatchCollector collector;
@@ -81,7 +105,9 @@
//processing state
private Map<String, SubqueryState> subqueries = new HashMap<String,
SubqueryState>();
- private LRUCache<List<?>, TupleBuffer> cache = new
LRUCache<List<?>, TupleBuffer>(1024);
+ private Map<Command, String> commands = new HashMap<Command, String>();
//TODO: could determine this ahead of time
+ private LRUCache<List<?>, TupleBuffer> smallCache = new LRUBufferCache(1024,
null);
+ private LRUCache<List<?>, TupleBuffer> cache = new LRUBufferCache(512,
smallCache);
private int maxTuples = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE << 4;
private int currentTuples = 0;
@@ -120,6 +146,15 @@
String key = ref.getContextSymbol();
SubqueryState state = this.subqueries.get(key);
if (state == null) {
+ String otherKey = commands.get(container.getCommand());
+ if (otherKey != null) {
+ state = this.subqueries.get(otherKey);
+ if (state != null) {
+ key = otherKey;
+ }
+ }
+ }
+ if (state == null) {
state = new SubqueryState();
state.plan = container.getCommand().getProcessorPlan().clone();
if (container.getCommand().getCorrelatedReferences() != null) {
@@ -131,6 +166,7 @@
}
}
this.subqueries.put(key, state);
+ this.commands.put(container.getCommand(), key);
}
SymbolMap correlatedRefs = container.getCommand().getCorrelatedReferences();
VariableContext currentContext = null;
@@ -168,10 +204,13 @@
removeBuffer = false;
this.currentTuples += tb.getRowCount();
while (this.currentTuples > maxTuples && !cache.isEmpty()) {
- //TODO: this should handle empty results better
Iterator<TupleBuffer> i = this.cache.values().iterator();
TupleBuffer buffer = i.next();
- buffer.remove();
+ if (buffer.getRowCount() <= 2) {
+ this.smallCache.put(cacheKey, buffer);
+ } else {
+ buffer.remove();
+ }
this.currentTuples -= buffer.getRowCount();
i.remove();
}
@@ -181,6 +220,9 @@
List<Object> cacheKey = new ArrayList<Object>(refValues);
cacheKey.add(key);
TupleBuffer cachedResult = cache.get(cacheKey);
+ if (cachedResult == null) {
+ cachedResult = smallCache.get(cacheKey);
+ }
if (cachedResult != null) {
state.close(false);
return new TupleSourceValueIterator(cachedResult.createIndexedTupleSource(),
0);
Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2012-06-22
22:16:53 UTC (rev 4202)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2012-06-26
15:22:47 UTC (rev 4203)
@@ -2292,6 +2292,23 @@
assertEquals(3, dataManager.getQueries().size());
}
+ @Test public void testCorrelatedSubqueryCaching1() throws Exception {
+ String sql = "Select (select e2 FROM pm2.g1 WHERE pm1.g1.e3 = pm2.g1.e3
limit 1), (select e2 FROM pm2.g1 WHERE pm1.g1.e3 = pm2.g1.e3 limit 1) from pm1.g1 order by
e1 limit 1"; //$NON-NLS-1$
+
+ List[] expected = new List[] {
+ Arrays.asList(0, 0)
+ };
+
+ FakeDataManager dataManager = new FakeDataManager();
+ sampleData1(dataManager);
+
+ ProcessorPlan plan = helpGetPlan(sql, RealMetadataFactory.example1Cached());
+
+ doProcess(plan, dataManager, expected, createCommandContext());
+
+ assertEquals(2, dataManager.getQueries().size());
+ }
+
/**
* There is a bug when the second query in a UNION ALL has a correlated subquery, and
both
* the outer and inner query are selecting from the same virtual group, and aliasing
them