Author: shawkins
Date: 2011-11-02 12:50:24 -0400 (Wed, 02 Nov 2011)
New Revision: 3602
Modified:
trunk/api/src/main/java/org/teiid/translator/DataNotAvailableException.java
trunk/api/src/main/java/org/teiid/translator/ExecutionContext.java
trunk/documentation/developer-guide/src/main/docbook/en-US/content/translator-api.xml
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ExecutionContextImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
trunk/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java
Log:
TEIID-1802 adding executioncontext dataavailable method
Modified: trunk/api/src/main/java/org/teiid/translator/DataNotAvailableException.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/DataNotAvailableException.java 2011-11-02
15:15:47 UTC (rev 3601)
+++ trunk/api/src/main/java/org/teiid/translator/DataNotAvailableException.java 2011-11-02
16:50:24 UTC (rev 3602)
@@ -25,7 +25,7 @@
/**
* Used by asynch connectors to indicate data is not available
- * and results should be polled for after the given delay.
+ * and results should be polled for after the given delay in milliseconds.
*/
public class DataNotAvailableException extends TeiidRuntimeException {
@@ -33,9 +33,24 @@
private long retryDelay = 0;
+ /**
+ * Indicate that the engine should not poll for results and will be notified
+ * via the {@link ExecutionContext#dataAvailable()} method.
+ */
+ public static final DataNotAvailableException NO_POLLING = new
DataNotAvailableException(-1);
+
+ /**
+ * Uses a delay of 0, which implies an immediate poll for results.
+ */
public DataNotAvailableException() {
}
+ /**
+ * Uses the given retryDelay. Negative values indicate that the
+ * engine should not poll for results (see also {@link
DataNotAvailableException#NO_POLLING} and will be notified
+ * via the {@link ExecutionContext#dataAvailable()} method.
+ * @param retryDelay in milliseconds
+ */
public DataNotAvailableException(long retryDelay) {
this.retryDelay = retryDelay;
}
Modified: trunk/api/src/main/java/org/teiid/translator/ExecutionContext.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/ExecutionContext.java 2011-11-02 15:15:47
UTC (rev 3601)
+++ trunk/api/src/main/java/org/teiid/translator/ExecutionContext.java 2011-11-02 16:50:24
UTC (rev 3602)
@@ -146,4 +146,10 @@
* @return
*/
Session getSession();
+
+ /**
+ * Signal the engine that data is available and processing should be
+ * resumed.
+ */
+ void dataAvailable();
}
Modified:
trunk/documentation/developer-guide/src/main/docbook/en-US/content/translator-api.xml
===================================================================
---
trunk/documentation/developer-guide/src/main/docbook/en-US/content/translator-api.xml 2011-11-02
15:15:47 UTC (rev 3601)
+++
trunk/documentation/developer-guide/src/main/docbook/en-US/content/translator-api.xml 2011-11-02
16:50:24 UTC (rev 3602)
@@ -258,12 +258,16 @@
<section>
<title>Asynchronous Executions</title>
<para>In some scenarios, a translator needs to execute
- asynchronously and allow the executing thread to perform other work. To allow this,
you should Throw a DataNotAvailableExecption during a retrival method, rather than
explicitly waiting or sleeping for the results. The
- DataNotAvailableException may take a delay parameter in its
+ asynchronously and allow the executing thread to perform other work. To allow this,
you should Throw a <code>DataNotAvailableExecption</code> during a retrival
method, rather than explicitly waiting or sleeping for the results.
+ <note><para>A <code>DataNotAvailableException</code> should
not be thrown by the execute method.</para><note>
+ The
+ <code>DataNotAvailableException</code> may take a delay parameter in
its
constructor to indicate how long the system should wait befor polling
- for results. Any non-negative value is allowed.
+ for results. Any non-negative value indicates a time until the next polling should
be performed.
+ The <code>DataNotAvailableException.NO_POLLING</code> exception (or any
DataNotAvailableException with a negative delay) can be thrown to indicate that
+ the execution will call <code>ExecutionContext.dataAvailable<c/ode> to
indicate processing should resume.
</para>
- <para>Since the exection and the associated connection are not closed until the
work has completed, care should be taken if using asynchronous executions that hold a lot
of state.</para>
+ <note><para>Since the exection and the associated connection are not
closed until the work has completed, care should be taken if using asynchronous executions
that hold a lot of state.</para></note>
</section>
<section>
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java 2011-11-02
15:15:47 UTC (rev 3601)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java 2011-11-02
16:50:24 UTC (rev 3602)
@@ -23,6 +23,7 @@
package org.teiid.dqp.internal.datamgr;
import org.teiid.common.buffer.BlockedException;
+import org.teiid.dqp.internal.process.RequestWorkItem;
import org.teiid.dqp.message.AtomicResultsMessage;
import org.teiid.translator.TranslatorException;
@@ -40,4 +41,8 @@
AtomicResultsMessage execute() throws TranslatorException, BlockedException;
+ void setRequestWorkItem(RequestWorkItem item);
+
+ boolean isDataAvailable();
+
}
\ No newline at end of file
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java 2011-11-02
15:15:47 UTC (rev 3601)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java 2011-11-02
16:50:24 UTC (rev 3602)
@@ -22,7 +22,6 @@
package org.teiid.dqp.internal.datamgr;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,9 +29,11 @@
import javax.resource.ResourceException;
import org.teiid.adminapi.impl.VDBMetaData;
+import org.teiid.client.ResizingArrayList;
import org.teiid.common.buffer.BlockedException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.util.Assertion;
+import org.teiid.dqp.internal.process.RequestWorkItem;
import org.teiid.dqp.message.AtomicRequestID;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.AtomicResultsMessage;
@@ -108,6 +109,11 @@
this.securityContext.setTransactional(requestMsg.isTransactional());
}
+ @Override
+ public void setRequestWorkItem(RequestWorkItem item) {
+ this.securityContext.setRequestWorkItem(item);
+ }
+
public AtomicRequestID getId() {
return id;
}
@@ -137,6 +143,7 @@
}
public void close() {
+ this.securityContext.setRequestWorkItem(null);
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id,
"Processing Close :", this.requestMsg.getCommand()}); //$NON-NLS-1$
if (!error) {
manager.logSRCCommand(this.requestMsg, this.securityContext, Event.END,
this.rowCount);
@@ -273,12 +280,12 @@
Assertion.assertTrue(!this.lastBatch);
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id,
"Getting results from connector"}); //$NON-NLS-1$
int batchSize = 0;
- List<List> rows = new ArrayList<List>(batchSize/4);
+ List<List<?>> rows = new
ResizingArrayList<List<?>>(batchSize/4);
try {
while (batchSize < this.requestMsg.getFetchSize()) {
- List row = this.execution.next();
+ List<?> row = this.execution.next();
if (row == null) {
this.lastBatch = true;
break;
@@ -306,14 +313,14 @@
}
}
} catch (DataNotAvailableException e) {
- if (rows.size() == 0) {
+ if (rows.size() == 0 && this.rowCount != 0) {
throw e;
}
}
if (lastBatch) {
if (this.procedureBatchHandler != null) {
- List row = this.procedureBatchHandler.getParameterRow();
+ List<?> row = this.procedureBatchHandler.getParameterRow();
if (row != null) {
rows.add(row);
this.rowCount++;
@@ -357,5 +364,10 @@
public String toString() {
return this.id.toString();
}
+
+ @Override
+ public boolean isDataAvailable() {
+ return this.securityContext.isDataAvailable();
+ }
}
\ No newline at end of file
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ExecutionContextImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ExecutionContextImpl.java 2011-11-02
15:15:47 UTC (rev 3601)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ExecutionContextImpl.java 2011-11-02
16:50:24 UTC (rev 3602)
@@ -33,6 +33,7 @@
import org.teiid.adminapi.Session;
import org.teiid.common.buffer.BufferManager;
import org.teiid.core.util.HashCodeUtil;
+import org.teiid.dqp.internal.process.RequestWorkItem;
import org.teiid.translator.ExecutionContext;
@@ -66,6 +67,8 @@
private int batchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
private List<Exception> warnings = new LinkedList<Exception>();
private Session session;
+ private RequestWorkItem worktItem;
+ private boolean dataAvailable;
public ExecutionContextImpl(String vdbName, int vdbVersion, Serializable
executionPayload,
String originalConnectionID, String connectorName, String
requestId, String partId, String execCount) {
@@ -215,4 +218,23 @@
public void setSession(Session session) {
this.session = session;
}
+
+ public void setRequestWorkItem(RequestWorkItem item) {
+ this.worktItem = item;
+ }
+
+ @Override
+ public synchronized void dataAvailable() {
+ RequestWorkItem requestWorkItem = this.worktItem;
+ dataAvailable = true;
+ if (requestWorkItem != null) {
+ requestWorkItem.moreWork();
+ }
+ }
+
+ public synchronized boolean isDataAvailable() {
+ boolean result = dataAvailable;
+ dataAvailable = false;
+ return result;
+ }
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2011-11-02
15:15:47 UTC (rev 3601)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2011-11-02
16:50:24 UTC (rev 3602)
@@ -202,6 +202,7 @@
}
ConnectorManagerRepository cmr =
workItem.getDqpWorkContext().getVDB().getAttachment(ConnectorManagerRepository.class);
ConnectorWork work =
cmr.getConnectorManager(aqr.getConnectorName()).registerRequest(aqr);
+ work.setRequestWorkItem(workItem);
return new DataTierTupleSource(aqr, workItem, work, this, limit);
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2011-11-02
15:15:47 UTC (rev 3601)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2011-11-02
16:50:24 UTC (rev 3602)
@@ -238,12 +238,16 @@
} catch (TranslatorException e) {
results = exceptionOccurred(e, true);
} catch (DataNotAvailableException e) {
- workItem.scheduleWork(new Runnable() {
- @Override
- public void run() {
- workItem.moreWork();
- }
- }, 10, e.getRetryDelay());
+ if (e.getRetryDelay() >= 0) {
+ workItem.scheduleWork(new Runnable() {
+ @Override
+ public void run() {
+ workItem.moreWork();
+ }
+ }, 10, e.getRetryDelay());
+ } else if (this.cwi.isDataAvailable()) {
+ continue;
+ }
throw BlockedException.block(aqr.getAtomicRequestID(), "Blocking on
DataNotAvailableException"); //$NON-NLS-1$
}
receiveResults(results);
@@ -326,7 +330,7 @@
return results;
}
- private AtomicResultsMessage getResults()
+ AtomicResultsMessage getResults()
throws BlockedException, TeiidComponentException,
TranslatorException {
AtomicResultsMessage results = null;
@@ -443,7 +447,7 @@
@Override
public void onCompletion(FutureWork<AtomicResultsMessage> future) {
if (!cancelAsynch) {
- workItem.moreWork();
+ workItem.moreWork(); //this is not necessary in some situations with DataNotAvailable
}
canAsynchClose = false;
if (closed.get()) {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-11-02
15:15:47 UTC (rev 3601)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-11-02
16:50:24 UTC (rev 3602)
@@ -291,9 +291,13 @@
}
}
} catch (BlockedException e) {
- LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread",
requestID, "- processor blocked"); //$NON-NLS-1$ //$NON-NLS-2$
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL))
{
+ LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread",
requestID, "- processor blocked"); //$NON-NLS-1$ //$NON-NLS-2$
+ }
} catch (QueryProcessor.ExpiredTimeSliceException e) {
- LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread",
requestID, "- time slice expired"); //$NON-NLS-1$ //$NON-NLS-2$
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL))
{
+ LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread",
requestID, "- time slice expired"); //$NON-NLS-1$ //$NON-NLS-2$
+ }
this.moreWork();
} catch (Throwable e) {
handleThrowable(e);
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-11-02
15:15:47 UTC (rev 3601)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-11-02
16:50:24 UTC (rev 3602)
@@ -587,6 +587,15 @@
assertEquals(1, this.core.getLongRunningRequests().size());
}
+ @Test public void testDataAvailable() throws Exception {
+ agds.dataNotAvailable = -1;
+ RequestMessage reqMsg = exampleRequestMessage("select * FROM
BQT1.SmallA");
+ ResultsMessage results = execute("A", 1, reqMsg);
+ if (results.getException() != null) {
+ throw results.getException();
+ }
+ }
+
public void helpTestVisibilityFails(String sql) throws Exception {
RequestMessage reqMsg = exampleRequestMessage(sql);
reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2011-11-02
15:15:47 UTC (rev 3601)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2011-11-02
16:50:24 UTC (rev 3602)
@@ -168,4 +168,21 @@
}
}
+ @Test public void testAsynch() throws Exception {
+ this.connectorManager.dataNotAvailable = 10;
+ this.connectorManager.setRows(0);
+ helpSetup(3);
+ boolean blocked = false;
+ while (true) {
+ try {
+ assertNull(info.nextTuple());
+ break;
+ } catch (BlockedException e) {
+ blocked = true;
+ Thread.sleep(50);
+ }
+ }
+ assertTrue(blocked);
+ }
+
}
Modified: trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java 2011-11-02
15:15:47 UTC (rev 3601)
+++ trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java 2011-11-02
16:50:24 UTC (rev 3602)
@@ -35,6 +35,7 @@
import org.teiid.dqp.internal.datamgr.ConnectorManager;
import org.teiid.dqp.internal.datamgr.ConnectorWork;
import org.teiid.dqp.internal.datamgr.ConnectorWorkItem;
+import org.teiid.dqp.internal.process.RequestWorkItem;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.AtomicResultsMessage;
import org.teiid.query.optimizer.TestOptimizer;
@@ -55,7 +56,7 @@
private int rows = 10;
private SourceCapabilities caps;
public boolean throwExceptionOnExecute;
- public int dataNotAvailable = -1;
+ public int dataNotAvailable = -2;
public int sleep;
private final AtomicInteger executeCount = new AtomicInteger();
private final AtomicInteger closeCount = new AtomicInteger();
@@ -98,9 +99,31 @@
final AtomicResultsMessage msg =
ConnectorWorkItem.createResultsMessage(results);
msg.setFinalRow(rows);
return new ConnectorWork() {
+
+ RequestWorkItem item;
+ boolean returnedInitial;
+
+ @Override
+ public boolean isDataAvailable() {
+ return true;
+ }
+
+ @Override
+ public void setRequestWorkItem(RequestWorkItem item) {
+ this.item = item;
+ }
@Override
public AtomicResultsMessage more() throws TranslatorException {
+ if (dataNotAvailable == -1) {
+ dataNotAvailable = -2;
+ item.moreWork(); //this alone is not sufficient, we have to call the data available
method to prevent
+ //timing issues
+ throw DataNotAvailableException.NO_POLLING;
+ }
+ if (returnedInitial) {
+ return msg;
+ }
throw new RuntimeException("Should not be called"); //$NON-NLS-1$
}
@@ -117,9 +140,13 @@
if (throwExceptionOnExecute) {
throw new TranslatorException("Connector Exception"); //$NON-NLS-1$
}
- if (dataNotAvailable > -1) {
+ if (dataNotAvailable > -2) {
int delay = dataNotAvailable;
- dataNotAvailable = -1;
+ if (delay == -1 && !returnedInitial) {
+ returnedInitial = true;
+ return ConnectorWorkItem.createResultsMessage(new List[0]);
+ }
+ dataNotAvailable = -2;
throw new DataNotAvailableException(delay);
}
return msg;
Modified: trunk/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java 2011-11-02
15:15:47 UTC (rev 3601)
+++ trunk/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java 2011-11-02
16:50:24 UTC (rev 3602)
@@ -21,16 +21,11 @@
*/
package org.teiid.transport;
-import java.util.Properties;
-
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.DefaultChannelPipeline;
import org.teiid.client.security.ILogon;
import org.teiid.common.buffer.StorageManager;
-import org.teiid.core.TeiidException;
-import org.teiid.jdbc.EmbeddedProfile;
import org.teiid.jdbc.TeiidDriver;
-import org.teiid.net.ServerConnection;
import org.teiid.net.TeiidURL.CONNECTION.AuthenticationType;
import org.teiid.net.socket.ObjectChannel;
@@ -46,14 +41,6 @@
super(config, new ClientServiceRegistryImpl(ClientServiceRegistry.Type.ODBC),
storageManager, portOffset);
this.maxLobSize = maxLobSize;
this.driver = new TeiidDriver();
- this.driver.setEmbeddedProfile(new EmbeddedProfile() {
- @Override
- protected ServerConnection createServerConnection(Properties info)
- throws TeiidException {
- //When using the non-blocking api, we don't want to use the calling thread
- return new LocalServerConnection(info, false);
- }
- });
this.logonService = logon;
}