[teiid-commits] teiid SVN: r2333 - in trunk: build/kits/jboss-container/deployers/teiid.deployer and 14 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Mon Jul 12 16:59:03 EDT 2010


Author: shawkins
Date: 2010-07-12 16:59:02 -0400 (Mon, 12 Jul 2010)
New Revision: 2333

Added:
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java
   trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java
Removed:
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/StatsCapturingWorkManager.java
   trunk/engine/src/test/java/org/teiid/common/queue/TestStatsCapturingWorkManager.java
Modified:
   trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
   trunk/build/kits/jboss-container/deployers/teiid.deployer/teiid-deployer-jboss-beans.xml
   trunk/common-core/src/main/resources/org/teiid/core/i18n.properties
   trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/CapabilitiesConverter.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManagerRepository.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
   trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
   trunk/engine/src/main/resources/org/teiid/query/i18n.properties
   trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
   trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
   trunk/engine/src/test/java/org/teiid/query/processor/relational/TestUnionAllNode.java
   trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java
   trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
   trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
Log:
TEIID-1151 re-adding support for multi-threaded source queries.  also fixing partial results and issameconnector check

Modified: trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml	2010-07-12 20:59:02 UTC (rev 2333)
@@ -75,8 +75,10 @@
         <property name="securityHelper"><inject bean="SecurityHelper"/></property>
         <property name="VDBRepository"><inject bean="VDBRepository"/></property>
         
-        <!-- Process pool maximum thread count. (default 32) Increase this value if the system's available processors is larger than 8. -->
-        <property name="maxThreads">32</property>
+        <!-- Process pool maximum thread count. (default 64) -->
+        <property name="maxThreads">64</property>
+        <!-- Max active plans (default 20).  Increase this value on highly concurrent systems - but ensure that the underlying pools can handle the increased load without timeouts. -->
+        <property name="maxActivePlans">20</property>
         <!-- Query processor time slice, in milliseconds. (default 2000) -->
         <property name="timeSliceInMilli">2000</property>
         <!-- Maximum allowed fetch size, set via JDBC. User requested value ignored above this value. (default 20480) -->

Modified: trunk/build/kits/jboss-container/deployers/teiid.deployer/teiid-deployer-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deployers/teiid.deployer/teiid-deployer-jboss-beans.xml	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/build/kits/jboss-container/deployers/teiid.deployer/teiid-deployer-jboss-beans.xml	2010-07-12 20:59:02 UTC (rev 2333)
@@ -57,7 +57,6 @@
     </bean>      
              
     <bean name="ConnectionFactoryDeployer" class="org.teiid.jboss.deployers.ConnectionFactoryDeployer">
-        <property name="connectorManagerRepository"><inject bean="ConnectorManagerRepository"/></property>
         <property name="VDBStatusChecker"><inject bean="VDBStatusChecker"/></property>
     </bean>
         

Modified: trunk/common-core/src/main/resources/org/teiid/core/i18n.properties
===================================================================
--- trunk/common-core/src/main/resources/org/teiid/core/i18n.properties	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/common-core/src/main/resources/org/teiid/core/i18n.properties	2010-07-12 20:59:02 UTC (rev 2333)
@@ -328,9 +328,6 @@
 Streamable.isNUll=Streamable object argument can not be null
 Streamable.InvalidReference=Streamable contents are not available, use the Streaming interface to get the contents.
 
-WorkerPool.New_thread=Created worker thread "{0}".
-WorkerPool.uncaughtException=Uncaught exception processing work
-
 FloatToBooleanTransform.Failed_transform=Failed to transform Float to Boolean.  Expected 0 or 1 for {0}
 NullToAnyTransform.Invalid_value=Invalid value for type {0}: {1} of type {2}
 ObjectToAnyTransform.Invalid_value=Invalid conversion from type {0} with value ''{2}'' to type {1}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -23,12 +23,11 @@
 package org.teiid.common.buffer;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.teiid.core.TeiidComponentException;
@@ -295,7 +294,7 @@
             		lob.setReferenceStreamId(id);
             	}
             	if (this.lobReferences == null) {
-            		this.lobReferences = Collections.synchronizedMap(new HashMap<String, Streamable<?>>());
+            		this.lobReferences = new ConcurrentHashMap<String, Streamable<?>>();
             	}
             	this.lobReferences.put(id, lob);
                 if (lob.getReference() == null) {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/CapabilitiesConverter.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/CapabilitiesConverter.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/CapabilitiesConverter.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -44,7 +44,7 @@
         return convertCapabilities(srcCaps, null);
     }
     
-    public static BasicSourceCapabilities convertCapabilities(ExecutionFactory srcCaps, String connectorID) {
+    public static BasicSourceCapabilities convertCapabilities(ExecutionFactory srcCaps, Object connectorID) {
         BasicSourceCapabilities tgtCaps = new BasicSourceCapabilities();
         
         tgtCaps.setCapabilitySupport(Capability.QUERY_SELECT_EXPRESSION, srcCaps.supportsSelectExpression());

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManager.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -26,21 +26,18 @@
  */
 package org.teiid.dqp.internal.datamgr.impl;
 
-import java.util.LinkedList;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
 
-import org.teiid.common.buffer.BlockedException;
+import org.teiid.core.TeiidComponentException;
 import org.teiid.core.util.Assertion;
 import org.teiid.dqp.DQPPlugin;
 import org.teiid.dqp.internal.cache.DQPContextCache;
-import org.teiid.dqp.internal.datamgr.impl.ConnectorWorkItem.PermitMode;
-import org.teiid.dqp.internal.process.AbstractWorkItem;
 import org.teiid.dqp.message.AtomicRequestID;
 import org.teiid.dqp.message.AtomicRequestMessage;
 import org.teiid.dqp.service.BufferService;
@@ -69,13 +66,8 @@
 	
 	private static final String JAVA_CONTEXT = "java:"; //$NON-NLS-1$
 
-	public static final int DEFAULT_MAX_THREADS = 20;
-	
-	private static AtomicInteger ID_SEQUENCE = new AtomicInteger();
-	
 	private String translatorName;
 	private String connectionName;
-	private String connectorId = String.valueOf(ID_SEQUENCE.getAndIncrement());
 	
     //services acquired in start
     private BufferService bufferService;
@@ -85,10 +77,6 @@
 	
 	private SourceCapabilities cachedCapabilities;
 	
-	private int currentConnections;
-	private int maxConnections = DEFAULT_MAX_THREADS;
-	private LinkedList<ConnectorWorkItem> queuedRequests = new LinkedList<ConnectorWorkItem>();
-	
 	private volatile boolean stopped;
 	private ExecutionFactory<Object, Object> executionFactory;
 	
@@ -115,24 +103,8 @@
     	return sb.toString();
     }
     
-    public synchronized void acquireConnectionLock(ConnectorWorkItem item) throws BlockedException {
-    	switch (item.getPermitMode()) {
-    	case NOT_ACQUIRED: 
-    		if (currentConnections < maxConnections) {
-	    		currentConnections++;
-	    		item.setPermitMode(PermitMode.ACQUIRED);
-	    		return;
-	    	}
-	    	queuedRequests.add(item);
-	    	item.setPermitMode(PermitMode.BLOCKED);	
-    	case BLOCKED:
-    		throw BlockedException.INSTANCE;
-    	}
-    }
-    
     public MetadataStore getMetadata(String modelName, Map<String, Datatype> datatypes, Properties importProperties) throws TranslatorException {
 		MetadataFactory factory = new MetadataFactory(modelName, datatypes, importProperties);
-		ExecutionFactory<Object, Object> executionFactory = getExecutionFactory();
 		Object connectionFactory = getConnectionFactory();
 		Object connection = executionFactory.getConnection(connectionFactory);
 		try {
@@ -143,26 +115,26 @@
 		return factory.getMetadataStore();
 	}    
     
-    public SourceCapabilities getCapabilities() throws TranslatorException {
+    public SourceCapabilities getCapabilities() throws TeiidComponentException {
     	if (cachedCapabilities != null) {
     		return cachedCapabilities;
     	}
 
 		checkStatus();
 		ExecutionFactory<Object, Object> translator = getExecutionFactory();
-		BasicSourceCapabilities resultCaps = CapabilitiesConverter.convertCapabilities(translator, this.connectorId);
+		BasicSourceCapabilities resultCaps = CapabilitiesConverter.convertCapabilities(translator, Arrays.asList(translatorName, connectionName));
 		resultCaps.setScope(Scope.SCOPE_GLOBAL);
 		cachedCapabilities = resultCaps;
 		return resultCaps;
     }
     
-    public ConnectorWork executeRequest(AtomicRequestMessage message, AbstractWorkItem awi) throws TranslatorException {
+    public ConnectorWork registerRequest(AtomicRequestMessage message) throws TeiidComponentException {
         // Set the connector ID to be used; if not already set. 
     	checkStatus();
     	AtomicRequestID atomicRequestId = message.getAtomicRequestID();
     	LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {atomicRequestId, "Create State"}); //$NON-NLS-1$
 
-    	ConnectorWorkItem item = new ConnectorWorkItem(message, awi, this);
+    	ConnectorWorkItem item = new ConnectorWorkItem(message, this);
         Assertion.isNull(requestStates.put(atomicRequestId, item), "State already existed"); //$NON-NLS-1$
         return item;
     }
@@ -178,17 +150,6 @@
     void removeState(AtomicRequestID id) {
     	LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {id, "Remove State"}); //$NON-NLS-1$
         ConnectorWorkItem cwi = requestStates.remove(id);
-        if (cwi != null && cwi.getPermitMode() == PermitMode.ACQUIRED) {
-        	synchronized (this) {
-	        	ConnectorWorkItem next = queuedRequests.pollFirst();
-	        	if (next == null) {
-	        		currentConnections--;
-	        		return;
-	        	}
-	        	next.setPermitMode(PermitMode.ACQUIRED);
-	        	next.getParent().moreWork();
-        	}
-        }
     }
 
     int size() {
@@ -257,13 +218,6 @@
      * @return the <code>ExecutionFactory</code>.
      */
 	protected ExecutionFactory<Object, Object> getExecutionFactory() {
-    	if (this.executionFactory == null) {
-	    	try {
-				InitialContext ic = new InitialContext();
-				return (ExecutionFactory<Object, Object>)ic.lookup(this.translatorName);
-			} catch (NamingException e) {
-			}
-    	}
 		return this.executionFactory;
     }
     
@@ -306,16 +260,12 @@
     	return null;
     }
     
-    private void checkStatus() throws TranslatorException {
+    private void checkStatus() throws TeiidComponentException {
     	if (stopped) {
-    		throw new TranslatorException(DQPPlugin.Util.getString("ConnectorManager.not_in_valid_state", this.translatorName)); //$NON-NLS-1$
+    		throw new TeiidComponentException(DQPPlugin.Util.getString("ConnectorManager.not_in_valid_state", this.translatorName)); //$NON-NLS-1$
     	}
     }
     
-    public void setMaxConnections(int value) {
-    	this.maxConnections = value;
-    }
-    
     public String getTranslatorName() {
     	return this.translatorName;
     }

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManagerRepository.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManagerRepository.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorManagerRepository.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -43,7 +43,7 @@
 	}
 	
 	public List<ConnectorManager> getConnectorManagers() {
-		return new ArrayList(this.repo.values());
+		return new ArrayList<ConnectorManager>(this.repo.values());
 	}
 	
 	

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWork.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -34,12 +34,10 @@
 
 	void cancel();
 
-	AtomicResultsMessage more() throws TranslatorException;
+	AtomicResultsMessage more() throws TranslatorException, BlockedException;
 
 	void close();
 
 	AtomicResultsMessage execute() throws TranslatorException, BlockedException;
 	
-	boolean isQueued();
-	
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -37,7 +37,6 @@
 import org.teiid.dqp.DQPPlugin;
 import org.teiid.dqp.internal.datamgr.language.LanguageBridgeFactory;
 import org.teiid.dqp.internal.datamgr.metadata.RuntimeMetadataImpl;
-import org.teiid.dqp.internal.process.AbstractWorkItem;
 import org.teiid.dqp.message.AtomicRequestID;
 import org.teiid.dqp.message.AtomicRequestMessage;
 import org.teiid.dqp.message.AtomicResultsMessage;
@@ -64,18 +63,12 @@
 
 public class ConnectorWorkItem implements ConnectorWork {
 	
-	enum PermitMode {
-		BLOCKED, ACQUIRED, NOT_ACQUIRED
-	}
-	
 	/* Permanent state members */
 	private AtomicRequestID id;
     private ConnectorManager manager;
     private AtomicRequestMessage requestMsg;
     private ExecutionFactory connector;
     private QueryMetadataInterface queryMetadata;
-    private PermitMode permitMode = PermitMode.NOT_ACQUIRED;
-    private AbstractWorkItem awi;
     
     /* Created on new request */
     private Object connection;
@@ -95,7 +88,7 @@
     
     private AtomicBoolean isCancelled = new AtomicBoolean();
     
-    ConnectorWorkItem(AtomicRequestMessage message, AbstractWorkItem awi, ConnectorManager manager) {
+    ConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager) {
         this.id = message.getAtomicRequestID();
         this.requestMsg = message;
         this.manager = manager;
@@ -118,30 +111,12 @@
     	this.queryMetadata = vdb.getAttachment(QueryMetadataInterface.class);
         this.queryMetadata = new TempMetadataAdapter(this.queryMetadata, new TempMetadataStore());
 		this.securityContext.setTransactional(requestMsg.isTransactional());
-        this.awi = awi;
     }
     
     public AtomicRequestID getId() {
 		return id;
 	}
     
-    public PermitMode getPermitMode() {
-		return permitMode;
-	}
-    
-    public void setPermitMode(PermitMode permitMode) {
-		this.permitMode = permitMode;
-	}
-    
-    public AbstractWorkItem getParent() {
-		return awi;
-	}
-    
-    @Override
-    public boolean isQueued() {
-    	return this.permitMode == PermitMode.BLOCKED;
-    }
-
     public void cancel() {
     	try {
             LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Processing CANCEL request"}); //$NON-NLS-1$
@@ -189,8 +164,6 @@
 		    LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Closed connection"}); //$NON-NLS-1$
         } 
     }
-
-
     
     private TranslatorException handleError(Throwable t) {
     	if (t instanceof DataNotAvailableException) {
@@ -224,10 +197,6 @@
     		throw new TranslatorException("Request canceled"); //$NON-NLS-1$
     	}
         
-        if (!this.securityContext.isTransactional()) {
-        	this.manager.acquireConnectionLock(this);
-        }
-
     	LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.requestMsg.getAtomicRequestID(), "Processing NEW request:", this.requestMsg.getCommand()}); //$NON-NLS-1$                                     
     	try {
 	    	this.connectionFactory = this.manager.getConnectionFactory();

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -37,8 +37,9 @@
     static final int DEFAULT_MAX_RESULTSET_CACHE_ENTRIES = 1024;
     static final int DEFAULT_QUERY_THRESHOLD = 600;
     static final String PROCESS_PLAN_QUEUE_NAME = "QueryProcessorQueue"; //$NON-NLS-1$
-    public static final int DEFAULT_MAX_PROCESS_WORKERS = 32;
+    public static final int DEFAULT_MAX_PROCESS_WORKERS = 64;
 	public static final int DEFAULT_MAX_SOURCE_ROWS = -1;
+	public static final int DEFAULT_MAX_ACTIVE_PLANS = 20;
     
 	private int maxThreads = DEFAULT_MAX_PROCESS_WORKERS;
 	private int timeSliceInMilli = DEFAULT_PROCESSOR_TIMESLICE;
@@ -55,8 +56,18 @@
 	private int queryThresholdInSecs = DEFAULT_QUERY_THRESHOLD;
 	private boolean exceptionOnMaxSourceRows = true;
 	private int maxSourceRows = -1;
+	private int maxActivePlans = DEFAULT_MAX_ACTIVE_PLANS;
+
+	@ManagementProperty(description="Max active plans (default 20).  Increase this value, and max threads, on highly concurrent systems - but ensure that the underlying pools can handle the increased load without timeouts.")
+	public int getMaxActivePlans() {
+		return maxActivePlans;
+	}
 	
-	@ManagementProperty(description="Process pool maximum thread count. (default 32) Increase this value if the system's available processors is larger than 8")
+	public void setMaxActivePlans(int maxActivePlans) {
+		this.maxActivePlans = maxActivePlans;
+	}
+	
+	@ManagementProperty(description="Process pool maximum thread count. (default 64)")
 	public int getMaxThreads() {
 		return maxThreads;
 	}

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -35,9 +35,7 @@
 
 import javax.resource.spi.work.Work;
 import javax.resource.spi.work.WorkEvent;
-import javax.resource.spi.work.WorkException;
 import javax.resource.spi.work.WorkListener;
-import javax.resource.spi.work.WorkManager;
 import javax.transaction.xa.Xid;
 
 import org.teiid.adminapi.Admin;
@@ -58,12 +56,11 @@
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
-import org.teiid.core.TeiidRuntimeException;
 import org.teiid.core.types.Streamable;
-import org.teiid.core.util.Assertion;
 import org.teiid.dqp.DQPPlugin;
 import org.teiid.dqp.internal.cache.DQPContextCache;
 import org.teiid.dqp.internal.datamgr.impl.ConnectorManagerRepository;
+import org.teiid.dqp.internal.process.ThreadReuseExecutor.PrioritizedRunnable;
 import org.teiid.dqp.message.AtomicRequestMessage;
 import org.teiid.dqp.message.RequestID;
 import org.teiid.dqp.service.BufferService;
@@ -84,15 +81,17 @@
  */
 public class DQPCore implements DQP {
 	
-	public final static class FutureWork<T> implements Work, WorkListener {
+	//TODO: replace with FutureTask
+	public final static class FutureWork<T> implements Work, WorkListener, PrioritizedRunnable {
 		private final Callable<T> toCall;
-		private DQPWorkContext workContext;
 		private ResultsFuture<T> result = new ResultsFuture<T>();
 		private ResultsReceiver<T> receiver = result.getResultsReceiver();
+		private int priority;
+		private long creationTime = System.currentTimeMillis();
 
-		public FutureWork(Callable<T> processor) {
-			this.workContext = DQPWorkContext.getWorkContext();
+		public FutureWork(Callable<T> processor, int priority) {
 			this.toCall = processor;
+			this.priority = priority;
 		}
 		
 		public ResultsFuture<T> getResult() {
@@ -102,7 +101,7 @@
 		@Override
 		public void run() {
 			try {
-				receiver.receiveResults(workContext.runInContext(toCall));
+				receiver.receiveResults(toCall.call());
 			} catch (Throwable t) {
 				receiver.exceptionOccurred(t);
 			}
@@ -132,6 +131,16 @@
 		public void workStarted(WorkEvent arg0) {
 			
 		}
+
+		@Override
+		public int getPriority() {
+			return priority;
+		}
+		
+		@Override
+		public long getCreationTime() {
+			return creationTime;
+		}
 	}	
 	
 	static class ClientState {
@@ -164,8 +173,7 @@
 		
 	}
 	
-    private WorkManager workManager;
-    private StatsCapturingWorkManager processWorkerPool;
+	private ThreadReuseExecutor processWorkerPool;
     
     // System properties for Code Table
     private int maxCodeTableRecords = DQPConfiguration.DEFAULT_MAX_CODE_TABLE_RECORDS;
@@ -197,6 +205,11 @@
 	private Map<String, ClientState> clientState = Collections.synchronizedMap(new HashMap<String, ClientState>());
 	private DQPContextCache contextCache;
     private boolean useEntitlements = false;
+    
+    private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS;
+    private int currentlyActivePlans;
+    private LinkedList<RequestWorkItem> waitingPlans = new LinkedList<RequestWorkItem>();
+    
     /**
      * perform a full shutdown and wait for 10 seconds for all threads to finish
      */
@@ -330,8 +343,13 @@
         RequestWorkItem workItem = new RequestWorkItem(this, requestMsg, request, resultsFuture.getResultsReceiver(), requestID, workContext);
     	logMMCommand(workItem, Event.NEW, null); 
         addRequest(requestID, workItem, state);
-        
-        this.addWork(workItem);      
+        synchronized (waitingPlans) {
+			if (currentlyActivePlans < maxActivePlans) {
+				startActivePlan(workItem);
+			} else {
+				waitingPlans.add(workItem);
+			}
+		}
         return resultsFuture;
     }
 	
@@ -351,8 +369,28 @@
 		this.requests.put(requestID, workItem);
 		state.addRequest(requestID);
 	}
+
+	private void startActivePlan(RequestWorkItem workItem) {
+		workItem.active = true;
+		this.addWork(workItem);
+		this.currentlyActivePlans++;
+	}
+	
+    void finishProcessing(final RequestWorkItem workItem) {
+    	synchronized (waitingPlans) {
+    		if (!workItem.active) {
+        		return;
+        	}
+        	workItem.active = false;
+    		currentlyActivePlans--;
+			if (!waitingPlans.isEmpty()) {
+				startActivePlan(waitingPlans.remove());
+			}
+		}
+    }
     
     void removeRequest(final RequestWorkItem workItem) {
+    	finishProcessing(workItem);
     	this.requests.remove(workItem.requestID);
     	ClientState state = getClientState(workItem.getDqpWorkContext().getSessionId(), false);
     	if (state != null) {
@@ -361,38 +399,20 @@
     	contextCache.removeRequestScopedCache(workItem.requestID.toString());
     }
     
-    void addWork(Work work) {
-    	try {
-			this.processWorkerPool.scheduleWork(work);
-		} catch (WorkException e) {
-			//TODO: cancel? close?
-			throw new TeiidRuntimeException(e);
-		}
+    void addWork(Runnable work) {
+		this.processWorkerPool.execute(work);
     }
     
-    void scheduleWork(final RequestWorkItem work, long delay) {
-    	try {
-			this.processWorkerPool.scheduleWork(new Work() {
-				
-				@Override
-				public void run() {
-					work.moreWork();
-				}
-				
-				@Override
-				public void release() {
-					
-				}
-			}, null, delay);
-		} catch (WorkException e) {
-			throw new TeiidRuntimeException(e);
-		}
+    void scheduleWork(final Runnable r, int priority, long delay) {
+		this.processWorkerPool.schedule(new FutureWork<Void>(new Callable<Void>() {
+			@Override
+			public Void call() throws Exception {
+				r.run();
+				return null;
+			}
+		}, priority), delay, TimeUnit.MILLISECONDS);
     }
     
-    public void setWorkManager(WorkManager mgr) {
-    	this.workManager = mgr;
-    }
-    
 	public ResultsFuture<?> closeLobChunkStream(int lobRequestId,
 			long requestId, String streamId)
 			throws TeiidProcessingException {
@@ -446,13 +466,8 @@
     	return this.requests.get(processorID);
 	}
 	
-    /**
-     * Returns a list of QueueStats objects that represent the queues in
-     * this service.
-     * If there are no queues, an empty Collection is returned.
-     */
     public WorkerPoolStatisticsMetadata getWorkManagerStatistics() {
-        return processWorkerPool.getStats();
+    	return this.processWorkerPool.getStats();
     }
            
     public void terminateSession(String sessionId) {
@@ -627,9 +642,6 @@
 	}
 	
 	public void start(DQPConfiguration config) {
-		
-		Assertion.isNotNull(this.workManager);
-
 		this.processorTimeslice = config.getTimeSliceInMilli();
         this.maxFetchSize = config.getMaxRowsFetchSize();
         this.processorDebugAllowed = config.isProcessDebugAllowed();
@@ -658,7 +670,7 @@
         this.bufferManager = bufferService.getBufferManager();
         this.contextCache = bufferService.getContextCache();
 
-        this.processWorkerPool = new StatsCapturingWorkManager(DQPConfiguration.PROCESS_PLAN_QUEUE_NAME, config.getMaxThreads(), this.workManager);
+        this.processWorkerPool = new ThreadReuseExecutor(DQPConfiguration.PROCESS_PLAN_QUEUE_NAME, config.getMaxThreads());
         
         dataTierMgr = new DataTierManagerImpl(this,
                                             this.connectorManagerRepository,
@@ -706,7 +718,7 @@
 				return null;
 			}
 		};
-		return addWork(processor);
+		return addWork(processor, 0);
 	}
 	
 	// local txn
@@ -719,7 +731,7 @@
 				return null;
 			}
 		};
-		return addWork(processor);
+		return addWork(processor, 0);
 	}
 
 	// global txn
@@ -732,7 +744,7 @@
 				return null;
 			}
 		};
-		return addWork(processor);
+		return addWork(processor, 0);
 	}
 	// global txn
 	public ResultsFuture<?> end(XidImpl xid, int flags) throws XATransactionException {
@@ -756,16 +768,12 @@
 				return getTransactionService().prepare(workContext.getSessionId(),xid, workContext.getSession().isEmbedded());
 			}
 		};
-		return addWork(processor);
+		return addWork(processor, 10);
 	}
 
-	private <T> ResultsFuture<T> addWork(Callable<T> processor) {
-		FutureWork<T> work = new FutureWork<T>(processor);
-		try {
-			this.workManager.scheduleWork(work);
-		} catch (WorkException e) {
-			throw new TeiidRuntimeException(e);
-		}
+	<T> ResultsFuture<T> addWork(Callable<T> processor, int priority) {
+		FutureWork<T> work = new FutureWork<T>(processor, priority);
+		this.addWork(work);
 		return work.getResult();
 	}
 	
@@ -786,7 +794,7 @@
 				return null;
 			}
 		};
-		return addWork(processor);
+		return addWork(processor, 0);
 	}
 	// global txn
 	public ResultsFuture<?> start(final XidImpl xid, final int flags, final int timeout)
@@ -799,7 +807,7 @@
 				return null;
 			}
 		};
-		return addWork(processor);
+		return addWork(processor, 100);
 	}
 
 	public MetadataResult getMetadata(long requestID)

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -32,6 +32,8 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
 
 import javax.security.auth.Subject;
 
@@ -166,14 +168,24 @@
 		return session.getVdb();
 	}
 	
-	public <V> V runInContext(Callable<V> callable) throws Exception {
+	public <V> V runInContext(Callable<V> callable) throws Throwable {
+		FutureTask<V> task = new FutureTask<V>(callable);
+		runInContext(task);
+		try {
+			return task.get();
+		} catch (ExecutionException e) {
+			throw e.getCause();
+		}
+	}
+	
+	public void runInContext(final Runnable runnable) {
 		DQPWorkContext.setWorkContext(this);
 		boolean associated = false;
 		if (securityHelper != null && this.getSubject() != null) {
 			associated = securityHelper.assosiateSecurityContext(this.getSecurityDomain(), this.getSecurityContext());			
 		}
 		try {
-			return callable.call();
+			runnable.run();
 		} finally {
 			if (associated) {
 				securityHelper.clearSecurityContext(this.getSecurityDomain());			
@@ -181,19 +193,6 @@
 			DQPWorkContext.releaseWorkContext();
 		}
 	}
-	
-	public void runInContext(final Runnable runnable) {
-		try {
-			runInContext(new Callable<Void>() {
-				@Override
-				public Void call() {
-					runnable.run();
-					return null;
-				}
-			});
-		} catch (Exception e) {
-		}
-	}
 
 	public HashMap<String, DataPolicy> getAllowedDataPolicies() {
 		if (this.policies == null) {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -30,10 +30,12 @@
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 import org.teiid.adminapi.impl.ModelMetaData;
 import org.teiid.adminapi.impl.VDBMetaData;
 import org.teiid.client.RequestMessage;
+import org.teiid.client.util.ResultsFuture;
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.TupleBatch;
 import org.teiid.common.buffer.TupleSource;
@@ -77,9 +79,11 @@
 import org.teiid.query.sql.symbol.Constant;
 import org.teiid.query.sql.symbol.GroupSymbol;
 import org.teiid.query.util.CommandContext;
-import org.teiid.translator.TranslatorException;
 
-
+/**
+ * Full {@link ProcessorDataManager} implementation that 
+ * controls access to {@link ConnectorManager}s and handles system queries.
+ */
 public class DataTierManagerImpl implements ProcessorDataManager {
 	
 	private enum SystemTables {
@@ -130,7 +134,8 @@
 		}
 		
 		AtomicRequestMessage aqr = createRequest(processorId, command, modelName, connectorBindingId, nodeID);
-        return new DataTierTupleSource(aqr.getCommand().getProjectedSymbols(), aqr, this, aqr.getConnectorName(), workItem);
+		ConnectorWork work = getCM(aqr.getConnectorName()).registerRequest(aqr);
+        return new DataTierTupleSource(aqr, workItem, work, this);
 	}
 
 	/**
@@ -348,10 +353,6 @@
 		return aqr;
 	}
 	
-	ConnectorWork executeRequest(AtomicRequestMessage aqr, AbstractWorkItem awi, String connectorName) throws TranslatorException {
-		return getCM(connectorName).executeRequest(aqr, awi);
-	}
-
     /** 
      * Notify each waiting request that the code table data is now available.
      * @param requests
@@ -421,4 +422,12 @@
         this.codeTableCache.clearAll();
     }
     
+    <T> ResultsFuture<T> addWork(Callable<T> callable, int priority) {
+    	return requestMgr.addWork(callable, priority);
+    }
+    
+    void scheduleWork(Runnable r, int priority, long delay) {
+    	requestMgr.scheduleWork(r, priority, delay);
+    }
+    
 }

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -24,105 +24,178 @@
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 
 import org.teiid.client.SourceWarning;
+import org.teiid.client.util.ResultsFuture;
+import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.TupleSource;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.TeiidRuntimeException;
 import org.teiid.core.util.Assertion;
 import org.teiid.dqp.internal.datamgr.impl.ConnectorWork;
 import org.teiid.dqp.message.AtomicRequestMessage;
 import org.teiid.dqp.message.AtomicResultsMessage;
+import org.teiid.translator.DataNotAvailableException;
 import org.teiid.translator.TranslatorException;
 
 
 /**
  * This tuple source impl can only be used once; once it is closed, it 
  * cannot be reopened and reused.
+ * 
+ * TODO: the handling of DataNotAvailable is awkward.
+ * In the multi-threaded case we'd like to not even
+ * notify the parent plan and just schedule the next poll. 
  */
 public class DataTierTupleSource implements TupleSource {
 
     // Construction state
-    private final List schema;
     private final AtomicRequestMessage aqr;
-    private final DataTierManagerImpl dataMgr;
-    private final String connectorName;
     private final RequestWorkItem workItem;
+    private final ConnectorWork cwi;
+    private final DataTierManagerImpl dtm;
     
     // Data state
-    private ConnectorWork cwi;
     private int index;
     private int rowsProcessed;
     private volatile AtomicResultsMessage arm;
     private boolean closed;
     private volatile boolean canceled;
+    private boolean executed;
     
+    private volatile ResultsFuture<AtomicResultsMessage> futureResult;
     private volatile boolean running;
     
-    /**
-     * Constructor for DataTierTupleSource.
-     */
-    public DataTierTupleSource(List schema, AtomicRequestMessage aqr, DataTierManagerImpl dataMgr, String connectorName, RequestWorkItem workItem) {
-        this.schema = schema;       
+    public DataTierTupleSource(AtomicRequestMessage aqr, RequestWorkItem workItem, ConnectorWork cwi, DataTierManagerImpl dtm) {
         this.aqr = aqr;
-        this.dataMgr = dataMgr;
-        this.connectorName = connectorName;
         this.workItem = workItem;
+        this.cwi = cwi;
+        this.dtm = dtm;
+    	Assertion.isNull(workItem.getConnectorRequest(aqr.getAtomicRequestID()));
+        workItem.addConnectorRequest(aqr.getAtomicRequestID(), this);
+        if (!aqr.isTransactional()) {
+        	addWork();
+        }
     }
 
-    /**
-     * @see TupleSource#getSchema()
-     */
+	private void addWork() {
+		futureResult = dtm.addWork(new Callable<AtomicResultsMessage>() {
+			@Override
+			public AtomicResultsMessage call() throws Exception {
+				return getResults();
+			}
+		}, 100);
+		futureResult.addCompletionListener(new ResultsFuture.CompletionListener<AtomicResultsMessage>() {
+			public void onCompletion(ResultsFuture<AtomicResultsMessage> future) {
+				workItem.moreWork();
+			}
+		});
+	}
+
     public List getSchema() {
-        return this.schema;
+        return this.aqr.getCommand().getProjectedSymbols();
     }
 
-    public List nextTuple() throws TeiidComponentException, TeiidProcessingException {
-    	if (this.arm == null) {
-    		open();
-    	}
+    public List<?> nextTuple() throws TeiidComponentException, TeiidProcessingException {
     	while (true) {
+    		if (arm == null) {
+    			AtomicResultsMessage results = null;
+    			try {
+	    			if (futureResult != null || !aqr.isTransactional()) {
+	    				results = asynchGet();
+	    			} else {
+	    				results = getResults();
+	    			}
+    			} catch (TranslatorException e) {
+    				exceptionOccurred(e, true);
+    			} catch (DataNotAvailableException e) {
+    				dtm.scheduleWork(new Runnable() {
+    					@Override
+    					public void run() {
+							workItem.moreWork();
+    					}
+    				}, 10, e.getRetryDelay());
+    				throw BlockedException.INSTANCE;
+    			} 
+    			receiveResults(results);
+    		}
 	    	if (index < arm.getResults().length) {
 	            return this.arm.getResults()[index++];
 	        }
+	    	arm = null;
 	    	if (isDone()) {
 	    		return null;
 	    	}
-	    	try {
-	    		running = true;
-				receiveResults(this.cwi.more());
-			} catch (TranslatorException e) {
-	        	exceptionOccurred(e, true);
-			} finally {
-				running = false;
-			}
     	}
     }
+
+	private AtomicResultsMessage asynchGet()
+			throws BlockedException, TeiidProcessingException,
+			TeiidComponentException, TranslatorException {
+		if (futureResult == null) {
+			addWork();
+		}
+		if (!futureResult.isDone()) {
+			throw BlockedException.INSTANCE;
+		}
+		ResultsFuture<AtomicResultsMessage> currentResults = futureResult;
+		futureResult = null;
+		AtomicResultsMessage results = null;
+		try {
+			results = currentResults.get();
+			addWork();
+		} catch (InterruptedException e) {
+			throw new TeiidRuntimeException(e);
+		} catch (ExecutionException e) {
+			if (e.getCause() instanceof TeiidProcessingException) {
+				throw (TeiidProcessingException)e.getCause();
+			}
+			if (e.getCause() instanceof TeiidComponentException) {
+				throw (TeiidComponentException)e.getCause();
+			}
+			if (e.getCause() instanceof TranslatorException) {
+				throw (TranslatorException)e.getCause();
+			}
+			if (e.getCause() instanceof RuntimeException) {
+				throw (RuntimeException)e.getCause();
+			}
+			//shouldn't happen
+			throw new RuntimeException(e);
+		}
+		return results;
+	}
+
+	private AtomicResultsMessage getResults()
+			throws BlockedException, TeiidComponentException,
+			TranslatorException {
+		AtomicResultsMessage results = null;
+		try {
+			running = true;
+			if (!executed) {
+				results = cwi.execute();
+				executed = true;
+			} else {
+				results = cwi.more();
+			}
+		} finally {
+			running = false;
+		}
+		return results;
+	}
     
     public boolean isQueued() {
-    	return this.cwi != null && this.cwi.isQueued();
+    	ResultsFuture<AtomicResultsMessage> future = futureResult;
+    	return !running && future != null && !future.isDone();
     }
 
 	public boolean isDone() {
-		return this.arm != null && this.arm.getFinalRow() >= 0;
+		AtomicResultsMessage results = this.arm;
+		return results != null && results.getFinalRow() >= 0;
 	}
     
-    void open() throws TeiidComponentException, TeiidProcessingException {
-        try {
-	        if (this.cwi == null) {
-	        	this.cwi = this.dataMgr.executeRequest(aqr, this.workItem, this.connectorName);
-	        	Assertion.isNull(workItem.getConnectorRequest(aqr.getAtomicRequestID()));
-	            workItem.addConnectorRequest(aqr.getAtomicRequestID(), this);
-	        }
-	        running = true;
-	        receiveResults(this.cwi.execute());
-        } catch (TranslatorException e) {
-        	exceptionOccurred(e, true);
-        } finally {
-        	running = false;
-        }
-    }
-    
     public boolean isRunning() {
 		return running;
 	}
@@ -131,7 +204,27 @@
     	if (!closed) {
     		if (cwi != null) {
 		    	workItem.closeAtomicRequest(this.aqr.getAtomicRequestID());
-				this.cwi.close();
+		    	if (!aqr.isTransactional()) {
+		    		if (futureResult != null && !futureResult.isDone()) {
+		    			futureResult.addCompletionListener(new ResultsFuture.CompletionListener<AtomicResultsMessage>() {
+		    				@Override
+		    				public void onCompletion(
+		    						ResultsFuture<AtomicResultsMessage> future) {
+		    					cwi.close(); // there is a small chance that this will be done in the processing thread
+		    				}
+						});
+		    		} else {
+		    			dtm.addWork(new Callable<Void>() {
+		    				@Override
+		    				public Void call() throws Exception {
+		    					cwi.close();
+		    					return null;
+		    				}
+		    			}, 0);
+		    		}
+		    	} else {
+		    		this.cwi.close();
+		    	}
     		}
 			closed = true;
     	}
@@ -165,7 +258,7 @@
 			AtomicResultsMessage emptyResults = new AtomicResultsMessage(new List[0], null);
 			emptyResults.setWarnings(Arrays.asList((Exception)exception));
 			emptyResults.setFinalRow(this.rowsProcessed);
-			receiveResults(arm);
+			receiveResults(emptyResults);
 		} else {
     		if (exception.getCause() instanceof TeiidComponentException) {
     			throw (TeiidComponentException)exception.getCause();
@@ -194,14 +287,11 @@
 	}
 
 	public String getConnectorName() {
-		return this.connectorName;
+		return this.aqr.getConnectorName();
 	}
 	
 	public boolean isTransactional() {
-		if (this.arm == null) {
-			return false;
-		}
-		return this.arm.isTransactional();
+		return this.aqr.isTransactional();
 	}
 	
 	@Override

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -47,6 +47,7 @@
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.dqp.DQPPlugin;
 import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
+import org.teiid.dqp.internal.process.ThreadReuseExecutor.PrioritizedRunnable;
 import org.teiid.dqp.message.AtomicRequestID;
 import org.teiid.dqp.message.RequestID;
 import org.teiid.dqp.service.TransactionContext;
@@ -64,10 +65,8 @@
 import org.teiid.query.sql.lang.SPParameter;
 import org.teiid.query.sql.lang.StoredProcedure;
 import org.teiid.query.sql.symbol.SingleElementSymbol;
-import org.teiid.translator.DataNotAvailableException;
 
-
-public class RequestWorkItem extends AbstractWorkItem {
+public class RequestWorkItem extends AbstractWorkItem implements PrioritizedRunnable {
 	
 	private enum ProcessingState {NEW, PROCESSING, CLOSE}
 	private ProcessingState state = ProcessingState.NEW;
@@ -86,6 +85,7 @@
 	private CacheID cid;
 	private final TransactionService transactionService;
 	private final DQPWorkContext dqpWorkContext;
+	boolean active;
 	
     /*
      * obtained during new
@@ -105,16 +105,17 @@
     private Map<AtomicRequestID, DataTierTupleSource> connectorInfo = new ConcurrentHashMap<AtomicRequestID, DataTierTupleSource>(4);
     // This exception contains details of all the atomic requests that failed when query is run in partial results mode.
     private List<TeiidException> warnings = new LinkedList<TeiidException>();
-    private boolean doneProducingBatches;
+    private volatile boolean doneProducingBatches;
     private volatile boolean isClosed;
     private volatile boolean isCanceled;
     private volatile boolean closeRequested;
+
 	//results request
 	private ResultsReceiver<ResultsMessage> resultsReceiver;
 	private int begin;
 	private int end;
     private TupleBatch savedBatch;
-    private Map<Integer, LobWorkItem> lobStreams = new ConcurrentHashMap<Integer, LobWorkItem>(4);
+    private Map<Integer, LobWorkItem> lobStreams = new ConcurrentHashMap<Integer, LobWorkItem>(4);    
     
     /**The time when command begins processing on the server.*/
     private long processingTimestamp = System.currentTimeMillis();
@@ -155,7 +156,11 @@
 
 	@Override
 	protected void resumeProcessing() {
-		dqpCore.addWork(this);
+		if (doneProducingBatches && !closeRequested && !isCanceled) {
+			this.run(); // just run in the IO thread
+		} else {
+			dqpCore.addWork(this);
+		}
 	}
 	
 	@Override
@@ -184,9 +189,6 @@
         } catch (QueryProcessor.ExpiredTimeSliceException e) {
             LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread", requestID, "- time slice expired"); //$NON-NLS-1$ //$NON-NLS-2$
             this.moreWork();
-        } catch (DataNotAvailableException e) { 
-        	LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread", requestID, "- data not available"); //$NON-NLS-1$ //$NON-NLS-2$
-            this.dqpCore.scheduleWork(this, e.getRetryDelay());
         } catch (Throwable e) {
         	LogManager.logDetail(LogConstants.CTX_DQP, e, "Request Thread", requestID, "- error occurred"); //$NON-NLS-1$ //$NON-NLS-2$
             
@@ -227,7 +229,6 @@
 
 	private void resume() throws XATransactionException {
 		if (this.transactionState == TransactionState.ACTIVE && this.transactionContext.getTransaction() != null) {
-			//there's no need to do this for xa transactions, as that is done by the workmanager
 			this.transactionService.resume(this.transactionContext);
 		}
 	}
@@ -339,7 +340,7 @@
 				this.resultsBuffer = cr.getResults();
 				this.analysisRecord = cr.getAnalysisRecord();
 				this.originalCommand = cr.getCommand();
-				this.doneProducingBatches = true;
+				this.doneProducingBatches();
 				return;
 			}
 		}
@@ -357,7 +358,9 @@
 					super.flushBatchDirect(batch, add);
 					added = true;
 				}
-				doneProducingBatches = batch.getTerminationFlag();
+				if (batch.getTerminationFlag()) {
+					doneProducingBatches();
+				}
 				if (doneProducingBatches && cid != null) {
 			    	boolean sessionScope = processor.getContext().isSessionFunctionEvaluated();
 	            	CachedResults cr = new CachedResults();
@@ -381,7 +384,7 @@
 			this.transactionState = TransactionState.ACTIVE;
 		}
 		if (requestMsg.isNoExec()) {
-		    doneProducingBatches = true;
+		    doneProducingBatches();
             resultsBuffer.close();
             this.cid = null;
 		}
@@ -608,7 +611,9 @@
         	}
 		}
     	this.closeRequested = true;
-    	this.requestCancel(); //pending work should be canceled for fastest clean up
+    	if (!this.doneProducingBatches) {
+    		this.requestCancel(); //pending work should be canceled for fastest clean up
+    	}
     	this.moreWork();
     }
     
@@ -692,5 +697,20 @@
 			LogManager.logWarning(LogConstants.CTX_DQP, e, "Failed to cancel " + requestID); //$NON-NLS-1$
 		}
 	}
+
+	private void doneProducingBatches() {
+		this.doneProducingBatches = true;
+		dqpCore.finishProcessing(this);
+	}
 	
+	@Override
+	public int getPriority() {
+		return (closeRequested || isCanceled) ? 0 : 1000;
+	}
+	
+	@Override
+	public long getCreationTime() {
+		return processingTimestamp;
+	}
+
 }
\ No newline at end of file

Deleted: trunk/engine/src/main/java/org/teiid/dqp/internal/process/StatsCapturingWorkManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/StatsCapturingWorkManager.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/StatsCapturingWorkManager.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -1,294 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.dqp.internal.process;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.IdentityHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.resource.spi.work.ExecutionContext;
-import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkEvent;
-import javax.resource.spi.work.WorkException;
-import javax.resource.spi.work.WorkListener;
-import javax.resource.spi.work.WorkManager;
-import javax.resource.spi.work.WorkRejectedException;
-
-import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
-import org.teiid.core.util.NamedThreadFactory;
-import org.teiid.logging.LogConstants;
-import org.teiid.logging.LogManager;
-import org.teiid.logging.MessageLevel;
-import org.teiid.query.QueryPlugin;
-
-
-/**
- * StatsCapturingWorkManager acts as a wrapper to the passed in {@link WorkManager} to 
- * capture statistics and implement an unbounded queue of work.
- */
-public class StatsCapturingWorkManager {
-		
-	private static class WorkContext {
-		ExecutionContext context;
-		long startTimeout;
-		long submitted = System.currentTimeMillis();
-		
-		public WorkContext(ExecutionContext context, long startTimeout) {
-			this.context = context;
-			this.startTimeout = startTimeout;
-		}
-		
-		long getStartTimeout() {
-			if (startTimeout == 0) {
-				return 0;
-			}
-			return Math.max(1, startTimeout + submitted - System.currentTimeMillis());
-		}
-		
-	}
-	
-	private final class WorkWrapper implements Work {
-		private final Work work;
-		private final WorkContext workContext;
-		private final DQPWorkContext dqpWorkContext;
-
-		private WorkWrapper(Work work, WorkContext workContext, DQPWorkContext dqpWorkContext) {
-			this.work = work;
-			this.workContext = workContext;
-			this.dqpWorkContext = dqpWorkContext;
-		}
-
-		@Override
-		public void run() {
-			Thread t = Thread.currentThread();
-			synchronized (poolLock) {
-				threads.add(t);
-			}
-			String name = t.getName();
-			t.setName(name + "_" + poolName + threadCounter.getAndIncrement()); //$NON-NLS-1$
-			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_RUNTIME, MessageLevel.TRACE)) {
-				LogManager.logTrace(LogConstants.CTX_RUNTIME, "Beginning work with virtual worker", t.getName()); //$NON-NLS-1$ 
-			}
-			boolean success = false;
-			try {
-				dqpWorkContext.runInContext(work);
-				success = true;
-			} finally {
-				synchronized (poolLock) {
-					WorkWrapper next = null;
-					if (success) {
-						completedCount++;
-						next = queue.poll();		
-					}
-					threads.remove(t);
-					if (next == null) {
-						activeCount--;
-						if (activeCount == 0 && terminated) {
-							poolLock.notifyAll();
-						}		
-					} else {
-						try {
-							if (next.workContext == null) {
-								delegate.scheduleWork(next);
-							} else {
-								delegate.scheduleWork(next, next.workContext.getStartTimeout(), next.workContext.context, next.work instanceof WorkListener?(WorkListener)next.work:null);
-							}
-						} catch (WorkException e) {
-							handleException(next.work, e);
-						}
-					}
-				}
-				t.setName(name);
-			}
-		}
-
-		@Override
-		public void release() {
-			this.work.release();
-		}
-	}
-	
-	private static void handleException(Work work, WorkException e) {
-		if (work instanceof WorkListener) {
-			((WorkListener)work).workRejected(new WorkEvent(work, WorkEvent.WORK_REJECTED, work, new WorkRejectedException(e)));
-		} else if (LogManager.isMessageToBeRecorded(LogConstants.CTX_RUNTIME, MessageLevel.DETAIL)) {
-			LogManager.logDetail(LogConstants.CTX_RUNTIME, e, "Exception adding work to the WorkManager"); //$NON-NLS-1$ 
-		}
-	}
-		
-	private static ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("Scheduler")); //$NON-NLS-1$
-
-	private volatile int activeCount;
-	private volatile int highestActiveCount;
-	private volatile int highestQueueSize;
-	private volatile boolean terminated;
-	private volatile int submittedCount;
-	private volatile int completedCount;
-	
-	private Object poolLock = new Object();
-	private AtomicInteger threadCounter = new AtomicInteger();
-	
-	private String poolName;
-	private int maximumPoolSize;
-	private WorkManager delegate;
-	
-	private Queue<WorkWrapper> queue = new LinkedList<WorkWrapper>();
-	private Set<Thread> threads = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<Thread, Boolean>()));
-	private Map<Integer, ScheduledFuture<?>> futures = new HashMap<Integer, ScheduledFuture<?>>();
-	private int idCounter;
-	
-	public StatsCapturingWorkManager(String name, int maximumPoolSize, WorkManager delegate) {
-		this.maximumPoolSize = maximumPoolSize;
-		this.poolName = name;
-		this.delegate = delegate;
-	}
-	
-	public void scheduleWork(final Work arg0) throws WorkException {
-		scheduleWork(arg0, (WorkContext)null, DQPWorkContext.getWorkContext());
-	}
-
-	private void scheduleWork(final Work work, WorkContext workContext, DQPWorkContext dqpWorkContext) 
-	throws WorkRejectedException, WorkException {
-		boolean atMaxThreads = false;
-		boolean newMaxQueueSize = false;
-		synchronized (poolLock) {
-			checkForTermination();
-			submittedCount++;
-			atMaxThreads = activeCount == maximumPoolSize;
-			if (atMaxThreads) {
-				queue.add(new WorkWrapper(work, workContext, dqpWorkContext));
-				int queueSize = queue.size();
-				if (queueSize > highestQueueSize) {
-					newMaxQueueSize = true;
-					highestQueueSize = queueSize;
-				}
-			} else {
-				activeCount++;
-				highestActiveCount = Math.max(activeCount, highestActiveCount);
-			}
-		}
-		if (atMaxThreads) {
-			if (newMaxQueueSize && maximumPoolSize > 1) {
-				LogManager.logWarning(LogConstants.CTX_RUNTIME, QueryPlugin.Util.getString("WorkerPool.Max_thread", maximumPoolSize, poolName, highestQueueSize)); //$NON-NLS-1$
-			}
-			return;
-		}
-		if (workContext == null) {
-			delegate.scheduleWork(new WorkWrapper(work, null, dqpWorkContext));
-		} else {
-			delegate.scheduleWork(new WorkWrapper(work, null, dqpWorkContext), workContext.getStartTimeout(), workContext.context, work instanceof WorkListener?(WorkListener)work:null);
-		}
-	}
-
-	public void scheduleWork(final Work arg0, final ExecutionContext arg2, long delay) throws WorkException {
-		if (delay < 1) {
-			scheduleWork(arg0, new WorkContext(arg2, WorkManager.INDEFINITE), DQPWorkContext.getWorkContext());
-		} else {
-			synchronized (futures) {
-				final int id = idCounter++;
-				final DQPWorkContext dqpWorkContext = DQPWorkContext.getWorkContext();
-				ScheduledFuture<?> sf = stpe.schedule(new Runnable() {
-					
-					@Override
-					public void run() {
-						try {
-							futures.remove(id);
-							scheduleWork(arg0, new WorkContext(arg2, WorkManager.INDEFINITE), dqpWorkContext);
-						} catch (WorkException e) {
-							handleException(arg0, e);
-						}
-					}
-				}, delay, TimeUnit.MILLISECONDS);
-				this.futures.put(id, sf);
-			}			
-		}
-	}
-	
-	private void checkForTermination() throws WorkRejectedException {
-		if (terminated) {
-			throw new WorkRejectedException("Queue has been terminated"); //$NON-NLS-1$
-		}
-	}
-		
-	public WorkerPoolStatisticsMetadata getStats() {
-		WorkerPoolStatisticsMetadata stats = new WorkerPoolStatisticsMetadata();
-		stats.setName(poolName);
-		stats.setQueued(queue.size());
-		stats.setHighestQueued(highestQueueSize);
-		stats.setActiveThreads(this.activeCount);
-		stats.setMaxThreads(this.maximumPoolSize);
-		stats.setTotalSubmitted(this.submittedCount);
-		stats.setHighestActiveThreads(this.highestActiveCount);
-		stats.setTotalCompleted(this.completedCount);
-		return stats;		
-	}
-	
-	public void shutdown() {
-		this.terminated = true;
-	}
-	
-	public void shutdownNow() {
-		this.shutdown();
-		synchronized (poolLock) {
-			for (Thread t : threads) {
-				t.interrupt();
-			}
-			queue.clear();
-		}
-		synchronized (futures) {
-			for (ScheduledFuture<?> future : futures.values()) {
-				future.cancel(true);
-			}
-			futures.clear();
-		}
-	}
-	
-	public boolean isTerminated() {
-		return terminated;
-	}
-	
-	public boolean awaitTermination(long timeout, TimeUnit unit)
-			throws InterruptedException {
-		long timeoutMillis = unit.toMillis(timeout);
-		long finalMillis = System.currentTimeMillis() + timeoutMillis;
-		synchronized (poolLock) {
-			while (this.activeCount > 0 || !terminated) {
-				if (timeoutMillis < 1) {
-					return false;
-				}
-				poolLock.wait(timeoutMillis);
-				timeoutMillis = finalMillis - System.currentTimeMillis();
-			}
-		}
-		return true;
-	}
-
-}

Added: trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -0,0 +1,412 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.dqp.internal.process;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.Executor;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
+import org.teiid.core.util.NamedThreadFactory;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.logging.MessageLevel;
+import org.teiid.query.QueryPlugin;
+
+/**
+ * An Executor that:
+ * <ol>
+ * <li>minimizes thread creation</li>
+ * <li>allows for proper timeout of idle threads</li>
+ * <li>allows for queuing</li>
+ * </ol>
+ * <br/>
+ * A non-fifo (lifo) {@link SynchronousQueue} based {@link ThreadPoolExecutor} satisfies 1 and 2, but not 3.
+ * A bounded or unbound queue based {@link ThreadPoolExecutor} allows for 3, but will tend to create 
+ * up to the maximum number of threads and makes no guarantee on thread scheduling.
+ * <br/>
+ * So the approach here is to use a virtual thread pool off of a {@link SynchronousQueue}
+ * backed {@link ThreadPoolExecutor}.
+ * <br/>
+ * There is also only a single master scheduling thread with actual executions deferred.
+ * 
+ * TODO: there is a race condition between retiring threads and adding work, which may create extra threads.  
+ * That is a flaw with attempting to reuse, rather than create threads.  
+ * TODO: bounded queuing - we never bothered bounding in the past with our worker pools, but reasonable
+ * defaults would be a good idea.
+ */
+public class ThreadReuseExecutor implements Executor {
+	
+	public interface PrioritizedRunnable extends Runnable {
+		
+		int getPriority();
+		
+		long getCreationTime();
+		
+	}
+	
+	static class RunnableWrapper implements PrioritizedRunnable {
+		Runnable r;
+		DQPWorkContext workContext = DQPWorkContext.getWorkContext();
+		long creationTime;
+		int priority;
+		
+		public RunnableWrapper(Runnable r) {
+			if (r instanceof PrioritizedRunnable) {
+				PrioritizedRunnable pr = (PrioritizedRunnable)r;
+				creationTime = pr.getCreationTime();
+				priority = pr.getPriority();
+			} else {
+				creationTime = System.currentTimeMillis();
+				priority = Integer.MAX_VALUE;
+			}
+			this.r = r;
+		}
+		
+		@Override
+		public long getCreationTime() {
+			return creationTime;
+		}
+
+		@Override
+		public int getPriority() {
+			return priority;
+		}
+
+		@Override
+		public void run() {
+			workContext.runInContext(r);
+		}
+		
+	}
+	
+	private final ThreadPoolExecutor tpe; 
+	
+	private ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("Scheduler")); //$NON-NLS-1$
+	
+	class ScheduledFutureTask extends FutureTask<Void> implements ScheduledFuture<Void>, PrioritizedRunnable {
+		private ScheduledFuture<?> scheduledFuture;
+		private boolean periodic;
+		private volatile boolean running;
+		private PrioritizedRunnable runnable;
+		
+		public ScheduledFutureTask(PrioritizedRunnable runnable, boolean periodic) {
+			super(runnable, null);
+			this.periodic = periodic;
+			this.runnable = runnable;
+		}
+		
+		public void setScheduledFuture(ScheduledFuture<?> scheduledFuture) {
+			scheduledTasks.add(this);
+			this.scheduledFuture = scheduledFuture;
+		}
+		
+		@Override
+		public long getDelay(TimeUnit unit) {
+			return this.scheduledFuture.getDelay(unit);
+		}
+
+		@Override
+		public int compareTo(Delayed o) {
+			return this.scheduledFuture.compareTo(o);
+		}
+		
+		@Override
+		public boolean cancel(boolean mayInterruptIfRunning) {
+			this.scheduledFuture.cancel(false);
+			scheduledTasks.remove(this);
+			return super.cancel(mayInterruptIfRunning);
+		}
+		
+		public Runnable getParent() {
+			return new Runnable() {
+				@Override
+				public void run() {
+					if (running || terminated) {
+						return;
+					}
+					running = periodic;
+					executeDirect(ScheduledFutureTask.this);
+				}
+			};
+		}
+		
+		@Override
+		public void run() {
+			if (periodic) {
+				if (!this.runAndReset()) {
+					this.scheduledFuture.cancel(false);
+					scheduledTasks.remove(this);
+				}
+				running = false;
+			} else {
+				scheduledTasks.remove(this);
+				super.run();
+			}
+		}
+
+		@Override
+		public long getCreationTime() {
+			return runnable.getCreationTime();
+		}
+
+		@Override
+		public int getPriority() {
+			return runnable.getPriority();
+		}
+	}
+	
+	private volatile int activeCount;
+	private volatile int highestActiveCount;
+	private volatile int highestQueueSize;
+	private volatile boolean terminated;
+	private volatile int submittedCount;
+	private volatile int completedCount;
+	private Object poolLock = new Object();
+	private AtomicInteger threadCounter = new AtomicInteger();
+	private Set<Thread> threads = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<Thread, Boolean>()));
+	private Set<ScheduledFutureTask> scheduledTasks = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<ScheduledFutureTask, Boolean>()));
+	
+	private String poolName;
+	private int maximumPoolSize;
+	private Queue<PrioritizedRunnable> queue = new PriorityQueue<PrioritizedRunnable>(11, new Comparator<PrioritizedRunnable>() {
+		@Override
+		public int compare(PrioritizedRunnable pr1, PrioritizedRunnable pr2) {
+			int result = pr1.getPriority() - pr2.getPriority();
+			if (result == 0) {
+				return Long.signum(pr1.getCreationTime() - pr2.getCreationTime());
+			}
+			return result;
+		}
+	});
+	
+	public ThreadReuseExecutor(String name, int maximumPoolSize) {
+		this.maximumPoolSize = maximumPoolSize;
+		this.poolName = name;
+		
+		tpe = new ThreadPoolExecutor(0,
+				maximumPoolSize, 2, TimeUnit.MINUTES,
+				new SynchronousQueue<Runnable>(), new NamedThreadFactory("Worker")) { //$NON-NLS-1$ 
+			@Override
+			protected void afterExecute(Runnable r, Throwable t) {
+				if (t != null) {
+					LogManager.logError(LogConstants.CTX_RUNTIME, t, QueryPlugin.Util.getString("WorkerPool.uncaughtException")); //$NON-NLS-1$
+				}
+			}
+			
+		};
+	}
+	
+	public void execute(final Runnable command) {
+		executeDirect(new RunnableWrapper(command));
+	}
+
+	private void executeDirect(final PrioritizedRunnable command) {
+		boolean atMaxThreads = false;
+		boolean newMaxQueueSize = false;
+		synchronized (poolLock) {
+			checkForTermination();
+			submittedCount++;
+			atMaxThreads = activeCount == maximumPoolSize;
+			if (atMaxThreads) {
+				queue.add(command);
+				int queueSize = queue.size();
+				if (queueSize > highestQueueSize) {
+					newMaxQueueSize = true;
+					highestQueueSize = queueSize;
+				}
+			} else {
+				activeCount++;
+				highestActiveCount = Math.max(activeCount, highestActiveCount);
+			}
+		}
+		if (atMaxThreads) {
+			if (newMaxQueueSize && maximumPoolSize > 1) {
+				LogManager.logWarning(LogConstants.CTX_RUNTIME, QueryPlugin.Util.getString("WorkerPool.Max_thread", maximumPoolSize, poolName, highestQueueSize)); //$NON-NLS-1$
+			}
+			return;
+		}
+		tpe.execute(new Runnable() {
+			@Override
+			public void run() {
+				Thread t = Thread.currentThread();
+				threads.add(t);
+				String name = t.getName();
+				t.setName(name + "_" + poolName + threadCounter.getAndIncrement()); //$NON-NLS-1$
+				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_RUNTIME, MessageLevel.TRACE)) {
+					LogManager.logTrace(LogConstants.CTX_RUNTIME, "Beginning work with virtual worker", t.getName()); //$NON-NLS-1$ 
+				}
+				Runnable r = command;
+				while (r != null) {
+					boolean success = false;
+					try {
+						r.run();
+						success = true;
+					} finally {
+						synchronized (poolLock) {
+							if (success) {
+								completedCount++;
+								r = queue.poll();		
+							}
+							if (!success || r == null) {
+								threads.remove(t);
+								activeCount--;
+								if (activeCount == 0 && terminated) {
+									poolLock.notifyAll();
+								}		
+							}
+						}
+						t.setName(name);
+					}
+				}
+			};
+		});
+	}
+
+	private void checkForTermination() {
+		if (terminated) {
+			throw new RejectedExecutionException();
+		}
+	}
+	
+	public int getActiveCount() {
+		return activeCount;
+	}
+	
+	public int getSubmittedCount() {
+		return submittedCount;
+	}
+	
+	public int getCompletedCount() {
+		return completedCount;
+	}
+	
+	public int getPoolSize() {
+		return activeCount;
+	}
+	
+	public boolean isTerminated() {
+		return terminated;
+	}
+	
+	public void shutdown() {
+		this.terminated = true;
+		synchronized (scheduledTasks) {
+			for (ScheduledFuture<?> future : new ArrayList<ScheduledFuture<?>>(scheduledTasks)) {
+				future.cancel(false);
+			}
+			scheduledTasks.clear();
+		}
+	}
+	
+	public int getLargestPoolSize() {
+		return this.highestActiveCount;
+	}
+	
+	public WorkerPoolStatisticsMetadata getStats() {
+		WorkerPoolStatisticsMetadata stats = new WorkerPoolStatisticsMetadata();
+		stats.setName(poolName);
+		stats.setQueued(queue.size());
+		stats.setHighestQueued(highestQueueSize);
+		stats.setActiveThreads(getActiveCount());
+		stats.setMaxThreads(this.maximumPoolSize);
+		stats.setTotalSubmitted(getSubmittedCount());
+		stats.setHighestActiveThreads(getLargestPoolSize());
+		stats.setTotalCompleted(getCompletedCount());
+		return stats;
+	}
+	
+	public boolean hasWork() {
+		synchronized (poolLock) {
+			return this.getSubmittedCount() - this.getCompletedCount() > 0 && !this.isTerminated();
+		}
+	}
+
+	public List<Runnable> shutdownNow() {
+		this.shutdown();
+		synchronized (poolLock) {
+			synchronized (threads) {
+				for (Thread t : threads) {
+					t.interrupt();
+				}
+			}
+			List<Runnable> result = new ArrayList<Runnable>(queue);
+			queue.clear();
+			return result;
+		}
+	}
+	
+	public boolean awaitTermination(long timeout, TimeUnit unit)
+			throws InterruptedException {
+		long timeoutMillis = unit.toMillis(timeout);
+		long finalMillis = System.currentTimeMillis() + timeoutMillis;
+		synchronized (poolLock) {
+			while (this.activeCount > 0 || !terminated) {
+				if (timeoutMillis < 1) {
+					return false;
+				}
+				poolLock.wait(timeoutMillis);
+				timeoutMillis = finalMillis - System.currentTimeMillis();
+			}
+		}
+		return true;
+	}
+
+	public ScheduledFuture<?> schedule(final Runnable command, long delay,
+			TimeUnit unit) {
+		checkForTermination();
+		ScheduledFutureTask sft = new ScheduledFutureTask(new RunnableWrapper(command), false);
+		synchronized (scheduledTasks) {
+			ScheduledFuture<?> future = stpe.schedule(sft.getParent(), delay, unit);
+			sft.setScheduledFuture(future);
+			return sft;
+		}
+	}
+
+	public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command,
+			long initialDelay, long period, TimeUnit unit) {
+		checkForTermination();
+		ScheduledFutureTask sft = new ScheduledFutureTask(new RunnableWrapper(command), true);
+		synchronized (scheduledTasks) {
+			ScheduledFuture<?> future = stpe.scheduleAtFixedRate(sft.getParent(), initialDelay, period, unit);
+			sft.setScheduledFuture(future);
+			return sft;
+		}		
+	}
+			
+}


Property changes on: trunk/engine/src/main/java/org/teiid/dqp/internal/process/ThreadReuseExecutor.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/transaction/TransactionServerImpl.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -246,7 +246,7 @@
 							public Transaction call() throws Exception {
 								return transactionManager.getTransaction();
 							}
-						});
+						}, 0);
 						workManager.doWork(work, WorkManager.INDEFINITE, tc, null);
 						tc.setTransaction(work.getResult().get());
 					}

Modified: trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -31,8 +31,8 @@
 import org.teiid.common.buffer.BufferManager.BufferReserveMode;
 import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.core.TeiidComponentException;
-import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.TeiidException;
+import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.TeiidRuntimeException;
 import org.teiid.core.util.Assertion;
 import org.teiid.logging.LogConstants;
@@ -41,9 +41,10 @@
 import org.teiid.query.execution.QueryExecPlugin;
 import org.teiid.query.processor.BatchCollector.BatchProducer;
 import org.teiid.query.util.CommandContext;
-import org.teiid.translator.DataNotAvailableException;
 
-
+/**
+ * Driver for plan processing.
+ */
 public class QueryProcessor implements BatchProducer {
 	
 	public static class ExpiredTimeSliceException extends TeiidRuntimeException {
@@ -110,11 +111,6 @@
 	    			throw e;
 	    		}
 	    		continue;
-	    	} catch (DataNotAvailableException e) {
-	    		if (!nonBlocking) {
-	    			throw e;
-	    		}
-	    		wait = e.getRetryDelay();
 	    	} catch (BlockedException e) {
 	    		if (!nonBlocking) {
 	    			throw e;
@@ -147,6 +143,9 @@
 	
 			long currentTime = System.currentTimeMillis();
 			Assertion.assertTrue(!processorClosed);
+			
+			//TODO: see if there is pending work before preempting
+			
 	        while(currentTime < context.getTimeSliceEnd()) {
 	        	if (requestCanceled) {
 	                throw new TeiidProcessingException(QueryExecPlugin.Util.getString("QueryProcessor.request_cancelled", getProcessID())); //$NON-NLS-1$

Modified: trunk/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- trunk/engine/src/main/resources/org/teiid/query/i18n.properties	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/main/resources/org/teiid/query/i18n.properties	2010-07-12 20:59:02 UTC (rev 2333)
@@ -941,4 +941,7 @@
 NewCalculateCostUtil.badCost=Unexpected format encountered for max or min value
 
 WorkerPool.Max_thread=Reached maximum thread count "{0}" for worker pool "{1}" with a queue size of "{2}".
+WorkerPool.New_thread=Created worker thread "{0}".
+WorkerPool.uncaughtException=Uncaught exception processing work
+
 XMLSystemFunctions.invalid_namespaces=Invalid namespaces supplied for XPath expression - ''{0}''
\ No newline at end of file

Deleted: trunk/engine/src/test/java/org/teiid/common/queue/TestStatsCapturingWorkManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/queue/TestStatsCapturingWorkManager.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/common/queue/TestStatsCapturingWorkManager.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -1,179 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.queue;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkManager;
-import javax.resource.spi.work.WorkRejectedException;
-
-import org.junit.Ignore;
-import org.junit.Test;
-import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
-import org.teiid.dqp.internal.process.StatsCapturingWorkManager;
-
-/**
- */
-public class TestStatsCapturingWorkManager {
-	
-	private WorkManager manager = new FakeWorkManager();
-
-    @Test public void testQueuing() throws Exception {
-        final long SINGLE_WAIT = 50;
-        final int WORK_ITEMS = 10;
-        final int MAX_THREADS = 5;
-
-        final StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", MAX_THREADS, manager); //$NON-NLS-1$
-        
-        for(int i=0; i<WORK_ITEMS; i++) {
-            pool.scheduleWork(new FakeWorkItem(SINGLE_WAIT));
-        }
-        
-        pool.shutdown();        
-        pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
-        assertTrue(pool.isTerminated());
-        WorkerPoolStatisticsMetadata stats = pool.getStats();
-        assertEquals(10, stats.getTotalCompleted());
-        assertEquals("Expected threads to be maxed out", MAX_THREADS, stats.getHighestActiveThreads()); //$NON-NLS-1$
-    }
-
-    @Ignore
-    @Test public void testThreadReuse() throws Exception {
-        final long SINGLE_WAIT = 50;
-        final long NUM_THREADS = 5;
-
-        StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5, manager); //$NON-NLS-1$
-        
-        for(int i=0; i<NUM_THREADS; i++) {            
-        	pool.scheduleWork(new FakeWorkItem(SINGLE_WAIT));
-            
-            try {
-                Thread.sleep(SINGLE_WAIT*2);
-            } catch(InterruptedException e) {                
-            }
-        }
-        
-        pool.shutdown();                
-        
-        WorkerPoolStatisticsMetadata stats = pool.getStats();
-        assertEquals("Expected 1 thread for serial execution", 1, stats.getHighestActiveThreads()); //$NON-NLS-1$
-        
-        pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
-    }
-    
-    @Test(expected=WorkRejectedException.class) public void testShutdown() throws Exception {
-    	StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5, manager); //$NON-NLS-1$
-        pool.shutdown();
-    	pool.scheduleWork(new FakeWorkItem(1));
-    }
-    
-    /*@Test public void testScheduleCancel() throws Exception {
-    	StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5); //$NON-NLS-1$
-    	ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
-    		@Override
-    		public void run() {
-    		}
-    	}, 0, 5, TimeUnit.MILLISECONDS);
-    	future.cancel(true);
-    	assertFalse(future.cancel(true));    	
-    }*/
-    
-    @Test public void testSchedule() throws Exception {
-    	StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5, manager); //$NON-NLS-1$
-        final ArrayList<String> result = new ArrayList<String>(); 
-    	pool.scheduleWork(new Work() {
-			
-			@Override
-			public void run() {
-    			result.add("hello"); //$NON-NLS-1$
-			}
-			
-			@Override
-			public void release() {
-				
-			}
-		}, null, 5);
-    	Thread.sleep(100);
-    	pool.shutdown();
-    	pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
-    	assertEquals(1, result.size());
-    }
-    
-    /*@Test(expected=ExecutionException.class) public void testScheduleException() throws Exception {
-    	StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5); //$NON-NLS-1$
-    	ScheduledFuture<?> future = pool.schedule(new Runnable() {
-    		@Override
-    		public void run() {
-    			throw new RuntimeException();
-    		}
-    	}, 0, TimeUnit.MILLISECONDS);
-    	future.get();
-    }*/
-    
-    /**
-     * Here each execution exceeds the period
-     */
-    /*@Test public void testScheduleRepeated() throws Exception {
-    	StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5); //$NON-NLS-1$
-    	final ArrayList<String> result = new ArrayList<String>();
-    	ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
-    		@Override
-    		public void run() {
-    			result.add("hello"); //$NON-NLS-1$
-    			try {
-					Thread.sleep(75);
-				} catch (InterruptedException e) {
-					throw new RuntimeException(e);
-				}
-    		}
-    	}, 0, 10, TimeUnit.MILLISECONDS);
-    	Thread.sleep(100);
-    	future.cancel(true);
-    	assertEquals(2, result.size());
-    }*/
-    
-    @Test public void testFailingWork() throws Exception {
-    	StatsCapturingWorkManager pool = new StatsCapturingWorkManager("test", 5, manager); //$NON-NLS-1$
-    	final AtomicInteger count = new AtomicInteger();
-    	pool.scheduleWork(new Work() {
-    		@Override
-    		public void run() {
-    			count.getAndIncrement();
-    			throw new RuntimeException();
-    		}
-    		
-    		@Override
-    		public void release() {
-    			
-    		}
-    	});
-    	Thread.sleep(100);
-    	assertEquals(1, count.get());
-    }
-        
-}

Copied: trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java (from rev 2323, trunk/engine/src/test/java/org/teiid/common/queue/TestStatsCapturingWorkManager.java)
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java	                        (rev 0)
+++ trunk/engine/src/test/java/org/teiid/common/queue/TestThreadReuseExecutor.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -0,0 +1,226 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.queue;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.resource.spi.work.Work;
+
+import org.junit.Test;
+import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
+import org.teiid.dqp.internal.process.ThreadReuseExecutor;
+import org.teiid.dqp.internal.process.DQPCore.FutureWork;
+
+/**
+ */
+public class TestThreadReuseExecutor {
+	
+    @Test public void testQueuing() throws Exception {
+        final long SINGLE_WAIT = 50;
+        final int WORK_ITEMS = 10;
+        final int MAX_THREADS = 5;
+
+        final ThreadReuseExecutor pool = new ThreadReuseExecutor("test", MAX_THREADS); //$NON-NLS-1$
+        
+        for(int i=0; i<WORK_ITEMS; i++) {
+            pool.execute(new FakeWorkItem(SINGLE_WAIT));
+        }
+        
+        pool.shutdown();        
+        pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
+        assertTrue(pool.isTerminated());
+        WorkerPoolStatisticsMetadata stats = pool.getStats();
+        assertEquals(10, stats.getTotalCompleted());
+        assertEquals("Expected threads to be maxed out", MAX_THREADS, stats.getHighestActiveThreads()); //$NON-NLS-1$
+    }
+
+    @Test public void testThreadReuse() throws Exception {
+        final long SINGLE_WAIT = 50;
+        final long NUM_THREADS = 5;
+
+        ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5); //$NON-NLS-1$
+        
+        for(int i=0; i<NUM_THREADS; i++) {            
+        	pool.execute(new FakeWorkItem(SINGLE_WAIT));
+            
+            try {
+                Thread.sleep(SINGLE_WAIT*2);
+            } catch(InterruptedException e) {                
+            }
+        }
+        
+        pool.shutdown();                
+        
+        WorkerPoolStatisticsMetadata stats = pool.getStats();
+        assertEquals("Expected 1 thread for serial execution", 1, stats.getHighestActiveThreads()); //$NON-NLS-1$
+        
+        pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
+    }
+    
+    @Test(expected=RejectedExecutionException.class) public void testShutdown() throws Exception {
+    	ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5); //$NON-NLS-1$
+        pool.shutdown();
+    	pool.execute(new FakeWorkItem(1));
+    }
+    
+    @Test public void testScheduleCancel() throws Exception {
+    	ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5); //$NON-NLS-1$
+    	ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
+    		@Override
+    		public void run() {
+    		}
+    	}, 0, 5, TimeUnit.MILLISECONDS);
+    	future.cancel(true);
+    	assertFalse(future.cancel(true));    	
+    }
+    
+    @Test public void testSchedule() throws Exception {
+    	ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5); //$NON-NLS-1$
+        final ArrayList<String> result = new ArrayList<String>(); 
+    	pool.schedule(new Work() {
+			
+			@Override
+			public void run() {
+    			result.add("hello"); //$NON-NLS-1$
+			}
+			
+			@Override
+			public void release() {
+				
+			}
+		}, 5, TimeUnit.MILLISECONDS);
+    	Thread.sleep(100);
+    	pool.shutdown();
+    	pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
+    	assertEquals(1, result.size());
+    }
+    
+    @Test(expected=ExecutionException.class) public void testScheduleException() throws Exception {
+    	ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5); //$NON-NLS-1$
+    	ScheduledFuture<?> future = pool.schedule(new Runnable() {
+    		@Override
+    		public void run() {
+    			throw new RuntimeException();
+    		}
+    	}, 0, TimeUnit.MILLISECONDS);
+    	future.get();
+    }
+    
+    /**
+     * Here each execution exceeds the period
+     */
+    @Test public void testScheduleRepeated() throws Exception {
+    	ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5); //$NON-NLS-1$
+    	final ArrayList<String> result = new ArrayList<String>();
+    	ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
+    		@Override
+    		public void run() {
+    			result.add("hello"); //$NON-NLS-1$
+    			try {
+					Thread.sleep(75);
+				} catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				}
+    		}
+    	}, 0, 30, TimeUnit.MILLISECONDS);
+    	Thread.sleep(140);
+    	future.cancel(true);
+    	assertEquals(2, result.size());
+    }
+    
+    @Test public void testFailingWork() throws Exception {
+    	ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 5); //$NON-NLS-1$
+    	final AtomicInteger count = new AtomicInteger();
+    	pool.execute(new Work() {
+    		@Override
+    		public void run() {
+    			count.getAndIncrement();
+    			throw new RuntimeException();
+    		}
+    		
+    		@Override
+    		public void release() {
+    			
+    		}
+    	});
+    	Thread.sleep(100);
+    	assertEquals(1, count.get());
+    }
+    
+    @Test public void testPriorities() throws Exception {
+    	final ThreadReuseExecutor pool = new ThreadReuseExecutor("test", 1); //$NON-NLS-1$
+    	FutureWork<Boolean> work1 = new FutureWork<Boolean>(new Callable<Boolean>() {
+    		public Boolean call() throws Exception {
+    			synchronized (pool) {
+    				while (pool.getSubmittedCount() < 4) {
+    					pool.wait();
+    				}
+				}
+    			return true;
+    		}
+		}, 0);
+    	final ConcurrentLinkedQueue<Integer> order = new ConcurrentLinkedQueue<Integer>();
+    	FutureWork<Boolean> work2 = new FutureWork<Boolean>(new Callable<Boolean>() {
+    		public Boolean call() throws Exception {
+    			order.add(2);
+    			return true;
+    		}
+		}, 2);
+    	FutureWork<Boolean> work3 = new FutureWork<Boolean>(new Callable<Boolean>() {
+    		public Boolean call() throws Exception {
+    			order.add(3);
+    			return false;
+    		}
+		}, 1);
+    	FutureWork<Boolean> work4 = new FutureWork<Boolean>(new Callable<Boolean>() {
+    		public Boolean call() throws Exception {
+    			order.add(4);
+    			return false;
+    		}
+		}, 2);
+    	pool.execute(work1);
+    	pool.execute(work2);
+    	pool.execute(work3);
+    	pool.execute(work4);
+    	synchronized (pool) {
+        	pool.notifyAll();
+		}
+    	work1.getResult().get();
+    	work2.getResult().get();
+    	work3.getResult().get();
+    	work4.getResult().get();
+    	assertEquals(Integer.valueOf(3), order.remove());
+    	assertEquals(Integer.valueOf(2), order.remove());
+    	assertEquals(Integer.valueOf(4), order.remove());
+    }
+        
+}

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorManager.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -28,9 +28,6 @@
 
 import junit.framework.TestCase;
 
-import org.mockito.Mockito;
-import org.teiid.common.buffer.BlockedException;
-import org.teiid.dqp.internal.process.AbstractWorkItem;
 import org.teiid.dqp.message.AtomicRequestID;
 import org.teiid.dqp.message.AtomicRequestMessage;
 import org.teiid.dqp.message.RequestID;
@@ -54,7 +51,6 @@
 				return c.getConnection();
 			}
 		};
-		cm.setMaxConnections(1);
 		cm.start();
 		return cm;
 	}
@@ -74,7 +70,7 @@
     }
 
     void helpAssureOneState() throws Exception {
-    	csm.executeRequest(request, Mockito.mock(AbstractWorkItem.class));
+    	csm.registerRequest(request);
     	ConnectorWork state = csm.getState(request.getAtomicRequestID());
     	assertEquals(state, csm.getState(request.getAtomicRequestID()));
     }
@@ -106,34 +102,5 @@
 
         assertEquals("Expected size of 1", 1, csm.size()); //$NON-NLS-1$
     }
-    
-    public void testQueuing() throws Exception {
-    	ConnectorWork workItem = csm.executeRequest(request, Mockito.mock(AbstractWorkItem.class));
-    	workItem.execute();
-    	
-    	AbstractWorkItem awi1 = Mockito.mock(AbstractWorkItem.class);
-    	ConnectorWork workItem1 = csm.executeRequest(TestConnectorWorkItem.createNewAtomicRequestMessage(2, 1), awi1);
-    	
-    	AbstractWorkItem awi2 = Mockito.mock(AbstractWorkItem.class);
-    	ConnectorWork workItem2 = csm.executeRequest(TestConnectorWorkItem.createNewAtomicRequestMessage(3, 1), awi2);
-
-    	try {
-    		workItem1.execute();
-    		fail("expected exception"); //$NON-NLS-1$
-    	} catch (BlockedException e) {
-    		
-    	}
-    	workItem.close();
-    	
-    	try {
-    		workItem2.execute(); //ensure that another item cannot jump in the queue
-    		fail("expected exception"); //$NON-NLS-1$
-    	} catch (BlockedException e) {
-    		
-    	}
-
-    	Mockito.verify(awi1).moreWork();
-    	workItem1.execute();
-    }
-        
+           
 }
\ No newline at end of file

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/impl/TestConnectorWorkItem.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -22,8 +22,7 @@
 
 package org.teiid.dqp.internal.datamgr.impl;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.fail;
+import static junit.framework.Assert.*;
 
 import java.util.Arrays;
 import java.util.List;
@@ -35,7 +34,6 @@
 import org.mockito.Mockito;
 import org.teiid.client.RequestMessage;
 import org.teiid.dqp.internal.datamgr.language.LanguageBridgeFactory;
-import org.teiid.dqp.internal.process.AbstractWorkItem;
 import org.teiid.dqp.internal.process.DQPWorkContext;
 import org.teiid.dqp.message.AtomicRequestMessage;
 import org.teiid.dqp.message.AtomicResultsMessage;
@@ -49,8 +47,8 @@
 import org.teiid.query.sql.lang.StoredProcedure;
 import org.teiid.query.sql.symbol.Constant;
 import org.teiid.query.unittest.FakeMetadataFactory;
-import org.teiid.translator.TranslatorException;
 import org.teiid.translator.ProcedureExecution;
+import org.teiid.translator.TranslatorException;
 
 
 public class TestConnectorWorkItem {
@@ -117,8 +115,7 @@
 		Command command = helpGetCommand("update bqt1.smalla set stringkey = 1 where stringkey = 2", EXAMPLE_BQT); //$NON-NLS-1$
 		AtomicRequestMessage arm = createNewAtomicRequestMessage(1, 1);
 		arm.setCommand(command);
-		ConnectorWorkItem synchConnectorWorkItem = new ConnectorWorkItem(arm, Mockito.mock(AbstractWorkItem.class), 
-				TestConnectorManager.getConnectorManager());
+		ConnectorWorkItem synchConnectorWorkItem = new ConnectorWorkItem(arm, TestConnectorManager.getConnectorManager());
 		return synchConnectorWorkItem.execute();
 	}
 	
@@ -152,7 +149,7 @@
 				return Mockito.mock(Xid.class);
 			}} );
 		
-		new ConnectorWorkItem(requestMsg, Mockito.mock(AbstractWorkItem.class), cm);
+		new ConnectorWorkItem(requestMsg, cm);
     }
     
 	@Ignore
@@ -180,7 +177,7 @@
 				return Mockito.mock(Xid.class);
 			}} );
 		
-		new ConnectorWorkItem(requestMsg, Mockito.mock(AbstractWorkItem.class), cm);
+		new ConnectorWorkItem(requestMsg, cm);
     }
 
 }

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -56,7 +56,6 @@
         Mockito.stub(repo.getConnectorManager(Mockito.anyString())).toReturn(new AutoGenDataService());
         
         core = new DQPCore();
-        core.setWorkManager(new FakeWorkManager());
         core.setBufferService(new FakeBufferService());
         core.setConnectorManagerRepository(repo);
         core.setTransactionService(new FakeTransactionService());

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCoreRequestHandling.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -23,11 +23,13 @@
 package org.teiid.dqp.internal.process;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
 import junit.framework.TestCase;
 
+import org.mockito.Mockito;
 import org.teiid.adminapi.impl.RequestMetadata;
 import org.teiid.client.RequestMessage;
 import org.teiid.client.SourceWarning;
@@ -50,12 +52,12 @@
     }
 
     private void compareReqInfos(Collection<RequestID> reqs1, Collection<RequestMetadata> reqs2) {
-        Set reqIDs2 = new HashSet();
+        Set<RequestID> reqIDs2 = new HashSet<RequestID>();
         for (RequestMetadata requestInfo : reqs2) {
             reqIDs2.add(new RequestID(requestInfo.getSessionId(), requestInfo.getExecutionId()));
         }
         
-        assertEquals("Collections of request infos are not the same: ", new HashSet(reqs1), reqIDs2); //$NON-NLS-1$
+        assertEquals("Collections of request infos are not the same: ", new HashSet<RequestID>(reqs1), reqIDs2); //$NON-NLS-1$
     }
 
     /**
@@ -63,8 +65,8 @@
      */
     public void testGetRequestsSessionToken1() {
         DQPCore rm = new DQPCore();
-        Set reqs = new HashSet();                
-        Collection actualReqs = rm.getRequestsForSession(SESSION_STRING); 
+        Set<RequestID> reqs = Collections.emptySet();                
+        Collection<RequestMetadata> actualReqs = rm.getRequestsForSession(SESSION_STRING); 
         compareReqInfos(reqs, actualReqs);
     }
 
@@ -74,7 +76,7 @@
     public void testGetRequestsSessionToken2() {
     	DQPCore rm = new DQPCore();
     	rm.setTransactionService(new FakeTransactionService());
-    	Set reqs = new HashSet();
+    	Set<RequestID> reqs = new HashSet<RequestID>();
         RequestID id = addRequest(rm, SESSION_STRING, 1);
         reqs.add(id);
 
@@ -95,13 +97,13 @@
     public void testGetRequestsSessionToken3() {
         DQPCore rm = new DQPCore();
         rm.setTransactionService(new FakeTransactionService());
-        Set reqs = new HashSet();
+        Set<RequestID> reqs = new HashSet<RequestID>();
          
         reqs.add(addRequest(rm, SESSION_STRING, 0));
         reqs.add(addRequest(rm, SESSION_STRING, 1));
         reqs.add(addRequest(rm, SESSION_STRING, 2));
                 
-        Collection actualReqs = rm.getRequestsForSession(SESSION_STRING); 
+        Collection<RequestMetadata> actualReqs = rm.getRequestsForSession(SESSION_STRING); 
         compareReqInfos(reqs, actualReqs);
     }
     
@@ -158,7 +160,7 @@
         RequestWorkItem workItem = addRequest(rm, r0, requestID, null, null);
         AtomicRequestMessage atomicReq = new AtomicRequestMessage(workItem.requestMsg, workItem.getDqpWorkContext(), 1);
 
-        DataTierTupleSource info = new DataTierTupleSource(null, atomicReq, null, "connID", workItem); //$NON-NLS-1$
+        DataTierTupleSource info = Mockito.mock(DataTierTupleSource.class);
         workItem.addConnectorRequest(atomicReq.getAtomicRequestID(), info);
         
         DataTierTupleSource arInfo = workItem.getConnectorRequest(atomicReq.getAtomicRequestID());
@@ -173,7 +175,7 @@
         RequestWorkItem workItem = addRequest(rm, r0, requestID, null, null);
         AtomicRequestMessage atomicReq = new AtomicRequestMessage(workItem.requestMsg, workItem.getDqpWorkContext(), 1);
 
-        DataTierTupleSource info = new DataTierTupleSource(null, atomicReq, null,"connID", workItem); //$NON-NLS-1$
+        DataTierTupleSource info = Mockito.mock(DataTierTupleSource.class);
         workItem.addConnectorRequest(atomicReq.getAtomicRequestID(), info);
         
         workItem.closeAtomicRequest(atomicReq.getAtomicRequestID());

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -22,8 +22,9 @@
 
 package org.teiid.dqp.internal.process;
 
-import junit.framework.TestCase;
+import static org.junit.Assert.*;
 
+import org.junit.Test;
 import org.mockito.Mockito;
 import org.teiid.client.RequestMessage;
 import org.teiid.core.TeiidException;
@@ -33,6 +34,8 @@
 import org.teiid.dqp.message.RequestID;
 import org.teiid.dqp.service.AutoGenDataService;
 import org.teiid.dqp.service.FakeBufferService;
+import org.teiid.dqp.service.TransactionContext;
+import org.teiid.dqp.service.TransactionContext.Scope;
 import org.teiid.query.metadata.QueryMetadataInterface;
 import org.teiid.query.optimizer.capabilities.DefaultCapabilitiesFinder;
 import org.teiid.query.parser.QueryParser;
@@ -40,10 +43,9 @@
 import org.teiid.query.sql.lang.Command;
 import org.teiid.query.unittest.FakeMetadataFactory;
 import org.teiid.query.util.CommandContext;
+import org.teiid.translator.TranslatorException;
 
-
-
-public class TestDataTierManager extends TestCase {
+public class TestDataTierManager {
     
     private DQPCore rm;
     private DataTierManagerImpl dtm;
@@ -54,10 +56,6 @@
     private AutoGenDataService connectorManager;
     private RequestWorkItem workItem;
     
-    public TestDataTierManager(String name) {
-        super(name);
-    }
-    
     private static Command helpGetCommand(String sql, QueryMetadataInterface metadata) throws Exception {
         Command command = QueryParser.getQueryParser().parseCommand(sql);
         QueryResolver.resolveCommand(command, metadata);
@@ -75,7 +73,8 @@
         connectorManager = new AutoGenDataService();
         rm = new DQPCore();
         rm.setTransactionService(new FakeTransactionService());
-        
+        rm.setBufferService(new FakeBufferService());
+        rm.start(new DQPConfiguration());
         FakeBufferService bs = new FakeBufferService();
 
         ConnectorManagerRepository repo = Mockito.mock(ConnectorManagerRepository.class);
@@ -92,7 +91,7 @@
         
         RequestMessage original = new RequestMessage();
         original.setExecutionId(1);
-        
+        original.setPartialResults(true);
         RequestID requestID = workContext.getRequestID(original.getExecutionId());
         
         context = new CommandContext();
@@ -105,11 +104,13 @@
         request = new AtomicRequestMessage(original, workContext, nodeId);
         request.setCommand(command);
         request.setConnectorName("FakeConnectorID"); //$NON-NLS-1$
-
-        info = new DataTierTupleSource(command.getProjectedSymbols(), request, dtm, request.getConnectorName(), workItem);
+        TransactionContext tc = new TransactionContext();
+        tc.setTransactionType(Scope.GLOBAL);
+        request.setTransactionContext(tc);
+        info = new DataTierTupleSource(request, workItem, connectorManager.registerRequest(request), dtm);
     }
     
-    public void testDataTierTupleSource() throws Exception {
+    @Test public void testDataTierTupleSource() throws Exception {
     	helpSetup(1);
     	info.nextTuple();
         assertNotNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
@@ -117,7 +118,13 @@
         assertNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
     }
     
-    public void testCodeTableResponseException() throws Exception {
+    @Test public void testPartialResults() throws Exception {
+    	helpSetup(1);
+    	info.exceptionOccurred(new TranslatorException(), true);
+    	assertNull(info.nextTuple());
+    }
+    
+    @Test public void testCodeTableResponseException() throws Exception {
     	helpSetup(3);
     	this.connectorManager.throwExceptionOnExecute = true;
         
@@ -129,13 +136,13 @@
         }
     }
     
-    public void testNoRowsException() throws Exception {
+    @Test public void testNoRowsException() throws Exception {
     	helpSetup(3);
     	this.connectorManager.setRows(0);
     	assertNull(info.nextTuple());
     }
     
-    public void testCodeTableResponseDataNotAvailable() throws Exception {
+    @Test public void testCodeTableResponseDataNotAvailable() throws Exception {
     	helpSetup(3);
     	this.connectorManager.dataNotAvailable = 5;
         

Modified: trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -28,11 +28,11 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.teiid.core.TeiidComponentException;
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.dqp.internal.datamgr.impl.ConnectorManager;
 import org.teiid.dqp.internal.datamgr.impl.ConnectorWork;
 import org.teiid.dqp.internal.datamgr.impl.ConnectorWorkItem;
-import org.teiid.dqp.internal.process.AbstractWorkItem;
 import org.teiid.dqp.message.AtomicRequestMessage;
 import org.teiid.dqp.message.AtomicResultsMessage;
 import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities;
@@ -68,10 +68,10 @@
     }
 
     @Override
-    public ConnectorWork executeRequest(AtomicRequestMessage message, AbstractWorkItem awi)
-    		throws TranslatorException {
+    public ConnectorWork registerRequest(AtomicRequestMessage message)
+    		throws TeiidComponentException {
     	if (throwExceptionOnExecute) {
-    		throw new TranslatorException("Connector Exception"); //$NON-NLS-1$
+    		throw new TeiidComponentException("Connector Exception"); //$NON-NLS-1$
     	}
         List projectedSymbols = (message.getCommand()).getProjectedSymbols();               
         List[] results = createResults(projectedSymbols);
@@ -105,10 +105,6 @@
 				
 			}
 			
-			@Override
-			public boolean isQueued() {
-				return false;
-			}
 		};
     }
     

Modified: trunk/engine/src/test/java/org/teiid/query/processor/relational/TestUnionAllNode.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/relational/TestUnionAllNode.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/engine/src/test/java/org/teiid/query/processor/relational/TestUnionAllNode.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -22,46 +22,38 @@
 
 package org.teiid.query.processor.relational;
 
+import static org.junit.Assert.*;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.junit.Test;
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.TupleBatch;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.types.DataTypeManager;
-import org.teiid.query.processor.relational.RelationalNode;
-import org.teiid.query.processor.relational.UnionAllNode;
+import org.teiid.query.processor.FakeDataManager;
 import org.teiid.query.sql.symbol.ElementSymbol;
 import org.teiid.query.util.CommandContext;
 
-import junit.framework.TestCase;
 
-
 /**
  */
-public class TestUnionAllNode extends TestCase {
+public class TestUnionAllNode {
 
-    /**
-     * Constructor for TestUnionAllNode.
-     * @param arg0
-     */
-    public TestUnionAllNode(String arg0) {
-        super(arg0);
-    }
-    
     public void helpTestUnion(RelationalNode[] children, RelationalNode union, List[] expected) throws TeiidComponentException, TeiidProcessingException {
         BufferManager mgr = NodeTestUtil.getTestBufferManager(1, 2);
         CommandContext context = new CommandContext("pid", "test", null, null, 1);               //$NON-NLS-1$ //$NON-NLS-2$
-        
+        FakeDataManager fdm = new FakeDataManager();
         for(int i=0; i<children.length; i++) {
             union.addChild(children[i]);
-            children[i].initialize(context, mgr, null);
+            children[i].initialize(context, mgr, fdm);
         }
         
-        union.initialize(context, mgr, null);
+        union.initialize(context, mgr, fdm);
         
         union.open();
         
@@ -90,7 +82,7 @@
         assertEquals("Didn't match expected counts", expected.length, currentRow-1); //$NON-NLS-1$
     }
     
-    public void testNoRows() throws TeiidComponentException, TeiidProcessingException {
+    @Test public void testNoRows() throws TeiidComponentException, TeiidProcessingException {
         ElementSymbol es1 = new ElementSymbol("e1"); //$NON-NLS-1$
         es1.setType(DataTypeManager.DefaultDataClasses.INTEGER);
 
@@ -151,7 +143,7 @@
         helpTestUnion(nodes, union, expected);           
     }
     
-    public void testBasicUnion() throws TeiidComponentException, TeiidProcessingException {
+    @Test public void testBasicUnion() throws TeiidComponentException, TeiidProcessingException {
         List expected[] = new List[] {
             Arrays.asList(new Object[] { new Integer(0) }),    
             Arrays.asList(new Object[] { new Integer(0) }),    
@@ -164,7 +156,7 @@
         
     }
 
-    public void testBasicUnionMultipleSources() throws TeiidComponentException, TeiidProcessingException {
+    @Test public void testBasicUnionMultipleSources() throws TeiidComponentException, TeiidProcessingException {
         List expected[] = new List[] {
             Arrays.asList(new Object[] { new Integer(0) }),    
             Arrays.asList(new Object[] { new Integer(0) }),    
@@ -181,7 +173,7 @@
         helpTestUnionConfigs(5, -1, 2, 50, expected);
     }
 
-    public void testMultipleSourcesHalfBlockingNodes() throws TeiidComponentException, TeiidProcessingException  {
+    @Test public void testMultipleSourcesHalfBlockingNodes() throws TeiidComponentException, TeiidProcessingException  {
         List expected[] = new List[] {
             Arrays.asList(new Object[] { new Integer(1) }),    
             Arrays.asList(new Object[] { new Integer(0) }),    
@@ -193,7 +185,7 @@
         helpTestUnionConfigs(5, 2, 1, 50, expected);
     }
     
-    public void testMultipleSourcesAllBlockingNodes() throws TeiidComponentException, TeiidProcessingException {
+    @Test public void testMultipleSourcesAllBlockingNodes() throws TeiidComponentException, TeiidProcessingException {
         List expected[] = new List[] {
             Arrays.asList(new Object[] { new Integer(0) }),    
             Arrays.asList(new Object[] { new Integer(1) }),    
@@ -205,7 +197,7 @@
         helpTestUnionConfigs(5, 1, 1, 50, expected);       
     }    
     
-    public void testMultipleSourceMultiBatchAllBlocking() throws TeiidComponentException, TeiidProcessingException {
+    @Test public void testMultipleSourceMultiBatchAllBlocking() throws TeiidComponentException, TeiidProcessingException {
         List expected[] = new List[] {
             Arrays.asList(new Object[] { new Integer(0) }),    
             Arrays.asList(new Object[] { new Integer(1) }),    

Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/ConnectionFactoryDeployer.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -29,8 +29,6 @@
 import org.jboss.resource.metadata.mcf.ManagedConnectionFactoryDeploymentGroup;
 import org.jboss.resource.metadata.mcf.ManagedConnectionFactoryDeploymentMetaData;
 import org.teiid.deployers.VDBStatusChecker;
-import org.teiid.dqp.internal.datamgr.impl.ConnectorManager;
-import org.teiid.dqp.internal.datamgr.impl.ConnectorManagerRepository;
 
 /**
  * This deployer listens to the data source load and unload events and manages the connectionManager status based 
@@ -38,7 +36,6 @@
  */
 public class ConnectionFactoryDeployer extends AbstractSimpleRealDeployer<ManagedConnectionFactoryDeploymentGroup> {
 	
-	private ConnectorManagerRepository connectorManagerRepository;
 	private VDBStatusChecker vdbChecker;
 	
 	public ConnectionFactoryDeployer() {
@@ -52,13 +49,6 @@
 		
 		for (ManagedConnectionFactoryDeploymentMetaData data : deployments) {
             this.vdbChecker.dataSourceAdded(data.getJndiName());   
-            
-            // set the number of available connections on the cm
-            for (ConnectorManager cm:this.connectorManagerRepository.getConnectorManagers()) {
-            	if (cm.getConnectionName().equals(data.getJndiName())) {
-            		cm.setMaxConnections(data.getMaxSize());
-            	}
-            }
 		}
 	}
     
@@ -72,10 +62,6 @@
 		}
 	}
 	
-	public void setConnectorManagerRepository(ConnectorManagerRepository repo) {
-		this.connectorManagerRepository = repo;
-	}	
-	
 	public void setVDBStatusChecker(VDBStatusChecker checker) {
 		this.vdbChecker = checker;
 	}

Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -210,7 +210,6 @@
     }
     
     public void setWorkManager(WorkManager mgr) {
-    	this.dqpCore.setWorkManager(mgr);
     	this.transactionServerImpl.setWorkManager(mgr);
     }
 	

Modified: trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java	2010-07-08 15:38:23 UTC (rev 2332)
+++ trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java	2010-07-12 20:59:02 UTC (rev 2333)
@@ -32,7 +32,6 @@
 import org.teiid.adminapi.impl.VDBMetaData;
 import org.teiid.client.DQP;
 import org.teiid.client.security.ILogon;
-import org.teiid.common.queue.FakeWorkManager;
 import org.teiid.deployers.MetadataStoreGroup;
 import org.teiid.deployers.VDBRepository;
 import org.teiid.dqp.internal.datamgr.impl.ConnectorManager;
@@ -48,7 +47,6 @@
 import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities;
 import org.teiid.query.optimizer.capabilities.SourceCapabilities;
 import org.teiid.services.SessionServiceImpl;
-import org.teiid.translator.TranslatorException;
 import org.teiid.transport.ClientServiceRegistry;
 import org.teiid.transport.ClientServiceRegistryImpl;
 import org.teiid.transport.LocalServerConnection;
@@ -70,15 +68,13 @@
 		this.repo.setSystemStore(systemStore);
 		
         this.sessionService.setVDBRepository(repo);
-        this.dqp.setWorkManager(new FakeWorkManager());
         this.dqp.setBufferService(new FakeBufferService());
         this.dqp.setTransactionService(new FakeTransactionService());
         
         ConnectorManagerRepository cmr = Mockito.mock(ConnectorManagerRepository.class);
         Mockito.stub(cmr.getConnectorManager("source")).toReturn(new ConnectorManager("x", "x") {
         	@Override
-        	public SourceCapabilities getCapabilities()
-        			throws TranslatorException {
+        	public SourceCapabilities getCapabilities() {
         		return new BasicSourceCapabilities();
         	}
         });



More information about the teiid-commits mailing list