[teiid-commits] teiid SVN: r2992 - trunk/engine/src/main/java/org/teiid/dqp/internal/process.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Mar 15 21:43:50 EDT 2011


Author: shawkins
Date: 2011-03-15 21:43:50 -0400 (Tue, 15 Mar 2011)
New Revision: 2992

Modified:
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
Log:
TEIID-1474 refining the implementation so that morework is not called until after the workitem is done.  otherwise there is a race condition which could result in hung requests

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2011-03-16 01:26:46 UTC (rev 2991)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2011-03-16 01:43:50 UTC (rev 2992)
@@ -51,6 +51,7 @@
 import org.teiid.core.util.Assertion;
 import org.teiid.core.util.ObjectConverterUtil;
 import org.teiid.dqp.internal.datamgr.ConnectorWork;
+import org.teiid.dqp.internal.process.DQPCore.CompletionListener;
 import org.teiid.dqp.internal.process.DQPCore.FutureWork;
 import org.teiid.dqp.message.AtomicRequestMessage;
 import org.teiid.dqp.message.AtomicResultsMessage;
@@ -69,7 +70,7 @@
  * In the multi-threaded case we'd like to not even
  * notify the parent plan and just schedule the next poll. 
  */
-public class DataTierTupleSource implements TupleSource {
+public class DataTierTupleSource implements TupleSource, CompletionListener<AtomicResultsMessage> {
 	
     // Construction state
     private final AtomicRequestMessage aqr;
@@ -129,7 +130,7 @@
 			public AtomicResultsMessage call() throws Exception {
 				return getResults();
 			}
-		}, 100);
+		}, this, 100);
 	}
 
 	private List correctTypes(List row) throws TransformationException {
@@ -282,27 +283,16 @@
 			throws BlockedException, TeiidComponentException,
 			TranslatorException {
 		AtomicResultsMessage results = null;
-		try {
-			if (cancelAsynch) {
-				return null;
-			}
-			running = true;
-			if (!executed) {
-				results = cwi.execute();
-				executed = true;
-			} else {
-				results = cwi.more();
-			}
-		} finally {
-			if (!cancelAsynch) {
-				workItem.moreWork();
-			}
-			canAsynchClose = false;
-			if (closed.get()) {
-				cwi.close();
-			}
-			running = false;
+		if (cancelAsynch) {
+			return null;
 		}
+		running = true;
+		if (!executed) {
+			results = cwi.execute();
+			executed = true;
+		} else {
+			results = cwi.more();
+		}
 		return results;
 	}
     
@@ -402,5 +392,17 @@
 	public boolean isTransactional() {
 		return this.aqr.isTransactional();
 	}
+
+	@Override
+	public void onCompletion(FutureWork<AtomicResultsMessage> future) {
+		if (!cancelAsynch) {
+			workItem.moreWork();
+		}
+		canAsynchClose = false;
+		if (closed.get()) {
+			cwi.close();
+		}
+		running = false;		
+	}
 	
 }

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-03-16 01:26:46 UTC (rev 2991)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-03-16 01:43:50 UTC (rev 2992)
@@ -47,6 +47,7 @@
 import org.teiid.core.TeiidException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.types.DataTypeManager;
+import org.teiid.dqp.internal.process.DQPCore.CompletionListener;
 import org.teiid.dqp.internal.process.DQPCore.FutureWork;
 import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
 import org.teiid.dqp.internal.process.ThreadReuseExecutor.PrioritizedRunnable;
@@ -807,10 +808,11 @@
 		return work;
 	}
 	
-    <T> FutureWork<T> addWork(Callable<T> callable, int priority) {
+    <T> FutureWork<T> addWork(Callable<T> callable, CompletionListener<T> listener, int priority) {
     	FutureWork<T> work = new FutureWork<T>(callable, priority);
     	WorkWrapper<T> wl = new WorkWrapper<T>(work);
     	work.addCompletionListener(wl);
+    	work.addCompletionListener(listener);
     	synchronized (queue) {
         	if (totalThreads < dqpCore.getUserRequestSourceConcurrency()) {
         		dqpCore.addWork(work);



More information about the teiid-commits mailing list