[teiid-commits] teiid SVN: r3307 - in branches/7.4.x: engine/src/main/java/org/teiid/dqp/internal/process and 3 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Jul 12 15:23:10 EDT 2011


Author: shawkins
Date: 2011-07-12 15:23:10 -0400 (Tue, 12 Jul 2011)
New Revision: 3307

Modified:
   branches/7.4.x/client/src/main/java/org/teiid/jdbc/StatementImpl.java
   branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
   branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java
   branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/FakeConnector.java
   branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorManager.java
   branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorWorkItem.java
   branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java
   branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java
Log:
TEIID-1614 more appropriate fix for possible hangs in embedded execution

Modified: branches/7.4.x/client/src/main/java/org/teiid/jdbc/StatementImpl.java
===================================================================
--- branches/7.4.x/client/src/main/java/org/teiid/jdbc/StatementImpl.java	2011-07-11 19:37:33 UTC (rev 3306)
+++ branches/7.4.x/client/src/main/java/org/teiid/jdbc/StatementImpl.java	2011-07-12 19:23:10 UTC (rev 3307)
@@ -40,8 +40,6 @@
 import java.util.Properties;
 import java.util.TimeZone;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.regex.Matcher;
@@ -538,11 +536,7 @@
 		});
     	if (synch) {
     		try {
-    			if (queryTimeoutMS > 0) {
-    				pendingResult.get(queryTimeoutMS, TimeUnit.MILLISECONDS);
-    			} else {
-    				pendingResult.get();
-    			}
+				pendingResult.get();
     			result.get(); //throw an exception if needed
     			return result;
     		} catch (ExecutionException e) {
@@ -552,8 +546,6 @@
     			throw TeiidSQLException.create(e);
     		} catch (InterruptedException e) {
     			timeoutOccurred();
-    		} catch (TimeoutException e) {
-    			timeoutOccurred();
 			}
     		throw new TeiidSQLException(JDBCPlugin.Util.getString("MMStatement.Timeout_before_complete")); //$NON-NLS-1$
     	}
@@ -578,7 +570,7 @@
         reqMsg.setExecutionId(this.currentRequestID);
         
         ResultsFuture.CompletionListener<ResultsMessage> compeletionListener = null;
-		if (queryTimeoutMS > 0 && !synch) {
+		if (queryTimeoutMS > 0) {
 			final CancelTask c = new QueryTimeoutCancelTask(queryTimeoutMS, this);
 			cancellationTimer.add(c);
 			compeletionListener = new ResultsFuture.CompletionListener<ResultsMessage>() {

Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java	2011-07-11 19:37:33 UTC (rev 3306)
+++ branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java	2011-07-12 19:23:10 UTC (rev 3307)
@@ -26,7 +26,6 @@
 import javax.resource.spi.work.WorkEvent;
 import javax.resource.spi.work.WorkListener;
 
-import org.teiid.core.TeiidRuntimeException;
 import org.teiid.logging.LogManager;
 
 
@@ -43,23 +42,14 @@
     
     private ThreadState threadState = ThreadState.MORE_WORK;
     private volatile boolean isProcessing;
-    private Thread callingThread;
     
-    public AbstractWorkItem(Thread callingThread) {
-    	this.callingThread = callingThread;
-    }
-    
     public void run() {
-    	do {
-			startProcessing();
-			try {
-				process();
-			} finally {
-				if (!endProcessing()) {
-					break;
-				}
-			}
-    	} while (!isDoneProcessing());
+		startProcessing();
+		try {
+			process();
+		} finally {
+			endProcessing();
+		}
     }
     
     synchronized ThreadState getThreadState() {
@@ -79,10 +69,7 @@
     	this.threadState = ThreadState.WORKING;
 	}
     
-    /**
-     * @return true if processing should be continued
-     */
-    final private synchronized boolean endProcessing() {
+    private synchronized void endProcessing() {
     	isProcessing = false;
     	logTrace("end processing"); //$NON-NLS-1$
     	switch (this.threadState) {
@@ -92,21 +79,20 @@
 	        		this.threadState = ThreadState.DONE;
 	        	} else {
 		    		this.threadState = ThreadState.IDLE;
-		    		return pauseProcessing();
+		    		pauseProcessing();
 	        	}
 	    		break;
 	    	case MORE_WORK:
 	    		if (isDoneProcessing()) {
 	    			logTrace("done processing - ignoring more"); //$NON-NLS-1$
 	        		this.threadState = ThreadState.DONE;
-	        	} else if (this.callingThread == null) {
-        			resumeProcessing();
+	        	} else {
+	        		resumeProcessing();
 	        	}
 	    		break;
     		default:
     			throw new IllegalStateException("Should not END on " + this.threadState); //$NON-NLS-1$
     	}
-    	return this.callingThread != null;
     }
     
     protected boolean isIdle() {
@@ -117,24 +103,18 @@
     	moreWork(true);
     }
     
-    final protected synchronized void moreWork(boolean ignoreDone) {
+    protected synchronized void moreWork(boolean ignoreDone) {
     	logTrace("more work"); //$NON-NLS-1$
+    	this.notifyAll();
     	switch (this.threadState) {
 	    	case WORKING:
 	    		this.threadState = ThreadState.MORE_WORK;
 	    		break;
 	    	case MORE_WORK:
-	    		if (this.callingThread != null && !this.isProcessing) {
-	    			useCallingThread();
-	    		}
 	    		break;
 	    	case IDLE:
 	    		this.threadState = ThreadState.MORE_WORK;
-        		if (this.callingThread != null) {
-        			useCallingThread();
-        		} else {
-        			resumeProcessing();
-        		}
+	    		resumeProcessing();
 	    		break;
 			default:
 				if (!ignoreDone) {
@@ -143,14 +123,6 @@
 				LogManager.logDetail(org.teiid.logging.LogConstants.CTX_DQP, new Object[] {this, "ignoring more work, since the work item is done"}); //$NON-NLS-1$
     	}
     }
-
-	private void useCallingThread() {
-		if (this.callingThread == Thread.currentThread()) {
-			run(); //restart with the calling thread
-		} else {
-			this.notifyAll(); //notify the waiting caller
-		}
-	}
     
 	private void logTrace(String msg) {
 		LogManager.logTrace(org.teiid.logging.LogConstants.CTX_DQP, new Object[] {this, msg, this.threadState}); 
@@ -158,33 +130,8 @@
     
     protected abstract void process();
 
-	protected boolean pauseProcessing() {
-		if (this.callingThread != null && !shouldPause()) {
-			return false;
-		}
-		while (this.callingThread != null && this.getThreadState() == ThreadState.IDLE) {
-			try {
-				this.wait(); //the lock should already be held
-			} catch (InterruptedException e) {
-				interrupted(e);
-			}
-		}
-		return this.callingThread != null;
+	protected void pauseProcessing() {
 	}
-	
-	/**
-	 * only called for synch processing
-	 */
-	protected boolean shouldPause() {
-		return false;
-	}
-
-	/**
-	 * only called for synch processing
-	 */
-	protected void interrupted(InterruptedException e) {
-		throw new TeiidRuntimeException(e);
-	}
     
     protected abstract void resumeProcessing();
 	

Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-07-11 19:37:33 UTC (rev 3306)
+++ branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-07-12 19:23:10 UTC (rev 3307)
@@ -108,7 +108,7 @@
 		
 		@Override
 		public void run() {
-			LogManager.logDetail("Running task for parent thread", parentName); //$NON-NLS-1$
+			LogManager.logDetail(LogConstants.CTX_DQP, "Running task for parent thread", parentName); //$NON-NLS-1$
 			super.run();
 		}
 		
@@ -352,6 +352,7 @@
 			}
 		}
         if (runInThread) {
+        	workItem.useCallingThread = true;
         	workItem.run();
         }
         return resultsFuture;

Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java	2011-07-11 19:37:33 UTC (rev 3306)
+++ branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java	2011-07-12 19:23:10 UTC (rev 3307)
@@ -188,11 +188,8 @@
 	}
 	
 	public void runInContext(final Runnable runnable) {
-		DQPWorkContext.setWorkContext(this);
-		boolean associated = false;
-		if (securityHelper != null && this.getSubject() != null) {
-			associated = securityHelper.assosiateSecurityContext(this.getSecurityDomain(), this.getSecurityContext());			
-		}
+		DQPWorkContext previous = DQPWorkContext.getWorkContext();
+		boolean associated = attachDQPWorkContext();
 		try {
 			runnable.run();
 		} finally {
@@ -200,9 +197,21 @@
 				securityHelper.clearSecurityContext(this.getSecurityDomain());			
 			}
 			DQPWorkContext.releaseWorkContext();
+			if (previous != null) {
+				previous.attachDQPWorkContext();
+			}
 		}
 	}
 
+	private boolean attachDQPWorkContext() {
+		DQPWorkContext.setWorkContext(this);
+		boolean associated = false;
+		if (securityHelper != null && this.getSubject() != null) {
+			associated = securityHelper.assosiateSecurityContext(this.getSecurityDomain(), this.getSecurityContext());			
+		}
+		return associated;
+	}
+
 	public HashMap<String, DataPolicy> getAllowedDataPolicies() {
 		if (this.policies == null) {
 	    	this.policies = new HashMap<String, DataPolicy>();

Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-07-11 19:37:33 UTC (rev 3306)
+++ branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-07-12 19:23:10 UTC (rev 3307)
@@ -162,8 +162,9 @@
     /**The time when command begins processing on the server.*/
     private long processingTimestamp = System.currentTimeMillis();
     
+    protected boolean useCallingThread;
+    
     public RequestWorkItem(DQPCore dqpCore, RequestMessage requestMsg, Request request, ResultsReceiver<ResultsMessage> receiver, RequestID requestID, DQPWorkContext workContext) {
-    	super(workContext.useCallingThread() || requestMsg.isSync() ? Thread.currentThread() : null);
         this.requestMsg = requestMsg;
         this.requestID = requestID;
         this.processorTimeslice = dqpCore.getProcessorTimeSlice();
@@ -196,24 +197,56 @@
 	protected boolean isDoneProcessing() {
 		return isClosed;
 	}
+		
+	@Override
+	public void run() {
+		while (!isDoneProcessing()) {
+			super.run();
+			if (!useCallingThread) {
+				break;
+			}
+			//should use the calling thread
+			synchronized (this) {
+				if (this.resultsReceiver == null) {
+					break; //allow results to be processed by calling thread
+				}
+				try {
+					wait();
+				} catch (InterruptedException e) {
+					try {
+						requestCancel();
+					} catch (TeiidComponentException e1) {
+						throw new TeiidRuntimeException(e1);
+					}
+				}
+			}
+		}
+	}
 
 	@Override
 	protected void resumeProcessing() {
-		if (doneProducingBatches && !closeRequested && !isCanceled) {
-			this.run(); // just run in the IO thread
-		} else {
+		if (!this.useCallingThread) {
 			dqpCore.addWork(this);
 		}
 	}
 	
-	@Override
-	protected void interrupted(InterruptedException e) {
-		try {
-			this.requestCancel();
-		} catch (TeiidComponentException e1) {
-			throw new TeiidRuntimeException(e1);
+	/**
+	 * Special call from request threads to allow resumption of processing by
+	 * the calling thread.
+	 */
+	public void doMoreWork() {
+		boolean run = false;
+		synchronized (this) {
+			run = this.getThreadState() == ThreadState.IDLE;
+			moreWork();
+			if (!useCallingThread || this.getThreadState() != ThreadState.MORE_WORK) {
+				return;
+			}
 		}
-		super.interrupted(e);
+		if (run) {
+			//run outside of the lock
+			run();
+		}
 	}
 	
 	@Override
@@ -650,12 +683,6 @@
 		return new TeiidProcessingException(exception, SQLStates.QUERY_CANCELED, exception.getMessage());
 	}
     
-    @Override
-    protected boolean shouldPause() {
-    	//if we are waiting on results it's ok to pause
-    	return this.resultsReceiver != null;
-    }
-
     private static List<ParameterInfo> getParameterInfo(StoredProcedure procedure) {
         List<ParameterInfo> paramInfos = new ArrayList<ParameterInfo>();
         
@@ -750,12 +777,12 @@
     	if (!this.doneProducingBatches) {
     		this.requestCancel(); //pending work should be canceled for fastest clean up
     	}
-    	this.moreWork();
+    	this.doMoreWork();
     }
     
     public void requestMore(int batchFirst, int batchLast, ResultsReceiver<ResultsMessage> receiver) {
     	this.requestResults(batchFirst, batchLast, receiver);
-    	this.moreWork(); 
+    	this.doMoreWork(); 
     }
     
     public void closeAtomicRequest(AtomicRequestID atomicRequestId) {

Modified: branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/FakeConnector.java
===================================================================
--- branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/FakeConnector.java	2011-07-11 19:37:33 UTC (rev 3306)
+++ branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/FakeConnector.java	2011-07-12 19:23:10 UTC (rev 3307)
@@ -22,35 +22,25 @@
 
 package org.teiid.dqp.internal.datamgr;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import junit.framework.Assert;
-
 import org.teiid.language.Command;
-import org.teiid.language.QueryExpression;
 import org.teiid.metadata.RuntimeMetadata;
-import org.teiid.translator.TranslatorException;
 import org.teiid.translator.DataNotAvailableException;
 import org.teiid.translator.Execution;
 import org.teiid.translator.ExecutionContext;
 import org.teiid.translator.ExecutionFactory;
 import org.teiid.translator.ResultSetExecution;
+import org.teiid.translator.TranslatorException;
 import org.teiid.translator.UpdateExecution;
 
-public class FakeConnector extends ExecutionFactory {
-	private static final int RESULT_SIZE = 5;
-	
-	private boolean executeBlocks;
-    private boolean nextBatchBlocks;
-    private boolean returnsFinalBatch;
-    private boolean driverThrowsExceptionOnCancel;
-    private long simulatedBatchRetrievalTime = 1000L;
-    private ClassLoader classloader;
+public class FakeConnector extends ExecutionFactory<Object, Object> {
     
     private int connectionCount;
     private int executionCount;
-    
+
     public int getConnectionCount() {
 		return connectionCount;
 	}
@@ -62,123 +52,52 @@
     @Override
     public Execution createExecution(Command command, ExecutionContext executionContext, RuntimeMetadata metadata, Object connection) throws TranslatorException {
     	executionCount++;
-        return new FakeBlockingExecution(executionContext);
+        return new FakeExecution(executionContext);
     }
     
-    public Object getConnection() {
-        return new FakeConnection();
-    }
-    
     @Override
     public Object getConnection(Object factory) throws TranslatorException {
+    	connectionCount++;
     	return factory;
     }
     
     @Override
     public void closeConnection(Object connection, Object factory) {
     }
-	
-    private class FakeConnection {
-    	public FakeConnection() {
-			connectionCount++;
-		}
-    	
-        public boolean released = false;
-        public void close() {
-            Assert.assertFalse("The connection should not be released more than once", released); //$NON-NLS-1$
-            released = true;
-        }
-    }   
     
-    private final class FakeBlockingExecution implements ResultSetExecution, UpdateExecution {
-        private boolean closed = false;
-        private boolean cancelled = false;
+    public final class FakeExecution implements ResultSetExecution, UpdateExecution {
         private int rowCount;
         ExecutionContext ec;
-        public FakeBlockingExecution(ExecutionContext ec) {
+        
+        public FakeExecution(ExecutionContext ec) {
             this.ec = ec;
         }
-        public void execute(QueryExpression query, int maxBatchSize) throws TranslatorException {
-            if (executeBlocks) {
-                waitForCancel();
-            }
-            if (classloader != null) {
-            	Assert.assertSame(classloader, Thread.currentThread().getContextClassLoader());
-            }
-        }
-        public synchronized void cancel() throws TranslatorException {
-            cancelled = true;
-            this.notify();
-        }
-        public void close() {
-            Assert.assertFalse("The execution should not be closed more than once", closed); //$NON-NLS-1$
-            closed = true;
-        }
         @Override
         public void execute() throws TranslatorException {
             ec.addWarning(new Exception("Some warning")); //$NON-NLS-1$
         }
         @Override
-        public List next() throws TranslatorException, DataNotAvailableException {
-        	if (nextBatchBlocks) {
-                waitForCancel();
-            }
-            if (this.rowCount >= RESULT_SIZE || returnsFinalBatch) {
+        public List<?> next() throws TranslatorException, DataNotAvailableException {
+            if (this.rowCount == 1) {
             	return null;
             }
             this.rowCount++;
-            return Arrays.asList(this.rowCount - 1);
+            return new ArrayList<Object>(Arrays.asList(this.rowCount - 1));
         }
-        private synchronized void waitForCancel() throws TranslatorException {
-            try {
-                this.wait(simulatedBatchRetrievalTime);
-                if (cancelled && driverThrowsExceptionOnCancel) {
-                    throw new TranslatorException("Request cancelled"); //$NON-NLS-1$
-                }
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        }
 		@Override
 		public int[] getUpdateCounts() throws DataNotAvailableException,
 				TranslatorException {
 			return new int[] {1};
 		}
+		
+		@Override
+		public void close() {
+		}
+		
+		@Override
+		public void cancel() throws TranslatorException {
+		}
     }
 
-	public boolean isExecuteBlocks() {
-		return executeBlocks;
-	}
-	public void setExecuteBlocks(boolean executeBlocks) {
-		this.executeBlocks = executeBlocks;
-	}
-	public boolean isNextBatchBlocks() {
-		return nextBatchBlocks;
-	}
-	public void setNextBatchBlocks(boolean nextBatchBlocks) {
-		this.nextBatchBlocks = nextBatchBlocks;
-	}
-	public boolean isReturnsFinalBatch() {
-		return returnsFinalBatch;
-	}
-	public void setReturnsFinalBatch(boolean returnsFinalBatch) {
-		this.returnsFinalBatch = returnsFinalBatch;
-	}
-	public boolean isDriverThrowsExceptionOnCancel() {
-		return driverThrowsExceptionOnCancel;
-	}
-	public void setDriverThrowsExceptionOnCancel(
-			boolean driverThrowsExceptionOnCancel) {
-		this.driverThrowsExceptionOnCancel = driverThrowsExceptionOnCancel;
-	}
-	public long getSimulatedBatchRetrievalTime() {
-		return simulatedBatchRetrievalTime;
-	}
-	public void setSimulatedBatchRetrievalTime(long simulatedBatchRetrievalTime) {
-		this.simulatedBatchRetrievalTime = simulatedBatchRetrievalTime;
-	}
 	
-	public void setClassloader(ClassLoader classloader) {
-		this.classloader = classloader;
-	}
 }

Modified: branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorManager.java
===================================================================
--- branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorManager.java	2011-07-11 19:37:33 UTC (rev 3306)
+++ branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorManager.java	2011-07-12 19:23:10 UTC (rev 3307)
@@ -50,7 +50,7 @@
 				return c;
 			}
 			protected Object getConnectionFactory(){
-				return c.getConnection();
+				return c;
 			}
 		};
 		cm.start();

Modified: branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorWorkItem.java
===================================================================
--- branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorWorkItem.java	2011-07-11 19:37:33 UTC (rev 3306)
+++ branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorWorkItem.java	2011-07-12 19:23:10 UTC (rev 3307)
@@ -87,7 +87,7 @@
 		int total_columns = 3;
 		StoredProcedure command = (StoredProcedure)helpGetCommand("{call pm2.spTest8(?)}", EXAMPLE_BQT); //$NON-NLS-1$      
 		command.getInputParameters().get(0).setExpression(new Constant(1));
-		Call proc = (Call)new LanguageBridgeFactory(EXAMPLE_BQT).translate(command);
+		Call proc = new LanguageBridgeFactory(EXAMPLE_BQT).translate(command);
 
 		ProcedureBatchHandler pbh = new ProcedureBatchHandler(proc, exec);
 

Modified: branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java
===================================================================
--- branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java	2011-07-11 19:37:33 UTC (rev 3306)
+++ branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java	2011-07-12 19:23:10 UTC (rev 3307)
@@ -25,20 +25,10 @@
 import static org.junit.Assert.*;
 
 import org.junit.Test;
-import org.teiid.dqp.internal.process.AbstractWorkItem.ThreadState;
 
 
 public class TestWorkItemState {
 	
-	private final class WorkItemRunner implements Runnable {
-		TestWorkItem workItem;
-
-		@Override
-		public void run() {
-			workItem.run();
-		}
-	}
-
 	private class TestWorkItem extends AbstractWorkItem {
 
 		private boolean isDone;
@@ -50,11 +40,6 @@
 		}
 		
 		private TestWorkItem(boolean done, boolean callMoreWork) {
-			this(done, callMoreWork, null);
-		}
-		
-		private TestWorkItem(boolean done, boolean callMoreWork, Thread callingThread) {
-			super(callingThread);
 			this.isDone = done;
 			this.callMoreWork = callMoreWork;
 		}
@@ -167,51 +152,4 @@
     	}
     }
     
-    @Test public void testUsingCallingThreadIdle() throws Exception {
-    	WorkItemRunner r = new WorkItemRunner();
-    	Thread t = new Thread(r);
-    	final TestWorkItem item = new TestWorkItem(false, false, t) {
-    		@Override
-    		protected boolean shouldPause() {
-    			return true;
-    		}
-    	};
-    	r.workItem = item;
-    	t.start();
-		for (int i = 0; i < 10 && item.getThreadState() != ThreadState.IDLE; i++) {
-			Thread.sleep(100);
-		}
-		assertEquals(ThreadState.IDLE, item.getThreadState());
-		item.moreWork();
-		//if we don't return from this call, that means that this thread has been hijacked -
-		//we should instead use t.
-    }
-    
-    @Test public void testUsingCallingThreadMoreWork() throws Exception {
-    	final int[] processCount = new int[1];
-    	final TestWorkItem item = new TestWorkItem(false, false, Thread.currentThread()) {
-    		@Override
-    		protected boolean shouldPause() {
-    			return false;
-    		}
-    		
-    		@Override
-    		protected void process() {
-    			super.process();
-    			processCount[0]++;
-    		}
-    	};
-    	item.run();
-		assertEquals(ThreadState.IDLE, item.getThreadState());
-		Thread t = new Thread() {
-			@Override
-			public void run() {
-				item.moreWork();
-			}
-		};
-		t.start();
-		t.join();
-		item.moreWork();
-		assertEquals(2, processCount[0]);
-    }        
 }

Modified: branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java
===================================================================
--- branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java	2011-07-11 19:37:33 UTC (rev 3306)
+++ branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java	2011-07-12 19:23:10 UTC (rev 3307)
@@ -26,8 +26,12 @@
 
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.sql.Connection;
+import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.LinkedHashMap;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -37,13 +41,21 @@
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.util.UnitTestUtil;
+import org.teiid.dqp.internal.datamgr.ConnectorManager;
+import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
+import org.teiid.language.Command;
 import org.teiid.metadata.FunctionMethod;
 import org.teiid.metadata.FunctionParameter;
-import org.teiid.metadata.MetadataStore;
-import org.teiid.metadata.Schema;
+import org.teiid.metadata.RuntimeMetadata;
 import org.teiid.metadata.FunctionMethod.PushDown;
 import org.teiid.query.function.metadata.FunctionCategoryConstants;
-import org.teiid.query.metadata.TransformationMetadata.Resource;
+import org.teiid.translator.DataNotAvailableException;
+import org.teiid.translator.Execution;
+import org.teiid.translator.ExecutionContext;
+import org.teiid.translator.ExecutionFactory;
+import org.teiid.translator.ResultSetExecution;
+import org.teiid.translator.TranslatorException;
 
 @SuppressWarnings("nls")
 public class TestLocalConnections {
@@ -76,16 +88,71 @@
 	}
 
 	static FakeServer server = new FakeServer();
-	
-	@BeforeClass public static void oneTimeSetup() {
+    
+	@SuppressWarnings("serial")
+	@BeforeClass public static void oneTimeSetup() throws Exception {
     	server.setUseCallingThread(true);
-    	MetadataStore ms = new MetadataStore();
-    	Schema s = new Schema();
-    	s.setName("test");
+    	server.setConnectorManagerRepository(new ConnectorManagerRepository() {
+    		@Override
+    		public ConnectorManager getConnectorManager(String connectorName) {
+    			return new ConnectorManager(connectorName, connectorName) {
+    				@Override
+    				protected ExecutionFactory<Object, Object> getExecutionFactory() {
+    					return new ExecutionFactory<Object, Object>() {
+    						@Override
+    						public Execution createExecution(Command command,
+    								ExecutionContext executionContext,
+    								RuntimeMetadata metadata, Object connection)
+    								throws TranslatorException {
+    						    return new ResultSetExecution() {
+    						    	
+									@Override
+									public void execute() throws TranslatorException {
+										lock.lock();
+										try {
+											waiting.signal();
+											if (!wait.await(2, TimeUnit.SECONDS)) {
+												throw new TimeoutException();
+											}
+										} catch (InterruptedException e) {
+											throw new RuntimeException(e);
+										} finally {
+											lock.unlock();
+										}
+									}
+									
+									@Override
+									public void close() {
+										
+									}
+									
+									@Override
+									public void cancel() throws TranslatorException {
+										
+									}
+									
+									@Override
+									public List<?> next() throws TranslatorException, DataNotAvailableException {
+										// TODO Auto-generated method stub
+										return null;
+									}
+								};
+    						}
+    					};
+    				}
+    				
+    				@Override
+    				protected Object getConnectionFactory()
+    						throws TranslatorException {
+    					return null;
+    				}
+    			};
+    		}
+    	});
     	FunctionMethod function = new FunctionMethod("foo", null, FunctionCategoryConstants.MISCELLANEOUS, PushDown.CANNOT_PUSHDOWN, TestLocalConnections.class.getName(), "blocking", new FunctionParameter[0], new FunctionParameter("result", DataTypeManager.DefaultDataTypes.INTEGER), true, FunctionMethod.Determinism.NONDETERMINISTIC);
-    	s.addFunction(function);
-    	ms.addSchema(s);
-    	server.deployVDB("test", ms, new LinkedHashMap<String, Resource>());
+    	HashMap<String, Collection<FunctionMethod>> udfs = new HashMap<String, Collection<FunctionMethod>>();
+    	udfs.put("test", Arrays.asList(function));
+    	server.deployVDB("test", UnitTestUtil.getTestDataPath() + "/PartsSupplier.vdb", udfs);
 	}
 	
 	@AfterClass public static void oneTimeTearDown() {
@@ -102,6 +169,7 @@
 	    	    	
 	    	    	Statement s = c.createStatement();
 	    	    	s.execute("select foo()");
+	    	    	s.close();
     			} catch (Exception e) {
     				throw new RuntimeException(e);
     			}
@@ -131,9 +199,84 @@
     	if (t.isAlive()) {
     		fail();
     	}
+    	s.close();
     	if (handler.t != null) {
     		throw handler.t;
     	}
 	}
 	
+	@Test public void testUseInDifferentThreads() throws Throwable {
+		Connection c = server.createConnection("jdbc:teiid:test");
+    	
+    	final Statement s = c.createStatement();
+    	s.execute("select 1");
+    	
+    	assertFalse(server.dqp.getRequests().isEmpty());
+
+    	Thread t = new Thread() {
+    		public void run() {
+    			try {
+					s.close();
+				} catch (SQLException e) {
+					throw new RuntimeException(e);
+				}
+    		}
+    	};
+    	SimpleUncaughtExceptionHandler handler = new SimpleUncaughtExceptionHandler();
+    	t.setUncaughtExceptionHandler(handler);
+    	t.start();
+    	t.join(2000);
+    	if (t.isAlive()) {
+    		fail();
+    	}
+    	
+    	assertTrue(server.dqp.getRequests().isEmpty());
+    	
+    	if (handler.t != null) {
+    		throw handler.t;
+    	}
+	}
+	
+	@Test public void testWait() throws Throwable {
+		final Connection c = server.createConnection("jdbc:teiid:test");
+    	
+		Thread t = new Thread() {
+			public void run() {
+		    	Statement s;
+				try {
+					s = c.createStatement();
+			    	assertTrue(s.execute("select part_id from parts"));
+				} catch (SQLException e) {
+					throw new RuntimeException(e);
+				}
+			}
+		};
+    	t.start();
+    	SimpleUncaughtExceptionHandler handler = new SimpleUncaughtExceptionHandler();
+    	t.setUncaughtExceptionHandler(handler);
+    	
+    	lock.lock();
+    	try {
+    		assertTrue(waiting.await(2, TimeUnit.SECONDS));
+    	} finally {
+    		lock.unlock();
+    	}    	
+    	
+    	//t should now be waiting also
+    	
+    	lock.lock();
+    	try {
+    		wait.signal();
+    	} finally {
+    		lock.unlock();
+    	}
+
+    	//t should finish
+    	t.join();
+    	
+    	if (handler.t != null) {
+    		throw handler.t;
+    	}
+	}
+	
 }



More information about the teiid-commits mailing list