Author: shawkins
Date: 2012-01-30 13:18:50 -0500 (Mon, 30 Jan 2012)
New Revision: 3833
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java
Log:
TEIID-1915 correction to thread coordination in XMLTableNode
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java 2012-01-27
18:10:06 UTC (rev 3832)
+++
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java 2012-01-30
18:18:50 UTC (rev 3833)
@@ -88,8 +88,15 @@
private Item item;
private TupleBuffer buffer;
- private boolean batchAvailable = false;
- private TeiidRuntimeException asynchException;
+
+ private enum State {
+ BUILDING,
+ AVAILABLE,
+ DONE
+ };
+
+ private State state = State.BUILDING;
+ private volatile TeiidRuntimeException asynchException;
private int outputRow = 1;
private boolean usingOutput;
@@ -121,7 +128,7 @@
outputRow = 1;
usingOutput = false;
this.buffer = null;
- this.batchAvailable = false;
+ this.state = State.BUILDING;
this.asynchException = null;
}
@@ -149,7 +156,7 @@
evaluate(false);
if (this.table.getXQueryExpression().isStreaming()) {
- while (!batchAvailable) {
+ while (state == State.BUILDING) {
try {
this.wait();
} catch (InterruptedException e) {
@@ -159,7 +166,9 @@
unwrapException(asynchException);
TupleBatch batch = this.buffer.getBatch(outputRow);
outputRow = batch.getEndRow() + 1;
- batchAvailable = hasNextBatch();
+ if (state != State.DONE && !batch.getTerminationFlag()) {
+ state = hasNextBatch()?State.AVAILABLE:State.BUILDING;
+ }
return batch;
}
@@ -195,16 +204,20 @@
if (this.table.getXQueryExpression().isStreaming()) {
if (this.buffer == null) {
this.buffer = this.getBufferManager().createTupleBuffer(getOutputElements(),
getConnectionID(), TupleSourceType.PROCESSOR);
+ if (!useFinalBuffer) {
+ this.buffer.setForwardOnly(true);
+ }
}
Runnable r = new Runnable() {
@Override
public void run() {
try {
- if (!useFinalBuffer) {
- buffer.setForwardOnly(true);
+ XQueryEvaluator.evaluateXQuery(table.getXQueryExpression(), contextItem,
parameters, XMLTableNode.this, getContext());
+ synchronized (XMLTableNode.this) {
+ if (buffer != null) {
+ buffer.close();
+ }
}
- XQueryEvaluator.evaluateXQuery(table.getXQueryExpression(), contextItem,
parameters, XMLTableNode.this, getContext());
- buffer.close();
} catch (TeiidException e) {
asynchException = new TeiidRuntimeException(e);
} catch (TeiidRuntimeException e) {
@@ -214,8 +227,8 @@
asynchException = new TeiidRuntimeException(e);
}
} finally {
- batchAvailable = true;
synchronized (XMLTableNode.this) {
+ state = State.DONE;
XMLTableNode.this.notifyAll();
}
}
@@ -323,12 +336,13 @@
if (isClosed()) {
throw EARLY_TERMINATION;
}
+ assert this.state != State.DONE;
this.item = row;
rowCount++;
try {
this.buffer.addTuple(processRow());
- if (hasNextBatch()) {
- this.batchAvailable = true;
+ if (state == State.BUILDING && hasNextBatch()) {
+ this.state = State.AVAILABLE;
this.notifyAll();
}
} catch (TeiidException e) {
Show replies by date