Author: shawkins
Date: 2012-08-02 14:04:30 -0400 (Thu, 02 Aug 2012)
New Revision: 4289
Modified:
trunk/api/src/main/java/org/teiid/translator/BaseDelegatingExecutionFactory.java
trunk/api/src/main/java/org/teiid/translator/DataNotAvailableException.java
trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java
trunk/connectors/translator-loopback/src/main/java/org/teiid/translator/loopback/LoopbackExecution.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorWorkItem.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
trunk/metadata/src/test/java/org/teiid/cdk/api/ConnectorHost.java
trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
Log:
TEIID-2130 addressing short-comings with datanotavailable
Modified:
trunk/api/src/main/java/org/teiid/translator/BaseDelegatingExecutionFactory.java
===================================================================
---
trunk/api/src/main/java/org/teiid/translator/BaseDelegatingExecutionFactory.java 2012-08-01
20:18:03 UTC (rev 4288)
+++
trunk/api/src/main/java/org/teiid/translator/BaseDelegatingExecutionFactory.java 2012-08-02
18:04:30 UTC (rev 4289)
@@ -443,4 +443,8 @@
public boolean isSourceRequiredForMetadata() {
return delegate.isSourceRequiredForMetadata();
}
+ @Override
+ public boolean isForkable() {
+ return delegate.isForkable();
+ }
}
Modified: trunk/api/src/main/java/org/teiid/translator/DataNotAvailableException.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/DataNotAvailableException.java 2012-08-01
20:18:03 UTC (rev 4288)
+++ trunk/api/src/main/java/org/teiid/translator/DataNotAvailableException.java 2012-08-02
18:04:30 UTC (rev 4289)
@@ -21,25 +21,31 @@
*/
package org.teiid.translator;
+import java.util.Date;
+
import org.teiid.core.TeiidRuntimeException;
/**
- * Used by asynch connectors to indicate data is not available
- * and results should be polled for after the given delay in milliseconds.
+ * Used by asynch connectors to indicate data is not available and results should be
polled for after
+ * the given delay in milliseconds or until a Date is reached.
* <br>
- * Note that delays are not guaranteed. The delay is the maximum amount of time before
the plan will be re-queued for execution.
- * There are several scenarios that would cause the delay to be shorter, such as multiple
sources where one source returns a shorter
- * delay or if the engine believes more work is to be done before allowing the plan to
sit idle.
+ * Note that delays are not guaranteed unless {@link #strict} is set to true. With
{@link #strict} false, the delay is the maximum amount
+ * of time before the plan will be re-queued for execution. There are several scenarios
that would cause the delay to be shorter, such as
+ * multiple sources where one source returns a shorter delay or if the engine believes
more work is to be done before allowing the plan to sit idle.
+ * <br>
*/
public class DataNotAvailableException extends TeiidRuntimeException {
private static final long serialVersionUID = 5569111182915674334L;
private long retryDelay = 0;
+ private Date waitUntil;
+ private boolean strict;
/**
* Indicate that the engine should not poll for results and will be notified
- * via the {@link ExecutionContext#dataAvailable()} method.
+ * via the {@link ExecutionContext#dataAvailable()} method. However the engine may
still ask
+ * for results before the dataAvailable is called.
*/
public static final DataNotAvailableException NO_POLLING = new
DataNotAvailableException(-1);
@@ -59,8 +65,35 @@
this.retryDelay = retryDelay;
}
+ /**
+ * Instructs the engine to wait until the Date is met before getting results. By
default this will
+ * be strictly enforced, meaning that no attempt will be made to get results before the
given Date.
+ * @param waitUntil
+ */
+ public DataNotAvailableException(Date waitUntil) {
+ this.waitUntil = waitUntil;
+ this.strict = true;
+ }
+
public long getRetryDelay() {
return retryDelay;
}
+
+ public Date getWaitUntil() {
+ return waitUntil;
+ }
+
+ /**
+ * If the delay or Date is strictly enforced then the execution will not asked for
results until
+ * after that time or until {@link ExecutionContext#dataAvailable()} is called.
+ * @return
+ */
+ public boolean isStrict() {
+ return strict;
+ }
+
+ public void setStrict(boolean strict) {
+ this.strict = strict;
+ }
}
Modified: trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java 2012-08-01 20:18:03
UTC (rev 4288)
+++ trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java 2012-08-02 18:04:30
UTC (rev 4289)
@@ -1019,5 +1019,13 @@
public CacheDirective getCacheDirective(Command command, ExecutionContext
executionContext, RuntimeMetadata metadata) throws TranslatorException {
return null;
}
+
+ /**
+ * When forkable the engine may use a separate thread to interact with returned {@link
Execution}.
+ * @return true if {@link Execution}s can be called in separate threads from the
processing thread
+ */
+ public boolean isForkable() {
+ return true;
+ }
}
Modified:
trunk/connectors/translator-loopback/src/main/java/org/teiid/translator/loopback/LoopbackExecution.java
===================================================================
---
trunk/connectors/translator-loopback/src/main/java/org/teiid/translator/loopback/LoopbackExecution.java 2012-08-01
20:18:03 UTC (rev 4288)
+++
trunk/connectors/translator-loopback/src/main/java/org/teiid/translator/loopback/LoopbackExecution.java 2012-08-02
18:04:30 UTC (rev 4289)
@@ -40,9 +40,9 @@
import org.teiid.language.Argument.Direction;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
-import org.teiid.translator.TranslatorException;
import org.teiid.translator.DataNotAvailableException;
import org.teiid.translator.ProcedureExecution;
+import org.teiid.translator.TranslatorException;
import org.teiid.translator.TypeFacility;
import org.teiid.translator.UpdateExecution;
@@ -81,7 +81,9 @@
// then just say we don't have results instead
if(randomTimeToWait > this.config.getPollIntervalInMilli()) {
waited = true;
- throw new DataNotAvailableException(randomTimeToWait);
+ DataNotAvailableException dnae = new
DataNotAvailableException(randomTimeToWait);
+ dnae.setStrict(true);
+ throw dnae;
}
try {
Thread.sleep(randomTimeToWait);
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java 2012-08-01
20:18:03 UTC (rev 4288)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java 2012-08-02
18:04:30 UTC (rev 4289)
@@ -40,7 +40,7 @@
void close();
- AtomicResultsMessage execute() throws TranslatorException, BlockedException;
+ void execute() throws TranslatorException, BlockedException;
void setRequestWorkItem(RequestWorkItem item);
@@ -52,4 +52,6 @@
boolean areLobsUsableAfterClose();
+ boolean isForkable();
+
}
\ No newline at end of file
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java 2012-08-01
20:18:03 UTC (rev 4288)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java 2012-08-02
18:04:30 UTC (rev 4289)
@@ -82,6 +82,8 @@
private AtomicBoolean isCancelled = new AtomicBoolean();
private org.teiid.language.Command translatedCommand;
+
+ private DataNotAvailableException dnae;
ConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager) {
this.id = message.getAtomicRequestID();
@@ -134,6 +136,12 @@
}
public AtomicResultsMessage more() throws TranslatorException {
+ if (this.dnae != null) {
+ //clear the exception if it has been set
+ DataNotAvailableException e = this.dnae;
+ this.dnae = null;
+ throw e;
+ }
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id,
"Processing MORE request"}); //$NON-NLS-1$
try {
return handleBatch();
@@ -197,7 +205,7 @@
return new TranslatorException(t);
}
- public AtomicResultsMessage execute() throws TranslatorException {
+ public void execute() throws TranslatorException {
if(isCancelled()) {
throw new TranslatorException(QueryPlugin.Event.TEIID30476,
QueryPlugin.Util.gs(QueryPlugin.Event.TEIID30476));
}
@@ -252,8 +260,6 @@
// Execute query
this.execution.execute();
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id,
"Executed command"}); //$NON-NLS-1$
-
- return handleBatch();
} catch (Throwable t) {
throw handleError(t);
}
@@ -339,9 +345,14 @@
}
}
} catch (DataNotAvailableException e) {
- if (rows.size() == 0 && this.rowCount != 0) {
+ if (rows.size() == 0) {
throw e;
}
+ if (e.getWaitUntil() != null) {
+ //we have an await until that we need to enforce
+ this.dnae = e;
+ }
+ //else we can just ignore the delay
}
if (lastBatch) {
@@ -414,4 +425,9 @@
return cd;
}
+ @Override
+ public boolean isForkable() {
+ return this.connector.isForkable();
+ }
+
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2012-08-01
20:18:03 UTC (rev 4288)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2012-08-02
18:04:30 UTC (rev 4289)
@@ -35,6 +35,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.resource.spi.work.Work;
@@ -452,8 +453,8 @@
this.processWorkerPool.execute(work);
}
- void scheduleWork(final Runnable r, int priority, long delay) {
- this.processWorkerPool.schedule(new FutureWork<Void>(new Callable<Void>()
{
+ ScheduledFuture<?> scheduleWork(final Runnable r, int priority, long delay) {
+ return this.processWorkerPool.schedule(new FutureWork<Void>(new
Callable<Void>() {
@Override
public Void call() throws Exception {
r.run();
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2012-08-01
20:18:03 UTC (rev 4288)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2012-08-02
18:04:30 UTC (rev 4289)
@@ -201,6 +201,9 @@
ConnectorManagerRepository cmr =
workItem.getDqpWorkContext().getVDB().getAttachment(ConnectorManagerRepository.class);
ConnectorManager connectorManager = cmr.getConnectorManager(aqr.getConnectorName());
ConnectorWork work = connectorManager.registerRequest(aqr);
+ if (!work.isForkable()) {
+ aqr.setSerial(true);
+ }
CacheID cid = null;
CacheDirective cd = null;
if (workItem.getRsCache() != null && command.areResultsCachable()) {
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2012-08-01
20:18:03 UTC (rev 4288)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2012-08-02
18:04:30 UTC (rev 4289)
@@ -23,10 +23,12 @@
package org.teiid.dqp.internal.process;
import java.io.IOException;
+import java.lang.ref.WeakReference;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.activation.DataSource;
@@ -81,7 +83,24 @@
*/
public class DataTierTupleSource implements TupleSource,
CompletionListener<AtomicResultsMessage> {
- // Construction state
+ private static final class MoreWorkTask implements Runnable {
+
+ WeakReference<RequestWorkItem> ref;
+
+ public MoreWorkTask(RequestWorkItem workItem) {
+ ref = new WeakReference<RequestWorkItem>(workItem);
+ }
+
+ @Override
+ public void run() {
+ RequestWorkItem item = ref.get();
+ if (item != null) {
+ item.moreWork();
+ }
+ }
+ }
+
+ // Construction state
private final AtomicRequestMessage aqr;
private final RequestWorkItem workItem;
private final ConnectorWork cwi;
@@ -113,6 +132,9 @@
boolean errored;
Scope scope; //this is to avoid synchronization
+
+ private long waitUntil;
+ private ScheduledFuture<?> scheduledFuture;
public DataTierTupleSource(AtomicRequestMessage aqr, RequestWorkItem workItem,
ConnectorWork cwi, DataTierManagerImpl dtm, int limit) {
this.aqr = aqr;
@@ -226,6 +248,12 @@
}
public List<?> nextTuple() throws TeiidComponentException,
TeiidProcessingException {
+ if (waitUntil > 0 && waitUntil > System.currentTimeMillis()) {
+ if (!this.cwi.isDataAvailable()) {
+ throw BlockedException.block(aqr.getAtomicRequestID(), "Blocking until",
waitUntil); //$NON-NLS-1$
+ }
+ this.waitUntil = 0;
+ }
while (true) {
if (arm == null) {
if (isDone()) {
@@ -262,17 +290,8 @@
partial = true;
} catch (DataNotAvailableException e) {
dna = true;
- if (e.getRetryDelay() >= 0) {
- workItem.scheduleWork(new Runnable() {
- @Override
- public void run() {
- workItem.moreWork();
- }
- }, 10, e.getRetryDelay());
- } else if (this.cwi.isDataAvailable()) {
- continue;
- }
- throw BlockedException.block(aqr.getAtomicRequestID(), "Blocking on
DataNotAvailableException", aqr.getAtomicRequestID()); //$NON-NLS-1$
+ handleDataNotAvailable(e);
+ continue;
} finally {
if (!dna && results == null) {
errored = true;
@@ -295,6 +314,39 @@
}
}
+ private void handleDataNotAvailable(DataNotAvailableException e)
+ throws BlockedException {
+ if (e.getWaitUntil() != null) {
+ long timeDiff = e.getWaitUntil().getTime() - System.currentTimeMillis();
+ if (timeDiff <= 0) {
+ //already met the time
+ return;
+ }
+ if (e.isStrict()) {
+ this.waitUntil = e.getWaitUntil().getTime();
+ }
+ scheduleMoreWork(timeDiff);
+ } else if (e.getRetryDelay() >= 0) {
+ if (e.isStrict()) {
+ this.waitUntil = System.currentTimeMillis() + e.getRetryDelay();
+ }
+ scheduleMoreWork(e.getRetryDelay());
+ } else if (this.cwi.isDataAvailable()) {
+ return; //no polling, but data is already available
+ } else if (e.isStrict()) {
+ //no polling, wait indefinitely
+ this.waitUntil = Long.MAX_VALUE;
+ }
+ throw BlockedException.block(aqr.getAtomicRequestID(), "Blocking on
DataNotAvailableException", aqr.getAtomicRequestID()); //$NON-NLS-1$
+ }
+
+ private void scheduleMoreWork(long timeDiff) {
+ if (scheduledFuture != null) {
+ this.scheduledFuture.cancel(false);
+ }
+ scheduledFuture = workItem.scheduleWork(new MoreWorkTask(workItem), 10, timeDiff);
+ }
+
private void checkForUpdates(AtomicResultsMessage results, Command command,
EventDistributor distributor, int commandIndex, long ts) {
if (!RelationalNodeUtil.isUpdate(command) || !(command instanceof ProcedureContainer))
{
@@ -367,11 +419,10 @@
}
running = true;
if (!executed) {
- results = cwi.execute();
+ cwi.execute();
executed = true;
- } else {
- results = cwi.more();
}
+ results = cwi.more();
return results;
}
@@ -419,6 +470,10 @@
* @see TupleSource#closeSource()
*/
public void closeSource() {
+ if (this.scheduledFuture != null) {
+ this.scheduledFuture.cancel(true);
+ this.scheduledFuture = null;
+ }
lobBuffer = null;
lobStore = null; //can still be referenced by lobs and will be cleaned-up by
reference
cancelAsynch = true;
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2012-08-01
20:18:03 UTC (rev 4288)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2012-08-02
18:04:30 UTC (rev 4289)
@@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
import org.teiid.client.RequestMessage;
import org.teiid.client.ResultsMessage;
@@ -1020,8 +1021,8 @@
return work;
}
- void scheduleWork(Runnable r, int priority, long delay) {
- dqpCore.scheduleWork(r, priority, delay);
+ ScheduledFuture<?> scheduleWork(Runnable r, int priority, long delay) {
+ return dqpCore.scheduleWork(r, priority, delay);
}
public void setCancelTask(Task cancelTask) {
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorWorkItem.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorWorkItem.java 2012-08-01
20:18:03 UTC (rev 4288)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorWorkItem.java 2012-08-02
18:04:30 UTC (rev 4289)
@@ -117,7 +117,8 @@
AtomicRequestMessage arm = createNewAtomicRequestMessage(1, 1);
arm.setCommand(command);
ConnectorWorkItem synchConnectorWorkItem = new ConnectorWorkItem(arm,
TestConnectorManager.getConnectorManager());
- return synchConnectorWorkItem.execute();
+ synchConnectorWorkItem.execute();
+ return synchConnectorWorkItem.more();
}
@Test public void testExecutionWarning() throws Throwable {
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2012-08-01
20:18:03 UTC (rev 4288)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2012-08-02
18:04:30 UTC (rev 4289)
@@ -599,6 +599,7 @@
@Test public void testDataAvailable() throws Exception {
agds.dataNotAvailable = -1;
+ agds.dataAvailable = true;
RequestMessage reqMsg = exampleRequestMessage("select * FROM
BQT1.SmallA");
ResultsMessage results = execute("A", 1, reqMsg);
if (results.getException() != null) {
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2012-08-01
20:18:03 UTC (rev 4288)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2012-08-02
18:04:30 UTC (rev 4289)
@@ -67,10 +67,12 @@
private AutoGenDataService connectorManager = new AutoGenDataService();
private RequestWorkItem workItem;
private int limit = -1;
+ private boolean serial = false;
@Before public void setUp() {
limit = -1;
connectorManager = new AutoGenDataService();
+ serial = false;
}
private static Command helpGetCommand(String sql, QueryMetadataInterface metadata)
throws Exception {
@@ -86,6 +88,7 @@
private DataTierTupleSource helpSetup(String sql, int nodeId) throws Exception {
helpSetupDataTierManager();
AtomicRequestMessage request = helpSetupRequest(sql, nodeId);
+ request.setSerial(serial);
return new DataTierTupleSource(request, workItem,
connectorManager.registerRequest(request), dtm, limit);
}
@@ -191,7 +194,7 @@
assertNull(workItem.getConnectorRequest(info.getAtomicRequestMessage().getAtomicRequestID()));
}
- private int pullTuples(TupleSource info, int limit)
+ private int pullTuples(TupleSource info, int l)
throws TeiidComponentException, TeiidProcessingException,
InterruptedException {
int i = 0;
@@ -200,7 +203,7 @@
if (info.nextTuple() == null) {
break;
}
- if (++i == limit) {
+ if (++i == l) {
break;
}
} catch (BlockedException e) {
@@ -241,6 +244,7 @@
@Test public void testAsynch() throws Exception {
this.connectorManager.dataNotAvailable = 10;
+ this.serial = true;
this.connectorManager.setRows(0);
DataTierTupleSource info = helpSetup(3);
boolean blocked = false;
@@ -250,12 +254,42 @@
break;
} catch (BlockedException e) {
blocked = true;
+ try {
+ info.nextTuple();
+ } catch (BlockedException be) {
+ fail();
+ }
Thread.sleep(50);
}
}
assertTrue(blocked);
}
+ @Test public void testAsynchStrict() throws Exception {
+ this.connectorManager.dataNotAvailable = 1000;
+ this.serial = true;
+ this.connectorManager.strict = true;
+ this.connectorManager.setRows(0);
+ DataTierTupleSource info = helpSetup(3);
+ boolean blocked = false;
+ while (true) {
+ try {
+ assertNull(info.nextTuple());
+ break;
+ } catch (BlockedException e) {
+ blocked = true;
+ try {
+ info.nextTuple();
+ fail();
+ } catch (BlockedException be) {
+ //we won't bother to wait the full second
+ }
+ break;
+ }
+ }
+ assertTrue(blocked);
+ }
+
@Test public void testCaching() throws Exception {
assertEquals(0, connectorManager.getExecuteCount().get());
@@ -271,6 +305,8 @@
assertEquals(1, connectorManager.getExecuteCount().get());
assertFalse(rrp.doNotCache);
+ assertEquals(1, this.rm.getRsCache().getTotalCacheEntries());
+
//same session, should be cached
command = helpSetupRequest("SELECT stringkey from bqt1.smalla",
1).getCommand();
rrp = new RegisterRequestParameter();
Modified: trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java 2012-08-01
20:18:03 UTC (rev 4288)
+++ trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java 2012-08-02
18:04:30 UTC (rev 4289)
@@ -65,7 +65,8 @@
private int rows = 10;
private SourceCapabilities caps;
public boolean throwExceptionOnExecute;
- public int dataNotAvailable = -2;
+ public Integer dataNotAvailable;
+ public boolean strict;
public int sleep;
private final AtomicInteger executeCount = new AtomicInteger();
private final AtomicInteger closeCount = new AtomicInteger();
@@ -73,6 +74,7 @@
public boolean addWarning;
public boolean copyLobs;
public CacheDirective cacheDirective;
+ public boolean dataAvailable;
public AutoGenDataService() {
super("FakeConnector","FakeConnector"); //$NON-NLS-1$
//$NON-NLS-2$
@@ -117,7 +119,7 @@
@Override
public boolean isDataAvailable() {
- return true;
+ return dataAvailable;
}
@Override
@@ -132,20 +134,25 @@
@Override
public AtomicResultsMessage more() throws TranslatorException {
- if (dataNotAvailable == -1) {
- dataNotAvailable = -2;
- item.moreWork(); //this alone is not sufficient, we have to call the data available
method to prevent
- //timing issues
- throw DataNotAvailableException.NO_POLLING;
+ if (dataNotAvailable != null) {
+ int delay = dataNotAvailable;
+ dataNotAvailable = null;
+ DataNotAvailableException dnae = new DataNotAvailableException(delay);
+ dnae.setStrict(strict);
+ throw dnae;
}
- if (returnedInitial) {
+ if (addWarning) {
+ msg.setWarnings(Arrays.asList(new Exception()));
+ }
+ if (!returnedInitial) {
+ returnedInitial = true;
return msg;
}
throw new RuntimeException("Should not be called"); //$NON-NLS-1$
}
@Override
- public AtomicResultsMessage execute() throws TranslatorException {
+ public void execute() throws TranslatorException {
executeCount.incrementAndGet();
if (sleep > 0) {
try {
@@ -157,19 +164,6 @@
if (throwExceptionOnExecute) {
throw new TranslatorException("Connector Exception"); //$NON-NLS-1$
}
- if (dataNotAvailable > -2) {
- int delay = dataNotAvailable;
- if (delay == -1 && !returnedInitial) {
- returnedInitial = true;
- return ConnectorWorkItem.createResultsMessage(new List[0]);
- }
- dataNotAvailable = -2;
- throw new DataNotAvailableException(delay);
- }
- if (addWarning) {
- msg.setWarnings(Arrays.asList(new Exception()));
- }
- return msg;
}
@Override
@@ -191,6 +185,11 @@
public CacheDirective getCacheDirective() {
return cacheDirective;
}
+
+ @Override
+ public boolean isForkable() {
+ return true;
+ }
};
}
Modified: trunk/metadata/src/test/java/org/teiid/cdk/api/ConnectorHost.java
===================================================================
--- trunk/metadata/src/test/java/org/teiid/cdk/api/ConnectorHost.java 2012-08-01 20:18:03
UTC (rev 4288)
+++ trunk/metadata/src/test/java/org/teiid/cdk/api/ConnectorHost.java 2012-08-02 18:04:30
UTC (rev 4289)
@@ -30,12 +30,12 @@
import org.teiid.language.Command;
import org.teiid.metadata.RuntimeMetadata;
import org.teiid.metadata.index.VDBMetadataFactory;
-import org.teiid.translator.TranslatorException;
import org.teiid.translator.DataNotAvailableException;
import org.teiid.translator.Execution;
import org.teiid.translator.ExecutionContext;
import org.teiid.translator.ExecutionFactory;
import org.teiid.translator.ResultSetExecution;
+import org.teiid.translator.TranslatorException;
import org.teiid.translator.UpdateExecution;
/**
@@ -138,11 +138,13 @@
}
break;
} catch (DataNotAvailableException e) {
- try {
- Thread.sleep(e.getRetryDelay());
- } catch (InterruptedException e1) {
- throw new TranslatorException(e1);
- }
+ if (e.getRetryDelay() > 0) {
+ try {
+ Thread.sleep(e.getRetryDelay());
+ } catch (InterruptedException e1) {
+ throw new TranslatorException(e1);
+ }
+ }
}
}
return results;
Modified: trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java 2012-08-01 20:18:03
UTC (rev 4288)
+++ trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java 2012-08-02 18:04:30
UTC (rev 4289)
@@ -159,6 +159,9 @@
return new ConnectorManager(translatorName, connectionName) {
@Override
public Object getConnectionFactory() throws TranslatorException {
+ if (getConnectionName() == null) {
+ return null;
+ }
ConnectionFactoryProvider<?> connectionFactoryProvider =
connectionFactoryProviders.get(getConnectionName());
if (connectionFactoryProvider != null) {
return connectionFactoryProvider.getConnectionFactory();