[teiid-commits] teiid SVN: r3344 - in branches/7.4.x/engine/src: main/java/org/teiid/query/processor/xml and 1 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Jul 27 17:13:26 EDT 2011


Author: shawkins
Date: 2011-07-27 17:13:25 -0400 (Wed, 27 Jul 2011)
New Revision: 3344

Modified:
   branches/7.4.x/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
   branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/ExecSqlInstruction.java
   branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/PlanExecutor.java
   branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java
   branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/XMLPlan.java
   branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/XMLProcessorEnvironment.java
   branches/7.4.x/engine/src/test/java/org/teiid/query/processor/xml/FakePlanExecutor.java
   branches/7.4.x/engine/src/test/java/org/teiid/query/processor/xml/TestProcessorEnvironment.java
Log:
TEIID-993 adding the ability to start sibling mapping class queries in parallel.

Modified: branches/7.4.x/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java	2011-07-27 17:52:49 UTC (rev 3343)
+++ branches/7.4.x/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java	2011-07-27 21:13:25 UTC (rev 3344)
@@ -169,7 +169,7 @@
 		return result;
 	}
 
-	private void init() throws TeiidComponentException, TeiidProcessingException {
+	public void init() throws TeiidComponentException, TeiidProcessingException {
 		// initialize if necessary
 		if(!initialized) {
 			reserved = this.bufferMgr.reserveBuffers(this.bufferMgr.getSchemaSize(this.getOutputElements()), BufferReserveMode.FORCE);
@@ -184,7 +184,6 @@
 		}
 	}
 
-	                   
     /**
      * Close processing and clean everything up.  Should only be called by the same thread that called process.
      */

Modified: branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/ExecSqlInstruction.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/ExecSqlInstruction.java	2011-07-27 17:52:49 UTC (rev 3343)
+++ branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/ExecSqlInstruction.java	2011-07-27 21:13:25 UTC (rev 3344)
@@ -49,14 +49,10 @@
         throws BlockedException, TeiidComponentException, TeiidProcessingException{
 
         LogManager.logTrace(org.teiid.logging.LogConstants.CTX_XML_PLAN, new Object[]{"SQL: Result set DOESN'T exist:",resultSetName}); //$NON-NLS-1$
-        PlanExecutor executor = context.getResultExecutor(resultSetName);
-        if (executor == null) {
-            executor = env.createResultExecutor(resultSetName, info);
-            context.setResultExecutor(resultSetName, executor);
-        }
+        PlanExecutor executor = getPlanExecutor(env, context);
         
         // this execute can throw the blocked exception
-        executor.execute(context.getReferenceValues());
+        executor.execute(context.getReferenceValues(), false);
         
         // now that we done executing the plan; remove the plan from context
         context.removeResultExecutor(resultSetName);
@@ -68,6 +64,16 @@
         env.incrementCurrentProgramCounter();
         return context;
     }
+
+	public PlanExecutor getPlanExecutor(XMLProcessorEnvironment env,
+			XMLContext context) throws TeiidComponentException {
+		PlanExecutor executor = context.getResultExecutor(resultSetName);
+        if (executor == null) {
+            executor = env.createResultExecutor(resultSetName, info);
+            context.setResultExecutor(resultSetName, executor);
+        }
+		return executor;
+	}
     
     public String toString() {
         return "SQL  " + resultSetName; //$NON-NLS-1$ 

Modified: branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/PlanExecutor.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/PlanExecutor.java	2011-07-27 17:52:49 UTC (rev 3343)
+++ branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/PlanExecutor.java	2011-07-27 21:13:25 UTC (rev 3344)
@@ -39,8 +39,9 @@
     /**
      * Execute the plan   
      * @param referenceValues - values for any external references
+     * @param openOnly
      */
-    public void execute(Map referenceValues) throws TeiidComponentException, BlockedException, TeiidProcessingException;
+    public void execute(Map referenceValues, boolean openOnly) throws TeiidComponentException, BlockedException, TeiidProcessingException;
 
     /**
      * Get the ElementSymbol list which represents the schema of the result set

Modified: branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java	2011-07-27 17:52:49 UTC (rev 3343)
+++ branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java	2011-07-27 21:13:25 UTC (rev 3344)
@@ -87,15 +87,20 @@
 
     /**
      * @throws TeiidProcessingException 
-     * @see org.teiid.query.processor.xml.PlanExecutor#execute(java.util.Map)
+     * @see org.teiid.query.processor.xml.PlanExecutor#execute(java.util.Map, boolean)
      */
-    public void execute(Map referenceValues) throws TeiidComponentException, BlockedException, TeiidProcessingException {        
+    public void execute(Map referenceValues, boolean openOnly) throws TeiidComponentException, BlockedException, TeiidProcessingException {        
         if (this.tupleSource == null) {
         	setReferenceValues(referenceValues);
             this.tupleSource = new BatchIterator(internalProcessor);
+            if (openOnly) {
+            	internalProcessor.init();
+            }
         }
-        //force execution
-        this.tupleSource.hasNext();
+        if (!openOnly) {
+	        //force execution
+	        this.tupleSource.hasNext();
+        }
     }    
     
     void setReferenceValues(Map<ElementSymbol, Object> referencesValues) {

Modified: branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/XMLPlan.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/XMLPlan.java	2011-07-27 17:52:49 UTC (rev 3343)
+++ branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/XMLPlan.java	2011-07-27 21:13:25 UTC (rev 3344)
@@ -155,7 +155,7 @@
         
         while(true){
         	// do the xml processing.
-            ProcessorInstruction inst = env.getCurrentInstruction();
+            ProcessorInstruction inst = env.getCurrentInstruction(this.context);
             while (inst != null){
             	LogManager.logTrace(LogConstants.CTX_XML_PLAN, "Executing instruction", inst); //$NON-NLS-1$
                 this.context = inst.process(this.env, this.context);
@@ -186,7 +186,7 @@
         	        TupleBatch batch = new TupleBatch(nextBatchCount++, Arrays.asList(Arrays.asList(xml)));
         	        return batch;
                 }
-                inst = env.getCurrentInstruction();
+                inst = env.getCurrentInstruction(this.context);
             }
             
         	TupleBatch batch = new TupleBatch(nextBatchCount++, Collections.EMPTY_LIST); 

Modified: branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/XMLProcessorEnvironment.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/XMLProcessorEnvironment.java	2011-07-27 17:52:49 UTC (rev 3343)
+++ branches/7.4.x/engine/src/main/java/org/teiid/query/processor/xml/XMLProcessorEnvironment.java	2011-07-27 21:13:25 UTC (rev 3344)
@@ -26,10 +26,12 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Set;
 
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
 import org.teiid.logging.LogManager;
 import org.teiid.logging.MessageLevel;
 import org.teiid.query.mapping.xml.ResultSetInfo;
@@ -84,6 +86,7 @@
     private static class ProgramState {
         private Program program;
         private int programCounter = 0;
+        private int lookaheadCounter;
         private int recursionCount = NOT_RECURSIVE;
         
         private static final int NOT_RECURSIVE = 0;
@@ -156,7 +159,7 @@
         this.programStack.addFirst(programState);
     }
 
-    public ProcessorInstruction getCurrentInstruction() {
+    public ProcessorInstruction getCurrentInstruction(XMLContext context) throws TeiidComponentException, TeiidProcessingException {
         ProgramState programState = this.programStack.getFirst();
         
         //Case 5266: account for an empty program on to the stack; 
@@ -171,6 +174,23 @@
             return null;
         }
         
+        //start all siblings
+        List<ProcessorInstruction> instrs = programState.program.getProcessorInstructions();
+        if (programState.programCounter >= programState.lookaheadCounter && instrs.size() > programState.programCounter + 1) {
+        	for (programState.lookaheadCounter = programState.programCounter; programState.lookaheadCounter < instrs.size(); programState.lookaheadCounter++) {
+        		ProcessorInstruction pi = instrs.get(programState.lookaheadCounter);
+        		if (pi instanceof ExecStagingTableInstruction) {
+        			//need to load staging tables prior to source queries
+        			break;
+        		}
+        		if (pi instanceof ExecSqlInstruction) {
+        			ExecSqlInstruction esi = (ExecSqlInstruction)pi;
+        			PlanExecutor pe = esi.getPlanExecutor(this, context);
+        			pe.execute(context.getReferenceValues(), true);
+        		}
+        	}
+        }
+        
         return programState.program.getInstructionAt(programState.programCounter);
     }
     
@@ -251,8 +271,6 @@
         ProgramState initialProgramState = this.programStack.getLast();
         ProgramState newState = new ProgramState();
         newState.program = initialProgramState.program;
-        newState.programCounter = 0;
-        newState.recursionCount = ProgramState.NOT_RECURSIVE;
         clone.programStack.addFirst(newState);
                
         // XML results form and format

Modified: branches/7.4.x/engine/src/test/java/org/teiid/query/processor/xml/FakePlanExecutor.java
===================================================================
--- branches/7.4.x/engine/src/test/java/org/teiid/query/processor/xml/FakePlanExecutor.java	2011-07-27 17:52:49 UTC (rev 3343)
+++ branches/7.4.x/engine/src/test/java/org/teiid/query/processor/xml/FakePlanExecutor.java	2011-07-27 21:13:25 UTC (rev 3344)
@@ -56,7 +56,7 @@
         return this.currentRow;
     }
 
-    public void execute(Map values) throws TeiidComponentException, BlockedException {
+    public void execute(Map values, boolean openOnly) throws TeiidComponentException, BlockedException {
         tupleSource.openSource();
     }
 

Modified: branches/7.4.x/engine/src/test/java/org/teiid/query/processor/xml/TestProcessorEnvironment.java
===================================================================
--- branches/7.4.x/engine/src/test/java/org/teiid/query/processor/xml/TestProcessorEnvironment.java	2011-07-27 17:52:49 UTC (rev 3343)
+++ branches/7.4.x/engine/src/test/java/org/teiid/query/processor/xml/TestProcessorEnvironment.java	2011-07-27 21:13:25 UTC (rev 3344)
@@ -119,31 +119,31 @@
         env.pushProgram(p1, true); //simulate recursion
         
         assertEquals(p1, env.getCurrentProgram());
-        assertEquals(i1, env.getCurrentInstruction());
+        assertEquals(i1, env.getCurrentInstruction(null));
         assertEquals(p1, env.getCurrentProgram());
         i1.process(env, context);
         assertEquals(p1, env.getCurrentProgram());
-        assertEquals(i2, env.getCurrentInstruction());
+        assertEquals(i2, env.getCurrentInstruction(null));
         assertEquals(p1, env.getCurrentProgram());
         i2.process(env, context);
         assertEquals(p2, env.getCurrentProgram());
-        assertEquals(i3, env.getCurrentInstruction());
+        assertEquals(i3, env.getCurrentInstruction(null));
         assertEquals(p2, env.getCurrentProgram());
         i3.process(env, context);
         assertEquals(p2, env.getCurrentProgram());
-        assertEquals(i4, env.getCurrentInstruction());
+        assertEquals(i4, env.getCurrentInstruction(null));
         assertEquals(p2, env.getCurrentProgram());
         i4.process(env, context);
         assertEquals(p1, env.getCurrentProgram());
-        assertEquals(i1, env.getCurrentInstruction());
+        assertEquals(i1, env.getCurrentInstruction(null));
         assertEquals(p1, env.getCurrentProgram());
         i1.process(env, context);
         assertEquals(p1, env.getCurrentProgram());
-        assertEquals(i2, env.getCurrentInstruction());
+        assertEquals(i2, env.getCurrentInstruction(null));
         assertEquals(p1, env.getCurrentProgram());
         i2.process(env, context);
         assertEquals(p1, env.getCurrentProgram());
-        assertEquals(null, env.getCurrentInstruction());
+        assertEquals(null, env.getCurrentInstruction(null));
         assertEquals(p1, env.getCurrentProgram());
     }
     
@@ -173,31 +173,31 @@
         env.pushProgram(p2);
         
         assertEquals(p2, env.getCurrentProgram());
-        assertEquals(i3, env.getCurrentInstruction());
+        assertEquals(i3, env.getCurrentInstruction(null));
         assertEquals(p2, env.getCurrentProgram());
         i3.process(env, context);
         assertEquals(p2, env.getCurrentProgram());
-        assertEquals(i4, env.getCurrentInstruction());
+        assertEquals(i4, env.getCurrentInstruction(null));
         assertEquals(p2, env.getCurrentProgram());
         i4.process(env, context);
         assertEquals(p1, env.getCurrentProgram());
-        assertEquals(i1, env.getCurrentInstruction());
+        assertEquals(i1, env.getCurrentInstruction(null));
         assertEquals(p1, env.getCurrentProgram());
         i1.process(env, context);
         assertEquals(p1, env.getCurrentProgram());
-        assertEquals(i2, env.getCurrentInstruction());
+        assertEquals(i2, env.getCurrentInstruction(null));
         assertEquals(p1, env.getCurrentProgram());
         i2.process(env, context);
         assertEquals(p1, env.getCurrentProgram());
-        assertEquals(i1, env.getCurrentInstruction());
+        assertEquals(i1, env.getCurrentInstruction(null));
         assertEquals(p1, env.getCurrentProgram());
         i1.process(env, context);
         assertEquals(p1, env.getCurrentProgram());
-        assertEquals(i2, env.getCurrentInstruction());
+        assertEquals(i2, env.getCurrentInstruction(null));
         assertEquals(p1, env.getCurrentProgram());
         i2.process(env, context);
         assertEquals(p1, env.getCurrentProgram());
-        assertEquals(null, env.getCurrentInstruction());
+        assertEquals(null, env.getCurrentInstruction(null));
         assertEquals(p1, env.getCurrentProgram());
     }    
     



More information about the teiid-commits mailing list