[teiid-commits] teiid SVN: r3641 - in trunk: engine/src/main/java/org/teiid/query/eval and 3 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Sun Nov 13 08:09:22 EST 2011
Author: shawkins
Date: 2011-11-13 08:09:21 -0500 (Sun, 13 Nov 2011)
New Revision: 3641
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java
trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
trunk/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java
Log:
TEIID-1813 adding asynch processing for faster batch delivery.
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-11-13 13:03:20 UTC (rev 3640)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-11-13 13:09:21 UTC (rev 3641)
@@ -335,6 +335,7 @@
request.initialize(requestMsg, bufferManager,
dataTierMgr, transactionService, state.sessionTables,
workContext, this.prepPlanCache);
+ request.setExecutor(this.processWorkerPool);
request.setResultSetCacheEnabled(this.rsCache != null);
request.setAuthorizationValidator(this.authorizationValidator);
request.setUserRequestConcurrency(this.getUserRequestSourceConcurrency());
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2011-11-13 13:03:20 UTC (rev 3640)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2011-11-13 13:09:21 UTC (rev 3641)
@@ -28,6 +28,7 @@
import java.util.List;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.Executor;
import org.teiid.adminapi.impl.VDBMetaData;
import org.teiid.api.exception.query.QueryMetadataException;
@@ -134,6 +135,7 @@
private boolean resultSetCacheEnabled = true;
private int userRequestConcurrency;
private AuthorizationValidator authorizationValidator;
+ private Executor executor;
void initialize(RequestMessage requestMsg,
BufferManager bufferManager,
@@ -240,7 +242,7 @@
multiSourceModels, workContext, context);
context.setPlanToProcessConverter(modifier);
}
-
+ context.setExecutor(this.executor);
context.setSecurityFunctionEvaluator(this);
context.setTempTableStore(tempTableStore);
context.setQueryProcessorFactory(new QueryProcessorFactoryImpl(this.bufferManager, this.processorDataManager, this.capabilitiesFinder, idGenerator, metadata));
@@ -475,4 +477,8 @@
}
}
+ public void setExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java 2011-11-13 13:03:20 UTC (rev 3640)
+++ trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java 2011-11-13 13:09:21 UTC (rev 3641)
@@ -925,6 +925,17 @@
public Result evaluateXQuery(SaxonXQueryExpression xquery, List<DerivedColumn> cols, List<?> tuple, RowProcessor processor)
throws BlockedException, TeiidComponentException, TeiidProcessingException {
HashMap<String, Object> parameters = new HashMap<String, Object>();
+ Object contextItem = evaluateParameters(cols, tuple, parameters);
+ return XQueryEvaluator.evaluateXQuery(xquery, contextItem, parameters, processor, context);
+ }
+
+ /**
+ * Evaluate the parameters and return the context item if it exists
+ */
+ public Object evaluateParameters(List<DerivedColumn> cols, List<?> tuple,
+ HashMap<String, Object> parameters)
+ throws ExpressionEvaluationException, BlockedException,
+ TeiidComponentException {
Object contextItem = null;
for (DerivedColumn passing : cols) {
Object value = this.evaluate(passing.getExpression(), tuple);
@@ -934,7 +945,7 @@
parameters.put(passing.getAlias(), value);
}
}
- return XQueryEvaluator.evaluateXQuery(xquery, contextItem, parameters, processor, context);
+ return contextItem;
}
private Evaluator.NameValuePair<Object>[] getNameValuePairs(List<?> tuple, List<DerivedColumn> args, boolean xmlNames)
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java 2011-11-13 13:03:20 UTC (rev 3640)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java 2011-11-13 13:09:21 UTC (rev 3641)
@@ -52,9 +52,11 @@
import org.teiid.core.types.DataTypeManager;
import org.teiid.core.types.XMLType;
import org.teiid.query.QueryPlugin;
+import org.teiid.query.eval.Evaluator;
import org.teiid.query.function.FunctionDescriptor;
import org.teiid.query.sql.lang.XMLTable;
import org.teiid.query.sql.lang.XMLTable.XMLColumn;
+import org.teiid.query.xquery.saxon.XQueryEvaluator;
import org.teiid.query.xquery.saxon.SaxonXQueryExpression.Result;
import org.teiid.query.xquery.saxon.SaxonXQueryExpression.RowProcessor;
@@ -76,6 +78,8 @@
typeMapping.put(DataTypeManager.DefaultDataClasses.DOUBLE, BuiltInAtomicType.DOUBLE);
}
+ private static RuntimeException EARLY_TERMINATION = new RuntimeException();
+
private XMLTable table;
private List<XMLColumn> projectedColumns;
@@ -84,6 +88,8 @@
private Item item;
private TupleBuffer buffer;
+ private boolean batchAvailable = false;
+ private TeiidRuntimeException asynchException;
private int outputRow = 1;
private boolean usingOutput;
@@ -92,7 +98,7 @@
}
@Override
- public void closeDirect() {
+ public synchronized void closeDirect() {
super.closeDirect();
if(this.buffer != null) {
if (!usingOutput) {
@@ -115,6 +121,8 @@
outputRow = 1;
usingOutput = false;
this.buffer = null;
+ this.batchAvailable = false;
+ this.asynchException = null;
}
public void setTable(XMLTable table) {
@@ -135,14 +143,23 @@
}
@Override
- protected TupleBatch nextBatchDirect() throws BlockedException,
+ protected synchronized TupleBatch nextBatchDirect() throws BlockedException,
TeiidComponentException, TeiidProcessingException {
- evaluate();
+ evaluate(false);
if (this.table.getXQueryExpression().isStreaming()) {
+ while (!batchAvailable) {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+ unwrapException(asynchException);
TupleBatch batch = this.buffer.getBatch(outputRow);
outputRow = batch.getEndRow() + 1;
+ batchAvailable = hasNextBatch();
return batch;
}
@@ -164,34 +181,70 @@
return pullBatch();
}
- private void evaluate() throws TeiidComponentException,
+ private void evaluate(final boolean useFinalBuffer) throws TeiidComponentException,
ExpressionEvaluationException, BlockedException,
TeiidProcessingException {
- if (result == null) {
- if (this.buffer == null && this.table.getXQueryExpression().isStreaming()) {
- this.buffer = this.getBufferManager().createTupleBuffer(getOutputElements(), getConnectionID(), TupleSourceType.PROCESSOR);
+ if (result != null || this.buffer != null) {
+ return;
+ }
+ setReferenceValues(this.table);
+ final HashMap<String, Object> parameters = new HashMap<String, Object>();
+ Evaluator eval = getEvaluator(Collections.emptyMap());
+ final Object contextItem = eval.evaluateParameters(this.table.getPassing(), null, parameters);
+
+ if (this.table.getXQueryExpression().isStreaming()) {
+ if (this.buffer == null) {
+ this.buffer = this.getBufferManager().createTupleBuffer(getOutputElements(), getConnectionID(), TupleSourceType.PROCESSOR);
}
- setReferenceValues(this.table);
- try {
- result = getEvaluator(Collections.emptyMap()).evaluateXQuery(this.table.getXQueryExpression(), this.table.getPassing(), null, this);
- if (this.buffer != null) {
- this.buffer.close();
- if (!usingOutput) {
- this.buffer.setForwardOnly(true);
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (!useFinalBuffer) {
+ buffer.setForwardOnly(true);
+ }
+ XQueryEvaluator.evaluateXQuery(table.getXQueryExpression(), contextItem, parameters, XMLTableNode.this, getContext());
+ buffer.close();
+ } catch (TeiidException e) {
+ asynchException = new TeiidRuntimeException(e);
+ } catch (TeiidRuntimeException e) {
+ asynchException = e;
+ } catch (RuntimeException e) {
+ if (e != EARLY_TERMINATION) {
+ asynchException = new TeiidRuntimeException(e);
+ }
+ } finally {
+ batchAvailable = true;
+ synchronized (XMLTableNode.this) {
+ XMLTableNode.this.notifyAll();
+ }
}
}
- } catch (TeiidRuntimeException e) {
- if (e.getCause() instanceof TeiidComponentException) {
- throw (TeiidComponentException)e.getCause();
- }
- if (e.getCause() instanceof TeiidProcessingException) {
- throw (TeiidProcessingException)e.getCause();
- }
- throw e;
- }
+ };
+ this.getContext().getExecutor().execute(r);
+ return;
}
+ try {
+ result = XQueryEvaluator.evaluateXQuery(this.table.getXQueryExpression(), contextItem, parameters, null, this.getContext());
+ } catch (TeiidRuntimeException e) {
+ unwrapException(e);
+ }
}
+ private void unwrapException(TeiidRuntimeException e)
+ throws TeiidComponentException, TeiidProcessingException {
+ if (e == null) {
+ return;
+ }
+ if (e.getCause() instanceof TeiidComponentException) {
+ throw (TeiidComponentException)e.getCause();
+ }
+ if (e.getCause() instanceof TeiidProcessingException) {
+ throw (TeiidProcessingException)e.getCause();
+ }
+ throw e;
+ }
+
private List<?> processRow() throws ExpressionEvaluationException, BlockedException,
TeiidComponentException, TeiidProcessingException {
List<Object> tuple = new ArrayList<Object>(projectedColumns.size());
@@ -256,23 +309,35 @@
@Override
public TupleBuffer getFinalBuffer() throws BlockedException,
TeiidComponentException, TeiidProcessingException {
- evaluate();
+ evaluate(true);
usingOutput = true;
TupleBuffer finalBuffer = this.buffer;
- this.buffer = null;
- close();
+ if (!this.table.getXQueryExpression().isStreaming()) {
+ close();
+ }
return finalBuffer;
}
@Override
- public void processRow(NodeInfo row) {
+ public synchronized void processRow(NodeInfo row) {
+ if (isClosed()) {
+ throw EARLY_TERMINATION;
+ }
this.item = row;
rowCount++;
try {
this.buffer.addTuple(processRow());
+ if (hasNextBatch()) {
+ this.batchAvailable = true;
+ this.notifyAll();
+ }
} catch (TeiidException e) {
throw new TeiidRuntimeException(e);
}
}
-
+
+ private boolean hasNextBatch() {
+ return this.outputRow + this.buffer.getBatchSize() <= rowCount + 1;
+ }
+
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java 2011-11-13 13:03:20 UTC (rev 3640)
+++ trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java 2011-11-13 13:09:21 UTC (rev 3641)
@@ -32,6 +32,7 @@
import java.util.Random;
import java.util.Set;
import java.util.TimeZone;
+import java.util.concurrent.Executor;
import javax.security.auth.Subject;
@@ -42,6 +43,7 @@
import org.teiid.common.buffer.BufferManager;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.util.ArgCheck;
+import org.teiid.core.util.ExecutorUtils;
import org.teiid.dqp.internal.process.DQPWorkContext;
import org.teiid.dqp.internal.process.PreparedPlan;
import org.teiid.dqp.internal.process.SessionAwareCache;
@@ -135,6 +137,7 @@
private TransactionContext transactionContext;
private TransactionService transactionService;
private SourceHint sourceHint;
+ private Executor executor = ExecutorUtils.getDirectExecutor();
}
private GlobalState globalState = new GlobalState();
@@ -639,4 +642,12 @@
this.globalState.sourceHint = hint;
}
+ public Executor getExecutor() {
+ return this.globalState.executor;
+ }
+
+ public void setExecutor(Executor e) {
+ this.globalState.executor = e;
+ }
+
}
Modified: trunk/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java 2011-11-13 13:03:20 UTC (rev 3640)
+++ trunk/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java 2011-11-13 13:09:21 UTC (rev 3641)
@@ -26,6 +26,7 @@
import java.net.InetSocketAddress;
import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;
@@ -97,6 +98,22 @@
assertEquals("<root></root>", s.getResultSet().getString(1));
}
+ @Test public void testXmlTableScrollable() throws Exception {
+ Statement s = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
+ assertTrue(s.execute("select * from xmltable('/root/row' passing (select xmlelement(name \"root\", xmlagg(xmlelement(name \"row\", xmlforest(t.name)) order by t.name)) from tables as t, columns as t1) columns \"Name\" string) as x"));
+ ResultSet rs = s.getResultSet();
+ int count = 0;
+ while (rs.next()) {
+ count++;
+ }
+ assertEquals(7812, count);
+ rs.beforeFirst();
+ while (rs.next()) {
+ count--;
+ }
+ assertEquals(0, count);
+ }
+
/**
* Ensures if you start more than the maxActivePlans
* where all the plans take up more than output buffer limit
More information about the teiid-commits
mailing list