[teiid-commits] teiid SVN: r4289 - in trunk: connectors/translator-loopback/src/main/java/org/teiid/translator/loopback and 7 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Aug 2 14:04:34 EDT 2012


Author: shawkins
Date: 2012-08-02 14:04:30 -0400 (Thu, 02 Aug 2012)
New Revision: 4289

Modified:
   trunk/api/src/main/java/org/teiid/translator/BaseDelegatingExecutionFactory.java
   trunk/api/src/main/java/org/teiid/translator/DataNotAvailableException.java
   trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java
   trunk/connectors/translator-loopback/src/main/java/org/teiid/translator/loopback/LoopbackExecution.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorWorkItem.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
   trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
   trunk/metadata/src/test/java/org/teiid/cdk/api/ConnectorHost.java
   trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
Log:
TEIID-2130 addressing short-comings with datanotavailable

Modified: trunk/api/src/main/java/org/teiid/translator/BaseDelegatingExecutionFactory.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/BaseDelegatingExecutionFactory.java	2012-08-01 20:18:03 UTC (rev 4288)
+++ trunk/api/src/main/java/org/teiid/translator/BaseDelegatingExecutionFactory.java	2012-08-02 18:04:30 UTC (rev 4289)
@@ -443,4 +443,8 @@
 	public boolean isSourceRequiredForMetadata() {
 		return delegate.isSourceRequiredForMetadata();
 	}
+	@Override
+	public boolean isForkable() {
+		return delegate.isForkable();
+	}
 }

Modified: trunk/api/src/main/java/org/teiid/translator/DataNotAvailableException.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/DataNotAvailableException.java	2012-08-01 20:18:03 UTC (rev 4288)
+++ trunk/api/src/main/java/org/teiid/translator/DataNotAvailableException.java	2012-08-02 18:04:30 UTC (rev 4289)
@@ -21,25 +21,31 @@
  */
 package org.teiid.translator;
 
+import java.util.Date;
+
 import org.teiid.core.TeiidRuntimeException;
 
 /**
- * Used by asynch connectors to indicate data is not available 
- * and results should be polled for after the given delay in milliseconds.
+ * Used by asynch connectors to indicate data is not available and results should be polled for after 
+ * the given delay in milliseconds or until a Date is reached.
  * <br>
- * Note that delays are not guaranteed.  The delay is the maximum amount of time before the plan will be re-queued for execution.  
- * There are several scenarios that would cause the delay to be shorter, such as multiple sources where one source returns a shorter 
- * delay or if the engine believes more work is to be done before allowing the plan to sit idle.
+ * Note that delays are not guaranteed unless {@link #strict} is set to true.  With {@link #strict} false, the delay is the maximum amount 
+ * of time before the plan will be re-queued for execution. There are several scenarios that would cause the delay to be shorter, such as 
+ * multiple sources where one source returns a shorter delay or if the engine believes more work is to be done before allowing the plan to sit idle.
+ * <br>
  */
 public class DataNotAvailableException extends TeiidRuntimeException {
 
 	private static final long serialVersionUID = 5569111182915674334L;
 
 	private long retryDelay = 0;
+	private Date waitUntil;
+	private boolean strict;
 	
 	/**
 	 * Indicate that the engine should not poll for results and will be notified
-	 * via the {@link ExecutionContext#dataAvailable()} method.
+	 * via the {@link ExecutionContext#dataAvailable()} method.  However the engine may still ask
+	 * for results before the dataAvailable is called.
 	 */
 	public static final DataNotAvailableException NO_POLLING = new DataNotAvailableException(-1);
 	
@@ -59,8 +65,35 @@
 		this.retryDelay = retryDelay;
 	}
 	
+	/**
+	 * Instructs the engine to wait until the Date is met before getting results.  By default this will
+	 * be strictly enforced, meaning that no attempt will be made to get results before the given Date.
+	 * @param waitUntil
+	 */
+	public DataNotAvailableException(Date waitUntil) {
+		this.waitUntil = waitUntil;
+		this.strict = true;
+	}
+	
 	public long getRetryDelay() {
 		return retryDelay;
 	}
+	
+	public Date getWaitUntil() {
+		return waitUntil;
+	}
+	
+	/**
+	 * If the delay or Date is strictly enforced then the execution will not asked for results until
+	 * after that time or until {@link ExecutionContext#dataAvailable()} is called.
+	 * @return
+	 */
+	public boolean isStrict() {
+		return strict;
+	}
+	
+	public void setStrict(boolean strict) {
+		this.strict = strict;
+	}
 
 }

Modified: trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java	2012-08-01 20:18:03 UTC (rev 4288)
+++ trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java	2012-08-02 18:04:30 UTC (rev 4289)
@@ -1019,5 +1019,13 @@
 	public CacheDirective getCacheDirective(Command command, ExecutionContext executionContext, RuntimeMetadata metadata) throws TranslatorException {
 		return null;
 	}
+	
+	/**
+	 * When forkable the engine may use a separate thread to interact with returned {@link Execution}.
+	 * @return true if {@link Execution}s can be called in separate threads from the processing thread
+	 */
+	public boolean isForkable() {
+		return true;
+	}
 
 }

Modified: trunk/connectors/translator-loopback/src/main/java/org/teiid/translator/loopback/LoopbackExecution.java
===================================================================
--- trunk/connectors/translator-loopback/src/main/java/org/teiid/translator/loopback/LoopbackExecution.java	2012-08-01 20:18:03 UTC (rev 4288)
+++ trunk/connectors/translator-loopback/src/main/java/org/teiid/translator/loopback/LoopbackExecution.java	2012-08-02 18:04:30 UTC (rev 4289)
@@ -40,9 +40,9 @@
 import org.teiid.language.Argument.Direction;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
-import org.teiid.translator.TranslatorException;
 import org.teiid.translator.DataNotAvailableException;
 import org.teiid.translator.ProcedureExecution;
+import org.teiid.translator.TranslatorException;
 import org.teiid.translator.TypeFacility;
 import org.teiid.translator.UpdateExecution;
 
@@ -81,7 +81,9 @@
             // then just say we don't have results instead
             if(randomTimeToWait > this.config.getPollIntervalInMilli()) {
             	waited = true;
-                throw new DataNotAvailableException(randomTimeToWait);
+                DataNotAvailableException dnae = new DataNotAvailableException(randomTimeToWait);
+                dnae.setStrict(true);
+                throw dnae;
             } 
             try {
                 Thread.sleep(randomTimeToWait);

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java	2012-08-01 20:18:03 UTC (rev 4288)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java	2012-08-02 18:04:30 UTC (rev 4289)
@@ -40,7 +40,7 @@
 
 	void close();
 
-	AtomicResultsMessage execute() throws TranslatorException, BlockedException;
+	void execute() throws TranslatorException, BlockedException;
 	
 	void setRequestWorkItem(RequestWorkItem item);
 	
@@ -52,4 +52,6 @@
 
 	boolean areLobsUsableAfterClose();
 	
+	boolean isForkable();
+	
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java	2012-08-01 20:18:03 UTC (rev 4288)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java	2012-08-02 18:04:30 UTC (rev 4289)
@@ -82,6 +82,8 @@
     
     private AtomicBoolean isCancelled = new AtomicBoolean();
 	private org.teiid.language.Command translatedCommand;
+	
+	private DataNotAvailableException dnae;
     
     ConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager) {
         this.id = message.getAtomicRequestID();
@@ -134,6 +136,12 @@
     }
     
     public AtomicResultsMessage more() throws TranslatorException {
+    	if (this.dnae != null) {
+    		//clear the exception if it has been set
+    		DataNotAvailableException e = this.dnae;
+    		this.dnae = null;
+    		throw e;
+    	}
     	LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Processing MORE request"}); //$NON-NLS-1$
     	try {
     		return handleBatch();
@@ -197,7 +205,7 @@
 		return new TranslatorException(t);
     }
     
-	public AtomicResultsMessage execute() throws TranslatorException {
+	public void execute() throws TranslatorException {
         if(isCancelled()) {
     		 throw new TranslatorException(QueryPlugin.Event.TEIID30476, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID30476));
     	}
@@ -252,8 +260,6 @@
 	        // Execute query
 	    	this.execution.execute();
 	        LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Executed command"}); //$NON-NLS-1$
-	
-	        return handleBatch();
     	} catch (Throwable t) {
     		throw handleError(t);
     	}
@@ -339,9 +345,14 @@
 	            }
 	        }
     	} catch (DataNotAvailableException e) {
-    		if (rows.size() == 0 && this.rowCount != 0) {
+    		if (rows.size() == 0) {
     			throw e;
     		}
+    		if (e.getWaitUntil() != null) {
+    			//we have an await until that we need to enforce 
+    			this.dnae = e;
+    		}
+    		//else we can just ignore the delay
     	}
                 
         if (lastBatch) {
@@ -414,4 +425,9 @@
 		return cd;
 	}
 
+	@Override
+	public boolean isForkable() {
+		return this.connector.isForkable();
+	}
+
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2012-08-01 20:18:03 UTC (rev 4288)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2012-08-02 18:04:30 UTC (rev 4289)
@@ -35,6 +35,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import javax.resource.spi.work.Work;
@@ -452,8 +453,8 @@
 		this.processWorkerPool.execute(work);
     }
     
-    void scheduleWork(final Runnable r, int priority, long delay) {
-		this.processWorkerPool.schedule(new FutureWork<Void>(new Callable<Void>() {
+    ScheduledFuture<?> scheduleWork(final Runnable r, int priority, long delay) {
+		return this.processWorkerPool.schedule(new FutureWork<Void>(new Callable<Void>() {
 			@Override
 			public Void call() throws Exception {
 				r.run();

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2012-08-01 20:18:03 UTC (rev 4288)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2012-08-02 18:04:30 UTC (rev 4289)
@@ -201,6 +201,9 @@
 		ConnectorManagerRepository cmr = workItem.getDqpWorkContext().getVDB().getAttachment(ConnectorManagerRepository.class);
 		ConnectorManager connectorManager = cmr.getConnectorManager(aqr.getConnectorName());
 		ConnectorWork work = connectorManager.registerRequest(aqr);
+		if (!work.isForkable()) {
+    		aqr.setSerial(true);
+    	}
 		CacheID cid = null;
 		CacheDirective cd = null;
 		if (workItem.getRsCache() != null && command.areResultsCachable()) {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2012-08-01 20:18:03 UTC (rev 4288)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2012-08-02 18:04:30 UTC (rev 4289)
@@ -23,10 +23,12 @@
 package org.teiid.dqp.internal.process;
 
 import java.io.IOException;
+import java.lang.ref.WeakReference;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.activation.DataSource;
@@ -81,7 +83,24 @@
  */
 public class DataTierTupleSource implements TupleSource, CompletionListener<AtomicResultsMessage> {
 	
-    // Construction state
+    private static final class MoreWorkTask implements Runnable {
+
+		WeakReference<RequestWorkItem> ref;
+
+    	public MoreWorkTask(RequestWorkItem workItem) {
+    		ref = new WeakReference<RequestWorkItem>(workItem);
+		}
+
+		@Override
+		public void run() {
+			RequestWorkItem item = ref.get();
+			if (item != null) {
+				item.moreWork();
+			}
+		}
+	}
+
+	// Construction state
     private final AtomicRequestMessage aqr;
     private final RequestWorkItem workItem;
     private final ConnectorWork cwi;
@@ -113,6 +132,9 @@
     
     boolean errored;
 	Scope scope; //this is to avoid synchronization
+	
+	private long waitUntil;
+	private ScheduledFuture<?> scheduledFuture;
     
     public DataTierTupleSource(AtomicRequestMessage aqr, RequestWorkItem workItem, ConnectorWork cwi, DataTierManagerImpl dtm, int limit) {
         this.aqr = aqr;
@@ -226,6 +248,12 @@
 	}
 
     public List<?> nextTuple() throws TeiidComponentException, TeiidProcessingException {
+    	if (waitUntil > 0 && waitUntil > System.currentTimeMillis()) {
+    		if (!this.cwi.isDataAvailable()) {
+    			throw BlockedException.block(aqr.getAtomicRequestID(), "Blocking until", waitUntil); //$NON-NLS-1$
+    		}
+    		this.waitUntil = 0;
+    	}
     	while (true) {
     		if (arm == null) {
     			if (isDone()) {
@@ -262,17 +290,8 @@
     				partial = true;
     			} catch (DataNotAvailableException e) {
     				dna = true;
-    				if (e.getRetryDelay() >= 0) {
-	    				workItem.scheduleWork(new Runnable() {
-	    					@Override
-	    					public void run() {
-								workItem.moreWork();
-	    					}
-	    				}, 10, e.getRetryDelay());
-    				} else if (this.cwi.isDataAvailable()) {
-    					continue; 
-    				}
-    				throw BlockedException.block(aqr.getAtomicRequestID(), "Blocking on DataNotAvailableException", aqr.getAtomicRequestID()); //$NON-NLS-1$
+    				handleDataNotAvailable(e);
+    				continue;
     			} finally {
     				if (!dna && results == null) {
     					errored = true;
@@ -295,6 +314,39 @@
     	}
     }
 
+	private void handleDataNotAvailable(DataNotAvailableException e)
+			throws BlockedException {
+		if (e.getWaitUntil() != null) {
+			long timeDiff = e.getWaitUntil().getTime() - System.currentTimeMillis();
+			if (timeDiff <= 0) {
+				//already met the time
+				return;
+			}
+			if (e.isStrict()) {
+				this.waitUntil = e.getWaitUntil().getTime();
+			}
+			scheduleMoreWork(timeDiff);
+		} else if (e.getRetryDelay() >= 0) {
+			if (e.isStrict()) {
+				this.waitUntil = System.currentTimeMillis() + e.getRetryDelay();
+			}
+			scheduleMoreWork(e.getRetryDelay());
+		} else if (this.cwi.isDataAvailable()) {
+			return; //no polling, but data is already available
+		} else if (e.isStrict()) {
+			//no polling, wait indefinitely
+			this.waitUntil = Long.MAX_VALUE;
+		}
+		throw BlockedException.block(aqr.getAtomicRequestID(), "Blocking on DataNotAvailableException", aqr.getAtomicRequestID()); //$NON-NLS-1$
+	}
+
+	private void scheduleMoreWork(long timeDiff) {
+		if (scheduledFuture != null) {
+			this.scheduledFuture.cancel(false);
+		}
+		scheduledFuture = workItem.scheduleWork(new MoreWorkTask(workItem), 10, timeDiff);
+	}
+
 	private void checkForUpdates(AtomicResultsMessage results, Command command,
 			EventDistributor distributor, int commandIndex, long ts) {
 		if (!RelationalNodeUtil.isUpdate(command) || !(command instanceof ProcedureContainer)) {
@@ -367,11 +419,10 @@
 		}
 		running = true;
 		if (!executed) {
-			results = cwi.execute();
+			cwi.execute();
 			executed = true;
-		} else {
-			results = cwi.more();
 		}
+		results = cwi.more();
 		return results;
 	}
     
@@ -419,6 +470,10 @@
      * @see TupleSource#closeSource()
      */
     public void closeSource() {
+    	if (this.scheduledFuture != null) {
+    		this.scheduledFuture.cancel(true);
+    		this.scheduledFuture = null;
+    	}
     	lobBuffer = null;
     	lobStore = null; //can still be referenced by lobs and will be cleaned-up by reference
     	cancelAsynch = true;

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2012-08-01 20:18:03 UTC (rev 4288)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2012-08-02 18:04:30 UTC (rev 4289)
@@ -32,6 +32,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
 
 import org.teiid.client.RequestMessage;
 import org.teiid.client.ResultsMessage;
@@ -1020,8 +1021,8 @@
     	return work;
     }
     
-    void scheduleWork(Runnable r, int priority, long delay) {
-    	dqpCore.scheduleWork(r, priority, delay);
+    ScheduledFuture<?> scheduleWork(Runnable r, int priority, long delay) {
+    	return dqpCore.scheduleWork(r, priority, delay);
     }
     
     public void setCancelTask(Task cancelTask) {

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorWorkItem.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorWorkItem.java	2012-08-01 20:18:03 UTC (rev 4288)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/datamgr/TestConnectorWorkItem.java	2012-08-02 18:04:30 UTC (rev 4289)
@@ -117,7 +117,8 @@
 		AtomicRequestMessage arm = createNewAtomicRequestMessage(1, 1);
 		arm.setCommand(command);
 		ConnectorWorkItem synchConnectorWorkItem = new ConnectorWorkItem(arm, TestConnectorManager.getConnectorManager());
-		return synchConnectorWorkItem.execute();
+		synchConnectorWorkItem.execute();
+		return synchConnectorWorkItem.more();
 	}
 	
 	@Test public void testExecutionWarning() throws Throwable {

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2012-08-01 20:18:03 UTC (rev 4288)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2012-08-02 18:04:30 UTC (rev 4289)
@@ -599,6 +599,7 @@
     
     @Test public void testDataAvailable() throws Exception {
     	agds.dataNotAvailable = -1;
+    	agds.dataAvailable = true;
     	RequestMessage reqMsg = exampleRequestMessage("select * FROM BQT1.SmallA"); 
         ResultsMessage results = execute("A", 1, reqMsg);
         if (results.getException() != null) {

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java	2012-08-01 20:18:03 UTC (rev 4288)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java	2012-08-02 18:04:30 UTC (rev 4289)
@@ -67,10 +67,12 @@
     private AutoGenDataService connectorManager = new AutoGenDataService();
     private RequestWorkItem workItem;
     private int limit = -1;
+    private boolean serial = false;
     
     @Before public void setUp() {
     	limit = -1;
     	connectorManager = new AutoGenDataService();
+    	serial = false;
     }
     
     private static Command helpGetCommand(String sql, QueryMetadataInterface metadata) throws Exception {
@@ -86,6 +88,7 @@
     private DataTierTupleSource helpSetup(String sql, int nodeId) throws Exception {
         helpSetupDataTierManager();
         AtomicRequestMessage request = helpSetupRequest(sql, nodeId);
+        request.setSerial(serial);
         return new DataTierTupleSource(request, workItem, connectorManager.registerRequest(request), dtm, limit);
     }
 
@@ -191,7 +194,7 @@
         assertNull(workItem.getConnectorRequest(info.getAtomicRequestMessage().getAtomicRequestID()));
     }
 
-	private int pullTuples(TupleSource info, int limit)
+	private int pullTuples(TupleSource info, int l)
 			throws TeiidComponentException, TeiidProcessingException,
 			InterruptedException {
 		int i = 0;
@@ -200,7 +203,7 @@
 	    		if (info.nextTuple() == null) {
 	    			break;
 	    		}
-	    		if (++i == limit) {
+	    		if (++i == l) {
 	    			break;
 	    		}
 	    	} catch (BlockedException e) {
@@ -241,6 +244,7 @@
     
     @Test public void testAsynch() throws Exception {
     	this.connectorManager.dataNotAvailable = 10;
+    	this.serial = true;
     	this.connectorManager.setRows(0);
     	DataTierTupleSource info = helpSetup(3);
     	boolean blocked = false;
@@ -250,12 +254,42 @@
 	    		break;
 	    	} catch (BlockedException e) {
 	    		blocked = true;
+	    		try {
+		    		info.nextTuple();
+	    		} catch (BlockedException be) {
+		    		fail();
+	    		}
 	    		Thread.sleep(50);
 	    	}
     	}
     	assertTrue(blocked);
     }
     
+    @Test public void testAsynchStrict() throws Exception {
+    	this.connectorManager.dataNotAvailable = 1000;
+    	this.serial = true;
+    	this.connectorManager.strict = true;
+    	this.connectorManager.setRows(0);
+    	DataTierTupleSource info = helpSetup(3);
+    	boolean blocked = false;
+    	while (true) {
+	    	try {
+	        	assertNull(info.nextTuple());
+	    		break;
+	    	} catch (BlockedException e) {
+	    		blocked = true;
+	    		try {
+		    		info.nextTuple();
+		    		fail();
+	    		} catch (BlockedException be) {
+	    			//we won't bother to wait the full second
+	    		}
+	    		break;
+	    	}
+    	}
+    	assertTrue(blocked);
+    }
+    
     @Test public void testCaching() throws Exception {
     	assertEquals(0, connectorManager.getExecuteCount().get());
 
@@ -271,6 +305,8 @@
     	assertEquals(1, connectorManager.getExecuteCount().get());
     	assertFalse(rrp.doNotCache);
     	
+    	assertEquals(1, this.rm.getRsCache().getTotalCacheEntries());
+    	
     	//same session, should be cached
     	command = helpSetupRequest("SELECT stringkey from bqt1.smalla", 1).getCommand();
     	rrp = new RegisterRequestParameter();

Modified: trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java	2012-08-01 20:18:03 UTC (rev 4288)
+++ trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java	2012-08-02 18:04:30 UTC (rev 4289)
@@ -65,7 +65,8 @@
     private int rows = 10;
     private SourceCapabilities caps;
 	public boolean throwExceptionOnExecute;
-	public int dataNotAvailable = -2;
+	public Integer dataNotAvailable;
+	public boolean strict;
 	public int sleep;
     private final AtomicInteger executeCount = new AtomicInteger();
     private final AtomicInteger closeCount = new AtomicInteger();
@@ -73,6 +74,7 @@
 	public boolean addWarning;
 	public boolean copyLobs;
 	public CacheDirective cacheDirective;
+	public boolean dataAvailable;
 
     public AutoGenDataService() {
     	super("FakeConnector","FakeConnector"); //$NON-NLS-1$ //$NON-NLS-2$
@@ -117,7 +119,7 @@
         	
         	@Override
         	public boolean isDataAvailable() {
-        		return true;
+        		return dataAvailable;
         	}
         	
         	@Override
@@ -132,20 +134,25 @@
 			
 			@Override
 			public AtomicResultsMessage more() throws TranslatorException {
-				if (dataNotAvailable == -1) {
-					dataNotAvailable = -2; 
-					item.moreWork(); //this alone is not sufficient, we have to call the data available method to prevent
-					                 //timing issues
-					throw DataNotAvailableException.NO_POLLING;
+				if (dataNotAvailable != null) {
+					int delay = dataNotAvailable;
+					dataNotAvailable = null;
+					DataNotAvailableException dnae = new DataNotAvailableException(delay);
+					dnae.setStrict(strict);
+					throw dnae;
 				}
-				if (returnedInitial) {
+				if (addWarning) {
+					msg.setWarnings(Arrays.asList(new Exception()));
+				}
+				if (!returnedInitial) {
+					returnedInitial = true;
 					return msg;
 				}
 				throw new RuntimeException("Should not be called"); //$NON-NLS-1$
 			}
 			
 			@Override
-			public AtomicResultsMessage execute() throws TranslatorException {
+			public void execute() throws TranslatorException {
 				executeCount.incrementAndGet();
 				if (sleep > 0) {
 					try {
@@ -157,19 +164,6 @@
 				if (throwExceptionOnExecute) {
 		    		throw new TranslatorException("Connector Exception"); //$NON-NLS-1$
 		    	}
-				if (dataNotAvailable > -2) {
-					int delay = dataNotAvailable;
-					if (delay == -1 && !returnedInitial) {
-						returnedInitial = true;
-						return ConnectorWorkItem.createResultsMessage(new List[0]);
-					}
-					dataNotAvailable = -2;
-					throw new DataNotAvailableException(delay);
-				}
-				if (addWarning) {
-					msg.setWarnings(Arrays.asList(new Exception()));
-				}
-				return msg;
 			}
 			
 			@Override
@@ -191,6 +185,11 @@
 			public CacheDirective getCacheDirective() {
 				return cacheDirective;
 			}
+
+			@Override
+			public boolean isForkable() {
+				return true;
+			}
 			
 		};
     }

Modified: trunk/metadata/src/test/java/org/teiid/cdk/api/ConnectorHost.java
===================================================================
--- trunk/metadata/src/test/java/org/teiid/cdk/api/ConnectorHost.java	2012-08-01 20:18:03 UTC (rev 4288)
+++ trunk/metadata/src/test/java/org/teiid/cdk/api/ConnectorHost.java	2012-08-02 18:04:30 UTC (rev 4289)
@@ -30,12 +30,12 @@
 import org.teiid.language.Command;
 import org.teiid.metadata.RuntimeMetadata;
 import org.teiid.metadata.index.VDBMetadataFactory;
-import org.teiid.translator.TranslatorException;
 import org.teiid.translator.DataNotAvailableException;
 import org.teiid.translator.Execution;
 import org.teiid.translator.ExecutionContext;
 import org.teiid.translator.ExecutionFactory;
 import org.teiid.translator.ResultSetExecution;
+import org.teiid.translator.TranslatorException;
 import org.teiid.translator.UpdateExecution;
 
 /**
@@ -138,11 +138,13 @@
 	    		}
 	    		break;
 	    	} catch (DataNotAvailableException e) {
-	    		try {
-					Thread.sleep(e.getRetryDelay());
-				} catch (InterruptedException e1) {
-					throw new TranslatorException(e1);
-				}
+	    		if (e.getRetryDelay() > 0) {
+		    		try {
+						Thread.sleep(e.getRetryDelay());
+					} catch (InterruptedException e1) {
+						throw new TranslatorException(e1);
+					}
+	    		}
 	    	}
     	}
     	return results;

Modified: trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java	2012-08-01 20:18:03 UTC (rev 4288)
+++ trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java	2012-08-02 18:04:30 UTC (rev 4289)
@@ -159,6 +159,9 @@
 			return new ConnectorManager(translatorName, connectionName) {
 				@Override
 				public Object getConnectionFactory() throws TranslatorException {
+					if (getConnectionName() == null) {
+						return null;
+					}
 					ConnectionFactoryProvider<?> connectionFactoryProvider = connectionFactoryProviders.get(getConnectionName());
 					if (connectionFactoryProvider != null) {
 						return connectionFactoryProvider.getConnectionFactory();



More information about the teiid-commits mailing list