Author: shawkins
Date: 2011-07-12 15:23:10 -0400 (Tue, 12 Jul 2011)
New Revision: 3307
Modified:
branches/7.4.x/client/src/main/java/org/teiid/jdbc/StatementImpl.java
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/FakeConnector.java
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorManager.java
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorWorkItem.java
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java
Log:
TEIID-1614 more appropriate fix for possible hangs in embedded execution
Modified: branches/7.4.x/client/src/main/java/org/teiid/jdbc/StatementImpl.java
===================================================================
--- branches/7.4.x/client/src/main/java/org/teiid/jdbc/StatementImpl.java 2011-07-11
19:37:33 UTC (rev 3306)
+++ branches/7.4.x/client/src/main/java/org/teiid/jdbc/StatementImpl.java 2011-07-12
19:23:10 UTC (rev 3307)
@@ -40,8 +40,6 @@
import java.util.Properties;
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
@@ -538,11 +536,7 @@
});
if (synch) {
try {
- if (queryTimeoutMS > 0) {
- pendingResult.get(queryTimeoutMS, TimeUnit.MILLISECONDS);
- } else {
- pendingResult.get();
- }
+ pendingResult.get();
result.get(); //throw an exception if needed
return result;
} catch (ExecutionException e) {
@@ -552,8 +546,6 @@
throw TeiidSQLException.create(e);
} catch (InterruptedException e) {
timeoutOccurred();
- } catch (TimeoutException e) {
- timeoutOccurred();
}
throw new
TeiidSQLException(JDBCPlugin.Util.getString("MMStatement.Timeout_before_complete"));
//$NON-NLS-1$
}
@@ -578,7 +570,7 @@
reqMsg.setExecutionId(this.currentRequestID);
ResultsFuture.CompletionListener<ResultsMessage> compeletionListener =
null;
- if (queryTimeoutMS > 0 && !synch) {
+ if (queryTimeoutMS > 0) {
final CancelTask c = new QueryTimeoutCancelTask(queryTimeoutMS, this);
cancellationTimer.add(c);
compeletionListener = new ResultsFuture.CompletionListener<ResultsMessage>() {
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2011-07-11
19:37:33 UTC (rev 3306)
+++
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2011-07-12
19:23:10 UTC (rev 3307)
@@ -26,7 +26,6 @@
import javax.resource.spi.work.WorkEvent;
import javax.resource.spi.work.WorkListener;
-import org.teiid.core.TeiidRuntimeException;
import org.teiid.logging.LogManager;
@@ -43,23 +42,14 @@
private ThreadState threadState = ThreadState.MORE_WORK;
private volatile boolean isProcessing;
- private Thread callingThread;
- public AbstractWorkItem(Thread callingThread) {
- this.callingThread = callingThread;
- }
-
public void run() {
- do {
- startProcessing();
- try {
- process();
- } finally {
- if (!endProcessing()) {
- break;
- }
- }
- } while (!isDoneProcessing());
+ startProcessing();
+ try {
+ process();
+ } finally {
+ endProcessing();
+ }
}
synchronized ThreadState getThreadState() {
@@ -79,10 +69,7 @@
this.threadState = ThreadState.WORKING;
}
- /**
- * @return true if processing should be continued
- */
- final private synchronized boolean endProcessing() {
+ private synchronized void endProcessing() {
isProcessing = false;
logTrace("end processing"); //$NON-NLS-1$
switch (this.threadState) {
@@ -92,21 +79,20 @@
this.threadState = ThreadState.DONE;
} else {
this.threadState = ThreadState.IDLE;
- return pauseProcessing();
+ pauseProcessing();
}
break;
case MORE_WORK:
if (isDoneProcessing()) {
logTrace("done processing - ignoring more"); //$NON-NLS-1$
this.threadState = ThreadState.DONE;
- } else if (this.callingThread == null) {
- resumeProcessing();
+ } else {
+ resumeProcessing();
}
break;
default:
throw new IllegalStateException("Should not END on " +
this.threadState); //$NON-NLS-1$
}
- return this.callingThread != null;
}
protected boolean isIdle() {
@@ -117,24 +103,18 @@
moreWork(true);
}
- final protected synchronized void moreWork(boolean ignoreDone) {
+ protected synchronized void moreWork(boolean ignoreDone) {
logTrace("more work"); //$NON-NLS-1$
+ this.notifyAll();
switch (this.threadState) {
case WORKING:
this.threadState = ThreadState.MORE_WORK;
break;
case MORE_WORK:
- if (this.callingThread != null && !this.isProcessing) {
- useCallingThread();
- }
break;
case IDLE:
this.threadState = ThreadState.MORE_WORK;
- if (this.callingThread != null) {
- useCallingThread();
- } else {
- resumeProcessing();
- }
+ resumeProcessing();
break;
default:
if (!ignoreDone) {
@@ -143,14 +123,6 @@
LogManager.logDetail(org.teiid.logging.LogConstants.CTX_DQP, new Object[] {this,
"ignoring more work, since the work item is done"}); //$NON-NLS-1$
}
}
-
- private void useCallingThread() {
- if (this.callingThread == Thread.currentThread()) {
- run(); //restart with the calling thread
- } else {
- this.notifyAll(); //notify the waiting caller
- }
- }
private void logTrace(String msg) {
LogManager.logTrace(org.teiid.logging.LogConstants.CTX_DQP, new Object[] {this, msg,
this.threadState});
@@ -158,33 +130,8 @@
protected abstract void process();
- protected boolean pauseProcessing() {
- if (this.callingThread != null && !shouldPause()) {
- return false;
- }
- while (this.callingThread != null && this.getThreadState() == ThreadState.IDLE)
{
- try {
- this.wait(); //the lock should already be held
- } catch (InterruptedException e) {
- interrupted(e);
- }
- }
- return this.callingThread != null;
+ protected void pauseProcessing() {
}
-
- /**
- * only called for synch processing
- */
- protected boolean shouldPause() {
- return false;
- }
-
- /**
- * only called for synch processing
- */
- protected void interrupted(InterruptedException e) {
- throw new TeiidRuntimeException(e);
- }
protected abstract void resumeProcessing();
Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-07-11
19:37:33 UTC (rev 3306)
+++
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-07-12
19:23:10 UTC (rev 3307)
@@ -108,7 +108,7 @@
@Override
public void run() {
- LogManager.logDetail("Running task for parent thread", parentName);
//$NON-NLS-1$
+ LogManager.logDetail(LogConstants.CTX_DQP, "Running task for parent thread",
parentName); //$NON-NLS-1$
super.run();
}
@@ -352,6 +352,7 @@
}
}
if (runInThread) {
+ workItem.useCallingThread = true;
workItem.run();
}
return resultsFuture;
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java 2011-07-11
19:37:33 UTC (rev 3306)
+++
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPWorkContext.java 2011-07-12
19:23:10 UTC (rev 3307)
@@ -188,11 +188,8 @@
}
public void runInContext(final Runnable runnable) {
- DQPWorkContext.setWorkContext(this);
- boolean associated = false;
- if (securityHelper != null && this.getSubject() != null) {
- associated = securityHelper.assosiateSecurityContext(this.getSecurityDomain(),
this.getSecurityContext());
- }
+ DQPWorkContext previous = DQPWorkContext.getWorkContext();
+ boolean associated = attachDQPWorkContext();
try {
runnable.run();
} finally {
@@ -200,9 +197,21 @@
securityHelper.clearSecurityContext(this.getSecurityDomain());
}
DQPWorkContext.releaseWorkContext();
+ if (previous != null) {
+ previous.attachDQPWorkContext();
+ }
}
}
+ private boolean attachDQPWorkContext() {
+ DQPWorkContext.setWorkContext(this);
+ boolean associated = false;
+ if (securityHelper != null && this.getSubject() != null) {
+ associated = securityHelper.assosiateSecurityContext(this.getSecurityDomain(),
this.getSecurityContext());
+ }
+ return associated;
+ }
+
public HashMap<String, DataPolicy> getAllowedDataPolicies() {
if (this.policies == null) {
this.policies = new HashMap<String, DataPolicy>();
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-07-11
19:37:33 UTC (rev 3306)
+++
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-07-12
19:23:10 UTC (rev 3307)
@@ -162,8 +162,9 @@
/**The time when command begins processing on the server.*/
private long processingTimestamp = System.currentTimeMillis();
+ protected boolean useCallingThread;
+
public RequestWorkItem(DQPCore dqpCore, RequestMessage requestMsg, Request request,
ResultsReceiver<ResultsMessage> receiver, RequestID requestID, DQPWorkContext
workContext) {
- super(workContext.useCallingThread() || requestMsg.isSync() ? Thread.currentThread()
: null);
this.requestMsg = requestMsg;
this.requestID = requestID;
this.processorTimeslice = dqpCore.getProcessorTimeSlice();
@@ -196,24 +197,56 @@
protected boolean isDoneProcessing() {
return isClosed;
}
+
+ @Override
+ public void run() {
+ while (!isDoneProcessing()) {
+ super.run();
+ if (!useCallingThread) {
+ break;
+ }
+ //should use the calling thread
+ synchronized (this) {
+ if (this.resultsReceiver == null) {
+ break; //allow results to be processed by calling thread
+ }
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ try {
+ requestCancel();
+ } catch (TeiidComponentException e1) {
+ throw new TeiidRuntimeException(e1);
+ }
+ }
+ }
+ }
+ }
@Override
protected void resumeProcessing() {
- if (doneProducingBatches && !closeRequested && !isCanceled) {
- this.run(); // just run in the IO thread
- } else {
+ if (!this.useCallingThread) {
dqpCore.addWork(this);
}
}
- @Override
- protected void interrupted(InterruptedException e) {
- try {
- this.requestCancel();
- } catch (TeiidComponentException e1) {
- throw new TeiidRuntimeException(e1);
+ /**
+ * Special call from request threads to allow resumption of processing by
+ * the calling thread.
+ */
+ public void doMoreWork() {
+ boolean run = false;
+ synchronized (this) {
+ run = this.getThreadState() == ThreadState.IDLE;
+ moreWork();
+ if (!useCallingThread || this.getThreadState() != ThreadState.MORE_WORK) {
+ return;
+ }
}
- super.interrupted(e);
+ if (run) {
+ //run outside of the lock
+ run();
+ }
}
@Override
@@ -650,12 +683,6 @@
return new TeiidProcessingException(exception, SQLStates.QUERY_CANCELED,
exception.getMessage());
}
- @Override
- protected boolean shouldPause() {
- //if we are waiting on results it's ok to pause
- return this.resultsReceiver != null;
- }
-
private static List<ParameterInfo> getParameterInfo(StoredProcedure procedure)
{
List<ParameterInfo> paramInfos = new ArrayList<ParameterInfo>();
@@ -750,12 +777,12 @@
if (!this.doneProducingBatches) {
this.requestCancel(); //pending work should be canceled for fastest clean up
}
- this.moreWork();
+ this.doMoreWork();
}
public void requestMore(int batchFirst, int batchLast,
ResultsReceiver<ResultsMessage> receiver) {
this.requestResults(batchFirst, batchLast, receiver);
- this.moreWork();
+ this.doMoreWork();
}
public void closeAtomicRequest(AtomicRequestID atomicRequestId) {
Modified:
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/FakeConnector.java
===================================================================
---
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/FakeConnector.java 2011-07-11
19:37:33 UTC (rev 3306)
+++
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/FakeConnector.java 2011-07-12
19:23:10 UTC (rev 3307)
@@ -22,35 +22,25 @@
package org.teiid.dqp.internal.datamgr;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import junit.framework.Assert;
-
import org.teiid.language.Command;
-import org.teiid.language.QueryExpression;
import org.teiid.metadata.RuntimeMetadata;
-import org.teiid.translator.TranslatorException;
import org.teiid.translator.DataNotAvailableException;
import org.teiid.translator.Execution;
import org.teiid.translator.ExecutionContext;
import org.teiid.translator.ExecutionFactory;
import org.teiid.translator.ResultSetExecution;
+import org.teiid.translator.TranslatorException;
import org.teiid.translator.UpdateExecution;
-public class FakeConnector extends ExecutionFactory {
- private static final int RESULT_SIZE = 5;
-
- private boolean executeBlocks;
- private boolean nextBatchBlocks;
- private boolean returnsFinalBatch;
- private boolean driverThrowsExceptionOnCancel;
- private long simulatedBatchRetrievalTime = 1000L;
- private ClassLoader classloader;
+public class FakeConnector extends ExecutionFactory<Object, Object> {
private int connectionCount;
private int executionCount;
-
+
public int getConnectionCount() {
return connectionCount;
}
@@ -62,123 +52,52 @@
@Override
public Execution createExecution(Command command, ExecutionContext executionContext,
RuntimeMetadata metadata, Object connection) throws TranslatorException {
executionCount++;
- return new FakeBlockingExecution(executionContext);
+ return new FakeExecution(executionContext);
}
- public Object getConnection() {
- return new FakeConnection();
- }
-
@Override
public Object getConnection(Object factory) throws TranslatorException {
+ connectionCount++;
return factory;
}
@Override
public void closeConnection(Object connection, Object factory) {
}
-
- private class FakeConnection {
- public FakeConnection() {
- connectionCount++;
- }
-
- public boolean released = false;
- public void close() {
- Assert.assertFalse("The connection should not be released more than
once", released); //$NON-NLS-1$
- released = true;
- }
- }
- private final class FakeBlockingExecution implements ResultSetExecution,
UpdateExecution {
- private boolean closed = false;
- private boolean cancelled = false;
+ public final class FakeExecution implements ResultSetExecution, UpdateExecution {
private int rowCount;
ExecutionContext ec;
- public FakeBlockingExecution(ExecutionContext ec) {
+
+ public FakeExecution(ExecutionContext ec) {
this.ec = ec;
}
- public void execute(QueryExpression query, int maxBatchSize) throws
TranslatorException {
- if (executeBlocks) {
- waitForCancel();
- }
- if (classloader != null) {
- Assert.assertSame(classloader,
Thread.currentThread().getContextClassLoader());
- }
- }
- public synchronized void cancel() throws TranslatorException {
- cancelled = true;
- this.notify();
- }
- public void close() {
- Assert.assertFalse("The execution should not be closed more than
once", closed); //$NON-NLS-1$
- closed = true;
- }
@Override
public void execute() throws TranslatorException {
ec.addWarning(new Exception("Some warning")); //$NON-NLS-1$
}
@Override
- public List next() throws TranslatorException, DataNotAvailableException {
- if (nextBatchBlocks) {
- waitForCancel();
- }
- if (this.rowCount >= RESULT_SIZE || returnsFinalBatch) {
+ public List<?> next() throws TranslatorException, DataNotAvailableException
{
+ if (this.rowCount == 1) {
return null;
}
this.rowCount++;
- return Arrays.asList(this.rowCount - 1);
+ return new ArrayList<Object>(Arrays.asList(this.rowCount - 1));
}
- private synchronized void waitForCancel() throws TranslatorException {
- try {
- this.wait(simulatedBatchRetrievalTime);
- if (cancelled && driverThrowsExceptionOnCancel) {
- throw new TranslatorException("Request cancelled");
//$NON-NLS-1$
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
@Override
public int[] getUpdateCounts() throws DataNotAvailableException,
TranslatorException {
return new int[] {1};
}
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void cancel() throws TranslatorException {
+ }
}
- public boolean isExecuteBlocks() {
- return executeBlocks;
- }
- public void setExecuteBlocks(boolean executeBlocks) {
- this.executeBlocks = executeBlocks;
- }
- public boolean isNextBatchBlocks() {
- return nextBatchBlocks;
- }
- public void setNextBatchBlocks(boolean nextBatchBlocks) {
- this.nextBatchBlocks = nextBatchBlocks;
- }
- public boolean isReturnsFinalBatch() {
- return returnsFinalBatch;
- }
- public void setReturnsFinalBatch(boolean returnsFinalBatch) {
- this.returnsFinalBatch = returnsFinalBatch;
- }
- public boolean isDriverThrowsExceptionOnCancel() {
- return driverThrowsExceptionOnCancel;
- }
- public void setDriverThrowsExceptionOnCancel(
- boolean driverThrowsExceptionOnCancel) {
- this.driverThrowsExceptionOnCancel = driverThrowsExceptionOnCancel;
- }
- public long getSimulatedBatchRetrievalTime() {
- return simulatedBatchRetrievalTime;
- }
- public void setSimulatedBatchRetrievalTime(long simulatedBatchRetrievalTime) {
- this.simulatedBatchRetrievalTime = simulatedBatchRetrievalTime;
- }
- public void setClassloader(ClassLoader classloader) {
- this.classloader = classloader;
- }
}
Modified:
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorManager.java
===================================================================
---
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorManager.java 2011-07-11
19:37:33 UTC (rev 3306)
+++
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorManager.java 2011-07-12
19:23:10 UTC (rev 3307)
@@ -50,7 +50,7 @@
return c;
}
protected Object getConnectionFactory(){
- return c.getConnection();
+ return c;
}
};
cm.start();
Modified:
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorWorkItem.java
===================================================================
---
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorWorkItem.java 2011-07-11
19:37:33 UTC (rev 3306)
+++
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorWorkItem.java 2011-07-12
19:23:10 UTC (rev 3307)
@@ -87,7 +87,7 @@
int total_columns = 3;
StoredProcedure command = (StoredProcedure)helpGetCommand("{call
pm2.spTest8(?)}", EXAMPLE_BQT); //$NON-NLS-1$
command.getInputParameters().get(0).setExpression(new Constant(1));
- Call proc = (Call)new LanguageBridgeFactory(EXAMPLE_BQT).translate(command);
+ Call proc = new LanguageBridgeFactory(EXAMPLE_BQT).translate(command);
ProcedureBatchHandler pbh = new ProcedureBatchHandler(proc, exec);
Modified:
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java
===================================================================
---
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java 2011-07-11
19:37:33 UTC (rev 3306)
+++
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java 2011-07-12
19:23:10 UTC (rev 3307)
@@ -25,20 +25,10 @@
import static org.junit.Assert.*;
import org.junit.Test;
-import org.teiid.dqp.internal.process.AbstractWorkItem.ThreadState;
public class TestWorkItemState {
- private final class WorkItemRunner implements Runnable {
- TestWorkItem workItem;
-
- @Override
- public void run() {
- workItem.run();
- }
- }
-
private class TestWorkItem extends AbstractWorkItem {
private boolean isDone;
@@ -50,11 +40,6 @@
}
private TestWorkItem(boolean done, boolean callMoreWork) {
- this(done, callMoreWork, null);
- }
-
- private TestWorkItem(boolean done, boolean callMoreWork, Thread callingThread) {
- super(callingThread);
this.isDone = done;
this.callMoreWork = callMoreWork;
}
@@ -167,51 +152,4 @@
}
}
- @Test public void testUsingCallingThreadIdle() throws Exception {
- WorkItemRunner r = new WorkItemRunner();
- Thread t = new Thread(r);
- final TestWorkItem item = new TestWorkItem(false, false, t) {
- @Override
- protected boolean shouldPause() {
- return true;
- }
- };
- r.workItem = item;
- t.start();
- for (int i = 0; i < 10 && item.getThreadState() != ThreadState.IDLE; i++) {
- Thread.sleep(100);
- }
- assertEquals(ThreadState.IDLE, item.getThreadState());
- item.moreWork();
- //if we don't return from this call, that means that this thread has been hijacked
-
- //we should instead use t.
- }
-
- @Test public void testUsingCallingThreadMoreWork() throws Exception {
- final int[] processCount = new int[1];
- final TestWorkItem item = new TestWorkItem(false, false, Thread.currentThread()) {
- @Override
- protected boolean shouldPause() {
- return false;
- }
-
- @Override
- protected void process() {
- super.process();
- processCount[0]++;
- }
- };
- item.run();
- assertEquals(ThreadState.IDLE, item.getThreadState());
- Thread t = new Thread() {
- @Override
- public void run() {
- item.moreWork();
- }
- };
- t.start();
- t.join();
- item.moreWork();
- assertEquals(2, processCount[0]);
- }
}
Modified:
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java
===================================================================
---
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java 2011-07-11
19:37:33 UTC (rev 3306)
+++
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java 2011-07-12
19:23:10 UTC (rev 3307)
@@ -26,8 +26,12 @@
import java.lang.Thread.UncaughtExceptionHandler;
import java.sql.Connection;
+import java.sql.SQLException;
import java.sql.Statement;
-import java.util.LinkedHashMap;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -37,13 +41,21 @@
import org.junit.BeforeClass;
import org.junit.Test;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.util.UnitTestUtil;
+import org.teiid.dqp.internal.datamgr.ConnectorManager;
+import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
+import org.teiid.language.Command;
import org.teiid.metadata.FunctionMethod;
import org.teiid.metadata.FunctionParameter;
-import org.teiid.metadata.MetadataStore;
-import org.teiid.metadata.Schema;
+import org.teiid.metadata.RuntimeMetadata;
import org.teiid.metadata.FunctionMethod.PushDown;
import org.teiid.query.function.metadata.FunctionCategoryConstants;
-import org.teiid.query.metadata.TransformationMetadata.Resource;
+import org.teiid.translator.DataNotAvailableException;
+import org.teiid.translator.Execution;
+import org.teiid.translator.ExecutionContext;
+import org.teiid.translator.ExecutionFactory;
+import org.teiid.translator.ResultSetExecution;
+import org.teiid.translator.TranslatorException;
@SuppressWarnings("nls")
public class TestLocalConnections {
@@ -76,16 +88,71 @@
}
static FakeServer server = new FakeServer();
-
- @BeforeClass public static void oneTimeSetup() {
+
+ @SuppressWarnings("serial")
+ @BeforeClass public static void oneTimeSetup() throws Exception {
server.setUseCallingThread(true);
- MetadataStore ms = new MetadataStore();
- Schema s = new Schema();
- s.setName("test");
+ server.setConnectorManagerRepository(new ConnectorManagerRepository() {
+ @Override
+ public ConnectorManager getConnectorManager(String connectorName) {
+ return new ConnectorManager(connectorName, connectorName) {
+ @Override
+ protected ExecutionFactory<Object, Object> getExecutionFactory() {
+ return new ExecutionFactory<Object, Object>() {
+ @Override
+ public Execution createExecution(Command command,
+ ExecutionContext executionContext,
+ RuntimeMetadata metadata, Object connection)
+ throws TranslatorException {
+ return new ResultSetExecution() {
+
+ @Override
+ public void execute() throws TranslatorException {
+ lock.lock();
+ try {
+ waiting.signal();
+ if (!wait.await(2, TimeUnit.SECONDS)) {
+ throw new TimeoutException();
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void cancel() throws TranslatorException {
+
+ }
+
+ @Override
+ public List<?> next() throws TranslatorException,
DataNotAvailableException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ protected Object getConnectionFactory()
+ throws TranslatorException {
+ return null;
+ }
+ };
+ }
+ });
FunctionMethod function = new FunctionMethod("foo", null,
FunctionCategoryConstants.MISCELLANEOUS, PushDown.CANNOT_PUSHDOWN,
TestLocalConnections.class.getName(), "blocking", new FunctionParameter[0], new
FunctionParameter("result", DataTypeManager.DefaultDataTypes.INTEGER), true,
FunctionMethod.Determinism.NONDETERMINISTIC);
- s.addFunction(function);
- ms.addSchema(s);
- server.deployVDB("test", ms, new LinkedHashMap<String,
Resource>());
+ HashMap<String, Collection<FunctionMethod>> udfs = new
HashMap<String, Collection<FunctionMethod>>();
+ udfs.put("test", Arrays.asList(function));
+ server.deployVDB("test", UnitTestUtil.getTestDataPath() +
"/PartsSupplier.vdb", udfs);
}
@AfterClass public static void oneTimeTearDown() {
@@ -102,6 +169,7 @@
Statement s = c.createStatement();
s.execute("select foo()");
+ s.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -131,9 +199,84 @@
if (t.isAlive()) {
fail();
}
+ s.close();
if (handler.t != null) {
throw handler.t;
}
}
+ @Test public void testUseInDifferentThreads() throws Throwable {
+ Connection c = server.createConnection("jdbc:teiid:test");
+
+ final Statement s = c.createStatement();
+ s.execute("select 1");
+
+ assertFalse(server.dqp.getRequests().isEmpty());
+
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ s.close();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ SimpleUncaughtExceptionHandler handler = new SimpleUncaughtExceptionHandler();
+ t.setUncaughtExceptionHandler(handler);
+ t.start();
+ t.join(2000);
+ if (t.isAlive()) {
+ fail();
+ }
+
+ assertTrue(server.dqp.getRequests().isEmpty());
+
+ if (handler.t != null) {
+ throw handler.t;
+ }
+ }
+
+ @Test public void testWait() throws Throwable {
+ final Connection c = server.createConnection("jdbc:teiid:test");
+
+ Thread t = new Thread() {
+ public void run() {
+ Statement s;
+ try {
+ s = c.createStatement();
+ assertTrue(s.execute("select part_id from parts"));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ t.start();
+ SimpleUncaughtExceptionHandler handler = new SimpleUncaughtExceptionHandler();
+ t.setUncaughtExceptionHandler(handler);
+
+ lock.lock();
+ try {
+ assertTrue(waiting.await(2, TimeUnit.SECONDS));
+ } finally {
+ lock.unlock();
+ }
+
+ //t should now be waiting also
+
+ lock.lock();
+ try {
+ wait.signal();
+ } finally {
+ lock.unlock();
+ }
+
+ //t should finish
+ t.join();
+
+ if (handler.t != null) {
+ throw handler.t;
+ }
+ }
+
}