Author: shawkins
Date: 2011-07-13 12:34:27 -0400 (Wed, 13 Jul 2011)
New Revision: 3308
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java
Log:
TEIID-1614 refining fix for possible hangs in embedded execution
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-07-12
19:23:10 UTC (rev 3307)
+++
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-07-13
16:34:27 UTC (rev 3308)
@@ -163,6 +163,7 @@
private long processingTimestamp = System.currentTimeMillis();
protected boolean useCallingThread;
+ private volatile boolean hasThread;
public RequestWorkItem(DQPCore dqpCore, RequestMessage requestMsg, Request request,
ResultsReceiver<ResultsMessage> receiver, RequestID requestID, DQPWorkContext
workContext) {
this.requestMsg = requestMsg;
@@ -200,26 +201,34 @@
@Override
public void run() {
- while (!isDoneProcessing()) {
- super.run();
- if (!useCallingThread) {
- break;
- }
- //should use the calling thread
- synchronized (this) {
- if (this.resultsReceiver == null) {
- break; //allow results to be processed by calling thread
+ hasThread = true;
+ try {
+ while (!isDoneProcessing()) {
+ super.run();
+ if (!useCallingThread) {
+ break;
}
- try {
- wait();
- } catch (InterruptedException e) {
+ //should use the calling thread
+ synchronized (this) {
+ if (this.resultsReceiver == null) {
+ break; //allow results to be processed by calling thread
+ }
+ if (this.getThreadState() == ThreadState.MORE_WORK) {
+ continue;
+ }
try {
- requestCancel();
- } catch (TeiidComponentException e1) {
- throw new TeiidRuntimeException(e1);
+ wait();
+ } catch (InterruptedException e) {
+ try {
+ requestCancel();
+ } catch (TeiidComponentException e1) {
+ throw new TeiidRuntimeException(e1);
+ }
}
}
}
+ } finally {
+ hasThread = false;
}
}
@@ -237,14 +246,15 @@
public void doMoreWork() {
boolean run = false;
synchronized (this) {
- run = this.getThreadState() == ThreadState.IDLE;
moreWork();
if (!useCallingThread || this.getThreadState() != ThreadState.MORE_WORK) {
return;
}
+ run = !hasThread;
}
if (run) {
//run outside of the lock
+ LogManager.logDetail(LogConstants.CTX_DQP, "Restarting processing using the
calling thread", requestID); //$NON-NLS-1$
run();
}
}
Modified:
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java
===================================================================
---
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java 2011-07-12
19:23:10 UTC (rev 3307)
+++
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java 2011-07-13
16:34:27 UTC (rev 3308)
@@ -26,17 +26,20 @@
import java.lang.Thread.UncaughtExceptionHandler;
import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
-import org.jboss.netty.handler.timeout.TimeoutException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -74,12 +77,14 @@
static Condition waiting = lock.newCondition();
static Condition wait = lock.newCondition();
+ static Semaphore sourceCounter = new Semaphore(0);
+
public static int blocking() throws InterruptedException {
lock.lock();
try {
waiting.signal();
if (!wait.await(2, TimeUnit.SECONDS)) {
- throw new TimeoutException();
+ throw new RuntimeException();
}
} finally {
lock.unlock();
@@ -88,7 +93,7 @@
}
static FakeServer server = new FakeServer();
-
+
@SuppressWarnings("serial")
@BeforeClass public static void oneTimeSetup() throws Exception {
server.setUseCallingThread(true);
@@ -106,13 +111,15 @@
throws TranslatorException {
return new ResultSetExecution() {
+ boolean returnedRow = false;
+
@Override
public void execute() throws TranslatorException {
lock.lock();
try {
- waiting.signal();
+ sourceCounter.release();
if (!wait.await(2, TimeUnit.SECONDS)) {
- throw new TimeoutException();
+ throw new RuntimeException();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -133,8 +140,11 @@
@Override
public List<?> next() throws TranslatorException,
DataNotAvailableException {
- // TODO Auto-generated method stub
- return null;
+ if (returnedRow) {
+ return null;
+ }
+ returnedRow = true;
+ return new ArrayList<Object>(Collections.singleton(null));
}
};
}
@@ -255,23 +265,68 @@
SimpleUncaughtExceptionHandler handler = new SimpleUncaughtExceptionHandler();
t.setUncaughtExceptionHandler(handler);
+ sourceCounter.acquire();
+
+ //t should now be waiting also
+
lock.lock();
try {
- assertTrue(waiting.await(2, TimeUnit.SECONDS));
+ wait.signal();
} finally {
lock.unlock();
- }
+ }
+
+ //t should finish
+ t.join();
+ if (handler.t != null) {
+ throw handler.t;
+ }
+ }
+
+ @Test public void testWaitMultiple() throws Throwable {
+ final Connection c = server.createConnection("jdbc:teiid:test");
+
+ Thread t = new Thread() {
+ public void run() {
+ Statement s;
+ try {
+ s = c.createStatement();
+ assertTrue(s.execute("select part_id from parts union all select part_id
from parts"));
+ ResultSet r = s.getResultSet();
+
+ //wake up the other source thread, should put the requestworkitem into the more
work state
+ lock.lock();
+ try {
+ wait.signal();
+ } finally {
+ lock.unlock();
+ }
+ Thread.sleep(1000); //TODO: need a better hook to determine that connector work
has finished
+ while (r.next()) {
+ //will hang unless this thread is allowed to resume processing
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ t.start();
+ SimpleUncaughtExceptionHandler handler = new SimpleUncaughtExceptionHandler();
+ t.setUncaughtExceptionHandler(handler);
+
+ sourceCounter.acquire(2);
+
//t should now be waiting also
+ //wake up 1 source thread
lock.lock();
try {
wait.signal();
} finally {
lock.unlock();
}
-
- //t should finish
+
t.join();
if (handler.t != null) {
Show replies by date