[teiid-commits] teiid SVN: r3295 - in branches/7.4.x/engine/src: main/java/org/teiid/query/tempdata and 1 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Jun 29 14:45:44 EDT 2011


Author: shawkins
Date: 2011-06-29 14:45:44 -0400 (Wed, 29 Jun 2011)
New Revision: 3295

Modified:
   branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
   branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
   branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java
Log:
TEIID-1614 fix for possible hang in the more_work state

Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java	2011-06-29 17:50:11 UTC (rev 3294)
+++ branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java	2011-06-29 18:45:44 UTC (rev 3295)
@@ -124,15 +124,14 @@
 	    		this.threadState = ThreadState.MORE_WORK;
 	    		break;
 	    	case MORE_WORK:
+	    		if (this.callingThread != null && !this.isProcessing) {
+	    			useCallingThread();
+	    		}
 	    		break;
 	    	case IDLE:
 	    		this.threadState = ThreadState.MORE_WORK;
         		if (this.callingThread != null) {
-        			if (this.callingThread == Thread.currentThread()) {
-        				run(); //restart with the calling thread
-        			} else {
-        				this.notifyAll(); //notify the waiting caller
-        			}
+        			useCallingThread();
         		} else {
         			resumeProcessing();
         		}
@@ -144,6 +143,14 @@
 				LogManager.logDetail(org.teiid.logging.LogConstants.CTX_DQP, new Object[] {this, "ignoring more work, since the work item is done"}); //$NON-NLS-1$
     	}
     }
+
+	private void useCallingThread() {
+		if (this.callingThread == Thread.currentThread()) {
+			run(); //restart with the calling thread
+		} else {
+			this.notifyAll(); //notify the waiting caller
+		}
+	}
     
 	private void logTrace(String msg) {
 		LogManager.logTrace(org.teiid.logging.LogConstants.CTX_DQP, new Object[] {this, msg, this.threadState}); 

Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-06-29 17:50:11 UTC (rev 3294)
+++ branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-06-29 18:45:44 UTC (rev 3295)
@@ -93,9 +93,11 @@
 		private long creationTime = System.currentTimeMillis();
 		private DQPWorkContext workContext = DQPWorkContext.getWorkContext();
 		private List<CompletionListener<T>> completionListeners = new LinkedList<CompletionListener<T>>();
+		private String parentName;
 
 		public FutureWork(final Callable<T> processor, int priority) {
 			super(processor);
+			this.parentName = Thread.currentThread().getName();
 			this.priority = priority;
 		}
 		
@@ -105,6 +107,12 @@
 		}
 		
 		@Override
+		public void run() {
+			LogManager.logDetail("Running task for parent thread", parentName); //$NON-NLS-1$
+			super.run();
+		}
+		
+		@Override
 		public int getPriority() {
 			return priority;
 		}
@@ -721,6 +729,8 @@
         processorDataManager.setMetadataRepository(metadataRepository);
 		dataTierMgr = new TempTableDataManager(processorDataManager, this.bufferManager, this.processWorkerPool, this.rsCache, this.matTables, this.cacheFactory);
         dataTierMgr.setEventDistributor(eventDistributor);
+        
+        LogManager.logDetail(LogConstants.CTX_DQP, "DQPCore started maxThreads", this.config.getMaxThreads(), "maxActivePlans", this.maxActivePlans, "source concurrency", this.userRequestSourceConcurrency); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
 	}
 	
 	public void setBufferService(BufferService service) {

Modified: branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java	2011-06-29 17:50:11 UTC (rev 3294)
+++ branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java	2011-06-29 18:45:44 UTC (rev 3295)
@@ -37,6 +37,8 @@
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.language.SQLConstants;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
 import org.teiid.query.QueryPlugin;
 import org.teiid.query.metadata.QueryMetadataInterface;
 import org.teiid.query.metadata.TempMetadataID;
@@ -96,6 +98,7 @@
 		
 		public synchronized MatState setState(MatState state, Boolean valid, Long timestamp) {
 			MatState oldState = this.state;
+			LogManager.logDetail(LogConstants.CTX_MATVIEWS, this, "setting matState to", state, valid, timestamp, "old values", oldState, this.valid); //$NON-NLS-1$ //$NON-NLS-2$
 			if (valid != null) {
 				this.valid = valid;
 			}

Modified: branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java
===================================================================
--- branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java	2011-06-29 17:50:11 UTC (rev 3294)
+++ branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java	2011-06-29 18:45:44 UTC (rev 3295)
@@ -181,12 +181,37 @@
 		for (int i = 0; i < 10 && item.getThreadState() != ThreadState.IDLE; i++) {
 			Thread.sleep(100);
 		}
-		if (item.getThreadState() != ThreadState.IDLE) {
-			fail();
-		}
+		assertEquals(ThreadState.IDLE, item.getThreadState());
 		item.moreWork();
 		//if we don't return from this call, that means that this thread has been hijacked -
 		//we should instead use t.
     }
-        
+    
+    @Test public void testUsingCallingThreadMoreWork() throws Exception {
+    	final int[] processCount = new int[1];
+    	final TestWorkItem item = new TestWorkItem(false, false, Thread.currentThread()) {
+    		@Override
+    		protected boolean shouldPause() {
+    			return false;
+    		}
+    		
+    		@Override
+    		protected void process() {
+    			super.process();
+    			processCount[0]++;
+    		}
+    	};
+    	item.run();
+		assertEquals(ThreadState.IDLE, item.getThreadState());
+		Thread t = new Thread() {
+			@Override
+			public void run() {
+				item.moreWork();
+			}
+		};
+		t.start();
+		t.join();
+		item.moreWork();
+		assertEquals(2, processCount[0]);
+    }        
 }



More information about the teiid-commits mailing list