[teiid-commits] teiid SVN: r3641 - in trunk: engine/src/main/java/org/teiid/query/eval and 3 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Sun Nov 13 08:09:22 EST 2011


Author: shawkins
Date: 2011-11-13 08:09:21 -0500 (Sun, 13 Nov 2011)
New Revision: 3641

Modified:
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
   trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java
   trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
   trunk/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java
Log:
TEIID-1813 adding asynch processing for faster batch delivery.

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-11-13 13:03:20 UTC (rev 3640)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-11-13 13:09:21 UTC (rev 3641)
@@ -335,6 +335,7 @@
 	    request.initialize(requestMsg, bufferManager,
 				dataTierMgr, transactionService, state.sessionTables,
 				workContext, this.prepPlanCache);
+	    request.setExecutor(this.processWorkerPool);
 		request.setResultSetCacheEnabled(this.rsCache != null);
 		request.setAuthorizationValidator(this.authorizationValidator);
 		request.setUserRequestConcurrency(this.getUserRequestSourceConcurrency());

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2011-11-13 13:03:20 UTC (rev 3640)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2011-11-13 13:09:21 UTC (rev 3641)
@@ -28,6 +28,7 @@
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 import org.teiid.adminapi.impl.VDBMetaData;
 import org.teiid.api.exception.query.QueryMetadataException;
@@ -134,6 +135,7 @@
 	private boolean resultSetCacheEnabled = true;
 	private int userRequestConcurrency;
 	private AuthorizationValidator authorizationValidator;
+	private Executor executor;
 
     void initialize(RequestMessage requestMsg,
                               BufferManager bufferManager,
@@ -240,7 +242,7 @@
 					multiSourceModels, workContext, context);
             context.setPlanToProcessConverter(modifier);
         }
-
+        context.setExecutor(this.executor);
         context.setSecurityFunctionEvaluator(this);
         context.setTempTableStore(tempTableStore);
         context.setQueryProcessorFactory(new QueryProcessorFactoryImpl(this.bufferManager, this.processorDataManager, this.capabilitiesFinder, idGenerator, metadata));
@@ -475,4 +477,8 @@
 		}
 	}
 	
+	public void setExecutor(Executor executor) {
+		this.executor = executor;
+	}
+	
 }

Modified: trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java	2011-11-13 13:03:20 UTC (rev 3640)
+++ trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java	2011-11-13 13:09:21 UTC (rev 3641)
@@ -925,6 +925,17 @@
 	public Result evaluateXQuery(SaxonXQueryExpression xquery, List<DerivedColumn> cols, List<?> tuple, RowProcessor processor) 
 	throws BlockedException, TeiidComponentException, TeiidProcessingException {
 		HashMap<String, Object> parameters = new HashMap<String, Object>();
+		Object contextItem = evaluateParameters(cols, tuple, parameters);
+		return XQueryEvaluator.evaluateXQuery(xquery, contextItem, parameters, processor, context);
+	}
+
+	/**
+	 * Evaluate the parameters and return the context item if it exists
+	 */
+	public Object evaluateParameters(List<DerivedColumn> cols, List<?> tuple,
+			HashMap<String, Object> parameters)
+			throws ExpressionEvaluationException, BlockedException,
+			TeiidComponentException {
 		Object contextItem = null;
 		for (DerivedColumn passing : cols) {
 			Object value = this.evaluate(passing.getExpression(), tuple);
@@ -934,7 +945,7 @@
 				parameters.put(passing.getAlias(), value);
 			}
 		}
-		return XQueryEvaluator.evaluateXQuery(xquery, contextItem, parameters, processor, context);
+		return contextItem;
 	}
 
 	private Evaluator.NameValuePair<Object>[] getNameValuePairs(List<?> tuple, List<DerivedColumn> args, boolean xmlNames)

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java	2011-11-13 13:03:20 UTC (rev 3640)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java	2011-11-13 13:09:21 UTC (rev 3641)
@@ -52,9 +52,11 @@
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.core.types.XMLType;
 import org.teiid.query.QueryPlugin;
+import org.teiid.query.eval.Evaluator;
 import org.teiid.query.function.FunctionDescriptor;
 import org.teiid.query.sql.lang.XMLTable;
 import org.teiid.query.sql.lang.XMLTable.XMLColumn;
+import org.teiid.query.xquery.saxon.XQueryEvaluator;
 import org.teiid.query.xquery.saxon.SaxonXQueryExpression.Result;
 import org.teiid.query.xquery.saxon.SaxonXQueryExpression.RowProcessor;
 
@@ -76,6 +78,8 @@
 		typeMapping.put(DataTypeManager.DefaultDataClasses.DOUBLE, BuiltInAtomicType.DOUBLE);
 	}
 	
+	private static RuntimeException EARLY_TERMINATION = new RuntimeException();
+	
 	private XMLTable table;
 	private List<XMLColumn> projectedColumns;
 	
@@ -84,6 +88,8 @@
 	private Item item;
 	
 	private TupleBuffer buffer;
+	private boolean batchAvailable = false;
+	private TeiidRuntimeException asynchException;
 	private int outputRow = 1;
 	private boolean usingOutput;
 	
@@ -92,7 +98,7 @@
 	}
 	
 	@Override
-	public void closeDirect() {
+	public synchronized void closeDirect() {
 		super.closeDirect();
 		if(this.buffer != null) {
 			if (!usingOutput) {
@@ -115,6 +121,8 @@
 		outputRow = 1;
 		usingOutput = false;
 		this.buffer = null;
+		this.batchAvailable = false;
+		this.asynchException = null;
 	}
 	
 	public void setTable(XMLTable table) {
@@ -135,14 +143,23 @@
 	}
 
 	@Override
-	protected TupleBatch nextBatchDirect() throws BlockedException,
+	protected synchronized TupleBatch nextBatchDirect() throws BlockedException,
 			TeiidComponentException, TeiidProcessingException {
 		
-		evaluate();
+		evaluate(false);
 		
 		if (this.table.getXQueryExpression().isStreaming()) {
+			while (!batchAvailable) {
+				try {
+					this.wait();
+				} catch (InterruptedException e) {
+					throw new TeiidRuntimeException(e);
+				}
+			}
+			unwrapException(asynchException);
 			TupleBatch batch = this.buffer.getBatch(outputRow);
 			outputRow = batch.getEndRow() + 1;
+			batchAvailable = hasNextBatch();
 			return batch;
 		}
 		
@@ -164,34 +181,70 @@
 		return pullBatch();
 	}
 
-	private void evaluate() throws TeiidComponentException,
+	private void evaluate(final boolean useFinalBuffer) throws TeiidComponentException,
 			ExpressionEvaluationException, BlockedException,
 			TeiidProcessingException {
-		if (result == null) {
-			if (this.buffer == null && this.table.getXQueryExpression().isStreaming()) {
-				this.buffer = this.getBufferManager().createTupleBuffer(getOutputElements(), getConnectionID(), TupleSourceType.PROCESSOR); 
+		if (result != null || this.buffer != null) {
+			return;
+		}
+		setReferenceValues(this.table);
+		final HashMap<String, Object> parameters = new HashMap<String, Object>();
+		Evaluator eval = getEvaluator(Collections.emptyMap());
+		final Object contextItem = eval.evaluateParameters(this.table.getPassing(), null, parameters);
+
+		if (this.table.getXQueryExpression().isStreaming()) {
+			if (this.buffer == null) {
+				this.buffer = this.getBufferManager().createTupleBuffer(getOutputElements(), getConnectionID(), TupleSourceType.PROCESSOR);
 			}
-			setReferenceValues(this.table);
-			try {
-				result = getEvaluator(Collections.emptyMap()).evaluateXQuery(this.table.getXQueryExpression(), this.table.getPassing(), null, this);
-				if (this.buffer != null) {
-					this.buffer.close();
-					if (!usingOutput) {
-						this.buffer.setForwardOnly(true);
+			Runnable r = new Runnable() {
+				@Override
+				public void run() {
+					try {
+						if (!useFinalBuffer) {
+							buffer.setForwardOnly(true);
+						}
+						XQueryEvaluator.evaluateXQuery(table.getXQueryExpression(), contextItem, parameters, XMLTableNode.this, getContext());
+						buffer.close();
+					} catch (TeiidException e) {
+						asynchException = new TeiidRuntimeException(e);
+					} catch (TeiidRuntimeException e) {
+						asynchException = e;
+					} catch (RuntimeException e) {
+						if (e != EARLY_TERMINATION) {
+							asynchException = new TeiidRuntimeException(e);
+						}
+					} finally {
+						batchAvailable = true;
+						synchronized (XMLTableNode.this) {
+							XMLTableNode.this.notifyAll();
+						}
 					}
 				}
-			} catch (TeiidRuntimeException e) {
-				if (e.getCause() instanceof TeiidComponentException) {
-					throw (TeiidComponentException)e.getCause();
-				}
-				if (e.getCause() instanceof TeiidProcessingException) {
-					throw (TeiidProcessingException)e.getCause();
-				}
-				throw e;
-			}
+			};
+			this.getContext().getExecutor().execute(r);
+			return;
 		}
+		try {
+			result = XQueryEvaluator.evaluateXQuery(this.table.getXQueryExpression(), contextItem, parameters, null, this.getContext());
+		} catch (TeiidRuntimeException e) {
+			unwrapException(e);
+		}
 	}
 
+	private void unwrapException(TeiidRuntimeException e)
+			throws TeiidComponentException, TeiidProcessingException {
+		if (e == null) {
+			return;
+		}
+		if (e.getCause() instanceof TeiidComponentException) {
+			throw (TeiidComponentException)e.getCause();
+		}
+		if (e.getCause() instanceof TeiidProcessingException) {
+			throw (TeiidProcessingException)e.getCause();
+		}
+		throw e;
+	}
+
 	private List<?> processRow() throws ExpressionEvaluationException, BlockedException,
 			TeiidComponentException, TeiidProcessingException {
 		List<Object> tuple = new ArrayList<Object>(projectedColumns.size());
@@ -256,23 +309,35 @@
 	@Override
 	public TupleBuffer getFinalBuffer() throws BlockedException,
 			TeiidComponentException, TeiidProcessingException {
-		evaluate();
+		evaluate(true);
 		usingOutput = true;
     	TupleBuffer finalBuffer = this.buffer;
-    	this.buffer = null;
-		close();
+    	if (!this.table.getXQueryExpression().isStreaming()) {
+			close();
+    	}
 		return finalBuffer;
 	}
 	
 	@Override
-	public void processRow(NodeInfo row) {
+	public synchronized void processRow(NodeInfo row) {
+		if (isClosed()) {
+			throw EARLY_TERMINATION;
+		}
 		this.item = row;
 		rowCount++;
 		try {
 			this.buffer.addTuple(processRow());
+			if (hasNextBatch()) {
+				this.batchAvailable = true;
+				this.notifyAll();
+			}
 		} catch (TeiidException e) {
 			throw new TeiidRuntimeException(e);
 		}
 	}
-		
+
+	private boolean hasNextBatch() {
+		return this.outputRow + this.buffer.getBatchSize() <= rowCount + 1;
+	}
+
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java	2011-11-13 13:03:20 UTC (rev 3640)
+++ trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java	2011-11-13 13:09:21 UTC (rev 3641)
@@ -32,6 +32,7 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.TimeZone;
+import java.util.concurrent.Executor;
 
 import javax.security.auth.Subject;
 
@@ -42,6 +43,7 @@
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.util.ArgCheck;
+import org.teiid.core.util.ExecutorUtils;
 import org.teiid.dqp.internal.process.DQPWorkContext;
 import org.teiid.dqp.internal.process.PreparedPlan;
 import org.teiid.dqp.internal.process.SessionAwareCache;
@@ -135,6 +137,7 @@
 		private TransactionContext transactionContext;
 		private TransactionService transactionService;
 		private SourceHint sourceHint;
+		private Executor executor = ExecutorUtils.getDirectExecutor();
 	}
 	
 	private GlobalState globalState = new GlobalState();
@@ -639,4 +642,12 @@
 		this.globalState.sourceHint = hint;
 	}
 	
+	public Executor getExecutor() {
+		return this.globalState.executor;
+	}
+	
+	public void setExecutor(Executor e) {
+		this.globalState.executor = e;
+	}
+	
 }

Modified: trunk/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java	2011-11-13 13:03:20 UTC (rev 3640)
+++ trunk/test-integration/common/src/test/java/org/teiid/transport/TestJDBCSocketTransport.java	2011-11-13 13:09:21 UTC (rev 3641)
@@ -26,6 +26,7 @@
 
 import java.net.InetSocketAddress;
 import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.Statement;
 import java.util.Properties;
 
@@ -97,6 +98,22 @@
 		assertEquals("<root></root>", s.getResultSet().getString(1));
 	}
 	
+	@Test public void testXmlTableScrollable() throws Exception {
+		Statement s = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
+		assertTrue(s.execute("select * from xmltable('/root/row' passing (select xmlelement(name \"root\", xmlagg(xmlelement(name \"row\", xmlforest(t.name)) order by t.name)) from tables as t, columns as t1) columns \"Name\" string) as x"));
+		ResultSet rs = s.getResultSet();
+		int count = 0;
+		while (rs.next()) {
+			count++;
+		}
+		assertEquals(7812, count);
+		rs.beforeFirst();
+		while (rs.next()) {
+			count--;
+		}
+		assertEquals(0, count);
+	}
+	
 	/**
 	 * Ensures if you start more than the maxActivePlans
 	 * where all the plans take up more than output buffer limit



More information about the teiid-commits mailing list