Author: jolee
Date: 2013-05-30 23:45:35 -0400 (Thu, 30 May 2013)
New Revision: 4573
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestSQLXMLProcessing.java
Log:
Timing issue when processing data from an XML stream
Modified: branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2013-05-23
19:56:46 UTC (rev 4572)
+++
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2013-05-31
03:45:35 UTC (rev 4573)
@@ -30,6 +30,7 @@
import org.teiid.client.ResizingArrayList;
import org.teiid.common.buffer.LobManager.ReferenceMode;
import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
import org.teiid.core.types.Streamable;
import org.teiid.core.util.Assertion;
@@ -42,6 +43,73 @@
public class TupleBuffer {
+ public interface RowSource {
+ void finalRow() throws TeiidComponentException, TeiidProcessingException;
+ }
+
+ public final class TupleBufferTupleSource extends
+ AbstractTupleSource {
+ private final boolean singleUse;
+ private boolean noBlocking;
+ private boolean reverse;
+
+ private TupleBufferTupleSource(boolean singleUse) {
+ this.singleUse = singleUse;
+ }
+
+ @Override
+ protected List<?> finalRow() throws TeiidComponentException,
TeiidProcessingException {
+ if(isFinal || noBlocking || reverse) {
+ return null;
+ }
+ if (rowSourceLock == null) {
+ throw BlockedException.blockWithTrace("Blocking on non-final TupleBuffer",
tupleSourceID, "size", getRowCount()); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ synchronized (rowSourceLock) {
+ rowSourceLock.finalRow();
+ return getCurrentTuple();
+ }
+ }
+
+ @Override
+ public int available() {
+ if (!reverse) {
+ return rowCount - getCurrentIndex() + 1;
+ }
+ return getCurrentIndex();
+ }
+
+ @Override
+ protected TupleBatch getBatch(int row) throws TeiidComponentException {
+ return TupleBuffer.this.getBatch(row);
+ }
+
+ @Override
+ public void closeSource() {
+ super.closeSource();
+ if (singleUse) {
+ remove();
+ }
+ }
+
+ public void setNoBlocking(boolean noBlocking) {
+ this.noBlocking = noBlocking;
+ }
+
+ public void setReverse(boolean reverse) {
+ this.reverse = reverse;
+ }
+
+ @Override
+ public int getCurrentIndex() {
+ if (!reverse) {
+ return super.getCurrentIndex();
+ }
+ return getRowCount() - super.getCurrentIndex() + 1;
+ }
+
+ }
+
/**
* Gets the data type names for each of the input expressions, in order.
* @param expressions List of Expressions
@@ -75,6 +143,7 @@
private LobManager lobManager;
private String uuid;
+ private RowSource rowSourceLock;
public TupleBuffer(BatchManager manager, String id, List<? extends Expression>
schema, LobManager lobManager, int batchSize) {
this.manager = manager;
@@ -84,6 +153,10 @@
this.batchSize = batchSize;
}
+ public void setRowSourceLock(RowSource rowSourceLock) {
+ this.rowSourceLock = rowSourceLock;
+ }
+
public void setInlineLobs(boolean inline) {
if (this.lobManager != null) {
this.lobManager.setInlineLobs(inline);
@@ -311,34 +384,7 @@
if (singleUse) {
setForwardOnly(true);
}
- return new AbstractTupleSource() {
-
- @Override
- protected List<?> finalRow() throws BlockedException {
- if(isFinal) {
- return null;
- }
- throw BlockedException.blockWithTrace("Blocking on non-final
TupleBuffer", tupleSourceID, "size", getRowCount()); //$NON-NLS-1$
//$NON-NLS-2$
- }
-
- @Override
- public int available() {
- return rowCount - getCurrentIndex() + 1;
- }
-
- @Override
- protected TupleBatch getBatch(int row) throws TeiidComponentException {
- return TupleBuffer.this.getBatch(row);
- }
-
- @Override
- public void closeSource() {
- super.closeSource();
- if (singleUse) {
- remove();
- }
- }
- };
+ return new TupleBufferTupleSource(singleUse);
}
@Override
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java 2013-05-23
19:56:46 UTC (rev 4572)
+++
branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java 2013-05-31
03:45:35 UTC (rev 4573)
@@ -25,7 +25,6 @@
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -46,9 +45,9 @@
import org.teiid.api.exception.query.ExpressionEvaluationException;
import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleBuffer;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidException;
import org.teiid.core.TeiidProcessingException;
@@ -60,9 +59,9 @@
import org.teiid.query.function.FunctionDescriptor;
import org.teiid.query.sql.lang.XMLTable;
import org.teiid.query.sql.lang.XMLTable.XMLColumn;
-import org.teiid.query.xquery.saxon.XQueryEvaluator;
import org.teiid.query.xquery.saxon.SaxonXQueryExpression.Result;
import org.teiid.query.xquery.saxon.SaxonXQueryExpression.RowProcessor;
+import org.teiid.query.xquery.saxon.XQueryEvaluator;
/**
* Handles xml table processing.
@@ -70,7 +69,7 @@
* When streaming the results will be fully built and stored in a buffer
* before being returned
*/
-public class XMLTableNode extends SubqueryAwareRelationalNode implements RowProcessor {
+public class XMLTableNode extends SubqueryAwareRelationalNode implements RowProcessor,
TupleBuffer.RowSource {
private static Map<Class<?>, BuiltInAtomicType> typeMapping = new
HashMap<Class<?>, BuiltInAtomicType>();
@@ -232,12 +231,20 @@
}
} finally {
synchronized (XMLTableNode.this) {
+ if (buffer != null && asynchException == null) {
+ try {
+ buffer.close();
+ } catch (TeiidComponentException e) {
+ asynchException = new TeiidRuntimeException(e);
+ }
+ }
state = State.DONE;
XMLTableNode.this.notifyAll();
}
}
}
};
+ this.buffer.setRowSourceLock(this);
this.getContext().getExecutor().execute(r);
return;
}
@@ -247,6 +254,21 @@
unwrapException(e);
}
}
+
+ @Override
+ public synchronized void finalRow() throws TeiidComponentException,
+ TeiidProcessingException {
+ while (state == State.BUILDING) {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+ if (this.asynchException != null) {
+ unwrapException(this.asynchException);
+ }
+ }
private List<?> processRow() throws ExpressionEvaluationException,
BlockedException,
TeiidComponentException, TeiidProcessingException {
Modified:
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestSQLXMLProcessing.java
===================================================================
---
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestSQLXMLProcessing.java 2013-05-23
19:56:46 UTC (rev 4572)
+++
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestSQLXMLProcessing.java 2013-05-31
03:45:35 UTC (rev 4573)
@@ -33,10 +33,15 @@
import java.util.List;
import java.util.TimeZone;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.teiid.api.exception.query.ExpressionEvaluationException;
+import org.teiid.client.util.ResultsFuture;
+import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.BlobImpl;
import org.teiid.core.types.BlobType;
import org.teiid.core.types.DataTypeManager;
@@ -53,6 +58,7 @@
import org.teiid.query.sql.symbol.Expression;
import org.teiid.query.unittest.RealMetadataFactory;
import org.teiid.query.unittest.TimestampUtil;
+import org.teiid.query.util.CommandContext;
@SuppressWarnings({"nls", "unchecked"})
public class TestSQLXMLProcessing {
@@ -357,6 +363,66 @@
process(sql, expected);
}
+ @Test public void testXmlTableStreamingTiming() throws Throwable {
+ String sql = "select xmlserialize(x.object_value as string), y.x from
xmltable('/a/b' passing xmlparse(document '<a><b
x=''1''/><b x=''2''/></a>')) as x,
(select 1 as x) as y"; //$NON-NLS-1$
+
+ final List<?>[] expected = new List<?>[] {
+ Arrays.asList("<b xmlns=\"\" x=\"1\"/>",
1),
+ Arrays.asList("<b xmlns=\"\" x=\"2\"/>",
1)
+ };
+
+ executeStreaming(sql, expected);
+ }
+
+ @Test(expected=TeiidProcessingException.class) public void
testXmlTableStreamingTimingWithError() throws Throwable {
+ String sql = "select x.x, y.x from xmltable('/a/b' passing
xmlparse(document '<a><b x=''1''/><b
x=''2''/></a>') columns x integer path '1 div (@x -
1)') as x, (select 1 as x) as y"; //$NON-NLS-1$
+
+ final List<?>[] expected = new List<?>[] {
+ Arrays.asList(1, 1),
+ Arrays.asList(2, 1)
+ };
+
+ executeStreaming(sql, expected);
+ }
+
+ private void executeStreaming(String sql, final List<?>[] expected)
+ throws Throwable {
+ final CommandContext cc = createCommandContext();
+ final ResultsFuture<Runnable> r = new ResultsFuture<Runnable>();
+ Executor ex = new Executor() {
+
+ @Override
+ public void execute(Runnable command) {
+ r.getResultsReceiver().receiveResults(command);
+ }
+ };
+ cc.setExecutor(ex);
+ final ProcessorPlan plan = helpGetPlan(helpParse(sql),
RealMetadataFactory.example1Cached(), new DefaultCapabilitiesFinder(), cc);
+ final ResultsFuture<Void> result = new ResultsFuture<Void>();
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ doProcess(plan, dataManager, expected, cc);
+ result.getResultsReceiver().receiveResults(null);
+ } catch (Throwable e) {
+ result.getResultsReceiver().exceptionOccurred(e);
+ }
+ }
+ };
+ t.start();
+ Runnable runnable = r.get();
+ runnable.run();
+ try {
+ result.get();
+ } catch (ExecutionException e) {
+ if (e.getCause() != null) {
+ throw e.getCause();
+ }
+ throw e;
+ }
+ }
+
@Test public void testXmlNameEscaping() throws Exception {
String sql = "select xmlforest(\"xml\") from (select 1 as
\"xml\") x"; //$NON-NLS-1$