[teiid-commits] teiid SVN: r3042 - in trunk: client/src/main/java/org/teiid/jdbc and 8 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Mon Mar 28 00:17:52 EDT 2011


Author: shawkins
Date: 2011-03-28 00:17:50 -0400 (Mon, 28 Mar 2011)
New Revision: 3042

Added:
   trunk/client/src/main/java/org/teiid/jdbc/CancellationTimer.java
Modified:
   trunk/build/kits/jboss-container/teiid-releasenotes.html
   trunk/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java
   trunk/client/src/main/java/org/teiid/jdbc/EmbeddedProfile.java
   trunk/client/src/main/java/org/teiid/jdbc/StatementImpl.java
   trunk/client/src/main/resources/org/teiid/jdbc/i18n.properties
   trunk/client/src/test/java/org/teiid/jdbc/TestStatement.java
   trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-connection.xml
   trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/ssl.xml
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java
   trunk/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java
   trunk/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java
   trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
   trunk/test-integration/common/src/test/java/org/teiid/transport/TestODBCSocketTransport.java
Log:
TEIID-1536 allowing non-odbc local connections to use the calling thread for processing.  also updating the client logic to delay transaction starting as long as possible.

Modified: trunk/build/kits/jboss-container/teiid-releasenotes.html
===================================================================
--- trunk/build/kits/jboss-container/teiid-releasenotes.html	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/build/kits/jboss-container/teiid-releasenotes.html	2011-03-28 04:17:50 UTC (rev 3042)
@@ -43,6 +43,7 @@
 	<LI><B>Dependent Join Improvements</B> - dependent join analysis and costing in general was improved to consider dependent joins earlier in planning.
 	<LI><B>Memory Management Improvements</B> - maxReserveBatchColumns and maxProcessingBatchesColumns will be default be determined automatically and will more reliably prevent memory issues.  See the admin guide for more.
 	<LI><B>Subquery optimization control</B> - added the MJ and NO_UNNEST hints and the org.teiid.subqueryUnnestDefault system property to control the optimization of subqueries to traditional joins or to a merge join implemenation of a semijoin or antijoin.
+	<LI><B>Local connection threads </B> - local connection calling threads will be used to process work rather than using an engine thread.  This helps decouple the configuration of maxThreads.
 </UL>
 
 <h2><a name="Compatibility">Compatibility Issues</a></h2>

Added: trunk/client/src/main/java/org/teiid/jdbc/CancellationTimer.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/CancellationTimer.java	                        (rev 0)
+++ trunk/client/src/main/java/org/teiid/jdbc/CancellationTimer.java	2011-03-28 04:17:50 UTC (rev 3042)
@@ -0,0 +1,122 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.jdbc;
+
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.logging.Logger;
+
+/**
+ * Specialized timer that actively purges tasks in lg(n) time
+ */
+public class CancellationTimer {
+	
+	private static AtomicInteger id = new AtomicInteger();
+	
+	static abstract class CancelTask implements Runnable, Comparable<CancelTask> {
+		long endTime;
+		int seqId = id.get();
+		
+		public CancelTask(long delay) {
+			this.endTime = System.currentTimeMillis() + delay;
+		}
+		
+		@Override
+		public int compareTo(CancelTask o) {
+			int result = Long.signum(this.endTime = o.endTime);
+			if (result == 0) {
+				return seqId = o.seqId;
+			}
+			return result;
+		}
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == this) {
+				return true;
+			}
+			if (!(obj instanceof CancelTask)) {
+				return false;
+			}
+			return this.compareTo((CancelTask)obj) == 0;
+		}
+	}
+	
+	private TreeSet<CancelTask> cancelQueue = new TreeSet<CancelTask>();
+	private Thread thread;
+	
+	public CancellationTimer(String name) {
+		thread = new Thread(new Runnable() {
+			
+			@Override
+			public void run() {
+				while (true) {
+					try {
+						doCancellations();
+					} catch (InterruptedException e) {
+						break;
+					}
+				}
+			}
+		}, name);
+		thread.setDaemon(true);
+		thread.start();
+	}
+
+	private void doCancellations() throws InterruptedException {
+		CancelTask task = null;
+		synchronized (this) {
+			if (cancelQueue.isEmpty()) {
+				this.wait();
+				return;
+			}
+			task = cancelQueue.first();
+			long toWait = task.endTime - System.currentTimeMillis();
+			if (toWait > 0) {
+				this.wait(toWait);
+				return;
+			}
+			cancelQueue.pollFirst();
+		}
+		try {
+			task.run();
+		} catch (Throwable t) {
+			Logger.getLogger(CancellationTimer.class).error("Unexpected exception running task", t); //$NON-NLS-1$
+		}
+	}
+	
+	public void add(CancelTask task) {
+		synchronized (this) {
+			this.cancelQueue.add(task);
+			this.notifyAll();
+		}
+	}
+	
+	public void remove(CancelTask task) {
+		synchronized (this) {
+			this.cancelQueue.remove(task);
+			this.notifyAll();
+		}
+	}
+
+}


Property changes on: trunk/client/src/main/java/org/teiid/jdbc/CancellationTimer.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java	2011-03-28 04:17:50 UTC (rev 3042)
@@ -86,7 +86,8 @@
     // status of connection object
     private boolean closed = false;
     // determines if a statement executed should be immediately committed.
-    private boolean autoCommitFlag = true;
+    private boolean autoCommitFlag = true;
+    private boolean inLocalTxn;
 
     // collection of all open statements on this connection
     private Collection<StatementImpl> statements = new ArrayList<StatementImpl>();
@@ -125,7 +126,11 @@
         
         this.disableLocalTransactions = Boolean.valueOf(this.propInfo.getProperty(ExecutionProperties.DISABLE_LOCAL_TRANSACTIONS)).booleanValue();
     }
-
+    
+    boolean isInLocalTxn() {
+		return inLocalTxn;
+	}
+    
 	private void setExecutionProperties(Properties info) {
 		this.propInfo = new Properties();
         
@@ -346,39 +351,37 @@
             try {
                 directCommit();
             } finally {
-                beginLocalTxn(); 
+                inLocalTxn = false; 
             }
         }
     }
 
-    private void directCommit() throws SQLException {
-        try {
-			ResultsFuture<?> future = this.dqp.commit();
-			future.get();
-		} catch (Exception e) {
-			throw TeiidSQLException.create(e);
-		}
-        logger.fine(JDBCPlugin.Util.getString("MMConnection.Commit_success")); //$NON-NLS-1$
+    private void directCommit() throws SQLException {
+    	if (inLocalTxn) {
+	        try {
+				ResultsFuture<?> future = this.dqp.commit();
+				future.get();
+			} catch (Exception e) {
+				throw TeiidSQLException.create(e);
+			}
+	        logger.fine(JDBCPlugin.Util.getString("MMConnection.Commit_success")); //$NON-NLS-1$
+    	}
     }
 
-    private void beginLocalTxn() throws SQLException {
-        if (this.transactionXid == null) {
-        	if (disableLocalTransactions) {
-        		this.autoCommitFlag = true;
-        		return;
-        	}
-            boolean txnStarted = false;
-            try {
-            	try {
-            		this.dqp.begin();
-        		} catch (XATransactionException e) {
-        			throw TeiidSQLException.create(e);
-        		} 
-                txnStarted = true;
-            } finally {
-                if (!txnStarted) {
-                    autoCommitFlag = true;
-                }
+    void beginLocalTxnIfNeeded() throws SQLException {
+        if (this.transactionXid != null || inLocalTxn || this.autoCommitFlag || disableLocalTransactions) {
+        	return;
+        }
+        try {
+        	try {
+        		this.dqp.begin();
+    		} catch (XATransactionException e) {
+    			throw TeiidSQLException.create(e);
+    		} 
+            inLocalTxn = true;
+        } finally {
+            if (!inLocalTxn) {
+                autoCommitFlag = true;
             }
         }
     }
@@ -654,17 +657,20 @@
         //Check to see the connection is open
         checkConnection();
         if (!autoCommitFlag) {
-            try {
-            	try {
-            		ResultsFuture<?> future = this.dqp.rollback();
-            		future.get();
-        		} catch (Exception e) {
-        			throw TeiidSQLException.create(e);
-        		}
-                logger.fine(JDBCPlugin.Util.getString("MMConnection.Rollback_success")); //$NON-NLS-1$
+            try {
+            	if (this.inLocalTxn) {
+            		this.inLocalTxn = false;
+	            	try {
+	            		ResultsFuture<?> future = this.dqp.rollback();
+	            		future.get();
+	        		} catch (Exception e) {
+	        			throw TeiidSQLException.create(e);
+	        		}
+	                logger.fine(JDBCPlugin.Util.getString("MMConnection.Rollback_success")); //$NON-NLS-1$
+            	}
             } finally {
                 if (startTxn) {
-                    beginLocalTxn();
+                    this.inLocalTxn = false;
                 }
                 else {
                     this.autoCommitFlag = true;
@@ -705,9 +711,9 @@
 
         if (autoCommit) {
             directCommit();   
-        } else {
-            beginLocalTxn();
-        }        
+        } else {
+        	inLocalTxn = false;
+        }
     }
 
     /**

Modified: trunk/client/src/main/java/org/teiid/jdbc/EmbeddedProfile.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/EmbeddedProfile.java	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/client/src/main/java/org/teiid/jdbc/EmbeddedProfile.java	2011-03-28 04:17:50 UTC (rev 3042)
@@ -34,8 +34,8 @@
 import org.teiid.net.ServerConnection;
 
 
-final class EmbeddedProfile implements ConnectionProfile {
-    
+public class EmbeddedProfile implements ConnectionProfile {
+	
     /**
      * This method tries to make a connection to the given URL. This class
      * will return a null if this is not the right driver to connect to the given URL.
@@ -46,7 +46,7 @@
     public ConnectionImpl connect(String url, Properties info) 
         throws TeiidSQLException {
         try {
-        	ServerConnection sc = (ServerConnection)ReflectionHelper.create("org.teiid.transport.LocalServerConnection", Arrays.asList(info), Thread.currentThread().getContextClassLoader()); //$NON-NLS-1$
+        	ServerConnection sc = createServerConnection(info);
 			return new ConnectionImpl(sc, info, url);
 		} catch (TeiidRuntimeException e) {
 			throw TeiidSQLException.create(e);
@@ -58,5 +58,10 @@
 			throw TeiidSQLException.create(e);
 		}
     }
+
+	protected ServerConnection createServerConnection(Properties info)
+			throws TeiidException {
+		return (ServerConnection)ReflectionHelper.create("org.teiid.transport.LocalServerConnection", Arrays.asList(info, true), Thread.currentThread().getContextClassLoader()); //$NON-NLS-1$
+	}
     
 }

Modified: trunk/client/src/main/java/org/teiid/jdbc/StatementImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/StatementImpl.java	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/client/src/main/java/org/teiid/jdbc/StatementImpl.java	2011-03-28 04:17:50 UTC (rev 3042)
@@ -23,6 +23,7 @@
 package org.teiid.jdbc;
 
 import java.io.Serializable;
+import java.lang.ref.WeakReference;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -64,14 +65,36 @@
 import org.teiid.core.types.SQLXMLImpl;
 import org.teiid.core.util.SqlUtil;
 import org.teiid.core.util.StringUtil;
+import org.teiid.jdbc.CancellationTimer.CancelTask;
 
 
 public class StatementImpl extends WrapperImpl implements TeiidStatement {
 	private static Logger logger = Logger.getLogger("org.teiid.jdbc"); //$NON-NLS-1$
+	
+	static CancellationTimer cancellationTimer = new CancellationTimer("Teiid Statement Timeout"); //$NON-NLS-1$
+	
+	private static final class QueryTimeoutCancelTask extends CancelTask {
+		private WeakReference<StatementImpl> ref;
+		private QueryTimeoutCancelTask(long delay, StatementImpl stmt) {
+			super(delay);
+			this.ref = new WeakReference<StatementImpl>(stmt);
+		}
 
-    // State constants
-    protected static final int TIMED_OUT = 4;
-    protected static final int CANCELLED = 3;
+		@Override
+		public void run() {
+			StatementImpl stmt = ref.get();
+			if (stmt != null) {
+				stmt.timeoutOccurred();
+			}
+		}
+	}
+
+	enum State {
+		RUNNING,
+		DONE,
+		TIMED_OUT,
+		CANCELLED
+	}
     protected static final int NO_TIMEOUT = 0;
 
     // integer indicating no maximum limit - used in some metadata-ish methods.
@@ -97,10 +120,10 @@
     private boolean isClosed = false;
 
     // Differentiate timeout from cancel in blocking asynch operation
-    protected int commandStatus = -1;
+    protected volatile State commandStatus = State.RUNNING;
 
     // number of seconds for the query to timeout.
-    protected int queryTimeout = NO_TIMEOUT;
+    protected long queryTimeoutMS = NO_TIMEOUT;
 
     //########## Per-execution state ########
 
@@ -136,12 +159,6 @@
     private int maxRows = NO_LIMIT;
     private int maxFieldSize = NO_LIMIT;
     
-    /** SPIN_TIMEOUT determines how responsive asynch operations will be to
-     *  statement cancellation, closure, or execution timeouts.  
-     *  1/2 second was chosen as default.
-     */
-    private static int SPIN_TIMEOUT = 500;
-    
     //Map<out/inout/return param index --> index in results>
     protected Map outParamIndexMap = new HashMap();
     
@@ -241,7 +258,7 @@
          * the statement.executeQuery() call throwing the server's exception instead of the
          * one generated by the conditionalWait() method.
          */
-        commandStatus = CANCELLED;
+        commandStatus = State.CANCELLED;
         cancelRequest();
     }
 
@@ -421,6 +438,7 @@
         		String command = match.group(1);
         		Boolean commit = null;
         		if (StringUtil.startsWithIgnoreCase(command, "start")) { //$NON-NLS-1$
+        			//TODO: this should force a start and through an exception if we're already in a txn
         			this.getConnection().setAutoCommit(false);
         		} else if (command.equalsIgnoreCase("commit")) { //$NON-NLS-1$
         			commit = true;
@@ -507,20 +525,70 @@
         }
         
         final RequestMessage reqMessage = createRequestMessage(commands, isBatchedCommand, resultsMode);
-    	ResultsFuture<ResultsMessage> pendingResult = this.sendRequestMessage(reqMessage);
-    	
+    	ResultsFuture<Boolean> result = execute(reqMessage);
     	if (synch) {
-	    	ResultsMessage resultsMsg = getResults(reqMessage, pendingResult);
-	        postReceiveResults(reqMessage, resultsMsg);
-	        return booleanFuture(hasResultSet());
+    		try {
+    			if (queryTimeoutMS > 0) {
+    				result.get(queryTimeoutMS, TimeUnit.MILLISECONDS);
+    			} else {
+    				result.get();
+    			}
+    			return result;
+    		} catch (ExecutionException e) {
+    			throw TeiidSQLException.create(e);
+    		} catch (InterruptedException e) {
+    			timeoutOccurred();
+    		} catch (TimeoutException e) {
+    			timeoutOccurred();
+			}
+    		throw new TeiidSQLException(JDBCPlugin.Util.getString("MMStatement.Timeout_before_complete")); //$NON-NLS-1$
     	}
-    	
+    	return result;
+    }
+
+	private ResultsFuture<Boolean> execute(final RequestMessage reqMsg) throws SQLException,
+			TeiidSQLException {
+		this.getConnection().beginLocalTxnIfNeeded();
+        this.currentRequestID = this.driverConnection.nextRequestID();
+        // Create a request message
+        reqMsg.setExecutionPayload(this.payload);        
+        reqMsg.setCursorType(this.resultSetType);
+        reqMsg.setFetchSize(this.fetchSize);
+        reqMsg.setRowLimit(this.maxRows);
+        reqMsg.setTransactionIsolation(this.driverConnection.getTransactionIsolation());
+
+        // Get connection properties and set them onto request message
+        copyPropertiesToRequest(reqMsg);
+
+        reqMsg.setExecutionId(this.currentRequestID);
+        
+        ResultsFuture.CompletionListener<ResultsMessage> compeletionListener = null;
+		if (queryTimeoutMS > 0) {
+			final CancelTask c = new QueryTimeoutCancelTask(queryTimeoutMS, this);
+			cancellationTimer.add(c);
+			compeletionListener = new ResultsFuture.CompletionListener<ResultsMessage>() {
+				@Override
+				public void onCompletion(ResultsFuture<ResultsMessage> future) {
+					cancellationTimer.remove(c);
+				}
+			};
+		} 
+        
+    	ResultsFuture<ResultsMessage> pendingResult = null;
+		try {
+			pendingResult = this.getDQP().executeRequest(this.currentRequestID, reqMsg);
+		} catch (TeiidException e) {
+			throw TeiidSQLException.create(e);
+		}
+		if (compeletionListener != null) {
+			pendingResult.addCompletionListener(compeletionListener);
+		}
     	final ResultsFuture<Boolean> result = new ResultsFuture<Boolean>();
     	pendingResult.addCompletionListener(new ResultsFuture.CompletionListener<ResultsMessage>() {
     		@Override
     		public void onCompletion(ResultsFuture<ResultsMessage> future) {
     			try {
-					postReceiveResults(reqMessage, future.get());
+					postReceiveResults(reqMsg, future.get());
 					result.getResultsReceiver().receiveResults(hasResultSet());
 				} catch (Throwable t) {
 					result.getResultsReceiver().exceptionOccurred(t);
@@ -535,47 +603,10 @@
 		rs.getResultsReceiver().receiveResults(isTrue);
 		return rs;
 	}
-
-	private ResultsMessage getResults(RequestMessage reqMessage,
-			ResultsFuture<ResultsMessage> pendingResult)
-			throws TeiidSQLException {
-		try {
-    		long timeoutMillis = queryTimeout * 1000;
-            long endTime = System.currentTimeMillis() + timeoutMillis;
-            ResultsMessage result = null;        
-            while (result == null) {
-
-            	if (timeoutMillis > 0 && endTime <= System.currentTimeMillis() && commandStatus != TIMED_OUT && commandStatus != CANCELLED) {
-    	            timeoutOccurred();
-            	}
-            	
-                checkStatement();
-    			try {
-    				result = pendingResult.get(SPIN_TIMEOUT, TimeUnit.MILLISECONDS);
-    			} catch (ExecutionException e) {
-    				throw TeiidSQLException.create(e);
-    			} catch (TimeoutException e) {
-    				continue;
-    			}
-            }
-            
-        	if (commandStatus == CANCELLED) {
-                throw new TeiidSQLException(JDBCPlugin.Util.getString("MMStatement.Cancel_before_execute")); //$NON-NLS-1$
-            }
-        	 
-        	if (commandStatus == TIMED_OUT) {
-                throw new TeiidSQLException(JDBCPlugin.Util.getString("MMStatement.Timeout_before_complete")); //$NON-NLS-1$
-            }    	
-            return result;
-        } catch ( Throwable ex ) {
-        	String msg = JDBCPlugin.Util.getString("MMStatement.Error_executing_stmt", reqMessage.getCommandString()); //$NON-NLS-1$ 
-            logger.log(ex instanceof SQLException?Level.WARNING:Level.SEVERE, msg, ex);
-            throw TeiidSQLException.create(ex, msg);
-        }
-	}
-
+	
 	private void postReceiveResults(RequestMessage reqMessage,
 			ResultsMessage resultsMsg) throws TeiidSQLException, SQLException {
+		commandStatus = State.DONE;
 		// warnings thrown
         List resultsWarning = resultsMsg.getWarnings();
 
@@ -726,7 +757,7 @@
     public int getQueryTimeout() throws SQLException {
         //Check to see the statement is closed and throw an exception
         checkStatement();
-        return this.queryTimeout;
+        return (int)this.queryTimeoutMS/1000;
     }
 
     /**
@@ -858,12 +889,16 @@
         //Check to see the statement is closed and throw an exception
         checkStatement();
         if (seconds >= 0) {
-            queryTimeout = seconds;
+            queryTimeoutMS = seconds*1000;
         }
         else {
             throw new TeiidSQLException(JDBCPlugin.Util.getString("MMStatement.Bad_timeout_value")); //$NON-NLS-1$
         }
     }
+    
+    void setQueryTimeoutMS(int queryTimeoutMS) {
+		this.queryTimeoutMS = queryTimeoutMS;
+	}
 
     /**
      * Helper method for copy the connection properties to request message.
@@ -920,11 +955,14 @@
      * Ends the command and sets the status to TIMED_OUT.
      */
     protected void timeoutOccurred() {
+    	if (this.commandStatus != State.RUNNING) {
+    		return;
+    	}
         logger.warning(JDBCPlugin.Util.getString("MMStatement.Timeout_ocurred_in_Statement.")); //$NON-NLS-1$
         try {
         	cancel();        
-            commandStatus = TIMED_OUT;
-            queryTimeout = NO_TIMEOUT;
+            commandStatus = State.TIMED_OUT;
+            queryTimeoutMS = NO_TIMEOUT;
             currentRequestID = -1;
             if (this.resultSet != null) {
                 this.resultSet.close();
@@ -972,31 +1010,6 @@
         return this.execProps.getProperty(name);
     }
 
-    /**
-     * Send out request message with necessary states.
-     */
-    protected ResultsFuture<ResultsMessage> sendRequestMessage(RequestMessage reqMsg)
-        throws SQLException {
-        this.currentRequestID = this.driverConnection.nextRequestID();
-        // Create a request message
-        reqMsg.setExecutionPayload(this.payload);        
-        reqMsg.setCursorType(this.resultSetType);
-        reqMsg.setFetchSize(this.fetchSize);
-        reqMsg.setRowLimit(this.maxRows);
-        reqMsg.setTransactionIsolation(this.driverConnection.getTransactionIsolation());
-
-        // Get connection properties and set them onto request message
-        copyPropertiesToRequest(reqMsg);
-
-        reqMsg.setExecutionId(this.currentRequestID);
-    	
-		try {
-			return this.getDQP().executeRequest(this.currentRequestID, reqMsg);
-		} catch (TeiidException e) {
-			throw TeiidSQLException.create(e);
-		}
-    }
-
     long getCurrentRequestID() {
         return this.currentRequestID;
     }

Modified: trunk/client/src/main/resources/org/teiid/jdbc/i18n.properties
===================================================================
--- trunk/client/src/main/resources/org/teiid/jdbc/i18n.properties	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/client/src/main/resources/org/teiid/jdbc/i18n.properties	2011-03-28 04:17:50 UTC (rev 3042)
@@ -38,7 +38,6 @@
 MMResultSet.Cant_call_closed_resultset=Error trying to operate on a closed ResultSet object.
 MMResultSet.cannot_convert_to_binary_stream=Cannot convert to binary stream
 MMStatement.Error_executing_stmt=Error trying to execute a statement {0}.
-MMStatement.Cancel_before_execute=Request was canceled before it could finish executing.
 MMStatement.Invalid_fetch_size=Fetch size should always be a value of 0 <= fetch size <= max rows.
 MMStatement.Timeout_before_complete=Operation timed out before completion.
 MMResultsImpl.Col_doesnt_exist=Column name "{0}" does not exist.

Modified: trunk/client/src/test/java/org/teiid/jdbc/TestStatement.java
===================================================================
--- trunk/client/src/test/java/org/teiid/jdbc/TestStatement.java	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/client/src/test/java/org/teiid/jdbc/TestStatement.java	2011-03-28 04:17:50 UTC (rev 3042)
@@ -36,7 +36,7 @@
 import org.teiid.client.ResultsMessage;
 import org.teiid.client.util.ResultsFuture;
 
-
+ at SuppressWarnings("nls")
 public class TestStatement {
 
 	@Test public void testBatchExecution() throws Exception {
@@ -106,4 +106,17 @@
 		Mockito.verify(conn).submitSetAutoCommitTrue(false);
 	}
 	
+	@Test public void testAsynchTimeout() throws Exception {
+		ConnectionImpl conn = Mockito.mock(ConnectionImpl.class);
+		StatementImpl statement = new StatementImpl(conn, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+		statement.setQueryTimeoutMS(1);
+		DQP dqp = Mockito.mock(DQP.class);
+		Mockito.stub(statement.getDQP()).toReturn(dqp);
+		ResultsFuture<ResultsMessage> future = new ResultsFuture<ResultsMessage>();
+		Mockito.stub(dqp.executeRequest(Mockito.anyLong(), (RequestMessage) Mockito.anyObject())).toReturn(future);
+		statement.submitExecute("select 'hello world'");
+		Thread.sleep(100);
+		Mockito.verify(dqp).cancelRequest(0);
+	}
+	
 }

Modified: trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-connection.xml
===================================================================
--- trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-connection.xml	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-connection.xml	2011-03-28 04:17:50 UTC (rev 3042)
@@ -450,6 +450,7 @@
            there is a way to make connections that by-pass making a socket based JDBC connection.
            You can use slightly modified data source configuration to make a "local" connection, where the JDBC API will lookup a local Teiid runtime in the same VM.</para>
            <warning><para>Since DataSources start before before Teiid VDBs are deployed, leave the min pool size of local connections as the default of 0.  Otherwise errors will occur on the startup of the Teiid DataSource.</para></warning>
+           <note><para>Local connections use their calling thread to perform processing operations rather than using an engine thread while the calling thread is blocked.</para></note>
            <example>
            <title>Local data source</title>
            <programlisting><![CDATA[<datasources>

Modified: trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/ssl.xml
===================================================================
--- trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/ssl.xml	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/documentation/client-developers-guide/src/main/docbook/en-US/content/ssl.xml	2011-03-28 04:17:50 UTC (rev 3042)
@@ -13,8 +13,7 @@
     <section id="default_security">
         <title>Default Security</title>
         
-        <para>If you are always using a 
-        <link linkend="local_connection">local connection</link>, then you do need to secure a channels.</para>
+        <para>If you are using a socket connection, then you may need to secure the channel more completely.</para>
     
         <para>By default all sensitive (non-data) messages between client and server 
         are encrypted using a <ulink url="http://en.wikipedia.org/wiki/Diffie-Hellman_key_exchange">Diffy-Hellman</ulink> 

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java	2011-03-28 04:17:50 UTC (rev 3042)
@@ -26,6 +26,7 @@
 import javax.resource.spi.work.WorkEvent;
 import javax.resource.spi.work.WorkListener;
 
+import org.teiid.core.TeiidRuntimeException;
 import org.teiid.logging.LogManager;
 
 
@@ -42,14 +43,23 @@
     
     private ThreadState threadState = ThreadState.MORE_WORK;
     private volatile boolean isProcessing;
+    private boolean useCallingThread;
     
+    public AbstractWorkItem(boolean useCallingThread) {
+    	this.useCallingThread = useCallingThread;
+    }
+    
     public void run() {
-		startProcessing();
-		try {
-			process();
-		} finally {
-			endProcessing();
-		}
+    	do {
+			startProcessing();
+			try {
+				process();
+			} finally {
+				if (!endProcessing()) {
+					break;
+				}
+			}
+    	} while (!isDoneProcessing());
     }
     
     synchronized ThreadState getThreadState() {
@@ -69,7 +79,10 @@
     	this.threadState = ThreadState.WORKING;
 	}
     
-    private synchronized void endProcessing() {
+    /**
+     * @return true if processing should be continued
+     */
+    final private synchronized boolean endProcessing() {
     	isProcessing = false;
     	logTrace("end processing"); //$NON-NLS-1$
     	switch (this.threadState) {
@@ -79,20 +92,21 @@
 	        		this.threadState = ThreadState.DONE;
 	        	} else {
 		    		this.threadState = ThreadState.IDLE;
-		    		pauseProcessing();
+		    		return pauseProcessing();
 	        	}
 	    		break;
 	    	case MORE_WORK:
 	    		if (isDoneProcessing()) {
 	    			logTrace("done processing - ignoring more"); //$NON-NLS-1$
 	        		this.threadState = ThreadState.DONE;
-	        	} else {
-	        		resumeProcessing();
+	        	} else if (!this.useCallingThread) {
+        			resumeProcessing();
 	        	}
 	    		break;
     		default:
     			throw new IllegalStateException("Should not END on " + this.threadState); //$NON-NLS-1$
     	}
+    	return useCallingThread;
     }
     
     protected boolean isIdle() {
@@ -103,7 +117,7 @@
     	moreWork(true);
     }
     
-    protected synchronized void moreWork(boolean ignoreDone) {
+    final protected synchronized void moreWork(boolean ignoreDone) {
     	logTrace("more work"); //$NON-NLS-1$
     	switch (this.threadState) {
 	    	case WORKING:
@@ -113,7 +127,15 @@
 	    		break;
 	    	case IDLE:
 	    		this.threadState = ThreadState.MORE_WORK;
-	    		resumeProcessing();
+        		if (this.useCallingThread) {
+        			if (isProcessing) {
+        				this.notifyAll(); //notify the waiting caller
+        			} else {
+        				run(); //restart with the calling thread
+        			}
+        		} else {
+        			resumeProcessing();
+        		}
 	    		break;
 			default:
 				if (!ignoreDone) {
@@ -129,8 +151,33 @@
     
     protected abstract void process();
 
-	protected void pauseProcessing() {
+	protected boolean pauseProcessing() {
+		if (useCallingThread && !shouldPause()) {
+			return false;
+		}
+		while (useCallingThread && this.getThreadState() == ThreadState.IDLE) {
+			try {
+				this.wait(); //the lock should already be held
+			} catch (InterruptedException e) {
+				interrupted(e);
+			}
+		}
+		return useCallingThread;
 	}
+	
+	/**
+	 * only called for synch processing
+	 */
+	protected boolean shouldPause() {
+		return false;
+	}
+
+	/**
+	 * only called for synch processing
+	 */
+	protected void interrupted(InterruptedException e) {
+		throw new TeiidRuntimeException(e);
+	}
     
     protected abstract void resumeProcessing();
 	

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-03-28 04:17:50 UTC (rev 3042)
@@ -358,8 +358,13 @@
 
 	private void startActivePlan(RequestWorkItem workItem) {
 		workItem.active = true;
-		this.addWork(workItem);
-		this.currentlyActivePlans++;
+		if (workItem.getDqpWorkContext().useCallingThread()) {
+			this.currentlyActivePlans++;
+			workItem.run();
+		} else {
+			this.addWork(workItem);
+			this.currentlyActivePlans++;
+		}
 	}
 	
     void finishProcessing(final RequestWorkItem workItem) {
@@ -791,7 +796,7 @@
 		return addWork(processor, 10);
 	}
 
-	<T> ResultsFuture<T> addWork(final Callable<T> processor, int priority) {
+	private <T> ResultsFuture<T> addWork(final Callable<T> processor, int priority) {
 		final ResultsFuture<T> result = new ResultsFuture<T>();
 		final ResultsReceiver<T> receiver = result.getResultsReceiver();
 		Runnable r = new Runnable() {
@@ -806,7 +811,11 @@
 			}
 		};
 		FutureWork<T> work = new FutureWork<T>(r, null, priority);
-		this.addWork(work);
+		if (DQPWorkContext.getWorkContext().useCallingThread()) {
+			work.run();
+		} else {
+			this.addWork(work);
+		}
 		return result;
 	}
 	

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java	2011-03-28 04:17:50 UTC (rev 3042)
@@ -71,10 +71,19 @@
     private String clientHostname;
     private SecurityHelper securityHelper;
     private HashMap<String, DataPolicy> policies;
+    private boolean useCallingThread;
     
     public DQPWorkContext() {
 	}
+
+    public boolean useCallingThread() {
+		return useCallingThread;
+	}
     
+    public void setUseCallingThread(boolean useCallingThread) {
+		this.useCallingThread = useCallingThread;
+	}
+    
     public SessionMetadata getSession() {
 		return session;
 	}

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-03-28 04:17:50 UTC (rev 3042)
@@ -46,6 +46,7 @@
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidException;
 import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.TeiidRuntimeException;
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.dqp.internal.process.DQPCore.CompletionListener;
 import org.teiid.dqp.internal.process.DQPCore.FutureWork;
@@ -160,6 +161,7 @@
     private long processingTimestamp = System.currentTimeMillis();
     
     public RequestWorkItem(DQPCore dqpCore, RequestMessage requestMsg, Request request, ResultsReceiver<ResultsMessage> receiver, RequestID requestID, DQPWorkContext workContext) {
+    	super(workContext.useCallingThread());
         this.requestMsg = requestMsg;
         this.requestID = requestID;
         this.processorTimeslice = dqpCore.getProcessorTimeSlice();
@@ -203,6 +205,16 @@
 	}
 	
 	@Override
+	protected void interrupted(InterruptedException e) {
+		try {
+			this.requestCancel();
+		} catch (TeiidComponentException e1) {
+			throw new TeiidRuntimeException(e1);
+		}
+		super.interrupted(e);
+	}
+	
+	@Override
 	protected void process() {
         LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread", requestID, "with state", state); //$NON-NLS-1$ //$NON-NLS-2$
         try {
@@ -600,8 +612,11 @@
 	}
 
     private void sendError() {
+    	ResultsReceiver<ResultsMessage> receiver = null;
     	synchronized (this) {
-    		if (this.resultsReceiver == null) {
+    		receiver = this.resultsReceiver;
+    		this.resultsReceiver = null;
+    		if (receiver == null) {
     			LogManager.logDetail(LogConstants.CTX_DQP, processingException, "Unable to send error to client as results were already sent.", requestID); //$NON-NLS-1$
     			return;
     		}
@@ -610,8 +625,14 @@
         ResultsMessage response = new ResultsMessage(requestMsg);
         response.setException(processingException);
         setAnalysisRecords(response);
-        resultsReceiver.receiveResults(response);
+        receiver.receiveResults(response);
     }
+    
+    @Override
+    protected boolean shouldPause() {
+    	//if we are waiting on results it's ok to pause
+    	return this.resultsReceiver != null;
+    }
 
     private static List<ParameterInfo> getParameterInfo(StoredProcedure procedure) {
         List<ParameterInfo> paramInfos = new ArrayList<ParameterInfo>();
@@ -634,7 +655,11 @@
             }
 		}
     	workItem.setResultsReceiver(chunckReceiver);
-        dqpCore.addWork(workItem);
+    	if (this.dqpWorkContext.useCallingThread()) {
+    		workItem.run();
+    	} else {
+    		dqpCore.addWork(workItem);
+    	}
     }
     
     public void removeLobStream(int streamRequestId) {

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java	2011-03-28 04:17:50 UTC (rev 3042)
@@ -22,11 +22,12 @@
 
 package org.teiid.dqp.internal.process;
 
-import org.teiid.dqp.internal.process.AbstractWorkItem;
+import static org.junit.Assert.*;
 
-import junit.framework.TestCase;
+import org.junit.Test;
 
-public class TestWorkItemState extends TestCase {
+
+public class TestWorkItemState {
 	
 	private class TestWorkItem extends AbstractWorkItem {
 
@@ -39,6 +40,7 @@
 		}
 		
 		private TestWorkItem(boolean done, boolean callMoreWork) {
+			super(false);
 			this.isDone = done;
 			this.callMoreWork = callMoreWork;
 		}
@@ -86,37 +88,33 @@
 	        checkState(ThreadState.DONE);
 	    }
 	}
-	
-    public TestWorkItemState(String name) {
-        super(name);
-    }
 
-    public void testInitialState() {
+    @Test public void testInitialState() {
         TestWorkItem item = new TestWorkItem();
         item.assertMoreWorkState();
     }
     
-    public void testGotoIdleState() {
+    @Test public void testGotoIdleState() {
         TestWorkItem item = new TestWorkItem();
         item.run();
         item.assertIdleState();
     }
     
-    public void testGotoMoreWorkState() {
+    @Test public void testGotoMoreWorkState() {
     	TestWorkItem item = new TestWorkItem();
         item.run();
         item.moreWork();
         item.assertMoreWorkState();
     }
     
-    public void testGotoWorkingState() {
+    @Test public void testGotoWorkingState() {
     	TestWorkItem item = new TestWorkItem();
     	item.run();
     	item.moreWork();
     	item.run();
     }
     
-    public void testResume() {
+    @Test public void testResume() {
     	TestWorkItem item = new TestWorkItem();
     	item.run();
     	assertFalse(item.resumed);
@@ -124,14 +122,14 @@
     	assertTrue(item.resumed);
     }
     
-    public void testResumeDuringWorking() {
+    @Test public void testResumeDuringWorking() {
     	TestWorkItem item = new TestWorkItem(false, true);
     	assertFalse(item.resumed);
     	item.run();
     	assertTrue(item.resumed);
     }
     
-    public void testRunAfterDone() {
+    @Test public void testRunAfterDone() {
     	TestWorkItem item = new TestWorkItem(true, false);
     	item.run();
     	item.assertDoneState();
@@ -143,7 +141,7 @@
     	}
     }
     
-    public void testRunDuringIdle() {
+    @Test public void testRunDuringIdle() {
     	TestWorkItem item = new TestWorkItem();
     	item.run();
     	item.assertIdleState();

Modified: trunk/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java	2011-03-28 04:17:50 UTC (rev 3042)
@@ -58,10 +58,11 @@
     private Properties connectionProperties;
     private boolean passthrough;
 
-	public LocalServerConnection(Properties connectionProperties) throws CommunicationException, ConnectionException{
+	public LocalServerConnection(Properties connectionProperties, boolean useCallingThread) throws CommunicationException, ConnectionException{
 		this.connectionProperties = connectionProperties;
 		this.csr = getClientServiceRegistry();
 		workContext.setSecurityHelper(csr.getSecurityHelper());
+		workContext.setUseCallingThread(useCallingThread);
 		authenticate();
 		passthrough = Boolean.valueOf(connectionProperties.getProperty(TeiidURL.CONNECTION.PASSTHROUGH_AUTHENTICATION, "false")); //$NON-NLS-1$
 	}

Modified: trunk/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java	2011-03-28 04:17:50 UTC (rev 3042)
@@ -21,25 +21,39 @@
  */
 package org.teiid.transport;
 
+import java.util.Properties;
+
 import javax.net.ssl.SSLEngine;
 
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.DefaultChannelPipeline;
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.teiid.common.buffer.StorageManager;
+import org.teiid.core.TeiidException;
+import org.teiid.jdbc.EmbeddedProfile;
 import org.teiid.jdbc.TeiidDriver;
+import org.teiid.net.ServerConnection;
 import org.teiid.net.socket.ObjectChannel;
 import org.teiid.odbc.ODBCServerRemote;
 
 public class ODBCSocketListener extends SocketListener {
 	private ODBCServerRemote.AuthenticationType authType = ODBCServerRemote.AuthenticationType.CLEARTEXT;
 	private int maxLobSize;
-	private TeiidDriver driver = TeiidDriver.getInstance();
+	private TeiidDriver driver;
 	
 	public ODBCSocketListener(SocketConfiguration config, StorageManager storageManager, int portOffset, int maxLobSize) {
 		//the clientserviceregistry isn't actually used by ODBC 
 		super(config, new ClientServiceRegistryImpl(ClientServiceRegistry.Type.ODBC), storageManager, portOffset);
 		this.maxLobSize = maxLobSize;
+		this.driver = new TeiidDriver();
+		this.driver.setEmbeddedProfile(new EmbeddedProfile() {
+			@Override
+			protected ServerConnection createServerConnection(Properties info)
+					throws TeiidException {
+				//When using the non-blocking api, we don't want to use the calling thread
+				return new LocalServerConnection(info, false);
+			}
+		});
 	}
 	
 	public void setDriver(TeiidDriver driver) {

Modified: trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java	2011-03-28 04:17:50 UTC (rev 3042)
@@ -68,6 +68,7 @@
 	DQPCore dqp = new DQPCore();
 	VDBRepository repo = new VDBRepository();
 	private ConnectorManagerRepository cmr;
+	private boolean useCallingThread = true;
 	
 	public FakeServer() {
 		this.logon = new LogonImpl(sessionService, null);
@@ -100,6 +101,10 @@
         registerClientService(DQP.class, dqp, null);
 	}
 	
+	public void setUseCallingThread(boolean useCallingThread) {
+		this.useCallingThread = useCallingThread;
+	}
+	
 	public void deployVDB(String vdbName, String vdbPath) throws Exception {
 		
 		IndexMetadataFactory imf = VDBMetadataFactory.loadMetadata(new File(vdbPath).toURI().toURL());
@@ -171,7 +176,7 @@
 			throws TeiidSQLException {
 		LocalServerConnection conn;
 		try {
-			conn = new LocalServerConnection(info) {
+			conn = new LocalServerConnection(info, useCallingThread) {
 				@Override
 				protected ClientServiceRegistry getClientServiceRegistry() {
 					return FakeServer.this;

Modified: trunk/test-integration/common/src/test/java/org/teiid/transport/TestODBCSocketTransport.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/transport/TestODBCSocketTransport.java	2011-03-26 01:12:47 UTC (rev 3041)
+++ trunk/test-integration/common/src/test/java/org/teiid/transport/TestODBCSocketTransport.java	2011-03-28 04:17:50 UTC (rev 3042)
@@ -58,6 +58,7 @@
 		odbcTransport = new ODBCSocketListener(config, BufferManagerFactory.getStandaloneBufferManager(), 0, 100000);
 		
 		FakeServer server = new FakeServer();
+		server.setUseCallingThread(false);
 		server.deployVDB("parts", UnitTestUtil.getTestDataPath() + "/PartsSupplier.vdb");
 		
 		TeiidDriver driver = new TeiidDriver();



More information about the teiid-commits mailing list