[teiid-commits] teiid SVN: r3308 - in branches/7.4.x: test-integration/common/src/test/java/org/teiid/jdbc and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Jul 13 12:34:27 EDT 2011


Author: shawkins
Date: 2011-07-13 12:34:27 -0400 (Wed, 13 Jul 2011)
New Revision: 3308

Modified:
   branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java
Log:
TEIID-1614 refining fix for possible hangs in embedded execution

Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-07-12 19:23:10 UTC (rev 3307)
+++ branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-07-13 16:34:27 UTC (rev 3308)
@@ -163,6 +163,7 @@
     private long processingTimestamp = System.currentTimeMillis();
     
     protected boolean useCallingThread;
+    private volatile boolean hasThread;
     
     public RequestWorkItem(DQPCore dqpCore, RequestMessage requestMsg, Request request, ResultsReceiver<ResultsMessage> receiver, RequestID requestID, DQPWorkContext workContext) {
         this.requestMsg = requestMsg;
@@ -200,26 +201,34 @@
 		
 	@Override
 	public void run() {
-		while (!isDoneProcessing()) {
-			super.run();
-			if (!useCallingThread) {
-				break;
-			}
-			//should use the calling thread
-			synchronized (this) {
-				if (this.resultsReceiver == null) {
-					break; //allow results to be processed by calling thread
+		hasThread = true;
+		try {
+			while (!isDoneProcessing()) {
+				super.run();
+				if (!useCallingThread) {
+					break;
 				}
-				try {
-					wait();
-				} catch (InterruptedException e) {
+				//should use the calling thread
+				synchronized (this) {
+					if (this.resultsReceiver == null) {
+						break; //allow results to be processed by calling thread
+					}
+					if (this.getThreadState() == ThreadState.MORE_WORK) {
+						continue;
+					}
 					try {
-						requestCancel();
-					} catch (TeiidComponentException e1) {
-						throw new TeiidRuntimeException(e1);
+						wait();
+					} catch (InterruptedException e) {
+						try {
+							requestCancel();
+						} catch (TeiidComponentException e1) {
+							throw new TeiidRuntimeException(e1);
+						}
 					}
 				}
 			}
+		} finally {
+			hasThread = false;
 		}
 	}
 
@@ -237,14 +246,15 @@
 	public void doMoreWork() {
 		boolean run = false;
 		synchronized (this) {
-			run = this.getThreadState() == ThreadState.IDLE;
 			moreWork();
 			if (!useCallingThread || this.getThreadState() != ThreadState.MORE_WORK) {
 				return;
 			}
+			run = !hasThread;
 		}
 		if (run) {
 			//run outside of the lock
+			LogManager.logDetail(LogConstants.CTX_DQP, "Restarting processing using the calling thread", requestID); //$NON-NLS-1$
 			run();
 		}
 	}

Modified: branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java
===================================================================
--- branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java	2011-07-12 19:23:10 UTC (rev 3307)
+++ branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java	2011-07-13 16:34:27 UTC (rev 3308)
@@ -26,17 +26,20 @@
 
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.jboss.netty.handler.timeout.TimeoutException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -74,12 +77,14 @@
 	static Condition waiting = lock.newCondition();
 	static Condition wait = lock.newCondition();
 	
+	static Semaphore sourceCounter = new Semaphore(0);
+	
 	public static int blocking() throws InterruptedException {
 		lock.lock();
 		try {
 			waiting.signal();
 			if (!wait.await(2, TimeUnit.SECONDS)) {
-				throw new TimeoutException();
+				throw new RuntimeException();
 			}
 		} finally {
 			lock.unlock();
@@ -88,7 +93,7 @@
 	}
 
 	static FakeServer server = new FakeServer();
-    
+	
 	@SuppressWarnings("serial")
 	@BeforeClass public static void oneTimeSetup() throws Exception {
     	server.setUseCallingThread(true);
@@ -106,13 +111,15 @@
     								throws TranslatorException {
     						    return new ResultSetExecution() {
     						    	
+    						    	boolean returnedRow = false;
+    						    	
 									@Override
 									public void execute() throws TranslatorException {
 										lock.lock();
 										try {
-											waiting.signal();
+											sourceCounter.release();
 											if (!wait.await(2, TimeUnit.SECONDS)) {
-												throw new TimeoutException();
+												throw new RuntimeException();
 											}
 										} catch (InterruptedException e) {
 											throw new RuntimeException(e);
@@ -133,8 +140,11 @@
 									
 									@Override
 									public List<?> next() throws TranslatorException, DataNotAvailableException {
-										// TODO Auto-generated method stub
-										return null;
+										if (returnedRow) {
+											return null;
+										}
+										returnedRow = true;
+										return new ArrayList<Object>(Collections.singleton(null));
 									}
 								};
     						}
@@ -255,23 +265,68 @@
     	SimpleUncaughtExceptionHandler handler = new SimpleUncaughtExceptionHandler();
     	t.setUncaughtExceptionHandler(handler);
     	
+    	sourceCounter.acquire();
+    	
+    	//t should now be waiting also
+    	
     	lock.lock();
     	try {
-    		assertTrue(waiting.await(2, TimeUnit.SECONDS));
+    		wait.signal();
     	} finally {
     		lock.unlock();
-    	}    	
+    	}
+
+    	//t should finish
+    	t.join();
     	
+    	if (handler.t != null) {
+    		throw handler.t;
+    	}
+	}
+	
+	@Test public void testWaitMultiple() throws Throwable {
+		final Connection c = server.createConnection("jdbc:teiid:test");
+    	
+		Thread t = new Thread() {
+			public void run() {
+		    	Statement s;
+				try {
+					s = c.createStatement();
+			    	assertTrue(s.execute("select part_id from parts union all select part_id from parts"));
+			    	ResultSet r = s.getResultSet();
+			    	
+			    	//wake up the other source thread, should put the requestworkitem into the more work state 
+			    	lock.lock();
+			    	try {
+			    		wait.signal();
+			    	} finally {
+			    		lock.unlock();
+			    	}
+			    	Thread.sleep(1000); //TODO: need a better hook to determine that connector work has finished
+			    	while (r.next()) {
+			    		//will hang unless this thread is allowed to resume processing
+			    	}
+				} catch (Exception e) {
+					throw new RuntimeException(e);
+				}
+			}
+		};
+    	t.start();
+    	SimpleUncaughtExceptionHandler handler = new SimpleUncaughtExceptionHandler();
+    	t.setUncaughtExceptionHandler(handler);
+    	
+    	sourceCounter.acquire(2);
+    	
     	//t should now be waiting also
     	
+    	//wake up 1 source thread
     	lock.lock();
     	try {
     		wait.signal();
     	} finally {
     		lock.unlock();
     	}
-
-    	//t should finish
+    	
     	t.join();
     	
     	if (handler.t != null) {



More information about the teiid-commits mailing list