[teiid-commits] teiid SVN: r2990 - in trunk: build/kits/jboss-container/deploy/teiid and 8 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Fri Mar 11 12:39:22 EST 2011


Author: shawkins
Date: 2011-03-11 12:39:21 -0500 (Fri, 11 Mar 2011)
New Revision: 2990

Modified:
   trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
   trunk/build/kits/jboss-container/teiid-releasenotes.html
   trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/message/AtomicRequestMessage.java
   trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java
   trunk/engine/src/main/resources/org/teiid/query/i18n.properties
   trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
   trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
Log:
TEIID-1474 TEIID-1505 adding a limit to the number of concurrent source requests and ensuring that maxactiveplans is read from the config.

Modified: trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml	2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml	2011-03-11 17:39:21 UTC (rev 2990)
@@ -92,6 +92,11 @@
         <property name="maxThreads">64</property>
         <!-- Max active plans (default 20).  Increase this value on highly concurrent systems - but ensure that the underlying pools can handle the increased load without timeouts. -->
         <property name="maxActivePlans">20</property>
+	    <!-- Max source query concurrency per user request (default 0).  
+			 0 indicates use the default calculated value based on max active plans and max threads - approximately 2*(max threads)/(max active plans).  
+			 1 forces serial execution in the processing thread, just as is done for a transactional request.  
+			 Any number greater than 1 limits the maximum number of concurrently executing source requests accordingly. -->
+	    <property name="userRequestSourceConcurrency">0</property>
         <!-- Query processor time slice, in milliseconds. (default 2000) -->
         <property name="timeSliceInMilli">2000</property>
         <!-- Maximum allowed fetch size, set via JDBC. User requested value ignored above this value. (default 20480) -->

Modified: trunk/build/kits/jboss-container/teiid-releasenotes.html
===================================================================
--- trunk/build/kits/jboss-container/teiid-releasenotes.html	2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/build/kits/jboss-container/teiid-releasenotes.html	2011-03-11 17:39:21 UTC (rev 2990)
@@ -37,7 +37,9 @@
 	    </UL>
 	<LI><B>ARRAYTABLE</B> - the ARRAYTABLE table function was added to simplify array value extraction into a tabular format.
 	<LI><B>Ingres</B> - Ingres database translator is now available to use as supported source under Teiid.
+	<LI><B>Optional Join Enhancements</B> - the optional join hint no longer requires the use of ANSI joins and can will not remove optional bridging tables that are used by two other tables that are required.
 	<LI><B>InterSystems Cache</B> - InterSystems Cache database translator is now available to use as supported source under Teiid.
+	<LI><B>userRequestSourceConcurrency</B> - was added to control the number of concurrent source queries allowed for each user request.
 </UL>
 
 <h2><a name="Compatibility">Compatibility Issues</a></h2>
@@ -103,6 +105,11 @@
 
 See the <a href="teiid-docs/teiid_admin_guide.pdf">Admin Guide</a> for more on configuration and installation.
 
+<h4>from 7.3</h4>
+<ul>
+  <LI>SocketConfiguration.maxSocketThreads will interpret a setting of 0 to mean use the system default of max available processors.  
+</ul>
+
 <h4>from 7.2</h4>
 <ul>
   <LI>Temporary tables can now be restricted by data roles.  Use the data-role attribute allow-create-temporary-tables to explicitly enable or disable the usage of temporary tables.  

Modified: trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml
===================================================================
--- trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml	2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml	2011-03-11 17:39:21 UTC (rev 2990)
@@ -62,13 +62,21 @@
 		They handle NIO non-blocking IO operations as well as directly servicing any operation that can run without blocking.</para>
 		<para>For longer running operations, the socket threads queue with work the query engine.  
 		The query engine has two settings that determine its thread utilization.  
+		
 		<code>maxThreads</code> sets the total number of threads available for query engine work (processing plans, transaction control operations, processing source queries, etc.). 
 		You should consider increasing the maximum threads on systems with a large number of available processors and/or when it's common to issue non-transactional queries with that 
 		issue a large number of concurrent source requests.
+		
 		<code>maxActivePlans</code>, which should always be smaller than maxThreads, sets the number of the maxThreads 
 		that should be used for user query processing.  Increasing the maxActivePlans should be considered for workloads with a high number of long 
 		running queries and/or systems with a large number of available processors.  If memory issues arise from increasing the max threads and the
-		max active plans, then consider decreasing the processor/connector batch sizes to limit the base number of memory rows consumed by each plan.</para>
+		max active plans, then consider decreasing the processor/connector batch sizes to limit the base number of memory rows consumed by each plan.
+		
+		<code>userRequestSourceConcurrency</code>, which should always be smaller than maxThreads, sets the number of concurrently executing source queries per user request.
+		Setting this value to 1 forces serial execution of all source queries by the processing thread.  The default value is computed based upon 2*maxThreads/maxActivePlans. 
+		Using the respective default values, this means that each user request would be allowed 6 concurrently executing source queries.  If the default calculated value is
+		not applicable to your workload, for example if you have queries that generate more concurrent long running source queries, you should adjust this value.   
+		</para>
 	</section>
 	<section>
 		<title>Cache Tuning</title>

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java	2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java	2011-03-11 17:39:21 UTC (rev 2990)
@@ -38,6 +38,7 @@
     public static final int DEFAULT_MAX_PROCESS_WORKERS = 64;
 	public static final int DEFAULT_MAX_SOURCE_ROWS = -1;
 	public static final int DEFAULT_MAX_ACTIVE_PLANS = 20;
+	public static final int DEFAULT_USER_REQUEST_SOURCE_CONCURRENCY = 0;
     
 	private int maxThreads = DEFAULT_MAX_PROCESS_WORKERS;
 	private int timeSliceInMilli = DEFAULT_PROCESSOR_TIMESLICE;
@@ -52,6 +53,7 @@
 	private int maxActivePlans = DEFAULT_MAX_ACTIVE_PLANS;
 	private CacheConfiguration resultsetCacheConfig;
 	private int maxODBCLobSizeAllowed = 5*1024*1024; // 5 MB
+    private int userRequestSourceConcurrency = DEFAULT_USER_REQUEST_SOURCE_CONCURRENCY;
 
 	@ManagementProperty(description="Max active plans (default 20).  Increase this value, and max threads, on highly concurrent systems - but ensure that the underlying pools can handle the increased load without timeouts.")
 	public int getMaxActivePlans() {
@@ -62,6 +64,18 @@
 		this.maxActivePlans = maxActivePlans;
 	}
 	
+	@ManagementProperty(description="Max source query concurrency per user request (default 0).  " +
+			"0 indicates use the default calculated value based on max active plans and max threads - approximately 2*(max threads)/(max active plans). " +
+			"1 forces serial execution in the processing thread, just as is done for a transactional request. " +
+			"Any number greater than 1 limits the maximum number of concurrently executing source requests accordingly.")
+	public int getUserRequestSourceConcurrency() {
+		return userRequestSourceConcurrency;
+	}
+	
+	public void setUserRequestSourceConcurrency(int userRequestSourceConcurrency) {
+		this.userRequestSourceConcurrency = userRequestSourceConcurrency;
+	}
+	
 	@ManagementProperty(description="Process pool maximum thread count. (default 64)")
 	public int getMaxThreads() {
 		return maxThreads;

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-03-11 17:39:21 UTC (rev 2990)
@@ -30,11 +30,10 @@
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 
 import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkEvent;
-import javax.resource.spi.work.WorkListener;
 import javax.transaction.xa.Xid;
 
 import org.teiid.adminapi.Admin;
@@ -83,72 +82,58 @@
  */
 public class DQPCore implements DQP {
 	
-	//TODO: replace with FutureTask
-	public final static class FutureWork<T> implements Work, WorkListener, PrioritizedRunnable {
-		private final Callable<T> toCall;
-		private ResultsFuture<T> result = new ResultsFuture<T>();
-		private ResultsReceiver<T> receiver = result.getResultsReceiver();
+	public interface CompletionListener<T> {
+		void onCompletion(FutureWork<T> future);
+	}
+	
+	public final static class FutureWork<T> extends FutureTask<T> implements PrioritizedRunnable, Work {
 		private int priority;
 		private long creationTime = System.currentTimeMillis();
 		private DQPWorkContext workContext = DQPWorkContext.getWorkContext();
+		private List<CompletionListener<T>> completionListeners = new LinkedList<CompletionListener<T>>();
 
-		public FutureWork(Callable<T> processor, int priority) {
-			this.toCall = processor;
+		public FutureWork(final Callable<T> processor, int priority) {
+			super(processor);
 			this.priority = priority;
 		}
 		
-		public ResultsFuture<T> getResult() {
-			return result;
+		public FutureWork(final Runnable processor, T result, int priority) {
+			super(processor, result);
+			this.priority = priority;
 		}
 		
 		@Override
-		public void run() {
-			try {
-				receiver.receiveResults(toCall.call());
-			} catch (Throwable t) {
-				receiver.exceptionOccurred(t);
-			}
+		public int getPriority() {
+			return priority;
 		}
-
+		
 		@Override
-		public void release() {
-			
+		public long getCreationTime() {
+			return creationTime;
 		}
 		
 		@Override
-		public void workAccepted(WorkEvent arg0) {
-			
+		public DQPWorkContext getDqpWorkContext() {
+			return workContext;
 		}
 		
 		@Override
-		public void workCompleted(WorkEvent arg0) {
+		public void release() {
 			
 		}
 		
-		@Override
-		public void workRejected(WorkEvent arg0) {
-			receiver.exceptionOccurred(arg0.getException());
+		void addCompletionListener(CompletionListener<T> completionListener) {
+			this.completionListeners.add(completionListener);
 		}
 		
 		@Override
-		public void workStarted(WorkEvent arg0) {
-			
+		protected void done() {
+			for (CompletionListener<T> listener : this.completionListeners) {
+				listener.onCompletion(this);
+			}
+			completionListeners.clear();
 		}
-
-		@Override
-		public int getPriority() {
-			return priority;
-		}
 		
-		@Override
-		public long getCreationTime() {
-			return creationTime;
-		}
-		
-		@Override
-		public DQPWorkContext getDqpWorkContext() {
-			return workContext;
-		}
 	}	
 	
 	static class ClientState {
@@ -200,6 +185,7 @@
     
     private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS;
     private int currentlyActivePlans;
+    private int userRequestSourceConcurrency;
     private LinkedList<RequestWorkItem> waitingPlans = new LinkedList<RequestWorkItem>();
     private CacheFactory cacheFactory;
 
@@ -690,9 +676,19 @@
         prepPlanCache = new SessionAwareCache<PreparedPlan>(this.cacheFactory, SessionAwareCache.Type.PREPAREDPLAN,  new CacheConfiguration(Policy.LRU, 60*60*8, config.getPreparedPlanCacheMaxCount(), "PreparedCache")); //$NON-NLS-1$
         prepPlanCache.setBufferManager(this.bufferManager);
 		
-        
         this.processWorkerPool = new ThreadReuseExecutor(DQPConfiguration.PROCESS_PLAN_QUEUE_NAME, config.getMaxThreads());
+        this.maxActivePlans = config.getMaxActivePlans();
         
+        if (this.maxActivePlans > config.getMaxThreads()) {
+        	LogManager.logWarning(LogConstants.CTX_DQP, QueryPlugin.Util.getString("DQPCore.invalid_max_active_plan", this.maxActivePlans, config.getMaxThreads())); //$NON-NLS-1$
+        	this.maxActivePlans = config.getMaxThreads();
+        }
+        
+        this.userRequestSourceConcurrency = config.getUserRequestSourceConcurrency();
+        if (this.userRequestSourceConcurrency < 1) {
+        	this.userRequestSourceConcurrency = Math.min(config.getMaxThreads(), 2*config.getMaxThreads()/this.maxActivePlans);
+        }
+        
         if (cacheFactory.isReplicated()) {
         	matTables = new SessionAwareCache<CachedResults>(this.cacheFactory, SessionAwareCache.Type.RESULTSET, new CacheConfiguration(Policy.EXPIRATION, -1, -1, "MaterilizationTables")); //$NON-NLS-1$
         	matTables.setBufferManager(this.bufferManager);
@@ -786,10 +782,23 @@
 		return addWork(processor, 10);
 	}
 
-	<T> ResultsFuture<T> addWork(Callable<T> processor, int priority) {
-		FutureWork<T> work = new FutureWork<T>(processor, priority);
+	<T> ResultsFuture<T> addWork(final Callable<T> processor, int priority) {
+		final ResultsFuture<T> result = new ResultsFuture<T>();
+		final ResultsReceiver<T> receiver = result.getResultsReceiver();
+		Runnable r = new Runnable() {
+
+			@Override
+			public void run() {
+				try {
+					receiver.receiveResults(processor.call());
+				} catch (Throwable t) {
+					receiver.exceptionOccurred(t);
+				}
+			}
+		};
+		FutureWork<T> work = new FutureWork<T>(r, null, priority);
 		this.addWork(work);
-		return work.getResult();
+		return result;
 	}
 	
 	// global txn
@@ -852,4 +861,16 @@
 		this.cacheFactory = factory;
 	}
 	
+	public int getUserRequestSourceConcurrency() {
+		return userRequestSourceConcurrency;
+	}
+	
+	void setUserRequestSourceConcurrency(int userRequestSourceConcurrency) {
+		this.userRequestSourceConcurrency = userRequestSourceConcurrency;
+	}
+	
+	public int getMaxActivePlans() {
+		return maxActivePlans;
+	}
+	
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2011-03-11 17:39:21 UTC (rev 2990)
@@ -30,13 +30,11 @@
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 
 import org.teiid.adminapi.impl.ModelMetaData;
 import org.teiid.adminapi.impl.VDBMetaData;
 import org.teiid.api.exception.query.QueryMetadataException;
 import org.teiid.client.RequestMessage;
-import org.teiid.client.util.ResultsFuture;
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.TupleSource;
@@ -360,6 +358,7 @@
         aqr.setMaxResultRows(requestMgr.getMaxSourceRows());
         aqr.setExceptionOnMaxRows(requestMgr.isExceptionOnMaxSourceRows());
         aqr.setPartialResults(request.supportsPartialResults());
+        aqr.setSerial(requestMgr.getUserRequestSourceConcurrency() == 1);
         if (nodeID >= 0) {
         	aqr.setTransactionContext(workItem.getTransactionContext());
         }
@@ -389,14 +388,6 @@
     	throw new UnsupportedOperationException();
     }
 
-    <T> ResultsFuture<T> addWork(Callable<T> callable, int priority) {
-    	return requestMgr.addWork(callable, priority);
-    }
-    
-    void scheduleWork(Runnable r, int priority, long delay) {
-    	requestMgr.scheduleWork(r, priority, delay);
-    }
-    
     BufferManager getBufferManager() {
 		return bufferService.getBufferManager();
 	}

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2011-03-11 17:39:21 UTC (rev 2990)
@@ -27,12 +27,12 @@
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.activation.DataSource;
 import javax.xml.transform.Source;
 
 import org.teiid.client.SourceWarning;
-import org.teiid.client.util.ResultsFuture;
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.FileStore;
 import org.teiid.common.buffer.TupleSource;
@@ -51,6 +51,7 @@
 import org.teiid.core.util.Assertion;
 import org.teiid.core.util.ObjectConverterUtil;
 import org.teiid.dqp.internal.datamgr.ConnectorWork;
+import org.teiid.dqp.internal.process.DQPCore.FutureWork;
 import org.teiid.dqp.message.AtomicRequestMessage;
 import org.teiid.dqp.message.AtomicResultsMessage;
 import org.teiid.query.processor.xml.XMLUtil;
@@ -86,13 +87,15 @@
     private int index;
     private int rowsProcessed;
     private AtomicResultsMessage arm;
-    private boolean closed;
+    private AtomicBoolean closed = new AtomicBoolean();
+    private volatile boolean canAsynchClose;
     private volatile boolean canceled;
+    private volatile boolean cancelAsynch;
     private boolean executed;
     private volatile boolean done;
     private boolean explicitClose;
     
-    private volatile ResultsFuture<AtomicResultsMessage> futureResult;
+    private volatile FutureWork<AtomicResultsMessage> futureResult;
     private volatile boolean running;
     
     public DataTierTupleSource(AtomicRequestMessage aqr, RequestWorkItem workItem, ConnectorWork cwi, DataTierManagerImpl dtm, int limit) {
@@ -114,23 +117,19 @@
         
     	Assertion.isNull(workItem.getConnectorRequest(aqr.getAtomicRequestID()));
         workItem.addConnectorRequest(aqr.getAtomicRequestID(), this);
-        if (!aqr.isTransactional()) {
+        if (!aqr.isSerial()) {
         	addWork();
         }
     }
 
 	private void addWork() {
-		futureResult = dtm.addWork(new Callable<AtomicResultsMessage>() {
+		this.canAsynchClose = true;
+		futureResult = workItem.addWork(new Callable<AtomicResultsMessage>() {
 			@Override
 			public AtomicResultsMessage call() throws Exception {
 				return getResults();
 			}
 		}, 100);
-		futureResult.addCompletionListener(new ResultsFuture.CompletionListener<AtomicResultsMessage>() {
-			public void onCompletion(ResultsFuture<AtomicResultsMessage> future) {
-				workItem.moreWork();
-			}
-		});
 	}
 
 	private List correctTypes(List row) throws TransformationException {
@@ -208,7 +207,7 @@
     		if (arm == null) {
     			AtomicResultsMessage results = null;
     			try {
-	    			if (futureResult != null || !aqr.isTransactional()) {
+	    			if (futureResult != null || !aqr.isSerial()) {
 	    				results = asynchGet();
 	    			} else {
 	    				results = getResults();
@@ -216,7 +215,7 @@
     			} catch (TranslatorException e) {
     				results = exceptionOccurred(e, true);
     			} catch (DataNotAvailableException e) {
-    				dtm.scheduleWork(new Runnable() {
+    				workItem.scheduleWork(new Runnable() {
     					@Override
     					public void run() {
 							workItem.moreWork();
@@ -250,7 +249,7 @@
 		if (!futureResult.isDone()) {
 			throw BlockedException.INSTANCE;
 		}
-		ResultsFuture<AtomicResultsMessage> currentResults = futureResult;
+		FutureWork<AtomicResultsMessage> currentResults = futureResult;
 		futureResult = null;
 		AtomicResultsMessage results = null;
 		try {
@@ -284,6 +283,9 @@
 			TranslatorException {
 		AtomicResultsMessage results = null;
 		try {
+			if (cancelAsynch) {
+				return null;
+			}
 			running = true;
 			if (!executed) {
 				results = cwi.execute();
@@ -292,13 +294,20 @@
 				results = cwi.more();
 			}
 		} finally {
+			if (!cancelAsynch) {
+				workItem.moreWork();
+			}
+			canAsynchClose = false;
+			if (closed.get()) {
+				cwi.close();
+			}
 			running = false;
 		}
 		return results;
 	}
     
     public boolean isQueued() {
-    	ResultsFuture<AtomicResultsMessage> future = futureResult;
+    	FutureWork<AtomicResultsMessage> future = futureResult;
     	return !running && future != null && !future.isDone();
     }
 
@@ -311,32 +320,20 @@
 	}
     
     public void fullyCloseSource() {
-    	if (!closed) {
-    		if (cwi != null) {
-		    	workItem.closeAtomicRequest(this.aqr.getAtomicRequestID());
-		    	if (!aqr.isTransactional()) {
-		    		if (futureResult != null && !futureResult.isDone()) {
-		    			futureResult.addCompletionListener(new ResultsFuture.CompletionListener<AtomicResultsMessage>() {
-		    				@Override
-		    				public void onCompletion(
-		    						ResultsFuture<AtomicResultsMessage> future) {
-		    					cwi.close(); // there is a small chance that this will be done in the processing thread
-		    				}
-						});
-		    		} else {
-		    			dtm.addWork(new Callable<Void>() {
-		    				@Override
-		    				public Void call() throws Exception {
-		    					cwi.close();
-		    					return null;
-		    				}
-		    			}, 0);
-		    		}
-		    	} else {
-		    		this.cwi.close();
-		    	}
-    		}
-			closed = true;
+		cancelAsynch = true;
+    	if (closed.compareAndSet(false, true)) {
+	    	workItem.closeAtomicRequest(this.aqr.getAtomicRequestID());
+	    	if (aqr.isSerial()) {
+	    		this.cwi.close();
+	    	} else if (!canAsynchClose) {
+	    		workItem.addHighPriorityWork(new Callable<Void>() {
+    				@Override
+    				public Void call() throws Exception {
+    					cwi.close();
+    					return null;
+    				}
+    			});
+	    	}
     	}
     }
     
@@ -346,15 +343,14 @@
     
     public void cancelRequest() {
     	this.canceled = true;
-    	if (this.cwi != null) {
-    		this.cwi.cancel();
-    	}
+		this.cwi.cancel();
     }
 
     /**
      * @see TupleSource#closeSource()
      */
     public void closeSource() {
+    	cancelAsynch = true;
     	if (!explicitClose) {
         	fullyCloseSource();
     	}

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-03-11 17:39:21 UTC (rev 2990)
@@ -28,6 +28,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.teiid.client.RequestMessage;
@@ -46,6 +47,7 @@
 import org.teiid.core.TeiidException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.types.DataTypeManager;
+import org.teiid.dqp.internal.process.DQPCore.FutureWork;
 import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
 import org.teiid.dqp.internal.process.ThreadReuseExecutor.PrioritizedRunnable;
 import org.teiid.dqp.message.AtomicRequestID;
@@ -71,12 +73,45 @@
 
 public class RequestWorkItem extends AbstractWorkItem implements PrioritizedRunnable {
 	
+	private final class WorkWrapper<T> implements
+			DQPCore.CompletionListener<T> {
+		
+		boolean submitted;
+		FutureWork<T> work;
+		
+		public WorkWrapper(FutureWork<T> work) {
+			this.work = work;
+		}
+
+		@Override
+		public void onCompletion(FutureWork<T> future) {
+			WorkWrapper<?> nextWork = null;
+			synchronized (queue) {
+				if (!submitted) {
+					return;
+				}
+				nextWork = queue.pollFirst();
+				if (nextWork == null) {
+					totalThreads--;
+				} else {
+					nextWork.submitted = true;
+				}
+			}
+			if (nextWork != null) {
+				dqpCore.addWork(nextWork.work);
+			}    		
+		}
+	}
+
 	private enum ProcessingState {NEW, PROCESSING, CLOSE}
 	private ProcessingState state = ProcessingState.NEW;
     
 	private enum TransactionState {NONE, ACTIVE, DONE}
 	private TransactionState transactionState = TransactionState.NONE;
 	
+	private int totalThreads;
+	private LinkedList<WorkWrapper<?>> queue = new LinkedList<WorkWrapper<?>>();
+	
 	/*
 	 * Obtained at construction time 
 	 */
@@ -764,6 +799,33 @@
 	@Override
 	public long getCreationTime() {
 		return processingTimestamp;
+	}	
+	
+	<T> FutureWork<T> addHighPriorityWork(Callable<T> callable) {
+		FutureWork<T> work = new FutureWork<T>(callable, PrioritizedRunnable.NO_WAIT_PRIORITY);
+		dqpCore.addWork(work);
+		return work;
 	}
+	
+    <T> FutureWork<T> addWork(Callable<T> callable, int priority) {
+    	FutureWork<T> work = new FutureWork<T>(callable, priority);
+    	WorkWrapper<T> wl = new WorkWrapper<T>(work);
+    	work.addCompletionListener(wl);
+    	synchronized (queue) {
+        	if (totalThreads < dqpCore.getUserRequestSourceConcurrency()) {
+        		dqpCore.addWork(work);
+        		totalThreads++;
+        		wl.submitted = true;
+        	} else {
+    	    	queue.add(wl);
+    	    	LogManager.logDetail(LogConstants.CTX_DQP, this.requestID, " reached max source concurrency of ", dqpCore.getUserRequestSourceConcurrency()); //$NON-NLS-1$
+        	}
+    	}
+    	return work;
+    }
+    
+    void scheduleWork(Runnable r, int priority, long delay) {
+    	dqpCore.scheduleWork(r, priority, delay);
+    }
 
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java	2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java	2011-03-11 17:39:21 UTC (rev 2990)
@@ -74,6 +74,8 @@
 	
 	public interface PrioritizedRunnable extends Runnable {
 		
+		final static int NO_WAIT_PRIORITY = 0;
+		
 		int getPriority();
 		
 		long getCreationTime();

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java	2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/TransactionServerImpl.java	2011-03-11 17:39:21 UTC (rev 2990)
@@ -248,7 +248,7 @@
 							}
 						}, 0);
 						workManager.doWork(work, WorkManager.INDEFINITE, tc, null);
-						tc.setTransaction(work.getResult().get());
+						tc.setTransaction(work.get());
 					}
 				} catch (NotSupportedException e) {
 					throw new XATransactionException(e, XAException.XAER_INVAL);

Modified: trunk/engine/src/main/java/org/teiid/dqp/message/AtomicRequestMessage.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/message/AtomicRequestMessage.java	2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/dqp/message/AtomicRequestMessage.java	2011-03-11 17:39:21 UTC (rev 2990)
@@ -81,6 +81,8 @@
     private boolean exceptionOnMaxRows;
     private int maxRows;
     
+    private boolean serial;
+    
     private DQPWorkContext workContext;
     
     public AtomicRequestMessage() {
@@ -137,6 +139,14 @@
     public void setTransactionContext(TransactionContext context) {
         txnContext = context;
     }
+    
+    public boolean isSerial() {
+    	return serial || isTransactional();
+    }
+    
+    public void setSerial(boolean serial) {
+		this.serial = serial;
+	}
 
     public boolean isTransactional(){
         return this.txnContext != null && this.txnContext.getTransactionType() != Scope.NONE;

Modified: trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java	2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java	2011-03-11 17:39:21 UTC (rev 2990)
@@ -569,17 +569,7 @@
                 }
 
             } else { // value is null
-
-                switch(criteria.getPredicateQuantifier()) {
-                    case SubqueryCompareCriteria.ALL:
-                        // null counts as unknown; one unknown means the whole thing is unknown
-                        return null;
-                    case SubqueryCompareCriteria.SOME:
-                        result = null;
-                        break;
-                    default:
-                        throw new ExpressionEvaluationException("ERR.015.006.0057", QueryPlugin.Util.getString("ERR.015.006.0057", criteria.getPredicateQuantifier())); //$NON-NLS-1$ //$NON-NLS-2$
-                }
+                result = null;
             }
 
 

Modified: trunk/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- trunk/engine/src/main/resources/org/teiid/query/i18n.properties	2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/main/resources/org/teiid/query/i18n.properties	2011-03-11 17:39:21 UTC (rev 2990)
@@ -839,6 +839,7 @@
 DQPCore.The_request_has_been_closed.=The request {0} has been closed.
 DQPCore.The_atomic_request_has_been_cancelled=The atomic request {0} has been canceled.
 DQPCore.failed_to_cancel=Failed to Cancel request, as request already finished processing
+DQPCore.invalid_max_active_plan=The maxActivePlan {0} setting should never be greater than the max processing threads {1}.
 
 ProcessWorker.failed_rollback=Failed to properly rollback autowrap transaction properly
 ProcessWorker.error=Unexpected exception for request {0}

Modified: trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java	2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java	2011-03-11 17:39:21 UTC (rev 2990)
@@ -215,10 +215,10 @@
     	synchronized (pool) {
         	pool.notifyAll();
 		}
-    	work1.getResult().get();
-    	work2.getResult().get();
-    	work3.getResult().get();
-    	work4.getResult().get();
+    	work1.get();
+    	work2.get();
+    	work3.get();
+    	work4.get();
     	assertEquals(Integer.valueOf(3), order.remove());
     	assertEquals(Integer.valueOf(2), order.remove());
     	assertEquals(Integer.valueOf(4), order.remove());

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-03-11 17:39:21 UTC (rev 2990)
@@ -49,7 +49,7 @@
 import org.teiid.query.optimizer.capabilities.SourceCapabilities.Capability;
 import org.teiid.query.unittest.FakeMetadataFactory;
 
-
+ at SuppressWarnings("nls")
 public class TestDQPCore {
 
     private DQPCore core;
@@ -70,7 +70,10 @@
         core.setCacheFactory(new DefaultCacheFactory());
         core.setTransactionService(new FakeTransactionService());
         
-        core.start(new DQPConfiguration());
+        DQPConfiguration config = new DQPConfiguration();
+        config.setMaxActivePlans(1);
+        config.setUserRequestSourceConcurrency(2);
+        core.start(config);
     }
     
     @After public void tearDown() throws Exception {
@@ -86,6 +89,11 @@
         msg.setExecutionId(100);
         return msg;
     }
+    
+    @Test public void testConfigurationSets() {
+    	assertEquals(1, core.getMaxActivePlans());
+    	assertEquals(2, core.getUserRequestSourceConcurrency());
+    }
 
     @Test public void testRequest1() throws Exception {
     	helpExecute("SELECT IntKey FROM BQT1.SmallA", "a"); //$NON-NLS-1$ //$NON-NLS-2$
@@ -272,6 +280,35 @@
         assertEquals(100, item.resultsBuffer.getRowCount());
     }
     
+    @Test public void testSourceConcurrency() throws Exception {
+    	//setup default of 2
+    	agds.setSleep(100);
+    	StringBuffer sql = new StringBuffer();
+    	int branches = 20;
+    	for (int i = 0; i < branches; i++) {
+    		if (i > 0) {
+    			sql.append(" union all ");
+    		}
+    		sql.append("select intkey || " + i + " from bqt1.smalla");
+    	}
+    	sql.append(" limit 2");
+    	helpExecute(sql.toString(), "a");
+    	//there's isn't a hard guarantee that only two requests will get started
+    	assertTrue(agds.getExecuteCount().get() <= 6);
+    	
+    	//20 concurrent
+    	core.setUserRequestSourceConcurrency(20);
+    	agds.getExecuteCount().set(0);
+    	helpExecute(sql.toString(), "a");
+    	assertEquals(20, agds.getExecuteCount().get());
+    	
+    	//serial
+    	core.setUserRequestSourceConcurrency(1);
+    	agds.getExecuteCount().set(0);
+    	helpExecute(sql.toString(), "a");
+    	assertEquals(1, agds.getExecuteCount().get());
+    }
+    
 	public void helpTestVisibilityFails(String sql) throws Exception {
         RequestMessage reqMsg = exampleRequestMessage(sql); 
         reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);
@@ -295,7 +332,7 @@
 
         Future<ResultsMessage> message = core.executeRequest(reqMsg.getExecutionId(), reqMsg);
         assertNotNull(core.getClientState(String.valueOf(sessionid), false));
-        ResultsMessage results = message.get(5000, TimeUnit.MILLISECONDS);
+        ResultsMessage results = message.get(500000, TimeUnit.MILLISECONDS);
         core.terminateSession(String.valueOf(sessionid));
         assertNull(core.getClientState(String.valueOf(sessionid), false));
         if (results.getException() != null) {

Modified: trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java	2011-03-11 05:26:32 UTC (rev 2989)
+++ trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java	2011-03-11 17:39:21 UTC (rev 2990)
@@ -27,6 +27,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.types.DataTypeManager;
@@ -53,12 +54,20 @@
     private SourceCapabilities caps;
 	public boolean throwExceptionOnExecute;
 	public int dataNotAvailable = -1;
+	public int sleep;
+    private final AtomicInteger executeCount = new AtomicInteger();
+    private final AtomicInteger closeCount = new AtomicInteger();
+
     
     public AutoGenDataService() {
     	super("FakeConnector","FakeConnector"); //$NON-NLS-1$ //$NON-NLS-2$
         caps = TestOptimizer.getTypicalCapabilities();
     }
     
+    public void setSleep(int sleep) {
+		this.sleep = sleep;
+	}
+    
     public void setCaps(SourceCapabilities caps) {
 		this.caps = caps;
 	}
@@ -88,6 +97,14 @@
 			
 			@Override
 			public AtomicResultsMessage execute() throws TranslatorException {
+				executeCount.incrementAndGet();
+				if (sleep > 0) {
+					try {
+						Thread.sleep(sleep);
+					} catch (InterruptedException e) {
+						throw new RuntimeException(e);
+					}
+				}
 				if (throwExceptionOnExecute) {
 		    		throw new TranslatorException("Connector Exception"); //$NON-NLS-1$
 		    	}
@@ -101,7 +118,7 @@
 			
 			@Override
 			public void close() {
-				
+				closeCount.incrementAndGet();
 			}
 			
 			@Override
@@ -112,6 +129,14 @@
 		};
     }
     
+    public AtomicInteger getExecuteCount() {
+		return executeCount;
+	}
+    
+    public AtomicInteger getCloseCount() {
+		return closeCount;
+	}
+    
     private List[] createResults(List symbols) {
         List[] rows = new List[this.rows];
 



More information about the teiid-commits mailing list