[teiid-commits] teiid SVN: r2329 - in trunk: engine/src/main/java/org/teiid/query/processor/relational and 2 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Jul 6 21:34:46 EDT 2010


Author: shawkins
Date: 2010-07-06 21:34:45 -0400 (Tue, 06 Jul 2010)
New Revision: 2329

Modified:
   trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/xml/IfInstruction.java
   trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSelectNode.java
Log:
TEIID-1144 fix for blockedexception handling that causes problems with timeslicing.  also updating the ping logic.

Modified: trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java
===================================================================
--- trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java	2010-07-06 18:36:27 UTC (rev 2328)
+++ trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java	2010-07-07 01:34:45 UTC (rev 2329)
@@ -107,13 +107,16 @@
         		
     			@Override
     			public void run() {
-    				if (ping == null || !ping.isDone()) {
+    				if (ping == null) {
     					ping = isOpen();
     				} 
-    				if (ping != null && ping.isDone()) {
+    				if (ping != null) {
     					try {
-							ping.get();
+    						ping.get(1, TimeUnit.SECONDS);
+    						ping = null;
 							return;
+    					} catch (TimeoutException e) {
+    						return;
 						} catch (Throwable e) {
 							handlePingError(e);
 						}

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java	2010-07-06 18:36:27 UTC (rev 2328)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java	2010-07-07 01:34:45 UTC (rev 2329)
@@ -23,7 +23,6 @@
 package org.teiid.query.processor.relational;
 
 import java.util.List;
-import java.util.Map;
 
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.core.TeiidComponentException;

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java	2010-07-06 18:36:27 UTC (rev 2328)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java	2010-07-07 01:34:45 UTC (rev 2329)
@@ -61,7 +61,7 @@
 
     // Saved state when blocked on evaluating a row - must be reset
     private TupleBatch currentBatch;
-    private int currentRow;
+    private int currentRow = 1;
 
 	public ProjectNode(int nodeID) {
 		super(nodeID);
@@ -71,7 +71,7 @@
         super.reset();
 
         currentBatch = null;
-        currentRow = 0;
+        currentRow = 1;
     }
 
     /**
@@ -147,70 +147,50 @@
 	
 	public TupleBatch nextBatchDirect()
 		throws BlockedException, TeiidComponentException, TeiidProcessingException {
-
-        // currentBatch and currentRow hold temporary state saved in the case
-        // of a BlockedException while evaluating an expression.  If that has
-        // not occurred, currentBatch will be null and currentRow will be < 0.
-        // blockedOnPrepare indicates that the BlockedException happened 
-        // during the call to prepareToProcessTuple
-
-        TupleBatch batch = this.currentBatch;
-        int beginRow = this.currentRow;
-
-        if(batch == null) {
+		
+        if(currentBatch == null) {
             // There was no saved batch, so get a new one
             //in the case of select with no from, should return only
             //one batch with one row
             if(this.getChildren()[0] == null){
-                batch = new TupleBatch(0, new List[]{Arrays.asList(new Object[] {})});
-                batch.setTerminationFlag(true);
+            	currentBatch = new TupleBatch(1, new List[]{Arrays.asList(new Object[] {})});
+            	currentBatch.setTerminationFlag(true);
             }else{
-                batch = this.getChildren()[0].nextBatch();
+            	currentBatch = this.getChildren()[0].nextBatch();
             }
 
             // Check for no project needed and pass through
-            if(batch.getRowCount() == 0 || !needsProject) {
-                // Just pass the batch through without processing
-                return batch;
+            if(!needsProject) {
+            	TupleBatch result = currentBatch;
+            	currentBatch = null;
+                return result;
             }
-
-            // Set the beginRow based on beginning row of the batch
-            beginRow = batch.getBeginRow();
-
-        } else {
-            // There was a saved batch, but we grabbed the state so it can now be removed
-            this.currentBatch = null;
-            this.currentRow = 0;
         }
 
-        for(int row = beginRow; row <= batch.getEndRow(); row++) {
-    		List tuple = batch.getTuple(row);
+        while (currentRow <= currentBatch.getEndRow() && !isBatchFull()) {
+    		List tuple = currentBatch.getTuple(currentRow);
 
 			List projectedTuple = new ArrayList(selectSymbols.size());
 
 			// Walk through symbols
-            try {
-                for(int i=0; i<selectSymbols.size(); i++) {
-    				SelectSymbol symbol = (SelectSymbol) selectSymbols.get(i);
-    				updateTuple(symbol, tuple, projectedTuple);
-    			}
-            } catch(BlockedException e) {
-                // Expression blocked, so save state and rethrow
-                this.currentBatch = batch;
-                this.currentRow = row;
-                throw e;
-            }
+            for(int i=0; i<selectSymbols.size(); i++) {
+				SelectSymbol symbol = (SelectSymbol) selectSymbols.get(i);
+				updateTuple(symbol, tuple, projectedTuple);
+			}
 
             // Add to batch
             addBatchRow(projectedTuple);
+            currentRow++;
 		}
-
-        // Check for termination tuple
-        if(batch.getTerminationFlag()) {
-            terminateBatches();
+        
+        if (currentRow > currentBatch.getEndRow()) {
+	        if(currentBatch.getTerminationFlag()) {
+	            terminateBatches();
+	        }
+	        currentBatch = null;
         }
-
-        return pullBatch();
+        
+    	return pullBatch();
 	}
 
 	private void updateTuple(SelectSymbol symbol, List values, List tuple)

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java	2010-07-06 18:36:27 UTC (rev 2328)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java	2010-07-07 01:34:45 UTC (rev 2329)
@@ -41,6 +41,7 @@
 import org.teiid.logging.MessageLevel;
 import org.teiid.query.analysis.AnalysisRecord;
 import org.teiid.query.processor.ProcessorDataManager;
+import org.teiid.query.processor.QueryProcessor;
 import org.teiid.query.processor.BatchCollector.BatchProducer;
 import org.teiid.query.sql.symbol.AliasSymbol;
 import org.teiid.query.sql.symbol.SingleElementSymbol;
@@ -287,6 +288,11 @@
                 this.nodeStatistics.collectCumulativeNodeStats(null, RelationalNodeStatistics.BLOCKEDEXCEPTION_STOP);
             }
             throw e;
+        } catch (QueryProcessor.ExpiredTimeSliceException e) {
+        	if(recordStats && this.context.getCollectNodeStatistics()) {
+                this.nodeStatistics.stopBatchTimer();
+            }
+            throw e;
         } catch (TeiidComponentException e) {
             // stop timer for this batch (MetaMatrixComponentException)
             if(recordStats &&  this.context.getCollectNodeStatistics()) {

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java	2010-07-06 18:36:27 UTC (rev 2328)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java	2010-07-07 01:34:45 UTC (rev 2329)
@@ -49,10 +49,8 @@
     private int[] projectionIndexes;
 	
     // State if blocked on evaluating a criteria
-    private boolean blockedOnCriteria = false;
-    private boolean blockedOnPrepare = false;
-    private TupleBatch blockedBatch = null;
-    private int blockedRow = 0;
+    private TupleBatch currentBatch;
+    private int currentRow = 1;
     
 	public SelectNode(int nodeID) {
 		super(nodeID);
@@ -61,10 +59,8 @@
     public void reset() {
         super.reset();
         
-        blockedOnCriteria = false;
-        blockedOnPrepare = false;
-        blockedBatch = null;
-        blockedRow = 0;
+        currentBatch = null;
+        currentRow = 1;
     }
 
 	public void setCriteria(Criteria criteria) { 
@@ -91,45 +87,28 @@
      */
 	public TupleBatch nextBatchDirect()
 		throws BlockedException, TeiidComponentException, TeiidProcessingException {
-
-        TupleBatch batch = blockedBatch; 		
-        if(! blockedOnCriteria && ! blockedOnPrepare) {	
-            batch = this.getChildren()[0].nextBatch();
+		
+        if(currentBatch == null) {
+        	currentBatch = this.getChildren()[0].nextBatch();
         }
-                
-        int row = blockedRow;
-        if(! blockedOnCriteria && ! blockedOnPrepare) {
-            row = batch.getBeginRow();               
-        } else {
-            // Reset blocked state
-            blockedOnCriteria = false;
-            blockedOnPrepare = false;
-            blockedBatch = null;
-            blockedRow = 0;
-        }
-        
-        for(; row <= batch.getEndRow(); row++) {             
-            List tuple = batch.getTuple(row);
-        
-            // Evaluate criteria with tuple
-            try {
-                if(getEvaluator(this.elementMap).evaluate(this.criteria, tuple)) {
-                    addBatchRow(projectTuple(this.projectionIndexes, tuple));
-                }
-            } catch(BlockedException e) {
-                // Save state and rethrow
-                blockedOnCriteria = true;
-                blockedBatch = batch;
-                blockedRow = row;
-                throw e;   
-            }
-        }   
 
-        if(batch.getTerminationFlag()) { 
-            terminateBatches();
-        }
+        while (currentRow <= currentBatch.getEndRow() && !isBatchFull()) {
+    		List tuple = currentBatch.getTuple(currentRow);
 
-        return pullBatch();            
+            if(getEvaluator(this.elementMap).evaluate(this.criteria, tuple)) {
+                addBatchRow(projectTuple(this.projectionIndexes, tuple));
+            }
+            currentRow++;
+		}
+        
+        if (currentRow > currentBatch.getEndRow()) {
+	        if(currentBatch.getTerminationFlag()) {
+	            terminateBatches();
+	        }
+	        currentBatch = null;
+        }
+        
+    	return pullBatch();
 	}
     
 	protected void getNodeString(StringBuffer str) {

Modified: trunk/engine/src/main/java/org/teiid/query/processor/xml/IfInstruction.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/xml/IfInstruction.java	2010-07-06 18:36:27 UTC (rev 2328)
+++ trunk/engine/src/main/java/org/teiid/query/processor/xml/IfInstruction.java	2010-07-07 01:34:45 UTC (rev 2329)
@@ -54,8 +54,7 @@
     private DefaultCondition defaultCondition;
 
     // State if condition evaluation blocked
-    private boolean blockedOnCondition = false;
-    private int blockedConditionIndex = 0;
+    private int conditionIndex = 0;
 
     /**
      * Constructor for IfInstruction.
@@ -148,37 +147,22 @@
 	        thens.add(defaultCondition);
         }
         
-        int conditionIndex = this.blockedConditionIndex;
-        if(blockedOnCondition) {
-            // Remove state - we have recovered and will reset if necessary
-            this.blockedOnCondition = false;
-            this.blockedConditionIndex = 0;            
-        } else{
-            conditionIndex = 0;
-        }
-        
         Condition condition = null;
         boolean foundTrueCondition = false;
 
         for(; conditionIndex < thens.size(); conditionIndex++){            
             condition = (Condition)thens.get(conditionIndex);
                              
-             // evaluate may block if criteria evaluation blocks
-             try {
-                 if(condition.evaluate(env, context)) {
-                     foundTrueCondition = true;
-                     //break from the loop; only the first "then" Program
-                     //whose criteria evaluates to true will be executed 
-                     break;
-                 }
-             } catch(BlockedException e) {
-                 // Save state and rethrow
-                 this.blockedOnCondition = true;
-                 this.blockedConditionIndex = conditionIndex; 
-                 throw e;
+            // evaluate may block if criteria evaluation blocks
+             if(condition.evaluate(env, context)) {
+                 foundTrueCondition = true;
+                 //break from the loop; only the first "then" Program
+                 //whose criteria evaluates to true will be executed 
+                 break;
              }
             
         }
+        conditionIndex = 0;
 
         // This IF instruction should be processed exactly once, so the 
         // program containing the IF instruction needs to have it's

Modified: trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSelectNode.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSelectNode.java	2010-07-06 18:36:27 UTC (rev 2328)
+++ trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSelectNode.java	2010-07-07 01:34:45 UTC (rev 2329)
@@ -22,12 +22,15 @@
 
 package org.teiid.query.processor.relational;
 
+import static org.junit.Assert.*;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.junit.Test;
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.BufferManagerFactory;
@@ -35,13 +38,13 @@
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.types.DataTypeManager;
+import org.teiid.query.eval.Evaluator;
 import org.teiid.query.function.FunctionDescriptor;
 import org.teiid.query.function.SystemFunctionManager;
 import org.teiid.query.processor.BatchIterator;
 import org.teiid.query.processor.FakeDataManager;
 import org.teiid.query.processor.ProcessorDataManager;
-import org.teiid.query.processor.relational.RelationalNode;
-import org.teiid.query.processor.relational.SelectNode;
+import org.teiid.query.processor.QueryProcessor;
 import org.teiid.query.sql.lang.CompareCriteria;
 import org.teiid.query.sql.lang.Criteria;
 import org.teiid.query.sql.symbol.Constant;
@@ -50,34 +53,28 @@
 import org.teiid.query.sql.symbol.Function;
 import org.teiid.query.util.CommandContext;
 
-import junit.framework.TestCase;
+public class TestSelectNode {
 
-
-/**
- */
-public class TestSelectNode extends TestCase {
-
-    /**
-     * Constructor for TestSelectNode.
-     * @param arg0
-     */
-    public TestSelectNode(String arg0) {
-        super(arg0);
-    }
-    
     public void helpTestSelect(List elements, Criteria criteria, List[] data, List childElements, ProcessorDataManager dataMgr, List[] expected) throws TeiidComponentException, TeiidProcessingException {
     	helpTestSelect(elements, criteria, childElements, dataMgr, expected, new FakeRelationalNode(2, data));
     }
     
     public void helpTestSelect(List elements, Criteria criteria, List childElements, ProcessorDataManager dataMgr, List[] expected, RelationalNode child) throws TeiidComponentException, TeiidProcessingException {
-        BufferManager mgr = BufferManagerFactory.getStandaloneBufferManager();
+        SelectNode selectNode = new SelectNode(1);
+        helpTestSelect(elements, criteria, childElements, dataMgr, expected, child, selectNode);
+    }
+
+	private void helpTestSelect(List elements, Criteria criteria, List childElements,
+			ProcessorDataManager dataMgr, List[] expected,
+			RelationalNode child,
+			SelectNode selectNode) throws TeiidComponentException,
+			TeiidProcessingException {
+		BufferManager mgr = BufferManagerFactory.getStandaloneBufferManager();
         CommandContext context = new CommandContext("pid", "test", null, null, 1);               //$NON-NLS-1$ //$NON-NLS-2$
         
         child.setElements(childElements);
-        child.initialize(context, mgr, dataMgr);    
-        
-        SelectNode selectNode = new SelectNode(1);
-        selectNode.setCriteria(criteria);
+        child.initialize(context, mgr, dataMgr);
+		selectNode.setCriteria(criteria);
         selectNode.setElements(elements);
         selectNode.addChild(child);
         selectNode.initialize(context, mgr, dataMgr);
@@ -93,16 +90,18 @@
 	        		break;
 	        	} catch (BlockedException e) {
 	        		continue;
+	        	} catch (QueryProcessor.ExpiredTimeSliceException e) {
+	        		continue;
 	        	}
         	}
 		}  
         assertFalse(iterator.hasNext());
-    }
+	}
     
     /**
      * Ensures that a final empty batch is reindexed so that the batch iterator works correctly
      */
-    public void testEmptyBatchIndexing() throws TeiidComponentException, TeiidProcessingException {
+    @Test public void testEmptyBatchIndexing() throws TeiidComponentException, TeiidProcessingException {
     	ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
         es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
 
@@ -138,10 +137,41 @@
     	helpTestSelect(elements, crit, childElements, null, new List[0], child);
     }
     
-    public void testNoRows() throws TeiidComponentException, TeiidProcessingException {
+    @Test public void testTimeslicing() throws TeiidComponentException, TeiidProcessingException {
         ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
         es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
 
+        List elements = new ArrayList();
+        elements.add(es1);
+        
+        CompareCriteria crit = new CompareCriteria(es1, CompareCriteria.EQ, new Constant(new Integer(1)));
+        
+        List[] data = new List[] {
+        	Arrays.asList(1),
+        	Arrays.asList(1),
+        	Arrays.asList(1)
+        };
+        
+        List childElements = new ArrayList();
+        childElements.add(es1);
+        
+        helpTestSelect(elements, crit, childElements, null, data, new FakeRelationalNode(2, data), new SelectNode(3) {
+        	int i = 0;
+        	
+        	@Override
+        	protected Evaluator getEvaluator(Map elementMap) {
+        		if (i++ == 1) {
+        			throw new QueryProcessor.ExpiredTimeSliceException();
+        		}
+        		return super.getEvaluator(elementMap);
+        	}
+        });
+    }
+    
+    @Test public void testNoRows() throws TeiidComponentException, TeiidProcessingException {
+        ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
+        es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
+
         ElementSymbol es2 = new ElementSymbol("e2"); //$NON-NLS-1$
         es2.setType(DataTypeManager.DefaultDataClasses.STRING);
         
@@ -160,7 +190,7 @@
         
     }
 
-    public void testSimpleSelect() throws TeiidComponentException, TeiidProcessingException {
+    @Test public void testSimpleSelect() throws TeiidComponentException, TeiidProcessingException {
         ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
         es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
 
@@ -195,7 +225,7 @@
 
     }
 
-    public void testSelectWithLookup() throws TeiidComponentException, TeiidProcessingException {
+    @Test public void testSelectWithLookup() throws TeiidComponentException, TeiidProcessingException {
         ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
         es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
 



More information about the teiid-commits mailing list