[teiid-commits] teiid SVN: r2992 - trunk/engine/src/main/java/org/teiid/dqp/internal/process.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Tue Mar 15 21:43:50 EDT 2011
Author: shawkins
Date: 2011-03-15 21:43:50 -0400 (Tue, 15 Mar 2011)
New Revision: 2992
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
Log:
TEIID-1474 refining the implementation so that morework is not called until after the workitem is done. otherwise there is a race condition which could result in hung requests
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2011-03-16 01:26:46 UTC (rev 2991)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2011-03-16 01:43:50 UTC (rev 2992)
@@ -51,6 +51,7 @@
import org.teiid.core.util.Assertion;
import org.teiid.core.util.ObjectConverterUtil;
import org.teiid.dqp.internal.datamgr.ConnectorWork;
+import org.teiid.dqp.internal.process.DQPCore.CompletionListener;
import org.teiid.dqp.internal.process.DQPCore.FutureWork;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.AtomicResultsMessage;
@@ -69,7 +70,7 @@
* In the multi-threaded case we'd like to not even
* notify the parent plan and just schedule the next poll.
*/
-public class DataTierTupleSource implements TupleSource {
+public class DataTierTupleSource implements TupleSource, CompletionListener<AtomicResultsMessage> {
// Construction state
private final AtomicRequestMessage aqr;
@@ -129,7 +130,7 @@
public AtomicResultsMessage call() throws Exception {
return getResults();
}
- }, 100);
+ }, this, 100);
}
private List correctTypes(List row) throws TransformationException {
@@ -282,27 +283,16 @@
throws BlockedException, TeiidComponentException,
TranslatorException {
AtomicResultsMessage results = null;
- try {
- if (cancelAsynch) {
- return null;
- }
- running = true;
- if (!executed) {
- results = cwi.execute();
- executed = true;
- } else {
- results = cwi.more();
- }
- } finally {
- if (!cancelAsynch) {
- workItem.moreWork();
- }
- canAsynchClose = false;
- if (closed.get()) {
- cwi.close();
- }
- running = false;
+ if (cancelAsynch) {
+ return null;
}
+ running = true;
+ if (!executed) {
+ results = cwi.execute();
+ executed = true;
+ } else {
+ results = cwi.more();
+ }
return results;
}
@@ -402,5 +392,17 @@
public boolean isTransactional() {
return this.aqr.isTransactional();
}
+
+ @Override
+ public void onCompletion(FutureWork<AtomicResultsMessage> future) {
+ if (!cancelAsynch) {
+ workItem.moreWork();
+ }
+ canAsynchClose = false;
+ if (closed.get()) {
+ cwi.close();
+ }
+ running = false;
+ }
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-03-16 01:26:46 UTC (rev 2991)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-03-16 01:43:50 UTC (rev 2992)
@@ -47,6 +47,7 @@
import org.teiid.core.TeiidException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.dqp.internal.process.DQPCore.CompletionListener;
import org.teiid.dqp.internal.process.DQPCore.FutureWork;
import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
import org.teiid.dqp.internal.process.ThreadReuseExecutor.PrioritizedRunnable;
@@ -807,10 +808,11 @@
return work;
}
- <T> FutureWork<T> addWork(Callable<T> callable, int priority) {
+ <T> FutureWork<T> addWork(Callable<T> callable, CompletionListener<T> listener, int priority) {
FutureWork<T> work = new FutureWork<T>(callable, priority);
WorkWrapper<T> wl = new WorkWrapper<T>(work);
work.addCompletionListener(wl);
+ work.addCompletionListener(listener);
synchronized (queue) {
if (totalThreads < dqpCore.getUserRequestSourceConcurrency()) {
dqpCore.addWork(work);
More information about the teiid-commits
mailing list