Author: shawkins
Date: 2010-07-06 21:34:45 -0400 (Tue, 06 Jul 2010)
New Revision: 2329
Modified:
trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java
trunk/engine/src/main/java/org/teiid/query/processor/xml/IfInstruction.java
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSelectNode.java
Log:
TEIID-1144 fix for blockedexception handling that causes problems with timeslicing. also
updating the ping logic.
Modified: trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java
===================================================================
--- trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java 2010-07-06
18:36:27 UTC (rev 2328)
+++ trunk/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java 2010-07-07
01:34:45 UTC (rev 2329)
@@ -107,13 +107,16 @@
@Override
public void run() {
- if (ping == null || !ping.isDone()) {
+ if (ping == null) {
ping = isOpen();
}
- if (ping != null && ping.isDone()) {
+ if (ping != null) {
try {
- ping.get();
+ ping.get(1, TimeUnit.SECONDS);
+ ping = null;
return;
+ } catch (TimeoutException e) {
+ return;
} catch (Throwable e) {
handlePingError(e);
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java 2010-07-06
18:36:27 UTC (rev 2328)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java 2010-07-07
01:34:45 UTC (rev 2329)
@@ -23,7 +23,6 @@
package org.teiid.query.processor.relational;
import java.util.List;
-import java.util.Map;
import org.teiid.common.buffer.BlockedException;
import org.teiid.core.TeiidComponentException;
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java 2010-07-06
18:36:27 UTC (rev 2328)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java 2010-07-07
01:34:45 UTC (rev 2329)
@@ -61,7 +61,7 @@
// Saved state when blocked on evaluating a row - must be reset
private TupleBatch currentBatch;
- private int currentRow;
+ private int currentRow = 1;
public ProjectNode(int nodeID) {
super(nodeID);
@@ -71,7 +71,7 @@
super.reset();
currentBatch = null;
- currentRow = 0;
+ currentRow = 1;
}
/**
@@ -147,70 +147,50 @@
public TupleBatch nextBatchDirect()
throws BlockedException, TeiidComponentException, TeiidProcessingException {
-
- // currentBatch and currentRow hold temporary state saved in the case
- // of a BlockedException while evaluating an expression. If that has
- // not occurred, currentBatch will be null and currentRow will be < 0.
- // blockedOnPrepare indicates that the BlockedException happened
- // during the call to prepareToProcessTuple
-
- TupleBatch batch = this.currentBatch;
- int beginRow = this.currentRow;
-
- if(batch == null) {
+
+ if(currentBatch == null) {
// There was no saved batch, so get a new one
//in the case of select with no from, should return only
//one batch with one row
if(this.getChildren()[0] == null){
- batch = new TupleBatch(0, new List[]{Arrays.asList(new Object[] {})});
- batch.setTerminationFlag(true);
+ currentBatch = new TupleBatch(1, new List[]{Arrays.asList(new Object[]
{})});
+ currentBatch.setTerminationFlag(true);
}else{
- batch = this.getChildren()[0].nextBatch();
+ currentBatch = this.getChildren()[0].nextBatch();
}
// Check for no project needed and pass through
- if(batch.getRowCount() == 0 || !needsProject) {
- // Just pass the batch through without processing
- return batch;
+ if(!needsProject) {
+ TupleBatch result = currentBatch;
+ currentBatch = null;
+ return result;
}
-
- // Set the beginRow based on beginning row of the batch
- beginRow = batch.getBeginRow();
-
- } else {
- // There was a saved batch, but we grabbed the state so it can now be
removed
- this.currentBatch = null;
- this.currentRow = 0;
}
- for(int row = beginRow; row <= batch.getEndRow(); row++) {
- List tuple = batch.getTuple(row);
+ while (currentRow <= currentBatch.getEndRow() && !isBatchFull()) {
+ List tuple = currentBatch.getTuple(currentRow);
List projectedTuple = new ArrayList(selectSymbols.size());
// Walk through symbols
- try {
- for(int i=0; i<selectSymbols.size(); i++) {
- SelectSymbol symbol = (SelectSymbol) selectSymbols.get(i);
- updateTuple(symbol, tuple, projectedTuple);
- }
- } catch(BlockedException e) {
- // Expression blocked, so save state and rethrow
- this.currentBatch = batch;
- this.currentRow = row;
- throw e;
- }
+ for(int i=0; i<selectSymbols.size(); i++) {
+ SelectSymbol symbol = (SelectSymbol) selectSymbols.get(i);
+ updateTuple(symbol, tuple, projectedTuple);
+ }
// Add to batch
addBatchRow(projectedTuple);
+ currentRow++;
}
-
- // Check for termination tuple
- if(batch.getTerminationFlag()) {
- terminateBatches();
+
+ if (currentRow > currentBatch.getEndRow()) {
+ if(currentBatch.getTerminationFlag()) {
+ terminateBatches();
+ }
+ currentBatch = null;
}
-
- return pullBatch();
+
+ return pullBatch();
}
private void updateTuple(SelectSymbol symbol, List values, List tuple)
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2010-07-06
18:36:27 UTC (rev 2328)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2010-07-07
01:34:45 UTC (rev 2329)
@@ -41,6 +41,7 @@
import org.teiid.logging.MessageLevel;
import org.teiid.query.analysis.AnalysisRecord;
import org.teiid.query.processor.ProcessorDataManager;
+import org.teiid.query.processor.QueryProcessor;
import org.teiid.query.processor.BatchCollector.BatchProducer;
import org.teiid.query.sql.symbol.AliasSymbol;
import org.teiid.query.sql.symbol.SingleElementSymbol;
@@ -287,6 +288,11 @@
this.nodeStatistics.collectCumulativeNodeStats(null,
RelationalNodeStatistics.BLOCKEDEXCEPTION_STOP);
}
throw e;
+ } catch (QueryProcessor.ExpiredTimeSliceException e) {
+ if(recordStats && this.context.getCollectNodeStatistics()) {
+ this.nodeStatistics.stopBatchTimer();
+ }
+ throw e;
} catch (TeiidComponentException e) {
// stop timer for this batch (MetaMatrixComponentException)
if(recordStats && this.context.getCollectNodeStatistics()) {
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java 2010-07-06
18:36:27 UTC (rev 2328)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java 2010-07-07
01:34:45 UTC (rev 2329)
@@ -49,10 +49,8 @@
private int[] projectionIndexes;
// State if blocked on evaluating a criteria
- private boolean blockedOnCriteria = false;
- private boolean blockedOnPrepare = false;
- private TupleBatch blockedBatch = null;
- private int blockedRow = 0;
+ private TupleBatch currentBatch;
+ private int currentRow = 1;
public SelectNode(int nodeID) {
super(nodeID);
@@ -61,10 +59,8 @@
public void reset() {
super.reset();
- blockedOnCriteria = false;
- blockedOnPrepare = false;
- blockedBatch = null;
- blockedRow = 0;
+ currentBatch = null;
+ currentRow = 1;
}
public void setCriteria(Criteria criteria) {
@@ -91,45 +87,28 @@
*/
public TupleBatch nextBatchDirect()
throws BlockedException, TeiidComponentException, TeiidProcessingException {
-
- TupleBatch batch = blockedBatch;
- if(! blockedOnCriteria && ! blockedOnPrepare) {
- batch = this.getChildren()[0].nextBatch();
+
+ if(currentBatch == null) {
+ currentBatch = this.getChildren()[0].nextBatch();
}
-
- int row = blockedRow;
- if(! blockedOnCriteria && ! blockedOnPrepare) {
- row = batch.getBeginRow();
- } else {
- // Reset blocked state
- blockedOnCriteria = false;
- blockedOnPrepare = false;
- blockedBatch = null;
- blockedRow = 0;
- }
-
- for(; row <= batch.getEndRow(); row++) {
- List tuple = batch.getTuple(row);
-
- // Evaluate criteria with tuple
- try {
- if(getEvaluator(this.elementMap).evaluate(this.criteria, tuple)) {
- addBatchRow(projectTuple(this.projectionIndexes, tuple));
- }
- } catch(BlockedException e) {
- // Save state and rethrow
- blockedOnCriteria = true;
- blockedBatch = batch;
- blockedRow = row;
- throw e;
- }
- }
- if(batch.getTerminationFlag()) {
- terminateBatches();
- }
+ while (currentRow <= currentBatch.getEndRow() && !isBatchFull()) {
+ List tuple = currentBatch.getTuple(currentRow);
- return pullBatch();
+ if(getEvaluator(this.elementMap).evaluate(this.criteria, tuple)) {
+ addBatchRow(projectTuple(this.projectionIndexes, tuple));
+ }
+ currentRow++;
+ }
+
+ if (currentRow > currentBatch.getEndRow()) {
+ if(currentBatch.getTerminationFlag()) {
+ terminateBatches();
+ }
+ currentBatch = null;
+ }
+
+ return pullBatch();
}
protected void getNodeString(StringBuffer str) {
Modified: trunk/engine/src/main/java/org/teiid/query/processor/xml/IfInstruction.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/xml/IfInstruction.java 2010-07-06
18:36:27 UTC (rev 2328)
+++ trunk/engine/src/main/java/org/teiid/query/processor/xml/IfInstruction.java 2010-07-07
01:34:45 UTC (rev 2329)
@@ -54,8 +54,7 @@
private DefaultCondition defaultCondition;
// State if condition evaluation blocked
- private boolean blockedOnCondition = false;
- private int blockedConditionIndex = 0;
+ private int conditionIndex = 0;
/**
* Constructor for IfInstruction.
@@ -148,37 +147,22 @@
thens.add(defaultCondition);
}
- int conditionIndex = this.blockedConditionIndex;
- if(blockedOnCondition) {
- // Remove state - we have recovered and will reset if necessary
- this.blockedOnCondition = false;
- this.blockedConditionIndex = 0;
- } else{
- conditionIndex = 0;
- }
-
Condition condition = null;
boolean foundTrueCondition = false;
for(; conditionIndex < thens.size(); conditionIndex++){
condition = (Condition)thens.get(conditionIndex);
- // evaluate may block if criteria evaluation blocks
- try {
- if(condition.evaluate(env, context)) {
- foundTrueCondition = true;
- //break from the loop; only the first "then" Program
- //whose criteria evaluates to true will be executed
- break;
- }
- } catch(BlockedException e) {
- // Save state and rethrow
- this.blockedOnCondition = true;
- this.blockedConditionIndex = conditionIndex;
- throw e;
+ // evaluate may block if criteria evaluation blocks
+ if(condition.evaluate(env, context)) {
+ foundTrueCondition = true;
+ //break from the loop; only the first "then" Program
+ //whose criteria evaluates to true will be executed
+ break;
}
}
+ conditionIndex = 0;
// This IF instruction should be processed exactly once, so the
// program containing the IF instruction needs to have it's
Modified:
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSelectNode.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSelectNode.java 2010-07-06
18:36:27 UTC (rev 2328)
+++
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSelectNode.java 2010-07-07
01:34:45 UTC (rev 2329)
@@ -22,12 +22,15 @@
package org.teiid.query.processor.relational;
+import static org.junit.Assert.*;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.junit.Test;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.BufferManagerFactory;
@@ -35,13 +38,13 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.query.eval.Evaluator;
import org.teiid.query.function.FunctionDescriptor;
import org.teiid.query.function.SystemFunctionManager;
import org.teiid.query.processor.BatchIterator;
import org.teiid.query.processor.FakeDataManager;
import org.teiid.query.processor.ProcessorDataManager;
-import org.teiid.query.processor.relational.RelationalNode;
-import org.teiid.query.processor.relational.SelectNode;
+import org.teiid.query.processor.QueryProcessor;
import org.teiid.query.sql.lang.CompareCriteria;
import org.teiid.query.sql.lang.Criteria;
import org.teiid.query.sql.symbol.Constant;
@@ -50,34 +53,28 @@
import org.teiid.query.sql.symbol.Function;
import org.teiid.query.util.CommandContext;
-import junit.framework.TestCase;
+public class TestSelectNode {
-
-/**
- */
-public class TestSelectNode extends TestCase {
-
- /**
- * Constructor for TestSelectNode.
- * @param arg0
- */
- public TestSelectNode(String arg0) {
- super(arg0);
- }
-
public void helpTestSelect(List elements, Criteria criteria, List[] data, List
childElements, ProcessorDataManager dataMgr, List[] expected) throws
TeiidComponentException, TeiidProcessingException {
helpTestSelect(elements, criteria, childElements, dataMgr, expected, new
FakeRelationalNode(2, data));
}
public void helpTestSelect(List elements, Criteria criteria, List childElements,
ProcessorDataManager dataMgr, List[] expected, RelationalNode child) throws
TeiidComponentException, TeiidProcessingException {
- BufferManager mgr = BufferManagerFactory.getStandaloneBufferManager();
+ SelectNode selectNode = new SelectNode(1);
+ helpTestSelect(elements, criteria, childElements, dataMgr, expected, child,
selectNode);
+ }
+
+ private void helpTestSelect(List elements, Criteria criteria, List childElements,
+ ProcessorDataManager dataMgr, List[] expected,
+ RelationalNode child,
+ SelectNode selectNode) throws TeiidComponentException,
+ TeiidProcessingException {
+ BufferManager mgr = BufferManagerFactory.getStandaloneBufferManager();
CommandContext context = new CommandContext("pid", "test",
null, null, 1); //$NON-NLS-1$ //$NON-NLS-2$
child.setElements(childElements);
- child.initialize(context, mgr, dataMgr);
-
- SelectNode selectNode = new SelectNode(1);
- selectNode.setCriteria(criteria);
+ child.initialize(context, mgr, dataMgr);
+ selectNode.setCriteria(criteria);
selectNode.setElements(elements);
selectNode.addChild(child);
selectNode.initialize(context, mgr, dataMgr);
@@ -93,16 +90,18 @@
break;
} catch (BlockedException e) {
continue;
+ } catch (QueryProcessor.ExpiredTimeSliceException e) {
+ continue;
}
}
}
assertFalse(iterator.hasNext());
- }
+ }
/**
* Ensures that a final empty batch is reindexed so that the batch iterator works
correctly
*/
- public void testEmptyBatchIndexing() throws TeiidComponentException,
TeiidProcessingException {
+ @Test public void testEmptyBatchIndexing() throws TeiidComponentException,
TeiidProcessingException {
ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
@@ -138,10 +137,41 @@
helpTestSelect(elements, crit, childElements, null, new List[0], child);
}
- public void testNoRows() throws TeiidComponentException, TeiidProcessingException {
+ @Test public void testTimeslicing() throws TeiidComponentException,
TeiidProcessingException {
ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
+ List elements = new ArrayList();
+ elements.add(es1);
+
+ CompareCriteria crit = new CompareCriteria(es1, CompareCriteria.EQ, new
Constant(new Integer(1)));
+
+ List[] data = new List[] {
+ Arrays.asList(1),
+ Arrays.asList(1),
+ Arrays.asList(1)
+ };
+
+ List childElements = new ArrayList();
+ childElements.add(es1);
+
+ helpTestSelect(elements, crit, childElements, null, data, new
FakeRelationalNode(2, data), new SelectNode(3) {
+ int i = 0;
+
+ @Override
+ protected Evaluator getEvaluator(Map elementMap) {
+ if (i++ == 1) {
+ throw new QueryProcessor.ExpiredTimeSliceException();
+ }
+ return super.getEvaluator(elementMap);
+ }
+ });
+ }
+
+ @Test public void testNoRows() throws TeiidComponentException,
TeiidProcessingException {
+ ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
+ es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
+
ElementSymbol es2 = new ElementSymbol("e2"); //$NON-NLS-1$
es2.setType(DataTypeManager.DefaultDataClasses.STRING);
@@ -160,7 +190,7 @@
}
- public void testSimpleSelect() throws TeiidComponentException,
TeiidProcessingException {
+ @Test public void testSimpleSelect() throws TeiidComponentException,
TeiidProcessingException {
ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
@@ -195,7 +225,7 @@
}
- public void testSelectWithLookup() throws TeiidComponentException,
TeiidProcessingException {
+ @Test public void testSelectWithLookup() throws TeiidComponentException,
TeiidProcessingException {
ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);