[teiid-commits] teiid SVN: r3602 - in trunk: documentation/developer-guide/src/main/docbook/en-US/content and 5 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Nov 2 12:50:25 EDT 2011


Author: shawkins
Date: 2011-11-02 12:50:24 -0400 (Wed, 02 Nov 2011)
New Revision: 3602

Modified:
   trunk/api/src/main/java/org/teiid/translator/DataNotAvailableException.java
   trunk/api/src/main/java/org/teiid/translator/ExecutionContext.java
   trunk/documentation/developer-guide/src/main/docbook/en-US/content/translator-api.xml
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ExecutionContextImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
   trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
   trunk/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java
Log:
TEIID-1802 adding executioncontext dataavailable method

Modified: trunk/api/src/main/java/org/teiid/translator/DataNotAvailableException.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/DataNotAvailableException.java	2011-11-02 15:15:47 UTC (rev 3601)
+++ trunk/api/src/main/java/org/teiid/translator/DataNotAvailableException.java	2011-11-02 16:50:24 UTC (rev 3602)
@@ -25,7 +25,7 @@
 
 /**
  * Used by asynch connectors to indicate data is not available 
- * and results should be polled for after the given delay.
+ * and results should be polled for after the given delay in milliseconds.
  */
 public class DataNotAvailableException extends TeiidRuntimeException {
 
@@ -33,9 +33,24 @@
 
 	private long retryDelay = 0;
 	
+	/**
+	 * Indicate that the engine should not poll for results and will be notified
+	 * via the {@link ExecutionContext#dataAvailable()} method.
+	 */
+	public static final DataNotAvailableException NO_POLLING = new DataNotAvailableException(-1);
+	
+	/**
+	 * Uses a delay of 0, which implies an immediate poll for results.
+	 */
 	public DataNotAvailableException() {
 	}
 	
+	/**
+	 * Uses the given retryDelay.  Negative values indicate that the
+	 * engine should not poll for results (see also {@link DataNotAvailableException#NO_POLLING} and will be notified
+	 * via the {@link ExecutionContext#dataAvailable()} method.
+	 * @param retryDelay in milliseconds
+	 */
 	public DataNotAvailableException(long retryDelay) {
 		this.retryDelay = retryDelay;
 	}

Modified: trunk/api/src/main/java/org/teiid/translator/ExecutionContext.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/ExecutionContext.java	2011-11-02 15:15:47 UTC (rev 3601)
+++ trunk/api/src/main/java/org/teiid/translator/ExecutionContext.java	2011-11-02 16:50:24 UTC (rev 3602)
@@ -146,4 +146,10 @@
      * @return
      */
     Session getSession();
+    
+    /**
+     * Signal the engine that data is available and processing should be
+     * resumed.
+     */
+    void dataAvailable();
 }

Modified: trunk/documentation/developer-guide/src/main/docbook/en-US/content/translator-api.xml
===================================================================
--- trunk/documentation/developer-guide/src/main/docbook/en-US/content/translator-api.xml	2011-11-02 15:15:47 UTC (rev 3601)
+++ trunk/documentation/developer-guide/src/main/docbook/en-US/content/translator-api.xml	2011-11-02 16:50:24 UTC (rev 3602)
@@ -258,12 +258,16 @@
 		<section>
 			<title>Asynchronous Executions</title>
 			<para>In some scenarios, a translator needs to execute
-				asynchronously and allow the executing thread to perform other work.  To allow this, you should Throw a DataNotAvailableExecption during a retrival method, rather than explicitly waiting or sleeping for the results. The
-						DataNotAvailableException may take a delay parameter in its
+				asynchronously and allow the executing thread to perform other work.  To allow this, you should Throw a <code>DataNotAvailableExecption</code> during a retrival method, rather than explicitly waiting or sleeping for the results. 
+				<note><para>A <code>DataNotAvailableException</code> should not be thrown by the execute method.</para><note>
+				The
+						<code>DataNotAvailableException</code> may take a delay parameter in its
 						constructor to indicate how long the system should wait befor polling
-						for results.  Any non-negative value is allowed. 
+						for results.  Any non-negative value indicates a time until the next polling should be performed.
+						The <code>DataNotAvailableException.NO_POLLING</code> exception (or any DataNotAvailableException with a negative delay) can be thrown to indicate that 
+						the execution will call <code>ExecutionContext.dataAvailable<c/ode> to indicate processing should resume.
 			</para>
-			<para>Since the exection and the associated connection are not closed until the work has completed, care should be taken if using asynchronous executions that hold a lot of state.</para>
+			<note><para>Since the exection and the associated connection are not closed until the work has completed, care should be taken if using asynchronous executions that hold a lot of state.</para></note>
 		</section>
         
 		<section>

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java	2011-11-02 15:15:47 UTC (rev 3601)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java	2011-11-02 16:50:24 UTC (rev 3602)
@@ -23,6 +23,7 @@
 package org.teiid.dqp.internal.datamgr;
 
 import org.teiid.common.buffer.BlockedException;
+import org.teiid.dqp.internal.process.RequestWorkItem;
 import org.teiid.dqp.message.AtomicResultsMessage;
 import org.teiid.translator.TranslatorException;
 
@@ -40,4 +41,8 @@
 
 	AtomicResultsMessage execute() throws TranslatorException, BlockedException;
 	
+	void setRequestWorkItem(RequestWorkItem item);
+	
+	boolean isDataAvailable();
+	
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java	2011-11-02 15:15:47 UTC (rev 3601)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java	2011-11-02 16:50:24 UTC (rev 3602)
@@ -22,7 +22,6 @@
 
 package org.teiid.dqp.internal.datamgr;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,9 +29,11 @@
 import javax.resource.ResourceException;
 
 import org.teiid.adminapi.impl.VDBMetaData;
+import org.teiid.client.ResizingArrayList;
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.util.Assertion;
+import org.teiid.dqp.internal.process.RequestWorkItem;
 import org.teiid.dqp.message.AtomicRequestID;
 import org.teiid.dqp.message.AtomicRequestMessage;
 import org.teiid.dqp.message.AtomicResultsMessage;
@@ -108,6 +109,11 @@
 		this.securityContext.setTransactional(requestMsg.isTransactional());
     }
     
+    @Override
+    public void setRequestWorkItem(RequestWorkItem item) {
+    	this.securityContext.setRequestWorkItem(item);
+    }
+    
     public AtomicRequestID getId() {
 		return id;
 	}
@@ -137,6 +143,7 @@
     }
     
     public void close() {
+    	this.securityContext.setRequestWorkItem(null);
     	LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Processing Close :", this.requestMsg.getCommand()}); //$NON-NLS-1$
     	if (!error) {
             manager.logSRCCommand(this.requestMsg, this.securityContext, Event.END, this.rowCount);
@@ -273,12 +280,12 @@
     	Assertion.assertTrue(!this.lastBatch);
         LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Getting results from connector"}); //$NON-NLS-1$
         int batchSize = 0;
-        List<List> rows = new ArrayList<List>(batchSize/4);
+        List<List<?>> rows = new ResizingArrayList<List<?>>(batchSize/4);
         
         try {
 	        while (batchSize < this.requestMsg.getFetchSize()) {
 	        	
-        		List row = this.execution.next();
+        		List<?> row = this.execution.next();
             	if (row == null) {
             		this.lastBatch = true;
             		break;
@@ -306,14 +313,14 @@
 	            }
 	        }
     	} catch (DataNotAvailableException e) {
-    		if (rows.size() == 0) {
+    		if (rows.size() == 0 && this.rowCount != 0) {
     			throw e;
     		}
     	}
                 
         if (lastBatch) {
         	if (this.procedureBatchHandler != null) {
-        		List row = this.procedureBatchHandler.getParameterRow();
+        		List<?> row = this.procedureBatchHandler.getParameterRow();
         		if (row != null) {
         			rows.add(row);
         			this.rowCount++;
@@ -357,5 +364,10 @@
 	public String toString() {
 		return this.id.toString();
 	}
+	
+	@Override
+	public boolean isDataAvailable() {
+		return this.securityContext.isDataAvailable();
+	}
 
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ExecutionContextImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ExecutionContextImpl.java	2011-11-02 15:15:47 UTC (rev 3601)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ExecutionContextImpl.java	2011-11-02 16:50:24 UTC (rev 3602)
@@ -33,6 +33,7 @@
 import org.teiid.adminapi.Session;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.core.util.HashCodeUtil;
+import org.teiid.dqp.internal.process.RequestWorkItem;
 import org.teiid.translator.ExecutionContext;
 
 
@@ -66,6 +67,8 @@
     private int batchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
 	private List<Exception> warnings = new LinkedList<Exception>();
 	private Session session;
+	private RequestWorkItem worktItem;
+	private boolean dataAvailable;
     
     public ExecutionContextImpl(String vdbName, int vdbVersion,  Serializable executionPayload, 
                                 String originalConnectionID, String connectorName, String requestId, String partId, String execCount) {
@@ -215,4 +218,23 @@
 	public void setSession(Session session) {
 		this.session = session;
 	}
+
+	public void setRequestWorkItem(RequestWorkItem item) {
+		this.worktItem = item;
+	}
+	
+	@Override
+	public synchronized void dataAvailable() {
+		RequestWorkItem requestWorkItem = this.worktItem;
+		dataAvailable = true;
+		if (requestWorkItem != null) {
+			requestWorkItem.moreWork();
+		}
+	}
+	
+	public synchronized boolean isDataAvailable() {
+		boolean result = dataAvailable;
+		dataAvailable = false;
+		return result;
+	}
 }

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2011-11-02 15:15:47 UTC (rev 3601)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2011-11-02 16:50:24 UTC (rev 3602)
@@ -202,6 +202,7 @@
 		}
 		ConnectorManagerRepository cmr = workItem.getDqpWorkContext().getVDB().getAttachment(ConnectorManagerRepository.class);
 		ConnectorWork work = cmr.getConnectorManager(aqr.getConnectorName()).registerRequest(aqr);
+		work.setRequestWorkItem(workItem);
         return new DataTierTupleSource(aqr, workItem, work, this, limit);
 	}
 

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2011-11-02 15:15:47 UTC (rev 3601)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2011-11-02 16:50:24 UTC (rev 3602)
@@ -238,12 +238,16 @@
     			} catch (TranslatorException e) {
     				results = exceptionOccurred(e, true);
     			} catch (DataNotAvailableException e) {
-    				workItem.scheduleWork(new Runnable() {
-    					@Override
-    					public void run() {
-							workItem.moreWork();
-    					}
-    				}, 10, e.getRetryDelay());
+    				if (e.getRetryDelay() >= 0) {
+	    				workItem.scheduleWork(new Runnable() {
+	    					@Override
+	    					public void run() {
+								workItem.moreWork();
+	    					}
+	    				}, 10, e.getRetryDelay());
+    				} else if (this.cwi.isDataAvailable()) {
+    					continue; 
+    				}
     				throw BlockedException.block(aqr.getAtomicRequestID(), "Blocking on DataNotAvailableException"); //$NON-NLS-1$
     			} 
     			receiveResults(results);
@@ -326,7 +330,7 @@
 		return results;
 	}
 
-	private AtomicResultsMessage getResults()
+	AtomicResultsMessage getResults()
 			throws BlockedException, TeiidComponentException,
 			TranslatorException {
 		AtomicResultsMessage results = null;
@@ -443,7 +447,7 @@
 	@Override
 	public void onCompletion(FutureWork<AtomicResultsMessage> future) {
 		if (!cancelAsynch) {
-			workItem.moreWork();
+			workItem.moreWork(); //this is not necessary in some situations with DataNotAvailable
 		}
 		canAsynchClose = false;
 		if (closed.get()) {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-11-02 15:15:47 UTC (rev 3601)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-11-02 16:50:24 UTC (rev 3602)
@@ -291,9 +291,13 @@
             	}
             }                  	            
         } catch (BlockedException e) {
-            LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread", requestID, "- processor blocked"); //$NON-NLS-1$ //$NON-NLS-2$
+        	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+        		LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread", requestID, "- processor blocked"); //$NON-NLS-1$ //$NON-NLS-2$
+        	}
         } catch (QueryProcessor.ExpiredTimeSliceException e) {
-            LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread", requestID, "- time slice expired"); //$NON-NLS-1$ //$NON-NLS-2$
+        	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+        		LogManager.logDetail(LogConstants.CTX_DQP, "Request Thread", requestID, "- time slice expired"); //$NON-NLS-1$ //$NON-NLS-2$
+        	}
             this.moreWork();
         } catch (Throwable e) {
         	handleThrowable(e);

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-11-02 15:15:47 UTC (rev 3601)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-11-02 16:50:24 UTC (rev 3602)
@@ -587,6 +587,15 @@
     	assertEquals(1, this.core.getLongRunningRequests().size());
     }
     
+    @Test public void testDataAvailable() throws Exception {
+    	agds.dataNotAvailable = -1;
+    	RequestMessage reqMsg = exampleRequestMessage("select * FROM BQT1.SmallA"); 
+        ResultsMessage results = execute("A", 1, reqMsg);
+        if (results.getException() != null) {
+        	throw results.getException();
+        }
+    }
+    
 	public void helpTestVisibilityFails(String sql) throws Exception {
         RequestMessage reqMsg = exampleRequestMessage(sql); 
         reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java	2011-11-02 15:15:47 UTC (rev 3601)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java	2011-11-02 16:50:24 UTC (rev 3602)
@@ -168,4 +168,21 @@
     	}
     }
     
+    @Test public void testAsynch() throws Exception {
+    	this.connectorManager.dataNotAvailable = 10;
+    	this.connectorManager.setRows(0);
+    	helpSetup(3);
+    	boolean blocked = false;
+    	while (true) {
+	    	try {
+	        	assertNull(info.nextTuple());
+	    		break;
+	    	} catch (BlockedException e) {
+	    		blocked = true;
+	    		Thread.sleep(50);
+	    	}
+    	}
+    	assertTrue(blocked);
+    }
+    
 }

Modified: trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java	2011-11-02 15:15:47 UTC (rev 3601)
+++ trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java	2011-11-02 16:50:24 UTC (rev 3602)
@@ -35,6 +35,7 @@
 import org.teiid.dqp.internal.datamgr.ConnectorManager;
 import org.teiid.dqp.internal.datamgr.ConnectorWork;
 import org.teiid.dqp.internal.datamgr.ConnectorWorkItem;
+import org.teiid.dqp.internal.process.RequestWorkItem;
 import org.teiid.dqp.message.AtomicRequestMessage;
 import org.teiid.dqp.message.AtomicResultsMessage;
 import org.teiid.query.optimizer.TestOptimizer;
@@ -55,7 +56,7 @@
     private int rows = 10;
     private SourceCapabilities caps;
 	public boolean throwExceptionOnExecute;
-	public int dataNotAvailable = -1;
+	public int dataNotAvailable = -2;
 	public int sleep;
     private final AtomicInteger executeCount = new AtomicInteger();
     private final AtomicInteger closeCount = new AtomicInteger();
@@ -98,9 +99,31 @@
         final AtomicResultsMessage msg = ConnectorWorkItem.createResultsMessage(results);
         msg.setFinalRow(rows);
         return new ConnectorWork() {
+        	
+        	RequestWorkItem item;
+        	boolean returnedInitial;
+        	
+        	@Override
+        	public boolean isDataAvailable() {
+        		return true;
+        	}
+        	
+        	@Override
+        	public void setRequestWorkItem(RequestWorkItem item) {
+        		this.item = item;
+        	}
 			
 			@Override
 			public AtomicResultsMessage more() throws TranslatorException {
+				if (dataNotAvailable == -1) {
+					dataNotAvailable = -2; 
+					item.moreWork(); //this alone is not sufficient, we have to call the data available method to prevent
+					                 //timing issues
+					throw DataNotAvailableException.NO_POLLING;
+				}
+				if (returnedInitial) {
+					return msg;
+				}
 				throw new RuntimeException("Should not be called"); //$NON-NLS-1$
 			}
 			
@@ -117,9 +140,13 @@
 				if (throwExceptionOnExecute) {
 		    		throw new TranslatorException("Connector Exception"); //$NON-NLS-1$
 		    	}
-				if (dataNotAvailable > -1) {
+				if (dataNotAvailable > -2) {
 					int delay = dataNotAvailable;
-					dataNotAvailable = -1;
+					if (delay == -1 && !returnedInitial) {
+						returnedInitial = true;
+						return ConnectorWorkItem.createResultsMessage(new List[0]);
+					}
+					dataNotAvailable = -2;
 					throw new DataNotAvailableException(delay);
 				}
 				return msg;

Modified: trunk/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java	2011-11-02 15:15:47 UTC (rev 3601)
+++ trunk/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java	2011-11-02 16:50:24 UTC (rev 3602)
@@ -21,16 +21,11 @@
  */
 package org.teiid.transport;
 
-import java.util.Properties;
-
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.DefaultChannelPipeline;
 import org.teiid.client.security.ILogon;
 import org.teiid.common.buffer.StorageManager;
-import org.teiid.core.TeiidException;
-import org.teiid.jdbc.EmbeddedProfile;
 import org.teiid.jdbc.TeiidDriver;
-import org.teiid.net.ServerConnection;
 import org.teiid.net.TeiidURL.CONNECTION.AuthenticationType;
 import org.teiid.net.socket.ObjectChannel;
 
@@ -46,14 +41,6 @@
 		super(config, new ClientServiceRegistryImpl(ClientServiceRegistry.Type.ODBC), storageManager, portOffset);
 		this.maxLobSize = maxLobSize;
 		this.driver = new TeiidDriver();
-		this.driver.setEmbeddedProfile(new EmbeddedProfile() {
-			@Override
-			protected ServerConnection createServerConnection(Properties info)
-					throws TeiidException {
-				//When using the non-blocking api, we don't want to use the calling thread
-				return new LocalServerConnection(info, false);
-			}
-		});
 		this.logonService = logon;
 	}
 	



More information about the teiid-commits mailing list