Author: shawkins
Date: 2011-07-27 17:13:25 -0400 (Wed, 27 Jul 2011)
New Revision: 3344
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/ExecSqlInstruction.java
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/PlanExecutor.java
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/XMLPlan.java
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/XMLProcessorEnvironment.java
branches/7.4.x/engine/src/test/java/org/teiid/query/processor/xml/FakePlanExecutor.java
branches/7.4.x/engine/src/test/java/org/teiid/query/processor/xml/TestProcessorEnvironment.java
Log:
TEIID-993 adding the ability to start sibling mapping class queries in parallel.
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java 2011-07-27
17:52:49 UTC (rev 3343)
+++
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java 2011-07-27
21:13:25 UTC (rev 3344)
@@ -169,7 +169,7 @@
return result;
}
- private void init() throws TeiidComponentException, TeiidProcessingException {
+ public void init() throws TeiidComponentException, TeiidProcessingException {
// initialize if necessary
if(!initialized) {
reserved =
this.bufferMgr.reserveBuffers(this.bufferMgr.getSchemaSize(this.getOutputElements()),
BufferReserveMode.FORCE);
@@ -184,7 +184,6 @@
}
}
-
/**
* Close processing and clean everything up. Should only be called by the same
thread that called process.
*/
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/ExecSqlInstruction.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/ExecSqlInstruction.java 2011-07-27
17:52:49 UTC (rev 3343)
+++
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/ExecSqlInstruction.java 2011-07-27
21:13:25 UTC (rev 3344)
@@ -49,14 +49,10 @@
throws BlockedException, TeiidComponentException, TeiidProcessingException{
LogManager.logTrace(org.teiid.logging.LogConstants.CTX_XML_PLAN, new
Object[]{"SQL: Result set DOESN'T exist:",resultSetName}); //$NON-NLS-1$
- PlanExecutor executor = context.getResultExecutor(resultSetName);
- if (executor == null) {
- executor = env.createResultExecutor(resultSetName, info);
- context.setResultExecutor(resultSetName, executor);
- }
+ PlanExecutor executor = getPlanExecutor(env, context);
// this execute can throw the blocked exception
- executor.execute(context.getReferenceValues());
+ executor.execute(context.getReferenceValues(), false);
// now that we done executing the plan; remove the plan from context
context.removeResultExecutor(resultSetName);
@@ -68,6 +64,16 @@
env.incrementCurrentProgramCounter();
return context;
}
+
+ public PlanExecutor getPlanExecutor(XMLProcessorEnvironment env,
+ XMLContext context) throws TeiidComponentException {
+ PlanExecutor executor = context.getResultExecutor(resultSetName);
+ if (executor == null) {
+ executor = env.createResultExecutor(resultSetName, info);
+ context.setResultExecutor(resultSetName, executor);
+ }
+ return executor;
+ }
public String toString() {
return "SQL " + resultSetName; //$NON-NLS-1$
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/PlanExecutor.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/PlanExecutor.java 2011-07-27
17:52:49 UTC (rev 3343)
+++
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/PlanExecutor.java 2011-07-27
21:13:25 UTC (rev 3344)
@@ -39,8 +39,9 @@
/**
* Execute the plan
* @param referenceValues - values for any external references
+ * @param openOnly
*/
- public void execute(Map referenceValues) throws TeiidComponentException,
BlockedException, TeiidProcessingException;
+ public void execute(Map referenceValues, boolean openOnly) throws
TeiidComponentException, BlockedException, TeiidProcessingException;
/**
* Get the ElementSymbol list which represents the schema of the result set
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java 2011-07-27
17:52:49 UTC (rev 3343)
+++
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java 2011-07-27
21:13:25 UTC (rev 3344)
@@ -87,15 +87,20 @@
/**
* @throws TeiidProcessingException
- * @see org.teiid.query.processor.xml.PlanExecutor#execute(java.util.Map)
+ * @see org.teiid.query.processor.xml.PlanExecutor#execute(java.util.Map, boolean)
*/
- public void execute(Map referenceValues) throws TeiidComponentException,
BlockedException, TeiidProcessingException {
+ public void execute(Map referenceValues, boolean openOnly) throws
TeiidComponentException, BlockedException, TeiidProcessingException {
if (this.tupleSource == null) {
setReferenceValues(referenceValues);
this.tupleSource = new BatchIterator(internalProcessor);
+ if (openOnly) {
+ internalProcessor.init();
+ }
}
- //force execution
- this.tupleSource.hasNext();
+ if (!openOnly) {
+ //force execution
+ this.tupleSource.hasNext();
+ }
}
void setReferenceValues(Map<ElementSymbol, Object> referencesValues) {
Modified: branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/XMLPlan.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/XMLPlan.java 2011-07-27
17:52:49 UTC (rev 3343)
+++
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/XMLPlan.java 2011-07-27
21:13:25 UTC (rev 3344)
@@ -155,7 +155,7 @@
while(true){
// do the xml processing.
- ProcessorInstruction inst = env.getCurrentInstruction();
+ ProcessorInstruction inst = env.getCurrentInstruction(this.context);
while (inst != null){
LogManager.logTrace(LogConstants.CTX_XML_PLAN, "Executing
instruction", inst); //$NON-NLS-1$
this.context = inst.process(this.env, this.context);
@@ -186,7 +186,7 @@
TupleBatch batch = new TupleBatch(nextBatchCount++,
Arrays.asList(Arrays.asList(xml)));
return batch;
}
- inst = env.getCurrentInstruction();
+ inst = env.getCurrentInstruction(this.context);
}
TupleBatch batch = new TupleBatch(nextBatchCount++, Collections.EMPTY_LIST);
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/XMLProcessorEnvironment.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/XMLProcessorEnvironment.java 2011-07-27
17:52:49 UTC (rev 3343)
+++
branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/XMLProcessorEnvironment.java 2011-07-27
21:13:25 UTC (rev 3344)
@@ -26,10 +26,12 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
import java.util.Set;
import org.teiid.common.buffer.BufferManager;
import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
import org.teiid.query.mapping.xml.ResultSetInfo;
@@ -84,6 +86,7 @@
private static class ProgramState {
private Program program;
private int programCounter = 0;
+ private int lookaheadCounter;
private int recursionCount = NOT_RECURSIVE;
private static final int NOT_RECURSIVE = 0;
@@ -156,7 +159,7 @@
this.programStack.addFirst(programState);
}
- public ProcessorInstruction getCurrentInstruction() {
+ public ProcessorInstruction getCurrentInstruction(XMLContext context) throws
TeiidComponentException, TeiidProcessingException {
ProgramState programState = this.programStack.getFirst();
//Case 5266: account for an empty program on to the stack;
@@ -171,6 +174,23 @@
return null;
}
+ //start all siblings
+ List<ProcessorInstruction> instrs =
programState.program.getProcessorInstructions();
+ if (programState.programCounter >= programState.lookaheadCounter &&
instrs.size() > programState.programCounter + 1) {
+ for (programState.lookaheadCounter = programState.programCounter;
programState.lookaheadCounter < instrs.size(); programState.lookaheadCounter++) {
+ ProcessorInstruction pi = instrs.get(programState.lookaheadCounter);
+ if (pi instanceof ExecStagingTableInstruction) {
+ //need to load staging tables prior to source queries
+ break;
+ }
+ if (pi instanceof ExecSqlInstruction) {
+ ExecSqlInstruction esi = (ExecSqlInstruction)pi;
+ PlanExecutor pe = esi.getPlanExecutor(this, context);
+ pe.execute(context.getReferenceValues(), true);
+ }
+ }
+ }
+
return programState.program.getInstructionAt(programState.programCounter);
}
@@ -251,8 +271,6 @@
ProgramState initialProgramState = this.programStack.getLast();
ProgramState newState = new ProgramState();
newState.program = initialProgramState.program;
- newState.programCounter = 0;
- newState.recursionCount = ProgramState.NOT_RECURSIVE;
clone.programStack.addFirst(newState);
// XML results form and format
Modified:
branches/7.4.x/engine/src/test/java/org/teiid/query/processor/xml/FakePlanExecutor.java
===================================================================
---
branches/7.4.x/engine/src/test/java/org/teiid/query/processor/xml/FakePlanExecutor.java 2011-07-27
17:52:49 UTC (rev 3343)
+++
branches/7.4.x/engine/src/test/java/org/teiid/query/processor/xml/FakePlanExecutor.java 2011-07-27
21:13:25 UTC (rev 3344)
@@ -56,7 +56,7 @@
return this.currentRow;
}
- public void execute(Map values) throws TeiidComponentException, BlockedException {
+ public void execute(Map values, boolean openOnly) throws TeiidComponentException,
BlockedException {
tupleSource.openSource();
}
Modified:
branches/7.4.x/engine/src/test/java/org/teiid/query/processor/xml/TestProcessorEnvironment.java
===================================================================
---
branches/7.4.x/engine/src/test/java/org/teiid/query/processor/xml/TestProcessorEnvironment.java 2011-07-27
17:52:49 UTC (rev 3343)
+++
branches/7.4.x/engine/src/test/java/org/teiid/query/processor/xml/TestProcessorEnvironment.java 2011-07-27
21:13:25 UTC (rev 3344)
@@ -119,31 +119,31 @@
env.pushProgram(p1, true); //simulate recursion
assertEquals(p1, env.getCurrentProgram());
- assertEquals(i1, env.getCurrentInstruction());
+ assertEquals(i1, env.getCurrentInstruction(null));
assertEquals(p1, env.getCurrentProgram());
i1.process(env, context);
assertEquals(p1, env.getCurrentProgram());
- assertEquals(i2, env.getCurrentInstruction());
+ assertEquals(i2, env.getCurrentInstruction(null));
assertEquals(p1, env.getCurrentProgram());
i2.process(env, context);
assertEquals(p2, env.getCurrentProgram());
- assertEquals(i3, env.getCurrentInstruction());
+ assertEquals(i3, env.getCurrentInstruction(null));
assertEquals(p2, env.getCurrentProgram());
i3.process(env, context);
assertEquals(p2, env.getCurrentProgram());
- assertEquals(i4, env.getCurrentInstruction());
+ assertEquals(i4, env.getCurrentInstruction(null));
assertEquals(p2, env.getCurrentProgram());
i4.process(env, context);
assertEquals(p1, env.getCurrentProgram());
- assertEquals(i1, env.getCurrentInstruction());
+ assertEquals(i1, env.getCurrentInstruction(null));
assertEquals(p1, env.getCurrentProgram());
i1.process(env, context);
assertEquals(p1, env.getCurrentProgram());
- assertEquals(i2, env.getCurrentInstruction());
+ assertEquals(i2, env.getCurrentInstruction(null));
assertEquals(p1, env.getCurrentProgram());
i2.process(env, context);
assertEquals(p1, env.getCurrentProgram());
- assertEquals(null, env.getCurrentInstruction());
+ assertEquals(null, env.getCurrentInstruction(null));
assertEquals(p1, env.getCurrentProgram());
}
@@ -173,31 +173,31 @@
env.pushProgram(p2);
assertEquals(p2, env.getCurrentProgram());
- assertEquals(i3, env.getCurrentInstruction());
+ assertEquals(i3, env.getCurrentInstruction(null));
assertEquals(p2, env.getCurrentProgram());
i3.process(env, context);
assertEquals(p2, env.getCurrentProgram());
- assertEquals(i4, env.getCurrentInstruction());
+ assertEquals(i4, env.getCurrentInstruction(null));
assertEquals(p2, env.getCurrentProgram());
i4.process(env, context);
assertEquals(p1, env.getCurrentProgram());
- assertEquals(i1, env.getCurrentInstruction());
+ assertEquals(i1, env.getCurrentInstruction(null));
assertEquals(p1, env.getCurrentProgram());
i1.process(env, context);
assertEquals(p1, env.getCurrentProgram());
- assertEquals(i2, env.getCurrentInstruction());
+ assertEquals(i2, env.getCurrentInstruction(null));
assertEquals(p1, env.getCurrentProgram());
i2.process(env, context);
assertEquals(p1, env.getCurrentProgram());
- assertEquals(i1, env.getCurrentInstruction());
+ assertEquals(i1, env.getCurrentInstruction(null));
assertEquals(p1, env.getCurrentProgram());
i1.process(env, context);
assertEquals(p1, env.getCurrentProgram());
- assertEquals(i2, env.getCurrentInstruction());
+ assertEquals(i2, env.getCurrentInstruction(null));
assertEquals(p1, env.getCurrentProgram());
i2.process(env, context);
assertEquals(p1, env.getCurrentProgram());
- assertEquals(null, env.getCurrentInstruction());
+ assertEquals(null, env.getCurrentInstruction(null));
assertEquals(p1, env.getCurrentProgram());
}