Author: shawkins
Date: 2011-06-29 14:45:44 -0400 (Wed, 29 Jun 2011)
New Revision: 3295
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java
Log:
TEIID-1614 fix for possible hang in the more_work state
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2011-06-29
17:50:11 UTC (rev 3294)
+++
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2011-06-29
18:45:44 UTC (rev 3295)
@@ -124,15 +124,14 @@
this.threadState = ThreadState.MORE_WORK;
break;
case MORE_WORK:
+ if (this.callingThread != null && !this.isProcessing) {
+ useCallingThread();
+ }
break;
case IDLE:
this.threadState = ThreadState.MORE_WORK;
if (this.callingThread != null) {
- if (this.callingThread == Thread.currentThread()) {
- run(); //restart with the calling thread
- } else {
- this.notifyAll(); //notify the waiting caller
- }
+ useCallingThread();
} else {
resumeProcessing();
}
@@ -144,6 +143,14 @@
LogManager.logDetail(org.teiid.logging.LogConstants.CTX_DQP, new Object[] {this,
"ignoring more work, since the work item is done"}); //$NON-NLS-1$
}
}
+
+ private void useCallingThread() {
+ if (this.callingThread == Thread.currentThread()) {
+ run(); //restart with the calling thread
+ } else {
+ this.notifyAll(); //notify the waiting caller
+ }
+ }
private void logTrace(String msg) {
LogManager.logTrace(org.teiid.logging.LogConstants.CTX_DQP, new Object[] {this, msg,
this.threadState});
Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-06-29
17:50:11 UTC (rev 3294)
+++
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-06-29
18:45:44 UTC (rev 3295)
@@ -93,9 +93,11 @@
private long creationTime = System.currentTimeMillis();
private DQPWorkContext workContext = DQPWorkContext.getWorkContext();
private List<CompletionListener<T>> completionListeners = new
LinkedList<CompletionListener<T>>();
+ private String parentName;
public FutureWork(final Callable<T> processor, int priority) {
super(processor);
+ this.parentName = Thread.currentThread().getName();
this.priority = priority;
}
@@ -105,6 +107,12 @@
}
@Override
+ public void run() {
+ LogManager.logDetail("Running task for parent thread", parentName);
//$NON-NLS-1$
+ super.run();
+ }
+
+ @Override
public int getPriority() {
return priority;
}
@@ -721,6 +729,8 @@
processorDataManager.setMetadataRepository(metadataRepository);
dataTierMgr = new TempTableDataManager(processorDataManager, this.bufferManager,
this.processWorkerPool, this.rsCache, this.matTables, this.cacheFactory);
dataTierMgr.setEventDistributor(eventDistributor);
+
+ LogManager.logDetail(LogConstants.CTX_DQP, "DQPCore started
maxThreads", this.config.getMaxThreads(), "maxActivePlans",
this.maxActivePlans, "source concurrency", this.userRequestSourceConcurrency);
//$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
public void setBufferService(BufferService service) {
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java 2011-06-29
17:50:11 UTC (rev 3294)
+++
branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java 2011-06-29
18:45:44 UTC (rev 3295)
@@ -37,6 +37,8 @@
import org.teiid.common.buffer.BufferManager;
import org.teiid.core.TeiidComponentException;
import org.teiid.language.SQLConstants;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
import org.teiid.query.QueryPlugin;
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.metadata.TempMetadataID;
@@ -96,6 +98,7 @@
public synchronized MatState setState(MatState state, Boolean valid, Long timestamp) {
MatState oldState = this.state;
+ LogManager.logDetail(LogConstants.CTX_MATVIEWS, this, "setting matState to",
state, valid, timestamp, "old values", oldState, this.valid); //$NON-NLS-1$
//$NON-NLS-2$
if (valid != null) {
this.valid = valid;
}
Modified:
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java
===================================================================
---
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java 2011-06-29
17:50:11 UTC (rev 3294)
+++
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java 2011-06-29
18:45:44 UTC (rev 3295)
@@ -181,12 +181,37 @@
for (int i = 0; i < 10 && item.getThreadState() != ThreadState.IDLE; i++) {
Thread.sleep(100);
}
- if (item.getThreadState() != ThreadState.IDLE) {
- fail();
- }
+ assertEquals(ThreadState.IDLE, item.getThreadState());
item.moreWork();
//if we don't return from this call, that means that this thread has been hijacked
-
//we should instead use t.
}
-
+
+ @Test public void testUsingCallingThreadMoreWork() throws Exception {
+ final int[] processCount = new int[1];
+ final TestWorkItem item = new TestWorkItem(false, false, Thread.currentThread()) {
+ @Override
+ protected boolean shouldPause() {
+ return false;
+ }
+
+ @Override
+ protected void process() {
+ super.process();
+ processCount[0]++;
+ }
+ };
+ item.run();
+ assertEquals(ThreadState.IDLE, item.getThreadState());
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ item.moreWork();
+ }
+ };
+ t.start();
+ t.join();
+ item.moreWork();
+ assertEquals(2, processCount[0]);
+ }
}