Author: shawkins
Date: 2011-05-29 14:10:40 -0400 (Sun, 29 May 2011)
New Revision: 3203
Modified:
branches/7.4.x/client/src/main/java/org/teiid/jdbc/EmbeddedProfile.java
branches/7.4.x/client/src/main/java/org/teiid/jdbc/JDBCURL.java
branches/7.4.x/client/src/main/java/org/teiid/jdbc/StatementImpl.java
branches/7.4.x/client/src/test/java/org/teiid/jdbc/TestTeiidDriver.java
branches/7.4.x/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-connection.xml
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
branches/7.4.x/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RulePushAggregates.java
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java
Log:
TEIID-1614 adding a fix for embedded thread hanging
Modified: branches/7.4.x/client/src/main/java/org/teiid/jdbc/EmbeddedProfile.java
===================================================================
--- branches/7.4.x/client/src/main/java/org/teiid/jdbc/EmbeddedProfile.java 2011-05-27
20:07:57 UTC (rev 3202)
+++ branches/7.4.x/client/src/main/java/org/teiid/jdbc/EmbeddedProfile.java 2011-05-29
18:10:40 UTC (rev 3203)
@@ -28,6 +28,7 @@
import org.teiid.core.TeiidException;
import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.util.PropertiesUtils;
import org.teiid.core.util.ReflectionHelper;
import org.teiid.net.CommunicationException;
import org.teiid.net.ConnectionException;
@@ -36,7 +37,9 @@
public class EmbeddedProfile implements ConnectionProfile {
- /**
+ public static final String USE_CALLING_THREAD = "useCallingThread";
//$NON-NLS-1$
+
+ /**
* This method tries to make a connection to the given URL. This class
* will return a null if this is not the right driver to connect to the given URL.
* @param The URL used to establish a connection.
@@ -61,7 +64,7 @@
protected ServerConnection createServerConnection(Properties info)
throws TeiidException {
- return
(ServerConnection)ReflectionHelper.create("org.teiid.transport.LocalServerConnection",
Arrays.asList(info, true), Thread.currentThread().getContextClassLoader()); //$NON-NLS-1$
+ return
(ServerConnection)ReflectionHelper.create("org.teiid.transport.LocalServerConnection",
Arrays.asList(info, PropertiesUtils.getBooleanProperty(info, USE_CALLING_THREAD, true)),
Thread.currentThread().getContextClassLoader()); //$NON-NLS-1$
}
}
Modified: branches/7.4.x/client/src/main/java/org/teiid/jdbc/JDBCURL.java
===================================================================
--- branches/7.4.x/client/src/main/java/org/teiid/jdbc/JDBCURL.java 2011-05-27 20:07:57
UTC (rev 3202)
+++ branches/7.4.x/client/src/main/java/org/teiid/jdbc/JDBCURL.java 2011-05-29 18:10:40
UTC (rev 3203)
@@ -57,6 +57,7 @@
ExecutionProperties.PROP_FETCH_SIZE,
ExecutionProperties.PROP_XML_FORMAT,
ExecutionProperties.PROP_XML_VALIDATION,
+ EmbeddedProfile.USE_CALLING_THREAD,
ExecutionProperties.DISABLE_LOCAL_TRANSACTIONS)));
public static final Set<String> KNOWN_PROPERTIES = getKnownProperties();
Modified: branches/7.4.x/client/src/main/java/org/teiid/jdbc/StatementImpl.java
===================================================================
--- branches/7.4.x/client/src/main/java/org/teiid/jdbc/StatementImpl.java 2011-05-27
20:07:57 UTC (rev 3202)
+++ branches/7.4.x/client/src/main/java/org/teiid/jdbc/StatementImpl.java 2011-05-29
18:10:40 UTC (rev 3203)
@@ -533,7 +533,6 @@
}
final RequestMessage reqMessage = createRequestMessage(commands,
isBatchedCommand, resultsMode);
- reqMessage.setSync(synch);
ResultsFuture<ResultsMessage> pendingResult = execute(reqMessage, synch);
final ResultsFuture<Boolean> result = new ResultsFuture<Boolean>();
pendingResult.addCompletionListener(new
ResultsFuture.CompletionListener<ResultsMessage>() {
@@ -581,7 +580,8 @@
reqMsg.setFetchSize(this.fetchSize);
reqMsg.setRowLimit(this.maxRows);
reqMsg.setTransactionIsolation(this.driverConnection.getTransactionIsolation());
-
+ String useCallingThread =
getExecutionProperty(EmbeddedProfile.USE_CALLING_THREAD);
+ reqMsg.setSync(synch && (useCallingThread == null ||
Boolean.valueOf(useCallingThread)));
// Get connection properties and set them onto request message
copyPropertiesToRequest(reqMsg);
Modified: branches/7.4.x/client/src/test/java/org/teiid/jdbc/TestTeiidDriver.java
===================================================================
--- branches/7.4.x/client/src/test/java/org/teiid/jdbc/TestTeiidDriver.java 2011-05-27
20:07:57 UTC (rev 3202)
+++ branches/7.4.x/client/src/test/java/org/teiid/jdbc/TestTeiidDriver.java 2011-05-29
18:10:40 UTC (rev 3203)
@@ -135,7 +135,7 @@
@Test public void testGetPropertyInfo1() throws Exception {
DriverPropertyInfo info[] =
drv.getPropertyInfo("jdbc:teiid:vdb@mm://localhost:12345;applicationName=x",
null); //$NON-NLS-1$
- assertEquals(19, info.length);
+ assertEquals(20, info.length);
assertEquals(false, info[0].required);
assertEquals("ApplicationName", info[0].name); //$NON-NLS-1$
assertEquals("x", info[0].value); //$NON-NLS-1$
Modified:
branches/7.4.x/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-connection.xml
===================================================================
---
branches/7.4.x/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-connection.xml 2011-05-27
20:07:57 UTC (rev 3202)
+++
branches/7.4.x/documentation/client-developers-guide/src/main/docbook/en-US/content/jdbc-connection.xml 2011-05-29
18:10:40 UTC (rev 3203)
@@ -223,7 +223,16 @@
to create session. Teiid also verifies that the same user is
using this connection during the life of the connection.
if it finds a different security context on the calling thread,
it switches the identity on the connection,
if the new user is also eligible to log in to Teiid otherwise
connection fails to execute.</entry>
- </row>
+ </row>
+ <row id="useCallingThread">
+ <entry>
+ <code>useCallingThread</code>
+ </entry>
+ <entry>
+ <code>boolean</code>
+ </entry>
+ <entry>Only applies to "local" connections. When
this option is set to "true" (the default), then the calling thread will be used
to process the query. If false, then an engine thread will be used.</entry>
+ </row>
<row>
<entry>
<code>QueryTimeout</code>
@@ -458,7 +467,8 @@
there is a way to make connections that by-pass making a socket based JDBC
connection.
You can use slightly modified data source configuration to make a
"local" connection, where the JDBC API will lookup a local Teiid runtime in the
same VM.</para>
<warning><para>Since DataSources start before before Teiid VDBs
are deployed, leave the min pool size of local connections as the default of 0. Otherwise
errors will occur on the startup of the Teiid DataSource.</para></warning>
- <note><para>Local connections use their calling thread to perform
processing operations rather than using an engine thread while the calling thread is
blocked.</para></note>
+ <note><para>Be default local connections use their calling thread
to perform processing operations rather than using an engine thread while the calling
thread is blocked.
+ To disable this behavior set the connection property
useCallingThreads=false.</para></note>
<example>
<title>Local data source</title>
<programlisting><![CDATA[<datasources>
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2011-05-27
20:07:57 UTC (rev 3202)
+++
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2011-05-29
18:10:40 UTC (rev 3203)
@@ -43,10 +43,10 @@
private ThreadState threadState = ThreadState.MORE_WORK;
private volatile boolean isProcessing;
- private boolean useCallingThread;
+ private Thread callingThread;
- public AbstractWorkItem(boolean useCallingThread) {
- this.useCallingThread = useCallingThread;
+ public AbstractWorkItem(Thread callingThread) {
+ this.callingThread = callingThread;
}
public void run() {
@@ -99,14 +99,14 @@
if (isDoneProcessing()) {
logTrace("done processing - ignoring more"); //$NON-NLS-1$
this.threadState = ThreadState.DONE;
- } else if (!this.useCallingThread) {
+ } else if (this.callingThread == null) {
resumeProcessing();
}
break;
default:
throw new IllegalStateException("Should not END on " +
this.threadState); //$NON-NLS-1$
}
- return useCallingThread;
+ return this.callingThread != null;
}
protected boolean isIdle() {
@@ -127,11 +127,11 @@
break;
case IDLE:
this.threadState = ThreadState.MORE_WORK;
- if (this.useCallingThread) {
- if (isProcessing) {
+ if (this.callingThread != null) {
+ if (this.callingThread == Thread.currentThread()) {
+ run(); //restart with the calling thread
+ } else {
this.notifyAll(); //notify the waiting caller
- } else {
- run(); //restart with the calling thread
}
} else {
resumeProcessing();
@@ -152,17 +152,17 @@
protected abstract void process();
protected boolean pauseProcessing() {
- if (useCallingThread && !shouldPause()) {
+ if (this.callingThread != null && !shouldPause()) {
return false;
}
- while (useCallingThread && this.getThreadState() == ThreadState.IDLE) {
+ while (this.callingThread != null && this.getThreadState() == ThreadState.IDLE)
{
try {
this.wait(); //the lock should already be held
} catch (InterruptedException e) {
interrupted(e);
}
}
- return useCallingThread;
+ return this.callingThread != null;
}
/**
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-05-27
20:07:57 UTC (rev 3202)
+++
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-05-29
18:10:40 UTC (rev 3203)
@@ -162,7 +162,7 @@
private long processingTimestamp = System.currentTimeMillis();
public RequestWorkItem(DQPCore dqpCore, RequestMessage requestMsg, Request request,
ResultsReceiver<ResultsMessage> receiver, RequestID requestID, DQPWorkContext
workContext) {
- super(workContext.useCallingThread() || requestMsg.isSync());
+ super(workContext.useCallingThread() || requestMsg.isSync() ? Thread.currentThread()
: null);
this.requestMsg = requestMsg;
this.requestID = requestID;
this.processorTimeslice = dqpCore.getProcessorTimeSlice();
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RulePushAggregates.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RulePushAggregates.java 2011-05-27
20:07:57 UTC (rev 3202)
+++
branches/7.4.x/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RulePushAggregates.java 2011-05-29
18:10:40 UTC (rev 3203)
@@ -403,7 +403,7 @@
if (groupingExpressions != null) {
newGroupingExpressions = new HashSet<SingleElementSymbol>();
for (SingleElementSymbol singleElementSymbol : groupingExpressions) {
- newGroupingExpressions.add((SingleElementSymbol)symbolMap.getKeys().get(virtualElements.indexOf(singleElementSymbol)).clone());
+ newGroupingExpressions.add(symbolMap.getKeys().get(virtualElements.indexOf(singleElementSymbol)).clone());
}
}
Modified:
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java
===================================================================
---
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java 2011-05-27
20:07:57 UTC (rev 3202)
+++
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestWorkItemState.java 2011-05-29
18:10:40 UTC (rev 3203)
@@ -25,10 +25,20 @@
import static org.junit.Assert.*;
import org.junit.Test;
+import org.teiid.dqp.internal.process.AbstractWorkItem.ThreadState;
public class TestWorkItemState {
+ private final class WorkItemRunner implements Runnable {
+ TestWorkItem workItem;
+
+ @Override
+ public void run() {
+ workItem.run();
+ }
+ }
+
private class TestWorkItem extends AbstractWorkItem {
private boolean isDone;
@@ -40,7 +50,11 @@
}
private TestWorkItem(boolean done, boolean callMoreWork) {
- super(false);
+ this(done, callMoreWork, null);
+ }
+
+ private TestWorkItem(boolean done, boolean callMoreWork, Thread callingThread) {
+ super(callingThread);
this.isDone = done;
this.callMoreWork = callMoreWork;
}
@@ -152,5 +166,27 @@
}
}
+
+ @Test public void testUsingCallingThreadIdle() throws Exception {
+ WorkItemRunner r = new WorkItemRunner();
+ Thread t = new Thread(r);
+ final TestWorkItem item = new TestWorkItem(false, false, t) {
+ @Override
+ protected boolean shouldPause() {
+ return true;
+ }
+ };
+ r.workItem = item;
+ t.start();
+ for (int i = 0; i < 10 && item.getThreadState() != ThreadState.IDLE; i++) {
+ Thread.sleep(100);
+ }
+ if (item.getThreadState() != ThreadState.IDLE) {
+ fail();
+ }
+ item.moreWork();
+ //if we don't return from this call, that means that this thread has been hijacked
-
+ //we should instead use t.
+ }
}