[teiid-commits] teiid SVN: r3657 - in trunk/engine/src: main/java/org/teiid/query/function and 2 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Nov 16 15:15:41 EST 2011


Author: shawkins
Date: 2011-11-16 15:15:40 -0500 (Wed, 16 Nov 2011)
New Revision: 3657

Modified:
   trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java
   trunk/engine/src/main/java/org/teiid/query/function/FunctionDescriptor.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java
   trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
Log:
TEIID-1518 adding a correlated subquery cache

Modified: trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java	2011-11-16 19:45:47 UTC (rev 3656)
+++ trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java	2011-11-16 20:15:40 UTC (rev 3657)
@@ -68,6 +68,7 @@
 import org.teiid.core.types.basic.StringToSQLXMLTransform;
 import org.teiid.core.util.EquivalenceUtil;
 import org.teiid.language.Like.MatchMode;
+import org.teiid.metadata.FunctionMethod.Determinism;
 import org.teiid.metadata.FunctionMethod.PushDown;
 import org.teiid.query.QueryPlugin;
 import org.teiid.query.function.FunctionDescriptor;
@@ -1058,6 +1059,11 @@
 	    
 		// Execute function
 		Object result = fd.invokeFunction(values);
+		
+        if (context != null && fd.getDeterministic().ordinal() <= Determinism.USER_DETERMINISTIC.ordinal()) {
+        	context.setDeterminismLevel(fd.getDeterministic());
+        }
+
 		return result;        
 	}
 	
@@ -1082,7 +1088,15 @@
 	    return result;
 	}
 	
-	protected ValueIterator evaluateSubquery(SubqueryContainer container, List<?> tuple) 
+	/**
+	 * @param container
+	 * @param tuple
+	 * @return
+	 * @throws TeiidProcessingException
+	 * @throws BlockedException
+	 * @throws TeiidComponentException
+	 */
+	protected ValueIterator evaluateSubquery(SubqueryContainer<?> container, List<?> tuple) 
 	throws TeiidProcessingException, BlockedException, TeiidComponentException {
 		throw new UnsupportedOperationException("Subquery evaluation not possible with a base Evaluator"); //$NON-NLS-1$
 	}

Modified: trunk/engine/src/main/java/org/teiid/query/function/FunctionDescriptor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/function/FunctionDescriptor.java	2011-11-16 19:45:47 UTC (rev 3656)
+++ trunk/engine/src/main/java/org/teiid/query/function/FunctionDescriptor.java	2011-11-16 20:15:40 UTC (rev 3657)
@@ -36,7 +36,6 @@
 import org.teiid.metadata.FunctionMethod.Determinism;
 import org.teiid.metadata.FunctionMethod.PushDown;
 import org.teiid.query.QueryPlugin;
-import org.teiid.query.util.CommandContext;
 
 
 /**
@@ -186,11 +185,6 @@
         	throw new FunctionExecutionException("ERR.015.001.0002", QueryPlugin.Util.getString("ERR.015.001.0002", getName())); //$NON-NLS-1$ //$NON-NLS-2$
         }
         
-        if (getDeterministic().compareTo(Determinism.USER_DETERMINISTIC) <= 0 && values.length > 0 && values[0] instanceof CommandContext) {
-        	CommandContext cc = (CommandContext)values[0];
-        	cc.setDeterminismLevel(getDeterministic());
-        }
-        
         // Invoke the method and return the result
         try {
         	if (method.isVarArgs()) {

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java	2011-11-16 19:45:47 UTC (rev 3656)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SubqueryAwareEvaluator.java	2011-11-16 20:15:40 UTC (rev 3657)
@@ -22,15 +22,20 @@
 
 package org.teiid.query.processor.relational;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.util.LRUCache;
+import org.teiid.metadata.FunctionMethod.Determinism;
 import org.teiid.query.eval.Evaluator;
 import org.teiid.query.processor.BatchCollector;
 import org.teiid.query.processor.ProcessorDataManager;
@@ -40,11 +45,9 @@
 import org.teiid.query.sql.symbol.ContextReference;
 import org.teiid.query.sql.symbol.ElementSymbol;
 import org.teiid.query.sql.symbol.Expression;
-import org.teiid.query.sql.symbol.ScalarSubquery;
 import org.teiid.query.sql.util.SymbolMap;
 import org.teiid.query.sql.util.ValueIterator;
 import org.teiid.query.sql.util.VariableContext;
-import org.teiid.query.sql.visitor.FunctionCollectorVisitor;
 import org.teiid.query.util.CommandContext;
 
 
@@ -57,20 +60,19 @@
 	public class SubqueryState {
 		QueryProcessor processor;
 		BatchCollector collector;
-		boolean done;
 		ProcessorPlan plan;
-		boolean nonDeterministic;
 		List<Object> refValues;
 		boolean comparable = true;
 		
-		void close() {
+		void close(boolean removeBuffer) {
 			if (processor == null) {
 				return;
 			}
 			processor.closeProcessing();
-			collector.getTupleBuffer().remove();
+			if (removeBuffer) {
+				collector.getTupleBuffer().remove();
+			}
 			processor = null;
-			this.done = false;
 		}
 	}
 	
@@ -79,11 +81,19 @@
 	
 	//processing state
 	private Map<String, SubqueryState> subqueries = new HashMap<String, SubqueryState>();
+	private LRUCache<List<?>, TupleBuffer> cache = new LRUCache<List<?>, TupleBuffer>(1024);
+	private int maxTuples = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE << 4;
+	private int currentTuples = 0;
 	
 	public SubqueryAwareEvaluator(Map elements, ProcessorDataManager dataMgr,
 			CommandContext context, BufferManager manager) {
 		super(elements, dataMgr, context);
 		this.manager = manager;
+		//default to 16 batches
+		if (this.manager != null) {
+			this.maxTuples = this.manager.getProcessorBatchSize() << 4;
+		}
+		//TODO the number of cache entries and the max tuples should be based upon the reference count and types involved as well.
 	}
 	
 	public void reset() {
@@ -94,23 +104,24 @@
 	
 	public void close() {
 		for (SubqueryState state : subqueries.values()) {
-			state.close();
+			state.close(true);
 		}
+		for (TupleBuffer buffer : cache.values()) {
+			buffer.remove();
+		}
+		cache.clear();
 	}
 	
 	@Override
-	protected ValueIterator evaluateSubquery(SubqueryContainer container,
-			List tuple) throws TeiidProcessingException, BlockedException,
+	protected ValueIterator evaluateSubquery(SubqueryContainer<?> container,
+			List<?> tuple) throws TeiidProcessingException, BlockedException,
 			TeiidComponentException {
 		ContextReference ref = (ContextReference)container;
-		String key = (ref).getContextSymbol();
+		String key = ref.getContextSymbol();
 		SubqueryState state = this.subqueries.get(key);
 		if (state == null) {
 			state = new SubqueryState();
 			state.plan = container.getCommand().getProcessorPlan().clone();
-	        if (container instanceof ScalarSubquery) {
-				state.nonDeterministic = FunctionCollectorVisitor.isNonDeterministic(container.getCommand());
-			}
 	        if (container.getCommand().getCorrelatedReferences() != null) {
 		        for (ElementSymbol es : container.getCommand().getCorrelatedReferences().getKeys()) {
 		        	if (DataTypeManager.isNonComparable(DataTypeManager.getDataTypeName(es.getType()))) {
@@ -123,7 +134,16 @@
 		}
 		SymbolMap correlatedRefs = container.getCommand().getCorrelatedReferences();
 		VariableContext currentContext = null;
-		boolean shouldClose = state.done && state.nonDeterministic;
+		boolean shouldClose = false;
+		boolean deterministic = true;
+		if (state.processor != null) {
+			Determinism determinism = state.processor.getContext().getDeterminismLevel();
+			deterministic = Determinism.COMMAND_DETERMINISTIC.compareTo(determinism) <= 0;
+			if (!deterministic) {
+				shouldClose = true;
+			}
+		}
+		boolean removeBuffer = true;
 		if (correlatedRefs != null) {
             currentContext = new VariableContext();
             for (Map.Entry<ElementSymbol, Expression> entry : container.getCommand().getCorrelatedReferences().asMap().entrySet()) {
@@ -131,30 +151,58 @@
 			}
             List<Object> refValues = currentContext.getLocalValues();
             if (!refValues.equals(state.refValues)) {
+            	if (state.comparable && deterministic) {
+            		if (state.processor != null) {
+	    				//cache the old value
+            			TupleBuffer tb = state.collector.collectTuples();
+            			//recheck determinism as the plan may not have been fully processed by the initial check 
+            			Determinism determinism = state.processor.getContext().getDeterminismLevel();
+            			deterministic = Determinism.COMMAND_DETERMINISTIC.compareTo(determinism) <= 0;
+            			if (deterministic) {
+	            			//allowed to track up to 4x the maximum results size
+		    				maxTuples = Math.max(tb.getRowCount() << 2, maxTuples);
+		    				ArrayList<Object> cacheKey = new ArrayList<Object>(state.refValues);
+		    				cacheKey.add(key);
+		    				tb.saveBatch(); //ensure that we aren't leaving large last batches in memory
+		    				this.cache.put(cacheKey, tb);
+		    				removeBuffer = false;
+		    				this.currentTuples += tb.getRowCount();
+		    				while (this.currentTuples > maxTuples && !cache.isEmpty()) {
+		    					//TODO: this should handle empty results better
+		    					Iterator<TupleBuffer> i = this.cache.values().iterator();
+		    					TupleBuffer buffer = i.next();
+		    					buffer.remove();
+		    					this.currentTuples -= buffer.getRowCount();
+		    					i.remove();
+		    				}
+            			}
+            		}
+    				//find if we have cached values
+    				List<Object> cacheKey = new ArrayList<Object>(refValues);
+    				cacheKey.add(key);
+    				TupleBuffer cachedResult = cache.get(cacheKey);
+    				if (cachedResult != null) {
+    					state.close(false);
+    					return new TupleSourceValueIterator(cachedResult.createIndexedTupleSource(), 0);
+    				}
+    			}
             	state.refValues = refValues;
             	shouldClose = true;
             }
 		}
 		if (shouldClose) {
-			//if (state.done && state.comparable) {
-				//cache
-			//} else {
-			state.close();
-			//}
+			state.close(removeBuffer);
 		}
-		if (!state.done) {
-			if (state.processor == null) {
-				CommandContext subContext = context.clone();
-				state.plan.reset();
-		        state.processor = new QueryProcessor(state.plan, subContext, manager, this.dataMgr);
-		        if (currentContext != null) {
-		        	state.processor.getContext().pushVariableContext(currentContext);
-		        }
-		        state.collector = state.processor.createBatchCollector();
-			}
-			state.done = true;
+		if (state.processor == null) {
+			CommandContext subContext = context.clone();
+			state.plan.reset();
+	        state.processor = new QueryProcessor(state.plan, subContext, manager, this.dataMgr);
+	        if (currentContext != null) {
+	        	state.processor.getContext().pushVariableContext(currentContext);
+	        }
+	        state.collector = state.processor.createBatchCollector();
 		}
-		return new DependentValueSource(state.collector.collectTuples()).getValueIterator(ref.getValueExpression());
+		return new TupleSourceValueIterator(state.collector.collectTuples().createIndexedTupleSource(), 0);
 	}
 	
 }

Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java	2011-11-16 19:45:47 UTC (rev 3656)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java	2011-11-16 20:15:40 UTC (rev 3657)
@@ -2273,7 +2273,28 @@
 
         // Run query
         helpProcess(plan, dataManager, expected);
+    }
+    
+    @Test public void testCorrelatedSubqueryCaching() throws Exception {
+        String sql = "Select e1 from pm1.g1 where e2 in (select e2 FROM pm2.g1 WHERE pm1.g1.e3 = pm2.g1.e3)"; //$NON-NLS-1$
 
+        // Create expected results
+        List[] expected = new List[] {
+            Arrays.asList("a"), Arrays.asList((String)null), Arrays.asList("a"), Arrays.asList("c"), Arrays.asList("b"), Arrays.asList("a") //$NON-NLS-1$
+        };
+
+        // Construct data manager with data
+        FakeDataManager dataManager = new FakeDataManager();
+        sampleData1(dataManager);
+
+        // Plan query
+        ProcessorPlan plan = helpGetPlan(sql, RealMetadataFactory.example1Cached());
+
+        // Run query
+        doProcess(plan, dataManager, expected, createCommandContext());
+
+        //three queries - 1 for the outer and 1 each for true/false
+        assertEquals(3, dataManager.getQueries().size());
     }
     
     /**



More information about the teiid-commits mailing list