[teiid-commits] teiid SVN: r3048 - in trunk/engine/src: main/java/org/teiid/query/processor/relational and 2 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Mar 29 12:29:52 EDT 2011


Author: shawkins
Date: 2011-03-29 12:29:51 -0400 (Tue, 29 Mar 2011)
New Revision: 3048

Modified:
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/AccessNode.java
   trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
   trunk/engine/src/test/java/org/teiid/query/processor/FakeDataManager.java
   trunk/engine/src/test/java/org/teiid/query/processor/FakeTupleSource.java
   trunk/engine/src/test/java/org/teiid/query/processor/TestDependentJoins.java
Log:
TEIID-1533 adding parallization of dependent queries

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-03-29 16:02:56 UTC (rev 3047)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-03-29 16:29:51 UTC (rev 3048)
@@ -322,6 +322,7 @@
 				workContext, this.config.getUseDataRoles(), this.prepPlanCache);
 		request.setResultSetCacheEnabled(this.rsCache != null);
 		request.setAllowCreateTemporaryTablesByDefault(this.config.isAllowCreateTemporaryTablesByDefault());
+		request.setUserRequestConcurrency(this.getUserRequestSourceConcurrency());
         ResultsFuture<ResultsMessage> resultsFuture = new ResultsFuture<ResultsMessage>();
         RequestWorkItem workItem = new RequestWorkItem(this, requestMsg, request, resultsFuture.getResultsReceiver(), requestID, workContext);
     	logMMCommand(workItem, Event.NEW, null); 

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2011-03-29 16:02:56 UTC (rev 3047)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2011-03-29 16:29:51 UTC (rev 3048)
@@ -135,6 +135,7 @@
 	private SessionAwareCache<PreparedPlan> planCache;
 	private boolean resultSetCacheEnabled = true;
 	private boolean allowCreateTemporaryTablesByDefault;
+	private int userRequestConcurrency;
 
     void initialize(RequestMessage requestMsg,
                               BufferManager bufferManager,
@@ -264,7 +265,12 @@
         context.setBufferManager(this.bufferManager);
         context.setPreparedPlanCache(planCache);
         context.setResultSetCacheEnabled(this.resultSetCacheEnabled);
+        context.setUserRequestSourceConcurrency(this.userRequestConcurrency);
     }
+    
+    public void setUserRequestConcurrency(int userRequestConcurrency) {
+		this.userRequestConcurrency = userRequestConcurrency;
+	}
 
     protected void checkReferences(List<Reference> references) throws QueryValidatorException {
     	referenceCheck(references);

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/AccessNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/AccessNode.java	2011-03-29 16:02:56 UTC (rev 3047)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/AccessNode.java	2011-03-29 16:29:51 UTC (rev 3048)
@@ -46,14 +46,15 @@
 
 public class AccessNode extends SubqueryAwareRelationalNode {
 
-    // Initialization state
+    private static final int MAX_CONCURRENT = 10; //TODO: this could be settable via a property
+	// Initialization state
     private Command command;
     private String modelName;
     private String connectorBindingId;
     private boolean shouldEvaluate = false;
 
     // Processing state
-	private TupleSource tupleSource;
+	private ArrayList<TupleSource> tupleSources = new ArrayList<TupleSource>();
 	private boolean isUpdate = false;
     private boolean returnedRows = false;
     private Command nextCommand;
@@ -68,7 +69,7 @@
 
     public void reset() {
         super.reset();
-        tupleSource = null;
+        this.tupleSources.clear();
 		isUpdate = false;
         returnedRows = false;
         nextCommand = null;
@@ -100,22 +101,25 @@
         // Copy command and resolve references if necessary
         Command atomicCommand = command;
         boolean needProcessing = true;
-        if(shouldEvaluate) {
-            atomicCommand = nextCommand();
-            needProcessing = prepareNextCommand(atomicCommand);
-            nextCommand = null;
-        } else {
-            needProcessing = RelationalNodeUtil.shouldExecute(atomicCommand, true);
-        }
-        // else command will not be changed, so no reason to all this work.
-        // Removing this if block and always evaluating has a significant cost that will
-        // show up in performance tests for many simple tests that do not require it.
-        
-        isUpdate = RelationalNodeUtil.isUpdate(atomicCommand);
-        
-		if(needProcessing) {
-			registerRequest(atomicCommand);
-		}
+        do {
+	        if(shouldEvaluate) {
+	            atomicCommand = nextCommand();
+	            needProcessing = prepareNextCommand(atomicCommand);
+	            nextCommand = null;
+	        } else {
+	            needProcessing = RelationalNodeUtil.shouldExecute(atomicCommand, true);
+	        }
+	        // else command will not be changed, so no reason to all this work.
+	        // Removing this if block and always evaluating has a significant cost that will
+	        // show up in performance tests for many simple tests that do not require it.
+	        
+	        isUpdate = RelationalNodeUtil.isUpdate(atomicCommand);
+	        
+			if(needProcessing) {
+				registerRequest(atomicCommand);
+			}
+			//We hardcode an upper limit on currency because these commands have potentially large in-memory value sets
+        } while (!processCommandsIndividually() && hasNextCommand() && this.tupleSources.size() < Math.min(MAX_CONCURRENT, this.getContext().getUserRequestSourceConcurrency()));
 	}
 
 	private Command nextCommand() {
@@ -147,38 +151,52 @@
 	public TupleBatch nextBatchDirect()
 		throws BlockedException, TeiidComponentException, TeiidProcessingException {
         
-        while (tupleSource != null || hasNextCommand()) {
-        	//drain the tuple source
-        	while (tupleSource != null) {
-                List<?> tuple = tupleSource.nextTuple();
-    
-                if(tuple == null) {
-                    closeSources();
-                    break;
-                } 
-                
-                returnedRows = true;
-                
-                addBatchRow(tuple);
-                
-                if (isBatchFull()) {
-                	return pullBatch();
-                }
+        while (!tupleSources.isEmpty() || hasNextCommand()) {
+        	
+        	if (tupleSources.isEmpty() && processCommandsIndividually()) {
+        		registerNext();
         	}
         	
-        	//execute another command
-            while (hasNextCommand()) {
-            	if (processCommandsIndividually() && hasPendingRows()) {
-            		return pullBatch();
-            	}
-                Command atomicCommand = nextCommand();
-                if (prepareNextCommand(atomicCommand)) {
-                	nextCommand = null;
-                    registerRequest(atomicCommand);
-                    break;
-                }
-                nextCommand = null;
-            }            
+        	//drain the tuple source(s)
+        	for (int i = 0; i < this.tupleSources.size(); i++) {
+        		TupleSource tupleSource = tupleSources.get(i);
+        		try {
+	        		List<?> tuple = null;
+	        		
+	        		while ((tuple = tupleSource.nextTuple()) != null) {
+	                    returnedRows = true;
+	                    addBatchRow(tuple);
+	                    
+	                    if (isBatchFull()) {
+	                    	return pullBatch();
+	                    }
+	        		}
+	        		
+                	//end of source
+                    tupleSource.closeSource();
+                    tupleSources.remove(i--);
+                    if (!processCommandsIndividually()) {
+                    	registerNext();
+                    }
+                    continue;
+        		} catch (BlockedException e) {
+        			if (processCommandsIndividually()) {
+        				throw e;
+        			}
+        			continue;
+        		}
+			}
+        	
+        	if (processCommandsIndividually()) {
+        		if (hasPendingRows()) {
+        			return pullBatch();
+        		}
+        		continue;
+        	}
+        	
+        	if (!this.tupleSources.isEmpty()) {
+        		throw BlockedException.INSTANCE;
+        	}
         }
         
         if(isUpdate && !returnedRows) {
@@ -191,6 +209,19 @@
         return pullBatch();
 	}
 
+	private void registerNext() throws TeiidComponentException,
+			TeiidProcessingException {
+		while (hasNextCommand()) {
+		    Command atomicCommand = nextCommand();
+		    if (prepareNextCommand(atomicCommand)) {
+		    	nextCommand = null;
+		        registerRequest(atomicCommand);
+		        break;
+		    }
+		    nextCommand = null;
+		}
+	}
+
 	private void registerRequest(Command atomicCommand)
 			throws TeiidComponentException, TeiidProcessingException {
 		int limit = -1;
@@ -200,7 +231,7 @@
 				limit = parent.getLimit() + parent.getOffset();
 			}
 		}
-		tupleSource = getDataManager().registerRequest(getContext(), atomicCommand, modelName, connectorBindingId, getID(), limit);
+		tupleSources.add(getDataManager().registerRequest(getContext(), atomicCommand, modelName, connectorBindingId, getID(), limit));
 	}
 	
 	protected boolean processCommandsIndividually() {
@@ -217,10 +248,10 @@
 	}
 
     private void closeSources() {
-        if(this.tupleSource != null) {
-    		this.tupleSource.closeSource();
-            tupleSource = null;
-        }
+    	for (TupleSource ts : this.tupleSources) {
+    		ts.closeSource();			
+		}
+    	this.tupleSources.clear();
 	}
 
 	protected void getNodeString(StringBuffer str) {

Modified: trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java	2011-03-29 16:02:56 UTC (rev 3047)
+++ trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java	2011-03-29 16:29:51 UTC (rev 3048)
@@ -110,6 +110,8 @@
 	    
 	    private boolean resultSetCacheEnabled = true;
 	    
+	    private int userRequestSourceConcurrency;
+
 	}
 	
 	private GlobalState globalState = new GlobalState();
@@ -511,5 +513,13 @@
     public void setResultSetCacheEnabled(boolean resultSetCacheEnabled) {
 		this.globalState.resultSetCacheEnabled = resultSetCacheEnabled;
 	}
+    
+	public int getUserRequestSourceConcurrency() {
+		return this.globalState.userRequestSourceConcurrency;
+	}
 	
+	public void setUserRequestSourceConcurrency(int userRequestSourceConcurrency) {
+		this.globalState.userRequestSourceConcurrency = userRequestSourceConcurrency;
+	}
+	
 }

Modified: trunk/engine/src/test/java/org/teiid/query/processor/FakeDataManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/FakeDataManager.java	2011-03-29 16:02:56 UTC (rev 3047)
+++ trunk/engine/src/test/java/org/teiid/query/processor/FakeDataManager.java	2011-03-29 16:29:51 UTC (rev 3048)
@@ -104,10 +104,6 @@
 		tuples.put(groupID, new Object[] { elements, data });
 	}
 	
-	public void closeRequest(Object requestID) {
-		// does nothing?
-    } 
-	
 	public TupleSource registerRequest(CommandContext context, Command command, String modelName, String connectorBindingId, int nodeID, int limit)
 		throws TeiidComponentException {
         

Modified: trunk/engine/src/test/java/org/teiid/query/processor/FakeTupleSource.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/FakeTupleSource.java	2011-03-29 16:02:56 UTC (rev 3047)
+++ trunk/engine/src/test/java/org/teiid/query/processor/FakeTupleSource.java	2011-03-29 16:29:51 UTC (rev 3048)
@@ -25,11 +25,20 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.teiid.common.buffer.*;
+import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.TupleSource;
 import org.teiid.core.TeiidComponentException;
 
 
 public class FakeTupleSource implements TupleSource {
+	
+	static int maxOpen;
+	static int open;
+	
+	static void resetStats() {
+		maxOpen = 0;
+		open = 0;
+	}
     
     public static class FakeComponentException extends TeiidComponentException {
         
@@ -55,6 +64,8 @@
 		this.tuples = tuples; 
 		this.expectedSymbols = expectedSymbols;
 		this.columnMap = columnMap;
+		open++;
+		maxOpen = Math.max(open, maxOpen);
 	}
 
 	public List getSchema() { 
@@ -68,9 +79,7 @@
         return theElements;
 	}
 	
-	public void openSource()
-		throws TeiidComponentException {				
-		
+	public void openSource() {				
 		index = 0;
 	}
 
@@ -105,6 +114,7 @@
 	}
 
 	public void closeSource() {
+		open--;
 	}
     
     public void setBlockOnce(){

Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestDependentJoins.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestDependentJoins.java	2011-03-29 16:02:56 UTC (rev 3047)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestDependentJoins.java	2011-03-29 16:29:51 UTC (rev 3048)
@@ -22,16 +22,18 @@
 
 package org.teiid.query.processor;
 
+import static org.junit.Assert.*;
+
 import java.util.Arrays;
 import java.util.List;
 
+import org.junit.Test;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.query.optimizer.TestOptimizer;
 import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities;
 import org.teiid.query.optimizer.capabilities.FakeCapabilitiesFinder;
 import org.teiid.query.optimizer.capabilities.SourceCapabilities.Capability;
-import org.teiid.query.processor.ProcessorPlan;
 import org.teiid.query.processor.relational.JoinNode;
 import org.teiid.query.processor.relational.RelationalNode;
 import org.teiid.query.processor.relational.RelationalPlan;
@@ -39,12 +41,10 @@
 import org.teiid.query.unittest.FakeMetadataFacade;
 import org.teiid.query.unittest.FakeMetadataFactory;
 import org.teiid.query.unittest.FakeMetadataObject;
+import org.teiid.query.util.CommandContext;
 
-import junit.framework.TestCase;
-
-
-
-public class TestDependentJoins extends TestCase {
+ at SuppressWarnings({"unchecked"})
+public class TestDependentJoins {
     
     /** 
      * @param sql
@@ -66,7 +66,7 @@
     }
     
     /** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e1=pm2.g1.e1 AND pm1.g1.e2=pm2.g1.e2 */
-    public void testMultiCritDepJoin1() { 
+    @Test public void testMultiCritDepJoin1() { 
        // Create query 
        String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e1=pm2.g1.e1 AND pm1.g1.e2=pm2.g1.e2 order by pm1.g1.e1 option makedep pm1.g1"; //$NON-NLS-1$
        
@@ -93,7 +93,7 @@
    }
 
     /** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm2.g1.e1=pm1.g1.e1 AND pm1.g1.e2=pm2.g1.e2 */
-    public void testMultiCritDepJoin2() { 
+    @Test public void testMultiCritDepJoin2() { 
        // Create query 
        String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm2.g1.e1=pm1.g1.e1 AND pm1.g1.e2=pm2.g1.e2 order by pm1.g1.e1 option makedep pm1.g1"; //$NON-NLS-1$
        
@@ -119,7 +119,7 @@
    }
 
     /** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm2.g1.e1=pm1.g1.e1 AND pm1.g1.e2=pm2.g1.e2 */
-    public void testMultiCritDepJoin3() { 
+    @Test public void testMultiCritDepJoin3() { 
        // Create query 
        String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm2.g1.e1=pm1.g1.e1 AND pm1.g1.e2=pm2.g1.e2 order by pm1.g1.e1 option makedep pm1.g1"; //$NON-NLS-1$
        
@@ -146,7 +146,7 @@
    }
 
     /** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm2.g1.e1=pm1.g1.e1 AND pm1.g1.e2=pm2.g1.e2 */
-    public void testMultiCritDepJoin4() { 
+    @Test public void testMultiCritDepJoin4() { 
        // Create query 
        String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm2.g1.e1=pm1.g1.e1 AND pm1.g1.e2=pm2.g1.e2 order by pm1.g1.e1 option makedep pm1.g1"; //$NON-NLS-1$
        
@@ -173,7 +173,7 @@
    }
 
     /** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm2.g1.e1=pm1.g1.e1 AND concat(pm1.g1.e1, 'a') = concat(pm2.g1.e1, 'a') AND pm1.g1.e2=pm2.g1.e2 */
-    public void testMultiCritDepJoin5() { 
+    @Test public void testMultiCritDepJoin5() { 
        // Create query 
        String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE concat(pm1.g1.e1, 'a') = concat(pm2.g1.e1, 'a') AND pm1.g1.e2=pm2.g1.e2 order by pm1.g1.e1 option makedep pm1.g1"; //$NON-NLS-1$
        
@@ -199,7 +199,7 @@
        TestProcessor.helpProcess(plan, dataManager, expected);
    }
 
-    public void testMultiCritDepJoin5a() { 
+    @Test public void testMultiCritDepJoin5a() { 
         // Create query 
         String sql = "SELECT X.e1 FROM pm1.g1 as X, pm2.g1 WHERE concat(X.e1, 'a') = concat(pm2.g1.e1, 'a') AND X.e2=pm2.g1.e2 order by x.e1"; //$NON-NLS-1$
        
@@ -225,7 +225,7 @@
         TestProcessor.helpProcess(plan, dataManager, expected);
    }
 
-   public void testMultiCritDepJoin5b() { 
+   @Test public void testMultiCritDepJoin5b() { 
        //Create query 
        String sql = "SELECT X.e1, X.e2 FROM pm1.g1 as X, pm2.g1 WHERE concat(X.e1, convert(X.e4, string)) = concat(pm2.g1.e1, convert(pm2.g1.e4, string)) AND X.e2=pm2.g1.e2 order by x.e1 option makedep x"; //$NON-NLS-1$
        
@@ -251,7 +251,7 @@
    }
 
     /** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e1 = concat(pm2.g1.e1, '') AND pm1.g1.e2=pm2.g1.e2 */
-    public void testMultiCritDepJoin6() { 
+    @Test public void testMultiCritDepJoin6() { 
        // Create query 
        String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e1 = concat(pm2.g1.e1, '') AND pm1.g1.e2=pm2.g1.e2 order by pm1.g1.e1 option makedep pm1.g1"; //$NON-NLS-1$
        
@@ -278,7 +278,7 @@
    }
 
     /** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE concat(pm1.g1.e1, '') = pm2.g1.e1 AND pm1.g1.e2=pm2.g1.e2 */
-    public void testMultiCritDepJoin7() { 
+    @Test public void testMultiCritDepJoin7() { 
        // Create query 
        String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE concat(pm1.g1.e1, '') = pm2.g1.e1 AND pm1.g1.e2=pm2.g1.e2 order by pm1.g1.e1 option makedep pm1.g1"; //$NON-NLS-1$
        
@@ -305,7 +305,7 @@
    }
 
     /** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e1 = pm2.g1.e1 AND pm1.g1.e2 <> pm2.g1.e2 */
-    public void testMultiCritDepJoin8() { 
+    @Test public void testMultiCritDepJoin8() { 
        // Create query 
        String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e1 = pm2.g1.e1 AND pm1.g1.e2 <> pm2.g1.e2 option makedep pm1.g1"; //$NON-NLS-1$
        
@@ -329,7 +329,7 @@
    }
 
     /** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e2 <> pm2.g1.e2 */
-    public void testMultiCritDepJoin9() { 
+    @Test public void testMultiCritDepJoin9() { 
        // Create query 
        String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e2 <> pm2.g1.e2 option makedep pm1.g1"; //$NON-NLS-1$
        
@@ -375,7 +375,7 @@
    }     
 
     /** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e3=pm2.g1.e3 AND pm1.g1.e2=pm2.g1.e2 AND pm2.g1.e1 = 'a' */
-    public void testMultiCritDepJoin10() { 
+    @Test public void testMultiCritDepJoin10() { 
        // Create query 
        String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e3=pm2.g1.e3 AND pm1.g1.e2=pm2.g1.e2 AND pm2.g1.e1 = 'a' option makedep pm1.g1"; //$NON-NLS-1$
        
@@ -399,11 +399,11 @@
        TestProcessor.helpProcess(plan, dataManager, expected);
    }       
 
-    public void testLargeSetInDepJoinWAccessPatternCausingSortNodeInsertCanHandleAlias() {
+    @Test public void testLargeSetInDepJoinWAccessPatternCausingSortNodeInsertCanHandleAlias() {
         helpTestDepAccessCausingSortNodeInsert(true);
     }
     
-    public void testLargeSetInDepJoinWAccessPatternCausingSortNodeInsertCannotHandleAlias() {
+    @Test public void testLargeSetInDepJoinWAccessPatternCausingSortNodeInsertCannotHandleAlias() {
         helpTestDepAccessCausingSortNodeInsert(false);
     }
     
@@ -456,7 +456,7 @@
         TestProcessor.helpProcess(plan, dataManager, expected);          
     }
     
-    public void testCase5130() {
+    @Test public void testCase5130() {
         FakeCapabilitiesFinder capFinder = new FakeCapabilitiesFinder();
         BasicSourceCapabilities caps = TestOptimizer.getTypicalCapabilities();
         caps.setCapabilitySupport(Capability.QUERY_ORDERBY, false);
@@ -501,13 +501,13 @@
         assertFalse(dataManager.getCommandHistory().contains("SELECT a.stringkey, a.intkey FROM bqt1.smalla AS a WHERE concat(a.stringkey, 't') IN ('1', '2')")); //$NON-NLS-1$
     }
     
-    public void testCase5130a() throws Exception {
+    @Test public void testCase5130a() throws Exception {
         HardcodedDataManager dataManager = helpTestDependentJoin(false);
         
         assertFalse(dataManager.getCommandHistory().contains("SELECT a.stringkey, a.intkey FROM bqt2.smalla AS a WHERE (concat(a.stringkey, 't') IN ('1t', '2')) AND (a.intkey IN (1))")); //$NON-NLS-1$
     }
     
-    public void testUnlimitedIn() throws Exception {
+    @Test public void testUnlimitedIn() throws Exception {
     	helpTestDependentJoin(true);
     }
 
@@ -596,7 +596,7 @@
     }
 
     /** SELECT pm1.g1.e1 FROM pm1.g1, pm6.g1 WHERE pm1.g1.e1=pm6.g1.e1 OPTION MAKEDEP pm6.g1 */
-    public void testLargeSetInDepAccess() throws Exception {
+    @Test public void testLargeSetInDepAccess() throws Exception {
         // Create query 
         String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm6.g1 WHERE pm1.g1.e1=pm6.g1.e1 OPTION MAKEDEP pm6.g1"; //$NON-NLS-1$
 
@@ -632,7 +632,7 @@
         TestProcessor.helpProcess(plan, dataManager, expected);
     }
 
-    public void testLargeSetInDepAccessMultiJoinCriteria() {
+    @Test public void testLargeSetInDepAccessMultiJoinCriteria() throws Exception {
         //     Create query 
         String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e1=pm2.g1.e1 AND pm1.g1.e2=pm2.g1.e2 order by e1 OPTION MAKEDEP pm2.g1"; //$NON-NLS-1$
         // Construct data manager with data
@@ -673,13 +673,16 @@
 
         Command command = TestProcessor.helpParse(sql);
         ProcessorPlan plan = TestProcessor.helpGetPlan(command, fakeMetadata, capFinder);
-
+        CommandContext cc = TestProcessor.createCommandContext();
+        cc.setUserRequestSourceConcurrency(5);
+        FakeTupleSource.resetStats();
         // Run query
-        TestProcessor.helpProcess(plan, dataManager, expected);
+        TestProcessor.helpProcess(plan, cc, dataManager, expected);
 
+        assertEquals(4, FakeTupleSource.maxOpen);
     }
 
-    public void testLargeSetInDepAccessWithAccessPattern() {
+    @Test public void testLargeSetInDepAccessWithAccessPattern() {
         String sql = "SELECT a.e1, b.e1, b.e2 FROM pm4.g1 a INNER JOIN pm1.g1 b ON a.e1=b.e1 AND a.e2 = b.e2"; //$NON-NLS-1$
 
         // Create expected results
@@ -727,7 +730,7 @@
     }
     
     /** SELECT pm1.g1.e1 FROM pm1.g1, pm1.g2 WHERE pm1.g1.e1 = pm1.g2.e1 AND pm1.g1.e2 = -100 OPTION MAKEDEP pm1.g2 */
-    public void testDependentNoRows() { 
+    @Test public void testDependentNoRows() { 
        // Create query 
        String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm1.g2 WHERE pm1.g1.e1 = pm1.g2.e1 AND pm1.g1.e2 = -100 OPTION MAKEDEP pm1.g2"; //$NON-NLS-1$
         
@@ -747,7 +750,7 @@
     }
 
     /** SELECT pm1.g1.e2, pm2.g1.e2 FROM pm1.g1, pm2.g1 WHERE (pm1.g1.e2+1)=pm2.g1.e2 OPTION MAKEDEP pm1.g2 */
-    public void testExpressionInDepJoin() { 
+    @Test public void testExpressionInDepJoin() { 
        // Create query 
        String sql = "SELECT pm1.g1.e2, pm2.g1.e2 FROM pm1.g1, pm2.g1 WHERE (pm1.g1.e2+1)=pm2.g1.e2 OPTION MAKEDEP pm2.g1"; //$NON-NLS-1$
        



More information about the teiid-commits mailing list