[teiid-commits] teiid SVN: r4294 - in trunk: engine/src/main/java/org/teiid/dqp/internal/process and 3 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Aug 2 16:42:06 EDT 2012


Author: rareddy
Date: 2012-08-02 16:42:06 -0400 (Thu, 02 Aug 2012)
New Revision: 4294

Added:
   trunk/test-integration/common/src/test/java/org/teiid/jdbc/TestASTQueries.java
Modified:
   trunk/client/src/main/java/org/teiid/client/RequestMessage.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
   trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
   trunk/runtime/src/main/java/org/teiid/runtime/RuntimePlugin.java
   trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties
Log:
TEIID-2062 - Adding way to submit query in object form using the internal Teiid language objects. This lets user to skip the whole JDBC layer and directly submit the query to the engine and also skips the parsing of the SQL from string form.

Modified: trunk/client/src/main/java/org/teiid/client/RequestMessage.java
===================================================================
--- trunk/client/src/main/java/org/teiid/client/RequestMessage.java	2012-08-02 19:18:51 UTC (rev 4293)
+++ trunk/client/src/main/java/org/teiid/client/RequestMessage.java	2012-08-02 20:42:06 UTC (rev 4294)
@@ -313,6 +313,10 @@
 	public void setCommands(String... batchedCommands) {
 		this.commands = batchedCommands;
 	}
+	
+	public Object getQueryCommand() {
+		return null;
+	}
 
 	public void setExecutionPayload(Serializable executionPayload) {
 		this.executionPayload = executionPayload;

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2012-08-02 19:18:51 UTC (rev 4293)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2012-08-02 20:42:06 UTC (rev 4294)
@@ -294,6 +294,9 @@
     }
     
     private Command parseCommand() throws QueryParserException {
+    	if (requestMsg.getQueryCommand() != null) {
+    		return (Command)requestMsg.getQueryCommand();
+    	}
         String[] commands = requestMsg.getCommands();
         ParseInfo parseInfo = createParseInfo(this.requestMsg);
         if (requestMsg.isPreparedStatement() || requestMsg.isCallableStatement() || !requestMsg.isBatchedUpdate()) {

Modified: trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java	2012-08-02 19:18:51 UTC (rev 4293)
+++ trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java	2012-08-02 20:42:06 UTC (rev 4294)
@@ -22,6 +22,7 @@
 
 package org.teiid.runtime;
 
+
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
@@ -31,8 +32,13 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.transaction.RollbackException;
 import javax.transaction.Synchronization;
@@ -43,17 +49,24 @@
 import org.teiid.Replicated;
 import org.teiid.Replicated.ReplicationMode;
 import org.teiid.adminapi.impl.ModelMetaData;
+import org.teiid.adminapi.impl.SessionMetadata;
 import org.teiid.adminapi.impl.VDBMetaData;
 import org.teiid.cache.Cache;
 import org.teiid.cache.CacheConfiguration;
+import org.teiid.cache.CacheConfiguration.Policy;
 import org.teiid.cache.DefaultCacheFactory;
-import org.teiid.cache.CacheConfiguration.Policy;
 import org.teiid.client.DQP;
+import org.teiid.client.RequestMessage;
+import org.teiid.client.ResultsMessage;
 import org.teiid.client.security.ILogon;
+import org.teiid.client.util.ResultsFuture;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.TupleBufferCache;
+import org.teiid.core.BundleUtil.Event;
+import org.teiid.core.types.JDBCSQLTypeInfo;
+import org.teiid.core.TeiidException;
+import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.TeiidRuntimeException;
-import org.teiid.core.BundleUtil.Event;
 import org.teiid.deployers.CompositeVDB;
 import org.teiid.deployers.UDFMetaData;
 import org.teiid.deployers.VDBLifeCycleListener;
@@ -65,6 +78,7 @@
 import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository.ExecutionFactoryProvider;
 import org.teiid.dqp.internal.process.CachedResults;
 import org.teiid.dqp.internal.process.DQPCore;
+import org.teiid.dqp.internal.process.DQPWorkContext;
 import org.teiid.dqp.internal.process.PreparedPlan;
 import org.teiid.dqp.internal.process.SessionAwareCache;
 import org.teiid.dqp.internal.process.TransactionServerImpl;
@@ -89,8 +103,11 @@
 import org.teiid.query.function.SystemFunctionManager;
 import org.teiid.query.metadata.TransformationMetadata;
 import org.teiid.query.metadata.TransformationMetadata.Resource;
+import org.teiid.query.sql.lang.QueryCommand;
+import org.teiid.query.sql.visitor.SQLStringVisitor;
 import org.teiid.query.tempdata.GlobalTableStore;
 import org.teiid.query.tempdata.GlobalTableStoreImpl;
+import org.teiid.query.tempdata.TempTableDataManager;
 import org.teiid.query.validator.ValidatorFailure;
 import org.teiid.query.validator.ValidatorReport;
 import org.teiid.services.AbstractEventDistributorFactoryService;
@@ -290,6 +307,7 @@
 	//to allow teiid to start a request transaction under an existing thread bound transaction
 	protected boolean detectTransactions = true;
 	private Boolean running;
+	private AtomicLong requestIdentifier = new AtomicLong();
 	
 	public EmbeddedServer() {
 
@@ -570,4 +588,211 @@
 		return this.repo;
 	}
 	
+	public class ExecutionResults {
+		private List<String> columnNames;
+		private List<Integer> dataTypes = new ArrayList<Integer>();
+		private List<? extends List<?>> rows;
+		private boolean hasMoreRows = false;
+		private int lastRow;
+		private SessionMetadata session;
+		private long requestId;
+		private DQPWorkContext context;
+		private int index = 0;
+		private int rowsRead = 0;
+		private boolean closed = false;
+		
+		/**
+		 * Advances the cursor and gets the next row of results. If this represents a update, then column name will
+		 * be "update-count"
+		 * @return row; if advanced over the last row of data, TeiidException is thrown.
+		 * @throws TeiidException
+		 */
+		public List<?> next() throws TeiidException {
+			if (hasNext()) {
+				return this.rows.get(this.index++);
+			}
+			throw new TeiidProcessingException(RuntimePlugin.Util.gs(RuntimePlugin.Event.TEIID40098));
+		}
+		
+		/**
+		 * Get the name of the columns
+		 * @return
+		 */
+		public List<String> getColumnNames() {
+			return columnNames;
+		}
+
+		/**
+		 * Get SQL Type of the column - 0 indexed
+		 * @return SQL type from java.sql.Types
+		 */
+		public int getDataType(int index) {
+			return dataTypes.get(index);
+		}
+		
+		/**
+		 * Check to see if the is another row of data available
+		 * @return true if data available; false otherwise
+		 * @throws TeiidException
+		 */
+		public boolean hasNext() throws TeiidException {
+			if ((this.index+this.rowsRead) >= this.lastRow) {
+				if (this.hasMoreRows) {
+					batchNext();
+				}
+				else {
+					return false;
+				}
+			}
+			return true;
+		}
+		
+		private boolean batchNext() throws TeiidException {
+			try {
+				if (this.hasMoreRows) {
+					Future<ResultsMessage> message = dqp.processCursorRequest(requestId, this.lastRow+1, 1024);
+					ResultsMessage rm = message.get();
+					this.columnNames = Arrays.asList(rm.getColumnNames());
+					this.rows = rm.getResultsList(); 
+					this.rowsRead = this.lastRow;
+					this.lastRow = rm.getLastRow();
+					this.index = 0;
+				    if (rm.getFinalRow() == -1 || rm.getLastRow() < rm.getFinalRow()) {
+				    	this.hasMoreRows = true;
+				    }
+				    else {
+				    	this.hasMoreRows = false;
+				    }
+				    return true;
+				}
+				return false;
+			} catch (InterruptedException e) {
+				throw new TeiidProcessingException(e);
+			} catch (ExecutionException e) {
+				throw new TeiidProcessingException(e);
+			}
+		}
+		
+		public void close() throws TeiidException {
+			if (!this.closed) {
+				try {
+			        ResultsFuture<?> response = dqp.closeRequest(this.requestId);
+			        response.get();
+			        this.closed = true;
+			        
+					context.runInContext(new Callable<Void>() {
+						@Override
+						public Void call() throws Exception {
+							dqp.terminateSession(session.getSessionId());
+							return null;
+						}
+					});
+				} catch (Throwable e) {
+					throw new TeiidException(e);
+				}			
+			}
+		}
+	}
+	
+	/**
+	 * Internal use only. Subject to change in next versions.
+	 * 
+	 * Execute the query directly in the engine by submitting the AST form of the SQL and omit the whole JDBC 
+	 * layer. The returned object contain the results, which is designed like java.sql.ResultSet.
+	 * 
+	 * @param vdbName
+	 * @param version
+	 * @param command
+	 * @param timoutInMilli
+	 * @return
+	 * @throws TeiidException
+	 */
+	public ExecutionResults executeQuery(final String vdbName, final int version, final QueryCommand command, final long timoutInMilli) throws TeiidException {
+		String user = "embedded"; //$NON-NLS-1$
+		
+        VDBMetaData vdb = this.repo.getLiveVDB(vdbName, version);
+        if (vdb == null) {
+        	throw new TeiidException(RuntimePlugin.Util.getString("wrong_vdb"));//$NON-NLS-1$
+        }
+        
+        final SessionMetadata session = TempTableDataManager.createTemporarySession(user, "embedded", vdb); //$NON-NLS-1$
+
+		final long requestID =  this.requestIdentifier.getAndIncrement();
+		
+		final DQPWorkContext context = new DQPWorkContext();
+		context.setUseCallingThread(true);
+		context.setSession(session);
+		
+		try {
+			return context.runInContext(new Callable<ExecutionResults>() {
+				@Override
+				public ExecutionResults call() throws Exception {
+					
+					ExecutionResults results = new ExecutionResults();
+					results.session = session;
+					results.requestId = requestID;
+					results.context = context;
+					
+					RequestMessage request = new RequestMessage() {
+						@Override
+						public QueryCommand getQueryCommand() {
+							return command;
+						}	
+						
+						@Override
+						public String[] getCommands() {
+							return new String[] {buildStringForm()};
+						}
+						
+						@Override
+						public String getCommandString() {
+							return buildStringForm();
+						}	
+						
+						private String buildStringForm() {
+							return SQLStringVisitor.getSQLString(getQueryCommand());
+						}						
+					};
+					request.setExecutionId(requestID);
+					request.setRowLimit(0);
+					Future<ResultsMessage> message = dqp.executeRequest(requestID, request);
+					ResultsMessage rm = null;
+					if (timoutInMilli < 0) {
+						rm = message.get();
+					} else {
+						rm = message.get(timoutInMilli, TimeUnit.MILLISECONDS);
+					}
+			        if (rm.getException() != null) {
+			             throw rm.getException();
+			        }
+			        
+			        if (rm.isUpdateResult()) {
+			        	results.columnNames = Arrays.asList("update-count");//$NON-NLS-1$
+			        	results.dataTypes.add(JDBCSQLTypeInfo.getSQLType("integer"));//$NON-NLS-1$
+			        	results.rows = rm.getResultsList(); 
+			        	results.lastRow = 1;
+			        	results.hasMoreRows = false;
+			        	results.close();
+			        }
+			        else {
+			        	results.columnNames = Arrays.asList(rm.getColumnNames());
+			        	for (String type:rm.getDataTypes()) {
+			        		results.dataTypes.add(JDBCSQLTypeInfo.getSQLType(type));
+			        	}
+			        	results.rows = rm.getResultsList(); 	
+			        	results.lastRow = rm.getLastRow();
+				        if (rm.getFinalRow() == -1 || rm.getLastRow() < rm.getFinalRow()) {
+				        	results.hasMoreRows = true;
+				        }
+				        else {
+				        	results.close();
+				        }
+			        }
+					return results;
+				}
+			});
+		} catch (Throwable t) {
+			throw new TeiidException(t);
+		}
+	}
 }

Modified: trunk/runtime/src/main/java/org/teiid/runtime/RuntimePlugin.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/RuntimePlugin.java	2012-08-02 19:18:51 UTC (rev 4293)
+++ trunk/runtime/src/main/java/org/teiid/runtime/RuntimePlugin.java	2012-08-02 20:42:06 UTC (rev 4294)
@@ -109,5 +109,6 @@
     	TEIID40095, //deployment failed
     	TEIID40096, //vdb deploy timeout
     	TEIID40097, //vdb finish timeout  
+    	TEIID40098,
     }
 }

Modified: trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties
===================================================================
--- trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties	2012-08-02 19:18:51 UTC (rev 4293)
+++ trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties	2012-08-02 20:42:06 UTC (rev 4294)
@@ -99,3 +99,4 @@
 
 TEIID40096=Waited {0} for VDB {1}.{2} to be deployed, but it never was.  Please check to see if the deployment is missing or is in error.
 TEIID40097=Waited {0} for VDB {1}.{2} to be ACTIVE, but it never was.  Please check it's sources - {3}.
+TEIID40098=Reached end of results; use hasNext() call to check if there are more results before calling next()
\ No newline at end of file

Added: trunk/test-integration/common/src/test/java/org/teiid/jdbc/TestASTQueries.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/jdbc/TestASTQueries.java	                        (rev 0)
+++ trunk/test-integration/common/src/test/java/org/teiid/jdbc/TestASTQueries.java	2012-08-02 20:42:06 UTC (rev 4294)
@@ -0,0 +1,101 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package org.teiid.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.teiid.adminapi.Model;
+import org.teiid.adminapi.impl.ModelMetaData;
+import org.teiid.adminapi.impl.SourceMappingMetadata;
+import org.teiid.query.sql.lang.From;
+import org.teiid.query.sql.lang.Query;
+import org.teiid.query.sql.lang.Select;
+import org.teiid.query.sql.symbol.ElementSymbol;
+import org.teiid.query.sql.symbol.GroupSymbol;
+import org.teiid.runtime.EmbeddedConfiguration;
+import org.teiid.runtime.EmbeddedServer;
+import org.teiid.runtime.EmbeddedServer.ExecutionResults;
+import org.teiid.translator.loopback.LoopbackExecutionFactory;
+
+ at SuppressWarnings("nls")
+public class TestASTQueries {
+
+	private static EmbeddedServer server;
+	
+	@BeforeClass public static void setUp() throws Exception {
+    	server = new EmbeddedServer();
+    	server.start(new EmbeddedConfiguration());
+    	LoopbackExecutionFactory loopy = new LoopbackExecutionFactory();
+    	loopy.setRowCount(10);
+    	server.addTranslator(loopy);
+    	
+    	String DDL = "CREATE FOREIGN TABLE G1 (e1 string, e2 integer);";
+    	ModelMetaData model = new ModelMetaData();
+    	model.setName("PM1");
+    	model.setModelType(Model.Type.PHYSICAL);
+    	model.setSchemaSourceType("DDL");
+    	model.setSchemaText(DDL);
+    	SourceMappingMetadata sm = new SourceMappingMetadata();
+    	sm.setName("loopy");
+    	sm.setTranslatorName("loopback");
+    	model.addSourceMapping(sm);
+    	server.deployVDB("test", model);
+    }
+	
+	@AfterClass public static void tearDown() throws Exception {
+		server.stop();
+	}
+
+	@Test public void testAST() throws Exception {
+		ExecutionResults rs = server.executeQuery("test", 1, sampleQuery(), -1);
+		assertNotNull(rs);
+		int count = 0;
+		while (rs.hasNext()) {
+			rs.next();
+			count++;
+		}
+		assertEquals(10, count);
+		rs.close();
+	}
+	
+    private Query sampleQuery() {
+        List<ElementSymbol> symbols = new ArrayList<ElementSymbol>();
+        symbols.add(new ElementSymbol("e1"));  //$NON-NLS-1$
+        symbols.add(new ElementSymbol("e2"));  //$NON-NLS-1$
+        Select select = new Select(symbols);
+           
+        From from = new From();
+        from.addGroup(new GroupSymbol("G1")); //$NON-NLS-1$
+        
+        Query query = new Query();
+        query.setSelect(select);
+        query.setFrom(from);
+        return query;
+    }	
+}


Property changes on: trunk/test-integration/common/src/test/java/org/teiid/jdbc/TestASTQueries.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain



More information about the teiid-commits mailing list