Author: shawkins
Date: 2011-11-01 16:16:59 -0400 (Tue, 01 Nov 2011)
New Revision: 3600
Added:
trunk/client/src/main/java/org/teiid/jdbc/NonBlockingRowProcessor.java
trunk/client/src/main/java/org/teiid/jdbc/StatementCallback.java
trunk/client/src/main/java/org/teiid/jdbc/TeiidPreparedStatement.java
Modified:
trunk/build/kits/jboss-container/teiid-releasenotes.html
trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java
trunk/client/src/main/java/org/teiid/jdbc/StatementImpl.java
trunk/client/src/main/java/org/teiid/jdbc/TeiidStatement.java
trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-connection.xml
trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-extensions.xml
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java
Log:
TEIID-1800 adding a statement callback for non-blocking processing of results.
Modified: trunk/build/kits/jboss-container/teiid-releasenotes.html
===================================================================
--- trunk/build/kits/jboss-container/teiid-releasenotes.html 2011-11-01 17:22:07 UTC (rev
3599)
+++ trunk/build/kits/jboss-container/teiid-releasenotes.html 2011-11-01 20:16:59 UTC (rev
3600)
@@ -36,6 +36,7 @@
<LI><B>Server-side Query Timeouts</B> - default query timeouts can be
configured at both the VDB (via the query-timeout VDB property) and entire server (via the
teiid-jboss-beans.xml queryTimeout property).
<LI><B>Native Queries</B> - added the ability to specify native query
SQL for JDBC physical tables and stored procedures via extension metadata.
<LI><B>View removal hint</B> - the NO_UNNEST hint now also applies to
from clause views and subqueries. It will instruct the planner to not perform view
flattening.
+ <LI><B>Non-blocking statement execution</B> - Teiid JDBC extensions
TeiidStatement and TeiidPreparedStatement can be used to submit queries against embedded
connections with a callback to process results in a non-blocking manner.
</UL>
<h2><a name="Compatibility">Compatibility
Issues</a></h2>
Added: trunk/client/src/main/java/org/teiid/jdbc/NonBlockingRowProcessor.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/NonBlockingRowProcessor.java
(rev 0)
+++ trunk/client/src/main/java/org/teiid/jdbc/NonBlockingRowProcessor.java 2011-11-01
20:16:59 UTC (rev 3600)
@@ -0,0 +1,117 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.jdbc;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.teiid.client.util.ResultsFuture;
+
+/**
+ * Handles the future processing logic and makes the appropriate calls to the callback
+ */
+public class NonBlockingRowProcessor implements
+ ResultsFuture.CompletionListener<Boolean> {
+
+ private static Logger logger =
Logger.getLogger(NonBlockingRowProcessor.class.getName());
+ private StatementImpl stmt;
+ private StatementCallback callback;
+
+ public NonBlockingRowProcessor(StatementImpl stmt, StatementCallback callback) {
+ this.stmt = stmt;
+ this.callback = callback;
+ }
+
+ @Override
+ public void onCompletion(ResultsFuture<Boolean> future) {
+ try {
+ boolean hasResultSet = future.get();
+ if (!hasResultSet) {
+ callback.onComplete(stmt);
+ return;
+ }
+ final ResultSetImpl resultSet = stmt.getResultSet();
+ Runnable rowProcessor = new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ ResultsFuture<Boolean> hasNext = resultSet.submitNext();
+ synchronized (hasNext) {
+ if (!hasNext.isDone()) {
+ hasNext.addCompletionListener(new
ResultsFuture.CompletionListener<Boolean>() {
+
+ @Override
+ public void onCompletion(
+ ResultsFuture<Boolean> f) {
+ if (processRow(f)) {
+ run();
+ }
+ }
+ });
+ break; // will be resumed by onCompletion above
+ }
+ }
+ if (!processRow(hasNext)) {
+ break;
+ }
+ } catch (Exception e) {
+ onException(e);
+ }
+ }
+ }
+ };
+ rowProcessor.run();
+ } catch (Exception e) {
+ onException(e);
+ }
+ }
+
+ /**
+ * return true to continue processing
+ */
+ boolean processRow(ResultsFuture<Boolean> hasNext) {
+ try {
+ if (!hasNext.get()) {
+ callback.onComplete(stmt);
+ return false;
+ }
+
+ callback.onRow(stmt, stmt.getResultSet());
+
+ return true;
+ } catch (Exception e) {
+ onException(e);
+ return false;
+ }
+ }
+
+ private void onException(Exception e) {
+ try {
+ callback.onException(stmt, e);
+ } catch (Exception e1) {
+ logger.log(Level.WARNING, "Unhandled exception from StatementCallback", e);
//$NON-NLS-1$
+ }
+ }
+
+}
Property changes on:
trunk/client/src/main/java/org/teiid/jdbc/NonBlockingRowProcessor.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java 2011-11-01
17:22:07 UTC (rev 3599)
+++ trunk/client/src/main/java/org/teiid/jdbc/PreparedStatementImpl.java 2011-11-01
20:16:59 UTC (rev 3600)
@@ -33,7 +33,6 @@
import java.sql.Clob;
import java.sql.NClob;
import java.sql.ParameterMetaData;
-import java.sql.PreparedStatement;
import java.sql.Ref;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -80,7 +79,7 @@
* preparedstatement object.</p>
*/
-public class PreparedStatementImpl extends StatementImpl implements PreparedStatement {
+public class PreparedStatementImpl extends StatementImpl implements
TeiidPreparedStatement {
// sql, which this prepared statement is operating on
protected String prepareSql;
@@ -174,6 +173,12 @@
}
@Override
+ public void submitExecute(String sql, StatementCallback callback) throws
TeiidSQLException {
+ String msg = JDBCPlugin.Util.getString("JDBC.Method_not_supported");
//$NON-NLS-1$
+ throw new TeiidSQLException(msg);
+ }
+
+ @Override
public ResultSet executeQuery(String sql) throws SQLException {
String msg = JDBCPlugin.Util.getString("JDBC.Method_not_supported");
//$NON-NLS-1$
throw new TeiidSQLException(msg);
@@ -191,6 +196,12 @@
throw new TeiidSQLException(msg);
}
+ @Override
+ public void submitExecute(StatementCallback callback) throws SQLException {
+ NonBlockingRowProcessor processor = new NonBlockingRowProcessor(this, callback);
+ submitExecute(ResultsMode.EITHER).addCompletionListener(processor);
+ }
+
public ResultsFuture<Boolean> submitExecute(ResultsMode mode) throws
SQLException {
return executeSql(new String[] {this.prepareSql}, false, mode, false);
}
Added: trunk/client/src/main/java/org/teiid/jdbc/StatementCallback.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/StatementCallback.java
(rev 0)
+++ trunk/client/src/main/java/org/teiid/jdbc/StatementCallback.java 2011-11-01 20:16:59
UTC (rev 3600)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+/**
+ * A callback for non-blocking statement result processing.
+ * {@link Statement#close()} must still be called to release
+ * statement resources.
+ *
+ * Statement methods, such as cancel, are perfectly valid
+ * even when using a callback.
+ */
+public interface StatementCallback {
+
+ /**
+ * Process the current row of the {@link ResultSet}.
+ * Any call that retrieves non-lob values from the current row
+ * will be performed without blocking on more data from sources.
+ * Calls outside of the current row, such as next(), may block.
+ * @param rs
+ * @throws Exception
+ */
+ void onRow(Statement s, ResultSet rs) throws Exception;
+
+ /**
+ * Called when an exception occurs. No further rows will
+ * be processed by this callback.
+ * @param e
+ */
+ void onException(Statement s, Exception e) throws Exception;
+
+ /**
+ * Called when processing has completed normally.
+ * @param rs
+ */
+ void onComplete(Statement s) throws Exception;
+
+}
Property changes on: trunk/client/src/main/java/org/teiid/jdbc/StatementCallback.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/client/src/main/java/org/teiid/jdbc/StatementImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/StatementImpl.java 2011-11-01 17:22:07 UTC
(rev 3599)
+++ trunk/client/src/main/java/org/teiid/jdbc/StatementImpl.java 2011-11-01 20:16:59 UTC
(rev 3600)
@@ -307,6 +307,12 @@
}
}
+ @Override
+ public void submitExecute(String sql, StatementCallback callback) throws SQLException
{
+ NonBlockingRowProcessor processor = new NonBlockingRowProcessor(this, callback);
+ submitExecute(sql).addCompletionListener(processor);
+ }
+
public ResultsFuture<Boolean> submitExecute(String sql) throws SQLException {
return executeSql(new String[] {sql}, false, ResultsMode.EITHER, false);
}
Added: trunk/client/src/main/java/org/teiid/jdbc/TeiidPreparedStatement.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/TeiidPreparedStatement.java
(rev 0)
+++ trunk/client/src/main/java/org/teiid/jdbc/TeiidPreparedStatement.java 2011-11-01
20:16:59 UTC (rev 3600)
@@ -0,0 +1,42 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.jdbc;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * This interface provides methods in
+ * addition to the standard JDBC methods.
+ */
+public interface TeiidPreparedStatement extends PreparedStatement {
+
+ /**
+ * Execute the given statement using a non-blocking callback.
+ * This method is only valid for use with embedded connections.
+ * @param callback
+ * @throws SQLException
+ */
+ void submitExecute(StatementCallback callback) throws SQLException;
+
+}
Property changes on:
trunk/client/src/main/java/org/teiid/jdbc/TeiidPreparedStatement.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/client/src/main/java/org/teiid/jdbc/TeiidStatement.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/TeiidStatement.java 2011-11-01 17:22:07 UTC
(rev 3599)
+++ trunk/client/src/main/java/org/teiid/jdbc/TeiidStatement.java 2011-11-01 20:16:59 UTC
(rev 3600)
@@ -23,6 +23,7 @@
package org.teiid.jdbc;
import java.io.Serializable;
+import java.sql.SQLException;
import java.util.Collection;
import org.teiid.client.plan.Annotation;
@@ -99,4 +100,12 @@
* @since 4.2
*/
void setPayload(Serializable payload);
+
+ /**
+ * Execute the given statement using a non-blocking callback.
+ * This method is only valid for use with embedded connections.
+ * @param callback
+ * @throws SQLException
+ */
+ void submitExecute(String sql, StatementCallback callback) throws SQLException;
}
Modified:
trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-connection.xml
===================================================================
---
trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-connection.xml 2011-11-01
17:22:07 UTC (rev 3599)
+++
trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-connection.xml 2011-11-01
20:16:59 UTC (rev 3600)
@@ -519,7 +519,7 @@
there is a way to make connections that by-pass making a socket based JDBC
connection.
You can use slightly modified data source configuration to make a
"local" connection, where the JDBC API will lookup a local Teiid runtime in the
same VM.</para>
<warning><para>Since DataSources start before before Teiid VDBs
are deployed, leave the min pool size of local connections as the default of 0. Otherwise
errors will occur on the startup of the Teiid DataSource.</para></warning>
- <note><para>Be default local connections use their calling thread
to perform processing operations rather than using an engine thread while the calling
thread is blocked.
+ <note><para>By default local connections use their calling thread
to perform processing operations rather than using an engine thread while the calling
thread is blocked.
To disable this behavior set the connection property
useCallingThreads=false.</para></note>
<example>
<title>Local data source</title>
Modified:
trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-extensions.xml
===================================================================
---
trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-extensions.xml 2011-11-01
17:22:07 UTC (rev 3599)
+++
trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-extensions.xml 2011-11-01
20:16:59 UTC (rev 3600)
@@ -389,4 +389,37 @@
</section>
</section>
+ <section>
+ <title>Non-blocking Statement Execution</title>
+ <para>JDBC query execution can indefinitely block the calling thread when a
statement is executed or a resultset is being iterated.
+ In some situations you may wish to have your calling threads held in these blocked
states. When using embedded connections, you may optionally use the
+ <code>org.teiid.jdbc.TeiidStatement</code> and
<code>org.teiid.jdbc.TeiidPreparedStatement</code> interfaces to execute
queries with a callback <code>org.teiid.jdbc.StatementCallback</code> that
will be notified
+ of statement events, such as an available row, an exception, or completion. Your
calling thread will be free to perform other work. The callback will be executed by an
engine processing thread as needed. If your results processing is itself blocking and you
want query processing to be concurrent with results processing, then your callback should
implement onRow
+ handling in a multi-threaded manner to allow the engine thread to continue.
+ </para>
+ <example>
+ <title>Non-blocking Prepared Statement Execution</title>
+ <programlisting>PreparedStatemnt stmt =
connection.prepareStatement(sql);
+TeiidPreparedStatement tStmt = stmt.unwrap(TeiidPreparedStatement.class);
+tStmt.submitExecute(new StatementCallback() {
+ @Override
+ public void onRow(Statement s, ResultSet rs) {
+ //any logic that accesses the current row ...
+ System.out.println(rs.getString(1));
+ }
+
+ @Override
+ public void onException(Statement s, Exception e) throws Exception {
+ s.close();
+ }
+
+ @Override
+ public void onComplete(Statement s) throws Exception {
+ s.close();
+ }
+);</programlisting>
+ </example>
+ <note><para>The non-blocking logic is limited to statement execution only.
Other JDBC operations, such as connection creation or batched executions do not yet have
non-blocking options.</para></note>
+ </section>
+
</chapter>
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-11-01
17:22:07 UTC (rev 3599)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-11-01
20:16:59 UTC (rev 3600)
@@ -181,10 +181,6 @@
}
}
- public static interface ContextProvider {
- DQPWorkContext getContext(String vdbName, int vdbVersion);
- }
-
private ThreadReuseExecutor processWorkerPool;
// Resources
@@ -364,7 +360,7 @@
}
}, timeout));
}
- boolean runInThread = DQPWorkContext.getWorkContext().useCallingThread() ||
requestMsg.isSync();
+ boolean runInThread = requestMsg.isSync();
synchronized (waitingPlans) {
if (runInThread || currentlyActivePlans <= maxActivePlans) {
startActivePlan(workItem, !runInThread);
Modified:
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java
===================================================================
---
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java 2011-11-01
17:22:07 UTC (rev 3599)
+++
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java 2011-11-01
20:16:59 UTC (rev 3600)
@@ -25,13 +25,18 @@
import static org.junit.Assert.*;
import java.io.IOException;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import org.junit.Before;
import org.junit.Test;
+import org.teiid.client.util.ResultsFuture;
import org.teiid.core.util.UnitTestUtil;
import org.teiid.jdbc.AbstractMMQueryTestCase;
import org.teiid.jdbc.FakeServer;
+import org.teiid.jdbc.StatementCallback;
+import org.teiid.jdbc.TeiidStatement;
import org.teiid.jdbc.TestMMDatabaseMetaData;
@@ -182,4 +187,28 @@
@Test(expected=SQLException.class) public void testLogMsg1() throws Exception {
execute("call logMsg(level=>'foo', context=>'org.teiid.foo',
msg=>'hello world')"); //$NON-NLS-1$
}
+
+ @Test public void testAsynch() throws Exception {
+ Statement s = this.internalConnection.createStatement();
+ TeiidStatement ts = s.unwrap(TeiidStatement.class);
+ final ResultsFuture<Integer> result = new ResultsFuture<Integer>();
+ ts.submitExecute("select * from SYS.Schemas", new StatementCallback() {
+ int rowCount;
+ @Override
+ public void onRow(Statement s, ResultSet rs) {
+ rowCount++;
+ }
+
+ @Override
+ public void onException(Statement s, Exception e) {
+ result.getResultsReceiver().exceptionOccurred(e);
+ }
+
+ @Override
+ public void onComplete(Statement s) {
+ result.getResultsReceiver().receiveResults(rowCount);
+ }
+ });
+ assertEquals(4, result.get().intValue());
+ }
}