[teiid-commits] teiid SVN: r3833 - branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Mon Jan 30 13:18:52 EST 2012


Author: shawkins
Date: 2012-01-30 13:18:50 -0500 (Mon, 30 Jan 2012)
New Revision: 3833

Modified:
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java
Log:
TEIID-1915 correction to thread coordination in XMLTableNode

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java	2012-01-27 18:10:06 UTC (rev 3832)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java	2012-01-30 18:18:50 UTC (rev 3833)
@@ -88,8 +88,15 @@
 	private Item item;
 	
 	private TupleBuffer buffer;
-	private boolean batchAvailable = false;
-	private TeiidRuntimeException asynchException;
+	
+	private enum State {
+		BUILDING,
+		AVAILABLE,
+		DONE
+	};
+	
+	private State state = State.BUILDING;
+	private volatile TeiidRuntimeException asynchException;
 	private int outputRow = 1;
 	private boolean usingOutput;
 	
@@ -121,7 +128,7 @@
 		outputRow = 1;
 		usingOutput = false;
 		this.buffer = null;
-		this.batchAvailable = false;
+		this.state = State.BUILDING;
 		this.asynchException = null;
 	}
 	
@@ -149,7 +156,7 @@
 		evaluate(false);
 		
 		if (this.table.getXQueryExpression().isStreaming()) {
-			while (!batchAvailable) {
+			while (state == State.BUILDING) {
 				try {
 					this.wait();
 				} catch (InterruptedException e) {
@@ -159,7 +166,9 @@
 			unwrapException(asynchException);
 			TupleBatch batch = this.buffer.getBatch(outputRow);
 			outputRow = batch.getEndRow() + 1;
-			batchAvailable = hasNextBatch();
+			if (state != State.DONE && !batch.getTerminationFlag()) {
+				state = hasNextBatch()?State.AVAILABLE:State.BUILDING;
+			}
 			return batch;
 		}
 		
@@ -195,16 +204,20 @@
 		if (this.table.getXQueryExpression().isStreaming()) {
 			if (this.buffer == null) {
 				this.buffer = this.getBufferManager().createTupleBuffer(getOutputElements(), getConnectionID(), TupleSourceType.PROCESSOR);
+				if (!useFinalBuffer) {
+					this.buffer.setForwardOnly(true);
+				}
 			}
 			Runnable r = new Runnable() {
 				@Override
 				public void run() {
 					try {
-						if (!useFinalBuffer) {
-							buffer.setForwardOnly(true);
+						XQueryEvaluator.evaluateXQuery(table.getXQueryExpression(), contextItem, parameters, XMLTableNode.this, getContext());
+						synchronized (XMLTableNode.this) {
+							if (buffer != null) {
+								buffer.close();
+							}
 						}
-						XQueryEvaluator.evaluateXQuery(table.getXQueryExpression(), contextItem, parameters, XMLTableNode.this, getContext());
-						buffer.close();
 					} catch (TeiidException e) {
 						asynchException = new TeiidRuntimeException(e);
 					} catch (TeiidRuntimeException e) {
@@ -214,8 +227,8 @@
 							asynchException = new TeiidRuntimeException(e);
 						}
 					} finally {
-						batchAvailable = true;
 						synchronized (XMLTableNode.this) {
+							state = State.DONE;
 							XMLTableNode.this.notifyAll();
 						}
 					}
@@ -323,12 +336,13 @@
 		if (isClosed()) {
 			throw EARLY_TERMINATION;
 		}
+		assert this.state != State.DONE;
 		this.item = row;
 		rowCount++;
 		try {
 			this.buffer.addTuple(processRow());
-			if (hasNextBatch()) {
-				this.batchAvailable = true;
+			if (state == State.BUILDING && hasNextBatch()) {
+				this.state = State.AVAILABLE;
 				this.notifyAll();
 			}
 		} catch (TeiidException e) {



More information about the teiid-commits mailing list