[teiid-commits] teiid SVN: r3600 - in trunk: client/src/main/java/org/teiid/jdbc and 3 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Nov 1 16:17:01 EDT 2011


Author: shawkins
Date: 2011-11-01 16:16:59 -0400 (Tue, 01 Nov 2011)
New Revision: 3600

Added:
   trunk/client/src/main/java/org/teiid/jdbc/NonBlockingRowProcessor.java
   trunk/client/src/main/java/org/teiid/jdbc/StatementCallback.java
   trunk/client/src/main/java/org/teiid/jdbc/TeiidPreparedStatement.java
Modified:
   trunk/build/kits/jboss-container/teiid-releasenotes.html
   trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java
   trunk/client/src/main/java/org/teiid/jdbc/StatementImpl.java
   trunk/client/src/main/java/org/teiid/jdbc/TeiidStatement.java
   trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-connection.xml
   trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-extensions.xml
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java
Log:
TEIID-1800 adding a statement callback for non-blocking processing of results.

Modified: trunk/build/kits/jboss-container/teiid-releasenotes.html
===================================================================
--- trunk/build/kits/jboss-container/teiid-releasenotes.html	2011-11-01 17:22:07 UTC (rev 3599)
+++ trunk/build/kits/jboss-container/teiid-releasenotes.html	2011-11-01 20:16:59 UTC (rev 3600)
@@ -36,6 +36,7 @@
   <LI><B>Server-side Query Timeouts</B> - default query timeouts can be configured at both the VDB (via the query-timeout VDB property) and entire server (via the teiid-jboss-beans.xml queryTimeout property).
   <LI><B>Native Queries</B> - added the ability to specify native query SQL for JDBC physical tables and stored procedures via extension metadata.
   <LI><B>View removal hint</B> - the NO_UNNEST hint now also applies to from clause views and subqueries.  It will instruct the planner to not perform view flattening.
+  <LI><B>Non-blocking statement execution</B> - Teiid JDBC extensions TeiidStatement and TeiidPreparedStatement can be used to submit queries against embedded connections with a callback to process results in a non-blocking manner.
 </UL>
 
 <h2><a name="Compatibility">Compatibility Issues</a></h2>

Added: trunk/client/src/main/java/org/teiid/jdbc/NonBlockingRowProcessor.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/NonBlockingRowProcessor.java	                        (rev 0)
+++ trunk/client/src/main/java/org/teiid/jdbc/NonBlockingRowProcessor.java	2011-11-01 20:16:59 UTC (rev 3600)
@@ -0,0 +1,117 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+ 
+package org.teiid.jdbc;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.teiid.client.util.ResultsFuture;
+
+/**
+ * Handles the future processing logic and makes the appropriate calls to the callback
+ */
+public class NonBlockingRowProcessor implements
+		ResultsFuture.CompletionListener<Boolean> {
+
+	private static Logger logger = Logger.getLogger(NonBlockingRowProcessor.class.getName());
+	private StatementImpl stmt;
+	private StatementCallback callback;
+
+	public NonBlockingRowProcessor(StatementImpl stmt, StatementCallback callback) {
+		this.stmt = stmt;
+		this.callback = callback;
+	}
+
+	@Override
+	public void onCompletion(ResultsFuture<Boolean> future) {
+		try {
+			boolean hasResultSet = future.get(); 
+			if (!hasResultSet) {
+				callback.onComplete(stmt);
+				return;
+			}
+			final ResultSetImpl resultSet = stmt.getResultSet();
+			Runnable rowProcessor = new Runnable() {
+				@Override
+				public void run() {
+					while (true) {
+						try {
+							ResultsFuture<Boolean> hasNext = resultSet.submitNext();
+							synchronized (hasNext) {
+								if (!hasNext.isDone()) {
+									hasNext.addCompletionListener(new ResultsFuture.CompletionListener<Boolean>() {
+	
+												@Override
+												public void onCompletion(
+														ResultsFuture<Boolean> f) {
+													if (processRow(f)) {
+														run();
+													}
+												}
+											});
+									break; // will be resumed by onCompletion above
+								}
+							}
+							if (!processRow(hasNext)) {
+								break;
+							}
+						} catch (Exception e) {
+							onException(e);
+						}
+					}
+				}
+			};
+			rowProcessor.run();
+		} catch (Exception e) {
+			onException(e);
+		}
+	}
+
+	/**
+	 * return true to continue processing
+	 */
+	boolean processRow(ResultsFuture<Boolean> hasNext) {
+		try {
+			if (!hasNext.get()) {
+				callback.onComplete(stmt);
+				return false;
+			}
+
+			callback.onRow(stmt, stmt.getResultSet());
+
+			return true;
+		} catch (Exception e) {
+			onException(e);
+			return false;
+		}
+	}
+
+	private void onException(Exception e) {
+		try {
+			callback.onException(stmt, e);
+		} catch (Exception e1) {
+			logger.log(Level.WARNING, "Unhandled exception from StatementCallback", e); //$NON-NLS-1$
+		}
+	}
+
+}


Property changes on: trunk/client/src/main/java/org/teiid/jdbc/NonBlockingRowProcessor.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java	2011-11-01 17:22:07 UTC (rev 3599)
+++ trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java	2011-11-01 20:16:59 UTC (rev 3600)
@@ -33,7 +33,6 @@
 import java.sql.Clob;
 import java.sql.NClob;
 import java.sql.ParameterMetaData;
-import java.sql.PreparedStatement;
 import java.sql.Ref;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
@@ -80,7 +79,7 @@
  * preparedstatement object.</p>
  */
 
-public class PreparedStatementImpl extends StatementImpl implements PreparedStatement {
+public class PreparedStatementImpl extends StatementImpl implements TeiidPreparedStatement {
     // sql, which this prepared statement is operating on
     protected String prepareSql;
 
@@ -174,6 +173,12 @@
     }
     
     @Override
+    public void submitExecute(String sql, StatementCallback callback) throws TeiidSQLException {
+    	String msg = JDBCPlugin.Util.getString("JDBC.Method_not_supported"); //$NON-NLS-1$
+        throw new TeiidSQLException(msg);
+    }
+    
+    @Override
     public ResultSet executeQuery(String sql) throws SQLException {
     	String msg = JDBCPlugin.Util.getString("JDBC.Method_not_supported"); //$NON-NLS-1$
         throw new TeiidSQLException(msg);
@@ -191,6 +196,12 @@
         throw new TeiidSQLException(msg);
     }
     
+    @Override
+    public void submitExecute(StatementCallback callback) throws SQLException {
+    	NonBlockingRowProcessor processor = new NonBlockingRowProcessor(this, callback);
+    	submitExecute(ResultsMode.EITHER).addCompletionListener(processor);
+    }
+    
     public ResultsFuture<Boolean> submitExecute(ResultsMode mode) throws SQLException {
         return executeSql(new String[] {this.prepareSql}, false, mode, false);
     }

Added: trunk/client/src/main/java/org/teiid/jdbc/StatementCallback.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/StatementCallback.java	                        (rev 0)
+++ trunk/client/src/main/java/org/teiid/jdbc/StatementCallback.java	2011-11-01 20:16:59 UTC (rev 3600)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+ 
+package org.teiid.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+/**
+ * A callback for non-blocking statement result processing.
+ * {@link Statement#close()} must still be called to release
+ * statement resources.
+ * 
+ * Statement methods, such as cancel, are perfectly valid
+ * even when using a callback.
+ */
+public interface StatementCallback {
+	
+	/**
+	 * Process the current row of the {@link ResultSet}.
+	 * Any call that retrieves non-lob values from the current row
+	 * will be performed without blocking on more data from sources.
+	 * Calls outside of the current row, such as next(), may block.
+	 * @param rs
+	 * @throws Exception
+	 */
+	void onRow(Statement s, ResultSet rs) throws Exception;
+
+	/**
+	 * Called when an exception occurs.  No further rows will
+	 * be processed by this callback.
+	 * @param e
+	 */
+	void onException(Statement s, Exception e) throws Exception;
+
+	/**
+	 * Called when processing has completed normally.
+	 * @param rs
+	 */
+	void onComplete(Statement s) throws Exception;
+
+}


Property changes on: trunk/client/src/main/java/org/teiid/jdbc/StatementCallback.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/client/src/main/java/org/teiid/jdbc/StatementImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/StatementImpl.java	2011-11-01 17:22:07 UTC (rev 3599)
+++ trunk/client/src/main/java/org/teiid/jdbc/StatementImpl.java	2011-11-01 20:16:59 UTC (rev 3600)
@@ -307,6 +307,12 @@
         }
     }
     
+    @Override
+    public void submitExecute(String sql, StatementCallback callback) throws SQLException {
+    	NonBlockingRowProcessor processor = new NonBlockingRowProcessor(this, callback);
+    	submitExecute(sql).addCompletionListener(processor);
+    }
+    
     public ResultsFuture<Boolean> submitExecute(String sql) throws SQLException {
     	return executeSql(new String[] {sql}, false, ResultsMode.EITHER, false);
     }

Added: trunk/client/src/main/java/org/teiid/jdbc/TeiidPreparedStatement.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/TeiidPreparedStatement.java	                        (rev 0)
+++ trunk/client/src/main/java/org/teiid/jdbc/TeiidPreparedStatement.java	2011-11-01 20:16:59 UTC (rev 3600)
@@ -0,0 +1,42 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.jdbc;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * This interface provides methods in 
+ * addition to the standard JDBC methods. 
+ */
+public interface TeiidPreparedStatement extends PreparedStatement {
+	
+    /**
+     * Execute the given statement using a non-blocking callback.
+     * This method is only valid for use with embedded connections.
+     * @param callback
+     * @throws SQLException 
+     */
+    void submitExecute(StatementCallback callback) throws SQLException;
+
+}


Property changes on: trunk/client/src/main/java/org/teiid/jdbc/TeiidPreparedStatement.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/client/src/main/java/org/teiid/jdbc/TeiidStatement.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/TeiidStatement.java	2011-11-01 17:22:07 UTC (rev 3599)
+++ trunk/client/src/main/java/org/teiid/jdbc/TeiidStatement.java	2011-11-01 20:16:59 UTC (rev 3600)
@@ -23,6 +23,7 @@
 package org.teiid.jdbc;
 
 import java.io.Serializable;
+import java.sql.SQLException;
 import java.util.Collection;
 
 import org.teiid.client.plan.Annotation;
@@ -99,4 +100,12 @@
      * @since 4.2
      */
     void setPayload(Serializable payload);
+    
+    /**
+     * Execute the given statement using a non-blocking callback.
+     * This method is only valid for use with embedded connections.
+     * @param callback
+     * @throws SQLException 
+     */
+    void submitExecute(String sql, StatementCallback callback) throws SQLException;
 }

Modified: trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-connection.xml
===================================================================
--- trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-connection.xml	2011-11-01 17:22:07 UTC (rev 3599)
+++ trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-connection.xml	2011-11-01 20:16:59 UTC (rev 3600)
@@ -519,7 +519,7 @@
            there is a way to make connections that by-pass making a socket based JDBC connection.
            You can use slightly modified data source configuration to make a "local" connection, where the JDBC API will lookup a local Teiid runtime in the same VM.</para>
            <warning><para>Since DataSources start before before Teiid VDBs are deployed, leave the min pool size of local connections as the default of 0.  Otherwise errors will occur on the startup of the Teiid DataSource.</para></warning>
-           <note><para>Be default local connections use their calling thread to perform processing operations rather than using an engine thread while the calling thread is blocked.
+           <note><para>By default local connections use their calling thread to perform processing operations rather than using an engine thread while the calling thread is blocked.
             To disable this behavior set the connection property useCallingThreads=false.</para></note>
            <example>
            <title>Local data source</title>

Modified: trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-extensions.xml
===================================================================
--- trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-extensions.xml	2011-11-01 17:22:07 UTC (rev 3599)
+++ trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-extensions.xml	2011-11-01 20:16:59 UTC (rev 3600)
@@ -389,4 +389,37 @@
         </section>
        </section>
        
+	<section>
+		<title>Non-blocking Statement Execution</title>	
+		<para>JDBC query execution can indefinitely block the calling thread when a statement is executed or a resultset is being iterated.  
+		In some situations you may wish to have your calling threads held in these blocked states.  When using embedded connections, you may optionally use the 
+		<code>org.teiid.jdbc.TeiidStatement</code> and <code>org.teiid.jdbc.TeiidPreparedStatement</code> interfaces to execute queries with a callback <code>org.teiid.jdbc.StatementCallback</code> that will be notified
+		of statement events, such as an available row, an exception, or completion.  Your calling thread will be free to perform other work.  The callback will be executed by an engine processing thread as needed.  If your results processing is itself blocking and you want query processing to be concurrent with results processing, then your callback should implement onRow
+		 handling in a multi-threaded manner to allow the engine thread to continue.  
+		</para>
+        <example>
+          <title>Non-blocking Prepared Statement Execution</title>
+          <programlisting>PreparedStatemnt stmt = connection.prepareStatement(sql);
+TeiidPreparedStatement tStmt = stmt.unwrap(TeiidPreparedStatement.class);
+tStmt.submitExecute(new StatementCallback() {
+    @Override
+    public void onRow(Statement s, ResultSet rs) {
+	    //any logic that accesses the current row ...
+        System.out.println(rs.getString(1));
+    }
+			
+    @Override
+    public void onException(Statement s, Exception e) throws Exception {
+        s.close();	
+    }
+			
+    @Override
+    public void onComplete(Statement s) throws Exception {
+        s.close();
+    }
+);</programlisting>
+		</example>
+		<note><para>The non-blocking logic is limited to statement execution only.  Other JDBC operations, such as connection creation or batched executions do not yet have non-blocking options.</para></note>
+	</section>
+ 
 </chapter>
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-11-01 17:22:07 UTC (rev 3599)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-11-01 20:16:59 UTC (rev 3600)
@@ -181,10 +181,6 @@
 		}
 	}
 	
-	public static interface ContextProvider {
-		DQPWorkContext getContext(String vdbName, int vdbVersion);
-	}
-	
 	private ThreadReuseExecutor processWorkerPool;
     
     // Resources
@@ -364,7 +360,7 @@
 				}
 			}, timeout));
         }
-        boolean runInThread = DQPWorkContext.getWorkContext().useCallingThread() || requestMsg.isSync();
+        boolean runInThread = requestMsg.isSync();
         synchronized (waitingPlans) {
 			if (runInThread || currentlyActivePlans <= maxActivePlans) {
 				startActivePlan(workItem, !runInThread);

Modified: trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java	2011-11-01 17:22:07 UTC (rev 3599)
+++ trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java	2011-11-01 20:16:59 UTC (rev 3600)
@@ -25,13 +25,18 @@
 import static org.junit.Assert.*;
 
 import java.io.IOException;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.teiid.client.util.ResultsFuture;
 import org.teiid.core.util.UnitTestUtil;
 import org.teiid.jdbc.AbstractMMQueryTestCase;
 import org.teiid.jdbc.FakeServer;
+import org.teiid.jdbc.StatementCallback;
+import org.teiid.jdbc.TeiidStatement;
 import org.teiid.jdbc.TestMMDatabaseMetaData;
 
 
@@ -182,4 +187,28 @@
 	@Test(expected=SQLException.class) public void testLogMsg1() throws Exception {
 		execute("call logMsg(level=>'foo', context=>'org.teiid.foo', msg=>'hello world')"); //$NON-NLS-1$
 	}
+	
+	@Test public void testAsynch() throws Exception {
+		Statement s = this.internalConnection.createStatement();
+		TeiidStatement ts = s.unwrap(TeiidStatement.class);
+		final ResultsFuture<Integer> result = new ResultsFuture<Integer>(); 
+		ts.submitExecute("select * from SYS.Schemas", new StatementCallback() {
+			int rowCount;
+			@Override
+			public void onRow(Statement s, ResultSet rs) {
+				rowCount++;
+			}
+			
+			@Override
+			public void onException(Statement s, Exception e) {
+				result.getResultsReceiver().exceptionOccurred(e);
+			}
+			
+			@Override
+			public void onComplete(Statement s) {
+				result.getResultsReceiver().receiveResults(rowCount);
+			}
+		});
+		assertEquals(4, result.get().intValue());
+	}
 }



More information about the teiid-commits mailing list