Author: shawkins
Date: 2011-03-29 12:29:51 -0400 (Tue, 29 Mar 2011)
New Revision: 3048
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/AccessNode.java
trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
trunk/engine/src/test/java/org/teiid/query/processor/FakeDataManager.java
trunk/engine/src/test/java/org/teiid/query/processor/FakeTupleSource.java
trunk/engine/src/test/java/org/teiid/query/processor/TestDependentJoins.java
Log:
TEIID-1533 adding parallization of dependent queries
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-03-29
16:02:56 UTC (rev 3047)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-03-29
16:29:51 UTC (rev 3048)
@@ -322,6 +322,7 @@
workContext, this.config.getUseDataRoles(), this.prepPlanCache);
request.setResultSetCacheEnabled(this.rsCache != null);
request.setAllowCreateTemporaryTablesByDefault(this.config.isAllowCreateTemporaryTablesByDefault());
+ request.setUserRequestConcurrency(this.getUserRequestSourceConcurrency());
ResultsFuture<ResultsMessage> resultsFuture = new
ResultsFuture<ResultsMessage>();
RequestWorkItem workItem = new RequestWorkItem(this, requestMsg, request,
resultsFuture.getResultsReceiver(), requestID, workContext);
logMMCommand(workItem, Event.NEW, null);
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2011-03-29
16:02:56 UTC (rev 3047)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2011-03-29
16:29:51 UTC (rev 3048)
@@ -135,6 +135,7 @@
private SessionAwareCache<PreparedPlan> planCache;
private boolean resultSetCacheEnabled = true;
private boolean allowCreateTemporaryTablesByDefault;
+ private int userRequestConcurrency;
void initialize(RequestMessage requestMsg,
BufferManager bufferManager,
@@ -264,7 +265,12 @@
context.setBufferManager(this.bufferManager);
context.setPreparedPlanCache(planCache);
context.setResultSetCacheEnabled(this.resultSetCacheEnabled);
+ context.setUserRequestSourceConcurrency(this.userRequestConcurrency);
}
+
+ public void setUserRequestConcurrency(int userRequestConcurrency) {
+ this.userRequestConcurrency = userRequestConcurrency;
+ }
protected void checkReferences(List<Reference> references) throws
QueryValidatorException {
referenceCheck(references);
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/AccessNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/AccessNode.java 2011-03-29
16:02:56 UTC (rev 3047)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/AccessNode.java 2011-03-29
16:29:51 UTC (rev 3048)
@@ -46,14 +46,15 @@
public class AccessNode extends SubqueryAwareRelationalNode {
- // Initialization state
+ private static final int MAX_CONCURRENT = 10; //TODO: this could be settable via a
property
+ // Initialization state
private Command command;
private String modelName;
private String connectorBindingId;
private boolean shouldEvaluate = false;
// Processing state
- private TupleSource tupleSource;
+ private ArrayList<TupleSource> tupleSources = new ArrayList<TupleSource>();
private boolean isUpdate = false;
private boolean returnedRows = false;
private Command nextCommand;
@@ -68,7 +69,7 @@
public void reset() {
super.reset();
- tupleSource = null;
+ this.tupleSources.clear();
isUpdate = false;
returnedRows = false;
nextCommand = null;
@@ -100,22 +101,25 @@
// Copy command and resolve references if necessary
Command atomicCommand = command;
boolean needProcessing = true;
- if(shouldEvaluate) {
- atomicCommand = nextCommand();
- needProcessing = prepareNextCommand(atomicCommand);
- nextCommand = null;
- } else {
- needProcessing = RelationalNodeUtil.shouldExecute(atomicCommand, true);
- }
- // else command will not be changed, so no reason to all this work.
- // Removing this if block and always evaluating has a significant cost that will
- // show up in performance tests for many simple tests that do not require it.
-
- isUpdate = RelationalNodeUtil.isUpdate(atomicCommand);
-
- if(needProcessing) {
- registerRequest(atomicCommand);
- }
+ do {
+ if(shouldEvaluate) {
+ atomicCommand = nextCommand();
+ needProcessing = prepareNextCommand(atomicCommand);
+ nextCommand = null;
+ } else {
+ needProcessing = RelationalNodeUtil.shouldExecute(atomicCommand, true);
+ }
+ // else command will not be changed, so no reason to all this work.
+ // Removing this if block and always evaluating has a significant cost that
will
+ // show up in performance tests for many simple tests that do not require it.
+
+ isUpdate = RelationalNodeUtil.isUpdate(atomicCommand);
+
+ if(needProcessing) {
+ registerRequest(atomicCommand);
+ }
+ //We hardcode an upper limit on currency because these commands have potentially large
in-memory value sets
+ } while (!processCommandsIndividually() && hasNextCommand() &&
this.tupleSources.size() < Math.min(MAX_CONCURRENT,
this.getContext().getUserRequestSourceConcurrency()));
}
private Command nextCommand() {
@@ -147,38 +151,52 @@
public TupleBatch nextBatchDirect()
throws BlockedException, TeiidComponentException, TeiidProcessingException {
- while (tupleSource != null || hasNextCommand()) {
- //drain the tuple source
- while (tupleSource != null) {
- List<?> tuple = tupleSource.nextTuple();
-
- if(tuple == null) {
- closeSources();
- break;
- }
-
- returnedRows = true;
-
- addBatchRow(tuple);
-
- if (isBatchFull()) {
- return pullBatch();
- }
+ while (!tupleSources.isEmpty() || hasNextCommand()) {
+
+ if (tupleSources.isEmpty() && processCommandsIndividually()) {
+ registerNext();
}
- //execute another command
- while (hasNextCommand()) {
- if (processCommandsIndividually() && hasPendingRows()) {
- return pullBatch();
- }
- Command atomicCommand = nextCommand();
- if (prepareNextCommand(atomicCommand)) {
- nextCommand = null;
- registerRequest(atomicCommand);
- break;
- }
- nextCommand = null;
- }
+ //drain the tuple source(s)
+ for (int i = 0; i < this.tupleSources.size(); i++) {
+ TupleSource tupleSource = tupleSources.get(i);
+ try {
+ List<?> tuple = null;
+
+ while ((tuple = tupleSource.nextTuple()) != null) {
+ returnedRows = true;
+ addBatchRow(tuple);
+
+ if (isBatchFull()) {
+ return pullBatch();
+ }
+ }
+
+ //end of source
+ tupleSource.closeSource();
+ tupleSources.remove(i--);
+ if (!processCommandsIndividually()) {
+ registerNext();
+ }
+ continue;
+ } catch (BlockedException e) {
+ if (processCommandsIndividually()) {
+ throw e;
+ }
+ continue;
+ }
+ }
+
+ if (processCommandsIndividually()) {
+ if (hasPendingRows()) {
+ return pullBatch();
+ }
+ continue;
+ }
+
+ if (!this.tupleSources.isEmpty()) {
+ throw BlockedException.INSTANCE;
+ }
}
if(isUpdate && !returnedRows) {
@@ -191,6 +209,19 @@
return pullBatch();
}
+ private void registerNext() throws TeiidComponentException,
+ TeiidProcessingException {
+ while (hasNextCommand()) {
+ Command atomicCommand = nextCommand();
+ if (prepareNextCommand(atomicCommand)) {
+ nextCommand = null;
+ registerRequest(atomicCommand);
+ break;
+ }
+ nextCommand = null;
+ }
+ }
+
private void registerRequest(Command atomicCommand)
throws TeiidComponentException, TeiidProcessingException {
int limit = -1;
@@ -200,7 +231,7 @@
limit = parent.getLimit() + parent.getOffset();
}
}
- tupleSource = getDataManager().registerRequest(getContext(), atomicCommand, modelName,
connectorBindingId, getID(), limit);
+ tupleSources.add(getDataManager().registerRequest(getContext(), atomicCommand,
modelName, connectorBindingId, getID(), limit));
}
protected boolean processCommandsIndividually() {
@@ -217,10 +248,10 @@
}
private void closeSources() {
- if(this.tupleSource != null) {
- this.tupleSource.closeSource();
- tupleSource = null;
- }
+ for (TupleSource ts : this.tupleSources) {
+ ts.closeSource();
+ }
+ this.tupleSources.clear();
}
protected void getNodeString(StringBuffer str) {
Modified: trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java 2011-03-29
16:02:56 UTC (rev 3047)
+++ trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java 2011-03-29
16:29:51 UTC (rev 3048)
@@ -110,6 +110,8 @@
private boolean resultSetCacheEnabled = true;
+ private int userRequestSourceConcurrency;
+
}
private GlobalState globalState = new GlobalState();
@@ -511,5 +513,13 @@
public void setResultSetCacheEnabled(boolean resultSetCacheEnabled) {
this.globalState.resultSetCacheEnabled = resultSetCacheEnabled;
}
+
+ public int getUserRequestSourceConcurrency() {
+ return this.globalState.userRequestSourceConcurrency;
+ }
+ public void setUserRequestSourceConcurrency(int userRequestSourceConcurrency) {
+ this.globalState.userRequestSourceConcurrency = userRequestSourceConcurrency;
+ }
+
}
Modified: trunk/engine/src/test/java/org/teiid/query/processor/FakeDataManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/FakeDataManager.java 2011-03-29
16:02:56 UTC (rev 3047)
+++ trunk/engine/src/test/java/org/teiid/query/processor/FakeDataManager.java 2011-03-29
16:29:51 UTC (rev 3048)
@@ -104,10 +104,6 @@
tuples.put(groupID, new Object[] { elements, data });
}
- public void closeRequest(Object requestID) {
- // does nothing?
- }
-
public TupleSource registerRequest(CommandContext context, Command command, String
modelName, String connectorBindingId, int nodeID, int limit)
throws TeiidComponentException {
Modified: trunk/engine/src/test/java/org/teiid/query/processor/FakeTupleSource.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/FakeTupleSource.java 2011-03-29
16:02:56 UTC (rev 3047)
+++ trunk/engine/src/test/java/org/teiid/query/processor/FakeTupleSource.java 2011-03-29
16:29:51 UTC (rev 3048)
@@ -25,11 +25,20 @@
import java.util.ArrayList;
import java.util.List;
-import org.teiid.common.buffer.*;
+import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.TupleSource;
import org.teiid.core.TeiidComponentException;
public class FakeTupleSource implements TupleSource {
+
+ static int maxOpen;
+ static int open;
+
+ static void resetStats() {
+ maxOpen = 0;
+ open = 0;
+ }
public static class FakeComponentException extends TeiidComponentException {
@@ -55,6 +64,8 @@
this.tuples = tuples;
this.expectedSymbols = expectedSymbols;
this.columnMap = columnMap;
+ open++;
+ maxOpen = Math.max(open, maxOpen);
}
public List getSchema() {
@@ -68,9 +79,7 @@
return theElements;
}
- public void openSource()
- throws TeiidComponentException {
-
+ public void openSource() {
index = 0;
}
@@ -105,6 +114,7 @@
}
public void closeSource() {
+ open--;
}
public void setBlockOnce(){
Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestDependentJoins.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/processor/TestDependentJoins.java 2011-03-29
16:02:56 UTC (rev 3047)
+++
trunk/engine/src/test/java/org/teiid/query/processor/TestDependentJoins.java 2011-03-29
16:29:51 UTC (rev 3048)
@@ -22,16 +22,18 @@
package org.teiid.query.processor;
+import static org.junit.Assert.*;
+
import java.util.Arrays;
import java.util.List;
+import org.junit.Test;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.query.optimizer.TestOptimizer;
import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities;
import org.teiid.query.optimizer.capabilities.FakeCapabilitiesFinder;
import org.teiid.query.optimizer.capabilities.SourceCapabilities.Capability;
-import org.teiid.query.processor.ProcessorPlan;
import org.teiid.query.processor.relational.JoinNode;
import org.teiid.query.processor.relational.RelationalNode;
import org.teiid.query.processor.relational.RelationalPlan;
@@ -39,12 +41,10 @@
import org.teiid.query.unittest.FakeMetadataFacade;
import org.teiid.query.unittest.FakeMetadataFactory;
import org.teiid.query.unittest.FakeMetadataObject;
+import org.teiid.query.util.CommandContext;
-import junit.framework.TestCase;
-
-
-
-public class TestDependentJoins extends TestCase {
+@SuppressWarnings({"unchecked"})
+public class TestDependentJoins {
/**
* @param sql
@@ -66,7 +66,7 @@
}
/** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e1=pm2.g1.e1 AND
pm1.g1.e2=pm2.g1.e2 */
- public void testMultiCritDepJoin1() {
+ @Test public void testMultiCritDepJoin1() {
// Create query
String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e1=pm2.g1.e1
AND pm1.g1.e2=pm2.g1.e2 order by pm1.g1.e1 option makedep pm1.g1"; //$NON-NLS-1$
@@ -93,7 +93,7 @@
}
/** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm2.g1.e1=pm1.g1.e1 AND
pm1.g1.e2=pm2.g1.e2 */
- public void testMultiCritDepJoin2() {
+ @Test public void testMultiCritDepJoin2() {
// Create query
String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm2.g1.e1=pm1.g1.e1
AND pm1.g1.e2=pm2.g1.e2 order by pm1.g1.e1 option makedep pm1.g1"; //$NON-NLS-1$
@@ -119,7 +119,7 @@
}
/** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm2.g1.e1=pm1.g1.e1 AND
pm1.g1.e2=pm2.g1.e2 */
- public void testMultiCritDepJoin3() {
+ @Test public void testMultiCritDepJoin3() {
// Create query
String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm2.g1.e1=pm1.g1.e1
AND pm1.g1.e2=pm2.g1.e2 order by pm1.g1.e1 option makedep pm1.g1"; //$NON-NLS-1$
@@ -146,7 +146,7 @@
}
/** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm2.g1.e1=pm1.g1.e1 AND
pm1.g1.e2=pm2.g1.e2 */
- public void testMultiCritDepJoin4() {
+ @Test public void testMultiCritDepJoin4() {
// Create query
String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm2.g1.e1=pm1.g1.e1
AND pm1.g1.e2=pm2.g1.e2 order by pm1.g1.e1 option makedep pm1.g1"; //$NON-NLS-1$
@@ -173,7 +173,7 @@
}
/** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm2.g1.e1=pm1.g1.e1 AND
concat(pm1.g1.e1, 'a') = concat(pm2.g1.e1, 'a') AND pm1.g1.e2=pm2.g1.e2
*/
- public void testMultiCritDepJoin5() {
+ @Test public void testMultiCritDepJoin5() {
// Create query
String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE concat(pm1.g1.e1,
'a') = concat(pm2.g1.e1, 'a') AND pm1.g1.e2=pm2.g1.e2 order by pm1.g1.e1
option makedep pm1.g1"; //$NON-NLS-1$
@@ -199,7 +199,7 @@
TestProcessor.helpProcess(plan, dataManager, expected);
}
- public void testMultiCritDepJoin5a() {
+ @Test public void testMultiCritDepJoin5a() {
// Create query
String sql = "SELECT X.e1 FROM pm1.g1 as X, pm2.g1 WHERE concat(X.e1,
'a') = concat(pm2.g1.e1, 'a') AND X.e2=pm2.g1.e2 order by x.e1";
//$NON-NLS-1$
@@ -225,7 +225,7 @@
TestProcessor.helpProcess(plan, dataManager, expected);
}
- public void testMultiCritDepJoin5b() {
+ @Test public void testMultiCritDepJoin5b() {
//Create query
String sql = "SELECT X.e1, X.e2 FROM pm1.g1 as X, pm2.g1 WHERE concat(X.e1,
convert(X.e4, string)) = concat(pm2.g1.e1, convert(pm2.g1.e4, string)) AND X.e2=pm2.g1.e2
order by x.e1 option makedep x"; //$NON-NLS-1$
@@ -251,7 +251,7 @@
}
/** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e1 = concat(pm2.g1.e1,
'') AND pm1.g1.e2=pm2.g1.e2 */
- public void testMultiCritDepJoin6() {
+ @Test public void testMultiCritDepJoin6() {
// Create query
String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e1 =
concat(pm2.g1.e1, '') AND pm1.g1.e2=pm2.g1.e2 order by pm1.g1.e1 option makedep
pm1.g1"; //$NON-NLS-1$
@@ -278,7 +278,7 @@
}
/** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE concat(pm1.g1.e1, '') =
pm2.g1.e1 AND pm1.g1.e2=pm2.g1.e2 */
- public void testMultiCritDepJoin7() {
+ @Test public void testMultiCritDepJoin7() {
// Create query
String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE concat(pm1.g1.e1,
'') = pm2.g1.e1 AND pm1.g1.e2=pm2.g1.e2 order by pm1.g1.e1 option makedep
pm1.g1"; //$NON-NLS-1$
@@ -305,7 +305,7 @@
}
/** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e1 = pm2.g1.e1 AND pm1.g1.e2
<> pm2.g1.e2 */
- public void testMultiCritDepJoin8() {
+ @Test public void testMultiCritDepJoin8() {
// Create query
String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e1 =
pm2.g1.e1 AND pm1.g1.e2 <> pm2.g1.e2 option makedep pm1.g1"; //$NON-NLS-1$
@@ -329,7 +329,7 @@
}
/** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e2 <> pm2.g1.e2 */
- public void testMultiCritDepJoin9() {
+ @Test public void testMultiCritDepJoin9() {
// Create query
String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e2 <>
pm2.g1.e2 option makedep pm1.g1"; //$NON-NLS-1$
@@ -375,7 +375,7 @@
}
/** SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e3=pm2.g1.e3 AND
pm1.g1.e2=pm2.g1.e2 AND pm2.g1.e1 = 'a' */
- public void testMultiCritDepJoin10() {
+ @Test public void testMultiCritDepJoin10() {
// Create query
String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e3=pm2.g1.e3
AND pm1.g1.e2=pm2.g1.e2 AND pm2.g1.e1 = 'a' option makedep pm1.g1";
//$NON-NLS-1$
@@ -399,11 +399,11 @@
TestProcessor.helpProcess(plan, dataManager, expected);
}
- public void testLargeSetInDepJoinWAccessPatternCausingSortNodeInsertCanHandleAlias()
{
+ @Test public void
testLargeSetInDepJoinWAccessPatternCausingSortNodeInsertCanHandleAlias() {
helpTestDepAccessCausingSortNodeInsert(true);
}
- public void
testLargeSetInDepJoinWAccessPatternCausingSortNodeInsertCannotHandleAlias() {
+ @Test public void
testLargeSetInDepJoinWAccessPatternCausingSortNodeInsertCannotHandleAlias() {
helpTestDepAccessCausingSortNodeInsert(false);
}
@@ -456,7 +456,7 @@
TestProcessor.helpProcess(plan, dataManager, expected);
}
- public void testCase5130() {
+ @Test public void testCase5130() {
FakeCapabilitiesFinder capFinder = new FakeCapabilitiesFinder();
BasicSourceCapabilities caps = TestOptimizer.getTypicalCapabilities();
caps.setCapabilitySupport(Capability.QUERY_ORDERBY, false);
@@ -501,13 +501,13 @@
assertFalse(dataManager.getCommandHistory().contains("SELECT a.stringkey,
a.intkey FROM bqt1.smalla AS a WHERE concat(a.stringkey, 't') IN ('1',
'2')")); //$NON-NLS-1$
}
- public void testCase5130a() throws Exception {
+ @Test public void testCase5130a() throws Exception {
HardcodedDataManager dataManager = helpTestDependentJoin(false);
assertFalse(dataManager.getCommandHistory().contains("SELECT a.stringkey,
a.intkey FROM bqt2.smalla AS a WHERE (concat(a.stringkey, 't') IN ('1t',
'2')) AND (a.intkey IN (1))")); //$NON-NLS-1$
}
- public void testUnlimitedIn() throws Exception {
+ @Test public void testUnlimitedIn() throws Exception {
helpTestDependentJoin(true);
}
@@ -596,7 +596,7 @@
}
/** SELECT pm1.g1.e1 FROM pm1.g1, pm6.g1 WHERE pm1.g1.e1=pm6.g1.e1 OPTION MAKEDEP
pm6.g1 */
- public void testLargeSetInDepAccess() throws Exception {
+ @Test public void testLargeSetInDepAccess() throws Exception {
// Create query
String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm6.g1 WHERE pm1.g1.e1=pm6.g1.e1
OPTION MAKEDEP pm6.g1"; //$NON-NLS-1$
@@ -632,7 +632,7 @@
TestProcessor.helpProcess(plan, dataManager, expected);
}
- public void testLargeSetInDepAccessMultiJoinCriteria() {
+ @Test public void testLargeSetInDepAccessMultiJoinCriteria() throws Exception {
// Create query
String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm2.g1 WHERE pm1.g1.e1=pm2.g1.e1
AND pm1.g1.e2=pm2.g1.e2 order by e1 OPTION MAKEDEP pm2.g1"; //$NON-NLS-1$
// Construct data manager with data
@@ -673,13 +673,16 @@
Command command = TestProcessor.helpParse(sql);
ProcessorPlan plan = TestProcessor.helpGetPlan(command, fakeMetadata,
capFinder);
-
+ CommandContext cc = TestProcessor.createCommandContext();
+ cc.setUserRequestSourceConcurrency(5);
+ FakeTupleSource.resetStats();
// Run query
- TestProcessor.helpProcess(plan, dataManager, expected);
+ TestProcessor.helpProcess(plan, cc, dataManager, expected);
+ assertEquals(4, FakeTupleSource.maxOpen);
}
- public void testLargeSetInDepAccessWithAccessPattern() {
+ @Test public void testLargeSetInDepAccessWithAccessPattern() {
String sql = "SELECT a.e1, b.e1, b.e2 FROM pm4.g1 a INNER JOIN pm1.g1 b ON
a.e1=b.e1 AND a.e2 = b.e2"; //$NON-NLS-1$
// Create expected results
@@ -727,7 +730,7 @@
}
/** SELECT pm1.g1.e1 FROM pm1.g1, pm1.g2 WHERE pm1.g1.e1 = pm1.g2.e1 AND pm1.g1.e2 =
-100 OPTION MAKEDEP pm1.g2 */
- public void testDependentNoRows() {
+ @Test public void testDependentNoRows() {
// Create query
String sql = "SELECT pm1.g1.e1 FROM pm1.g1, pm1.g2 WHERE pm1.g1.e1 =
pm1.g2.e1 AND pm1.g1.e2 = -100 OPTION MAKEDEP pm1.g2"; //$NON-NLS-1$
@@ -747,7 +750,7 @@
}
/** SELECT pm1.g1.e2, pm2.g1.e2 FROM pm1.g1, pm2.g1 WHERE (pm1.g1.e2+1)=pm2.g1.e2
OPTION MAKEDEP pm1.g2 */
- public void testExpressionInDepJoin() {
+ @Test public void testExpressionInDepJoin() {
// Create query
String sql = "SELECT pm1.g1.e2, pm2.g1.e2 FROM pm1.g1, pm2.g1 WHERE
(pm1.g1.e2+1)=pm2.g1.e2 OPTION MAKEDEP pm2.g1"; //$NON-NLS-1$