[teiid-commits] teiid SVN: r4233 - in trunk: api/src/main/java/org/teiid/translator and 16 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Jul 12 13:45:59 EDT 2012


Author: shawkins
Date: 2012-07-12 13:45:57 -0400 (Thu, 12 Jul 2012)
New Revision: 4233

Added:
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java
Modified:
   trunk/api/src/main/java/org/teiid/CommandContext.java
   trunk/api/src/main/java/org/teiid/translator/BaseDelegatingExecutionFactory.java
   trunk/api/src/main/java/org/teiid/translator/CacheDirective.java
   trunk/api/src/main/java/org/teiid/translator/ExecutionContext.java
   trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java
   trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ExecutionContextImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/LanguageBridgeFactory.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
   trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java
   trunk/engine/src/main/java/org/teiid/query/optimizer/relational/PlanToProcessConverter.java
   trunk/engine/src/main/java/org/teiid/query/parser/ParseInfo.java
   trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
   trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java
   trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
   trunk/engine/src/main/java/org/teiid/query/processor/RegisterRequestParameter.java
   trunk/engine/src/main/java/org/teiid/query/processor/proc/ForEachRowPlan.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/AccessNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/ArrayTableNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/BatchedUpdateNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentAccessNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureAccessNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentValueSource.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/GroupingNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/InsertPlanExecutionNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/LimitNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/NullNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/PlanExecutionNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectIntoNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/TextTableNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/UnionAllNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/WindowFunctionProjectNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java
   trunk/engine/src/main/java/org/teiid/query/sql/visitor/SQLStringVisitor.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
   trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
   trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedStatement.java
   trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
Log:
TEIID-2098 TEIID-1598 	fixing several issues with maxRows and further refining connector caching logic

Modified: trunk/api/src/main/java/org/teiid/CommandContext.java
===================================================================
--- trunk/api/src/main/java/org/teiid/CommandContext.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/api/src/main/java/org/teiid/CommandContext.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -149,6 +149,7 @@
 	/**
 	 * Get the number of times this command has been reused.  Useful 
 	 * in continuous executions.
+	 * @see #isContinuous()
 	 * @return
 	 */
 	long getReuseCount();
@@ -166,4 +167,10 @@
      */
     void addWarning(Exception ex);
 
+    /**
+     * 
+     * @return true if this is a continuous query
+     */
+	boolean isContinuous();
+
 }

Modified: trunk/api/src/main/java/org/teiid/translator/BaseDelegatingExecutionFactory.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/BaseDelegatingExecutionFactory.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/api/src/main/java/org/teiid/translator/BaseDelegatingExecutionFactory.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -433,4 +433,10 @@
 	public boolean supportsOnlyLiteralComparison() {
 		return delegate.supportsOnlyLiteralComparison();
 	}
+	@Override
+	public CacheDirective getCacheDirective(Command command,
+			ExecutionContext executionContext, RuntimeMetadata metadata)
+			throws TranslatorException {
+		return delegate.getCacheDirective(command, executionContext, metadata);
+	}
 }

Modified: trunk/api/src/main/java/org/teiid/translator/CacheDirective.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/CacheDirective.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/api/src/main/java/org/teiid/translator/CacheDirective.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -30,8 +30,9 @@
 public class CacheDirective implements Serializable {
 	
 	public enum Scope {
+		NONE,
+		SESSION,
 		USER,
-		SESSION,
 		VDB
 	}
 
@@ -58,10 +59,18 @@
 		this.prefersMemory = prefersMemory;
 	}
 	
+	/**
+	 * Get the time to live in milliseconds
+	 * @return
+	 */
 	public Long getTtl() {
 		return ttl;
 	}
 	
+	/**
+	 * Set the time to live in milliseconds
+	 * @param ttl
+	 */
 	public void setTtl(Long ttl) {
 		this.ttl = ttl;
 	}

Modified: trunk/api/src/main/java/org/teiid/translator/ExecutionContext.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/ExecutionContext.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/api/src/main/java/org/teiid/translator/ExecutionContext.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -201,4 +201,10 @@
      * @return
      */
     CommandContext getCommandContext();
+
+    /**
+     * Get the {@link CacheDirective}
+     * @return
+     */
+    CacheDirective getCacheDirective();
 }

Modified: trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -46,6 +46,7 @@
 import org.teiid.metadata.FunctionParameter;
 import org.teiid.metadata.MetadataFactory;
 import org.teiid.metadata.RuntimeMetadata;
+import org.teiid.translator.CacheDirective.Scope;
 import org.teiid.translator.TypeFacility.RUNTIME_CODES;
 import org.teiid.translator.TypeFacility.RUNTIME_NAMES;
 
@@ -986,4 +987,18 @@
 		return true;
 	}
 
+	/**
+	 * Get the {@link CacheDirective} to control command caching.
+	 * <p>Use {@link Scope#NONE} to indicate to the engine that no caching should be performed by the engine.</p>
+	 * <p>If cache parameters on the {@link CacheDirective} will be changed by the {@link Execution}, then
+	 * a new instance of a {@link CacheDirective} should be set each time.</p>
+	 * @param command
+	 * @param executionContext
+	 * @param metadata
+	 * @throws TranslatorException
+	 */
+	public CacheDirective getCacheDirective(Command command, ExecutionContext executionContext, RuntimeMetadata metadata) throws TranslatorException {
+		return null;
+	}
+
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -22,6 +22,7 @@
 
 package org.teiid.common.buffer;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
@@ -156,6 +157,9 @@
 		for (Long batch : this.batches.values()) {
 			this.manager.remove(batch);
 		}
+		if (this.lobManager != null) {
+			this.lobManager.remove();
+		}
 		this.batches.clear();
 	}
 	
@@ -236,9 +240,6 @@
 			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
 	            LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Removing TupleBuffer:", this.tupleSourceID); //$NON-NLS-1$
 	        }
-			if (this.lobManager != null) {
-				this.lobManager.remove();
-			}
 			this.batchBuffer = null;
 			purge();
 			this.manager.remove();
@@ -368,5 +369,40 @@
 		}
 		return this.lobManager.getLobCount();
 	}
+
+	public void truncateTo(int rowLimit) throws TeiidComponentException {
+		if (rowCount <= rowLimit) {
+			return;
+		}
+		//TODO this could be more efficient with handling the last batch
+		TupleBatch last = this.getBatch(rowLimit);
+		TupleBatch tb = last;
+		if (this.batchBuffer != null) {
+			this.batchBuffer.clear();
+		}
+		int begin = tb.getBeginRow();
+		do {
+			if (tb == null) {
+				tb = this.getBatch(begin);
+			}
+			Long id = this.batches.remove(begin);
+			if (id != null) {
+				this.manager.remove(id);
+			}
+			if (this.lobManager != null) {
+				for (List<?> tuple : tb.getTuples()) {
+					this.lobManager.updateReferences(tuple, ReferenceMode.REMOVE);
+				}
+			}
+			begin = tb.getEndRow() + 1;
+			tb = null;
+		} while (begin <= rowCount);
+		rowCount = last.getBeginRow() - 1;
+		Iterator<List<?>> iter = last.getTuples().iterator();
+		while (rowCount < rowLimit) {
+			addTuple(iter.next());
+		}
+		saveBatch(false);
+	}
 	
 }

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -25,6 +25,7 @@
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.dqp.internal.process.RequestWorkItem;
 import org.teiid.dqp.message.AtomicResultsMessage;
+import org.teiid.translator.CacheDirective;
 import org.teiid.translator.TranslatorException;
 
 
@@ -46,5 +47,7 @@
 	boolean isDataAvailable();
 	
 	boolean copyLobs();
+
+	CacheDirective getCacheDirective() throws TranslatorException;
 	
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -40,7 +40,6 @@
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
 import org.teiid.logging.CommandLogMessage.Event;
-import org.teiid.metadata.RuntimeMetadata;
 import org.teiid.query.QueryPlugin;
 import org.teiid.query.metadata.QueryMetadataInterface;
 import org.teiid.query.metadata.TempMetadataAdapter;
@@ -49,6 +48,7 @@
 import org.teiid.query.sql.lang.QueryCommand;
 import org.teiid.query.sql.lang.StoredProcedure;
 import org.teiid.resource.spi.WrappedConnection;
+import org.teiid.translator.CacheDirective;
 import org.teiid.translator.DataNotAvailableException;
 import org.teiid.translator.Execution;
 import org.teiid.translator.ExecutionFactory;
@@ -65,7 +65,7 @@
     private ConnectorManager manager;
     private AtomicRequestMessage requestMsg;
     private ExecutionFactory<Object, Object> connector;
-    private QueryMetadataInterface queryMetadata;
+    private RuntimeMetadataImpl queryMetadata;
     
     /* Created on new request */
     private Object connection;
@@ -81,6 +81,7 @@
     private boolean error;
     
     private AtomicBoolean isCancelled = new AtomicBoolean();
+	private org.teiid.language.Command translatedCommand;
     
     ConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager) {
         this.id = message.getAtomicRequestID();
@@ -99,9 +100,13 @@
         
         this.connector = manager.getExecutionFactory();
     	VDBMetaData vdb = requestMsg.getWorkContext().getVDB();
-    	this.queryMetadata = vdb.getAttachment(QueryMetadataInterface.class);
-        this.queryMetadata = new TempMetadataAdapter(this.queryMetadata, new TempMetadataStore());
+    	QueryMetadataInterface qmi = vdb.getAttachment(QueryMetadataInterface.class);
+        qmi = new TempMetadataAdapter(qmi, new TempMetadataStore());
+        this.queryMetadata = new RuntimeMetadataImpl(qmi);
 		this.securityContext.setTransactional(requestMsg.isTransactional());
+        LanguageBridgeFactory factory = new LanguageBridgeFactory(this.queryMetadata);
+        factory.setConvertIn(!this.connector.supportsInCriteria());
+        translatedCommand = factory.translate(message.getCommand());
     }
     
     @Override
@@ -154,12 +159,14 @@
         } catch (Throwable e) {
             LogManager.logError(LogConstants.CTX_CONNECTOR, e, e.getMessage());
         } finally {
-        	try {
-        		this.connector.closeConnection(connection, connectionFactory);
-        	} catch (Throwable e) {
-        		LogManager.logError(LogConstants.CTX_CONNECTOR, e, e.getMessage());
+        	if (this.connector.isSourceRequired() && this.connection != null) {
+	        	try {
+	        		this.connector.closeConnection(connection, connectionFactory);
+	        	} catch (Throwable e) {
+	        		LogManager.logError(LogConstants.CTX_CONNECTOR, e, e.getMessage());
+	        	}
+			    LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Closed connection"}); //$NON-NLS-1$
         	}
-		    LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id, "Closed connection"}); //$NON-NLS-1$
         } 
     }
     
@@ -217,16 +224,12 @@
 	        if (command instanceof StoredProcedure) {
 	        	this.expectedColumns = ((StoredProcedure)command).getResultSetColumns().size();
 	        }
-	        LanguageBridgeFactory factory = new LanguageBridgeFactory(queryMetadata);
-	        factory.setConvertIn(!this.connector.supportsInCriteria());
-	        org.teiid.language.Command translatedCommand = factory.translate(command);
 
 			Execution exec = this.requestMsg.getCommandContext().getReusableExecution(this.securityContext.getPartIdentifier());
 			if (exec != null) {
 				((ReusableExecution)exec).reset(translatedCommand, this.securityContext, connection);
 			} else {
-		        RuntimeMetadata rmd = new RuntimeMetadataImpl(queryMetadata);
-		        exec = connector.createExecution(translatedCommand, this.securityContext, rmd, (unwrapped == null) ? this.connection:unwrapped);
+		        exec = connector.createExecution(translatedCommand, this.securityContext, queryMetadata, (unwrapped == null) ? this.connection:unwrapped);
 		        if (exec instanceof ReusableExecution<?>) {
 		        	this.requestMsg.getCommandContext().putReusableExecution(this.securityContext.getPartIdentifier(), (ReusableExecution<?>) exec);
 		        }
@@ -388,5 +391,12 @@
 	public boolean copyLobs() {
 		return this.connector.isCopyLobs();
 	}
+	
+	@Override
+	public CacheDirective getCacheDirective() throws TranslatorException {
+		CacheDirective cd = connector.getCacheDirective(this.translatedCommand, this.securityContext, this.queryMetadata);
+		this.securityContext.setCacheDirective(cd);
+		return cd;
+	}
 
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ExecutionContextImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ExecutionContextImpl.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ExecutionContextImpl.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -38,6 +38,7 @@
 import org.teiid.dqp.internal.process.RequestWorkItem;
 import org.teiid.dqp.message.RequestID;
 import org.teiid.query.util.CommandContext;
+import org.teiid.translator.CacheDirective;
 import org.teiid.translator.ExecutionContext;
 
 
@@ -63,6 +64,7 @@
 	private String generalHint;
 	private String hint;
 	private CommandContext commandContext;
+	private CacheDirective cacheDirective;
 	
 	public ExecutionContextImpl(String vdbName, int vdbVersion,  Serializable executionPayload, 
             String originalConnectionID, String connectorName, long requestId, String partId, String execCount) {
@@ -269,4 +271,13 @@
 	public int getVirtualDatabaseVersion() {
 		return getVdbVersion();
 	}
+	
+	@Override
+	public CacheDirective getCacheDirective() {
+		return cacheDirective;
+	}
+	
+	public void setCacheDirective(CacheDirective directive) {
+		this.cacheDirective = directive;
+	}
 }

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/LanguageBridgeFactory.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/LanguageBridgeFactory.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/LanguageBridgeFactory.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -139,6 +139,10 @@
         }
     }
     
+    public LanguageBridgeFactory(RuntimeMetadataImpl metadata) {
+    	this.metadataFactory = metadata;
+    }
+    
     public void setConvertIn(boolean convertIn) {
 		this.convertIn = convertIn;
 	}

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -56,9 +56,12 @@
 import org.teiid.dqp.internal.datamgr.ConnectorManager;
 import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
 import org.teiid.dqp.internal.datamgr.ConnectorWork;
+import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
+import org.teiid.dqp.internal.process.TupleSourceCache.CachableVisitor;
 import org.teiid.dqp.message.AtomicRequestMessage;
 import org.teiid.dqp.message.RequestID;
 import org.teiid.events.EventDistributor;
+import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
 import org.teiid.logging.MessageLevel;
 import org.teiid.metadata.*;
@@ -67,6 +70,7 @@
 import org.teiid.query.metadata.TempMetadataID;
 import org.teiid.query.metadata.TransformationMetadata;
 import org.teiid.query.optimizer.relational.RelationalPlanner;
+import org.teiid.query.parser.ParseInfo;
 import org.teiid.query.processor.CollectionTupleSource;
 import org.teiid.query.processor.ProcessorDataManager;
 import org.teiid.query.processor.RegisterRequestParameter;
@@ -75,12 +79,16 @@
 import org.teiid.query.sql.lang.SourceHint;
 import org.teiid.query.sql.lang.StoredProcedure;
 import org.teiid.query.sql.lang.UnaryFromClause;
+import org.teiid.query.sql.navigator.PreOrPostOrderNavigator;
 import org.teiid.query.sql.symbol.Constant;
 import org.teiid.query.sql.symbol.GroupSymbol;
 import org.teiid.query.sql.visitor.GroupCollectorVisitor;
 import org.teiid.query.tempdata.GlobalTableStore;
 import org.teiid.query.tempdata.GlobalTableStoreImpl.MatTableInfo;
 import org.teiid.query.util.CommandContext;
+import org.teiid.translator.CacheDirective;
+import org.teiid.translator.TranslatorException;
+import org.teiid.translator.CacheDirective.Scope;
 
 /**
  * Full {@link ProcessorDataManager} implementation that 
@@ -181,13 +189,51 @@
 		if (parameterObject.limit > 0) {
 			aqr.setFetchSize(Math.min(parameterObject.limit, aqr.getFetchSize()));
 		}
+		Collection<GroupSymbol> accessedGroups = null;
 		if (context.getDataObjects() != null) {
-			for (GroupSymbol gs : GroupCollectorVisitor.getGroupsIgnoreInlineViews(command, false)) {
+			accessedGroups = GroupCollectorVisitor.getGroupsIgnoreInlineViews(command, false);
+			for (GroupSymbol gs : accessedGroups) {
 				context.accessedDataObject(gs.getMetadataID());
 			}
 		}
 		ConnectorManagerRepository cmr = workItem.getDqpWorkContext().getVDB().getAttachment(ConnectorManagerRepository.class);
-		ConnectorWork work = cmr.getConnectorManager(aqr.getConnectorName()).registerRequest(aqr);
+		ConnectorManager connectorManager = cmr.getConnectorManager(aqr.getConnectorName());
+		ConnectorWork work = connectorManager.registerRequest(aqr);
+		CacheID cid = null;
+		CacheDirective cd = null;
+		if (workItem.getRsCache() != null && command.areResultsCachable()) {
+			CachableVisitor cv = new CachableVisitor();
+			PreOrPostOrderNavigator.doVisit(command, cv, PreOrPostOrderNavigator.PRE_ORDER, true);
+			if (cv.cacheable) {
+				try {
+					cd = work.getCacheDirective();
+				} catch (TranslatorException e) {
+					throw new TeiidProcessingException(QueryPlugin.Event.TEIID30504, e, aqr.getConnectorName() + ": " + e.getMessage()); //$NON-NLS-1$
+				}
+				if (cd != null) {
+					if (cd.getScope() == Scope.NONE) {
+						parameterObject.doNotCache = true;
+					} else {
+						String cmdString = command.toString();
+						if (cmdString.length() < 200000) { //TODO: this check won't be needed if keys aren't exclusively held in memory
+							cid = new CacheID(workItem.getDqpWorkContext(), ParseInfo.DEFAULT_INSTANCE, cmdString);
+							cid.setParameters(cv.parameters);
+							CachedResults cr = workItem.getRsCache().get(cid);
+							if (cr != null) {
+								parameterObject.doNotCache = true;
+								LogManager.logDetail(LogConstants.CTX_DQP, "Using cache entry for", cid); //$NON-NLS-1$
+								work.close();
+								return cr.getResults().createIndexedTupleSource();
+							}
+						}
+					}
+				} else {
+					LogManager.logTrace(LogConstants.CTX_DQP, aqr.getAtomicRequestID(), "no cache directive"); //$NON-NLS-1$
+				}
+			} else {
+				LogManager.logTrace(LogConstants.CTX_DQP, aqr.getAtomicRequestID(), "command not cachable"); //$NON-NLS-1$
+			}
+		}
 		work.setRequestWorkItem(workItem);
         return new DataTierTupleSource(aqr, workItem, work, this, parameterObject.limit);
 	}

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -128,7 +128,8 @@
      * @throws TeiidProcessingException 
      * @see org.teiid.dqp.internal.process.Request#generatePlan()
      */
-    protected void generatePlan() throws TeiidComponentException, TeiidProcessingException {
+	@Override
+    protected void generatePlan(boolean addLimit) throws TeiidComponentException, TeiidProcessingException {
     	String sqlQuery = requestMsg.getCommands()[0];
     	CacheID id = new CacheID(this.workContext, Request.createParseInfo(this.requestMsg), sqlQuery);
         prepPlan = prepPlanCache.get(id);
@@ -151,21 +152,19 @@
             //if prepared plan does not exist, create one
             prepPlan = new PreparedPlan();
             LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Query does not exist in cache: ", sqlQuery}); //$NON-NLS-1$
-            super.generatePlan();
-	        if (!this.addedLimit) { //TODO: this is a little problematic
-            	prepPlan.setCommand(this.userCommand);
-		        // Defect 13751: Clone the plan in its current state (i.e. before processing) so that it can be used for later queries
-		        prepPlan.setPlan(processPlan.clone(), this.context);
-		        prepPlan.setAnalysisRecord(analysisRecord);
-				
-		        Determinism determinismLevel = this.context.getDeterminismLevel();
-				if (userCommand.getCacheHint() != null && userCommand.getCacheHint().getDeterminism() != null) {
-					LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Cache hint modified the query determinism from ",this.context.getDeterminismLevel(), " to ", determinismLevel }); //$NON-NLS-1$ //$NON-NLS-2$
-					determinismLevel = userCommand.getCacheHint().getDeterminism();
-				}		        
-		        
-		        this.prepPlanCache.put(id, determinismLevel, prepPlan, userCommand.getCacheHint() != null?userCommand.getCacheHint().getTtl():null);
-	        }
+            super.generatePlan(false);
+        	prepPlan.setCommand(this.userCommand);
+	        // Defect 13751: Clone the plan in its current state (i.e. before processing) so that it can be used for later queries
+	        prepPlan.setPlan(processPlan.clone(), this.context);
+	        prepPlan.setAnalysisRecord(analysisRecord);
+			
+	        Determinism determinismLevel = this.context.getDeterminismLevel();
+			if (userCommand.getCacheHint() != null && userCommand.getCacheHint().getDeterminism() != null) {
+				LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Cache hint modified the query determinism from ",this.context.getDeterminismLevel(), " to ", determinismLevel }); //$NON-NLS-1$ //$NON-NLS-2$
+				determinismLevel = userCommand.getCacheHint().getDeterminism();
+			}		        
+	        
+	        this.prepPlanCache.put(id, determinismLevel, prepPlan, userCommand.getCacheHint() != null?userCommand.getCacheHint().getTtl():null);
         }
         
         if (requestMsg.isBatchedUpdate()) {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -373,7 +373,7 @@
      * @throws TeiidComponentException
      * @throws TeiidProcessingException 
      */
-    protected void generatePlan() throws TeiidComponentException, TeiidProcessingException {
+    protected void generatePlan(boolean addLimit) throws TeiidComponentException, TeiidProcessingException {
         Command command = parseCommand();
 
         List<Reference> references = ReferenceCollectorVisitor.getReferences(command);
@@ -404,7 +404,7 @@
          * Adds a row limit to a query if Statement.setMaxRows has been called and the command
          * doesn't already have a limit clause.
          */
-        if (requestMsg.getRowLimit() > 0 && command instanceof QueryCommand) {
+        if (addLimit && requestMsg.getRowLimit() > 0 && command instanceof QueryCommand) {
             QueryCommand query = (QueryCommand)command;
             if (query.getLimit() == null) {
                 query.setLimit(new Limit(null, new Constant(new Integer(requestMsg.getRowLimit()), DataTypeManager.DefaultDataClasses.INTEGER)));
@@ -447,7 +447,7 @@
     	
         initMetadata();
         
-        generatePlan();
+        generatePlan(true);
         
         postProcessXML();
         

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -581,6 +581,9 @@
 				}
 			}
 		};
+		if (!request.addedLimit && this.requestMsg.getRowLimit() > 0) {
+        	this.collector.setRowLimit(this.requestMsg.getRowLimit());
+        }
 		this.resultsBuffer = collector.getTupleBuffer();
 		if (this.resultsBuffer == null) {
 			//This is just a dummy result it will get replaced by collector source
@@ -632,6 +635,10 @@
 		}
         dqpCore.getRsCache().put(cid, determinismLevel, cr, originalCommand.getCacheHint() != null?originalCommand.getCacheHint().getTtl():null);
 	}
+	
+	public SessionAwareCache<CachedResults> getRsCache() {
+		return dqpCore.getRsCache();
+	}
 
 	/**
 	 * Send results if they have been requested.  This should only be called from the processing thread.

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -38,7 +38,6 @@
 import org.teiid.cache.DefaultCacheFactory;
 import org.teiid.cache.CacheConfiguration.Policy;
 import org.teiid.common.buffer.TupleBufferCache;
-import org.teiid.core.types.DataTypeManager;
 import org.teiid.core.util.Assertion;
 import org.teiid.core.util.EquivalenceUtil;
 import org.teiid.core.util.HashCodeUtil;
@@ -261,7 +260,7 @@
 		 */
 		public boolean setParameters(List<?> parameters) {
 			if (parameters !=  null && !parameters.isEmpty()) {
-				this.parameters = new ArrayList<Serializable>();
+				this.parameters = new ArrayList<Serializable>(parameters.size());
 				for (Object obj:parameters) {
 					if (obj == null) {
 						this.parameters.add(null);
@@ -270,11 +269,6 @@
 					if (!(obj instanceof Serializable)) {
 						return false;
 					}
-					
-					Class<?> type = DataTypeManager.determineDataTypeClass(obj);
-					if (type == DataTypeManager.DefaultDataClasses.OBJECT) {
-						return false;
-					}
 					this.parameters.add((Serializable)obj);
 				}
 			}

Added: trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -0,0 +1,167 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.dqp.internal.process;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.TupleBuffer;
+import org.teiid.common.buffer.TupleSource;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
+import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.types.DataTypeManager;
+import org.teiid.query.processor.ProcessorDataManager;
+import org.teiid.query.processor.RegisterRequestParameter;
+import org.teiid.query.sql.LanguageVisitor;
+import org.teiid.query.sql.lang.Command;
+import org.teiid.query.sql.lang.DependentSetCriteria;
+import org.teiid.query.sql.symbol.Constant;
+import org.teiid.query.util.CommandContext;
+
+public class TupleSourceCache {
+	
+	final static class CachableVisitor extends LanguageVisitor {
+		boolean cacheable = true;
+		List<Object> parameters;
+
+		@Override
+		public void visit(Constant c) {
+			if (c.isMultiValued()) {
+				notCachable();
+			} else if (DataTypeManager.isLOB(c.getType())) {
+				if (parameters == null) {
+					parameters = new ArrayList<Object>();
+				}
+				parameters.add(c.getValue());
+			}
+		}
+
+		private void notCachable() {
+			cacheable = false;
+			setAbort(true);
+		}
+
+		@Override
+		public void visit(DependentSetCriteria obj) {
+			notCachable();
+		}
+	}
+	
+    private static class SharedState {
+    	TupleBuffer tb;
+    	TupleSource ts;
+    	int id;
+    	int expectedReaders;
+    	
+    	private void remove() {
+    		ts.closeSource();
+			tb.remove();
+			tb = null;
+			ts = null;
+    	}
+    }
+	
+	public abstract static class BufferedTupleSource implements TupleSource {
+		private int rowNumber = 1;
+		private TupleBuffer tb;
+		private TupleSource ts;
+		
+		protected BufferedTupleSource(TupleBuffer tb, TupleSource ts) {
+			this.tb = tb;
+			this.ts = ts;
+		}
+
+		@Override
+		public List<?> nextTuple() throws TeiidComponentException,
+				TeiidProcessingException {
+			if (rowNumber <= tb.getRowCount()) {
+				return tb.getBatch(rowNumber).getTuple(rowNumber++);
+			}
+			if (tb.isFinal()) {
+				return null;
+			}
+			List<?> row = ts.nextTuple();
+			if (row == null) {
+				tb.setFinal(true);
+			} else {
+				tb.addTuple(row);
+				rowNumber++;
+			}
+			return row;
+		}
+
+	}
+	
+	private class SharedTupleSource extends BufferedTupleSource {
+		private SharedState state;
+		
+		public SharedTupleSource(SharedState state) {
+			super(state.tb, state.ts);
+			this.state = state;
+		}
+		
+		@Override
+		public void closeSource() {
+			if (--state.expectedReaders == 0 && sharedStates != null && sharedStates.containsKey(state.id)) {
+				state.remove();
+				sharedStates.remove(state.id);
+			}
+		}		
+	}
+	
+    private Map<Integer, SharedState> sharedStates;
+    
+    public void close() {
+    	if (sharedStates != null) {
+    		for (SharedState ss : sharedStates.values()) {
+				ss.remove();
+			}
+    		sharedStates = null;
+    	}
+    }
+    
+    public TupleSource getSharedTupleSource(CommandContext context, Command command, String modelName, RegisterRequestParameter parameterObject, BufferManager bufferMgr, ProcessorDataManager pdm) throws TeiidComponentException, TeiidProcessingException {
+		if (sharedStates == null) {
+			sharedStates = new HashMap<Integer, SharedState>();
+		}
+		SharedState state = sharedStates.get(parameterObject.info.id);
+		if (state == null) {
+			state = new SharedState();
+			state.expectedReaders = parameterObject.info.sharingCount;
+			RegisterRequestParameter param = new RegisterRequestParameter(parameterObject.connectorBindingId, 0, -1);
+			state.ts = pdm.registerRequest(context, command, modelName, param);
+			if (param.doNotCache) {
+				return state.ts;
+			}
+			state.tb = bufferMgr.createTupleBuffer(command.getProjectedSymbols(), context.getConnectionId(), TupleSourceType.PROCESSOR);
+			state.id = parameterObject.info.id;
+			sharedStates.put(parameterObject.info.id, state);
+		}
+		return new SharedTupleSource(state);
+    }
+
+}


Property changes on: trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -229,13 +229,13 @@
 	public Boolean evaluate(CompoundCriteria criteria, List<?> tuple)
 		throws ExpressionEvaluationException, BlockedException, TeiidComponentException {
 
-		List subCrits = criteria.getCriteria();
-		Iterator subCritIter = subCrits.iterator();
+		List<Criteria> subCrits = criteria.getCriteria();
+		Iterator<Criteria> subCritIter = subCrits.iterator();
 
 		boolean and = criteria.getOperator() == CompoundCriteria.AND;
         Boolean result = and?Boolean.TRUE:Boolean.FALSE;
 		while(subCritIter.hasNext()) {
-			Criteria subCrit = (Criteria) subCritIter.next();
+			Criteria subCrit = subCritIter.next();
 			Boolean value = evaluateTVL(subCrit, tuple);
             if (value == null) {
 				result = null;

Modified: trunk/engine/src/main/java/org/teiid/query/optimizer/relational/PlanToProcessConverter.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/optimizer/relational/PlanToProcessConverter.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/optimizer/relational/PlanToProcessConverter.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -28,6 +28,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.teiid.api.exception.query.QueryMetadataException;
 import org.teiid.api.exception.query.QueryPlannerException;
@@ -80,7 +81,7 @@
 	
 	//state for detecting and reusing source queries
 	private Map<Command, AccessNode> sharedCommands = new HashMap<Command, AccessNode>();
-	private int sharedId;
+	private static AtomicInteger sharedId = new AtomicInteger();
 	
 	public static class SharedStateKey {
 		int id;
@@ -114,7 +115,6 @@
 	        return processPlan;
     	} finally {
     		sharedCommands.clear();
-    		sharedId = 0;
     	}
     }
 
@@ -538,7 +538,7 @@
 		} else {
 			if (other.info == null) {
 				other.info = new RegisterRequestParameter.SharedAccessInfo();
-				other.info.id = sharedId++;
+				other.info.id = sharedId.getAndIncrement();
 			}
 			other.info.sharingCount++;
 			aNode.info = other.info;

Modified: trunk/engine/src/main/java/org/teiid/query/parser/ParseInfo.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/parser/ParseInfo.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/parser/ParseInfo.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -33,6 +33,11 @@
     private static final boolean ANSI_QUOTED_DEFAULT = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.ansiQuotedIdentifiers", true); //$NON-NLS-1$
 
     public int referenceCount = 0;
+    
+    public static final ParseInfo DEFAULT_INSTANCE = new ParseInfo();
+    static {
+    	DEFAULT_INSTANCE.ansiQuotedIdentifiers = true;
+    }
 
     // treat a double quoted variable as variable instead of string 
     public boolean ansiQuotedIdentifiers=ANSI_QUOTED_DEFAULT;

Modified: trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -58,14 +58,17 @@
 	    
 	    /**
 	     * return the final tuple buffer or null if not available
+	     * @param maxRows
 	     * @return
 	     * @throws TeiidProcessingException 
 	     * @throws TeiidComponentException 
 	     * @throws BlockedException 
 	     */
-	    TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException, TeiidProcessingException;
+	    TupleBuffer getFinalBuffer(int maxRows) throws BlockedException, TeiidComponentException, TeiidProcessingException;
 	    
 	    boolean hasFinalBuffer();
+	    
+	    void close() throws TeiidComponentException;
 	}
 	
 	public static class BatchProducerTupleSource implements TupleSource {
@@ -119,11 +122,14 @@
     private boolean done = false;
     private TupleBuffer buffer;
     private boolean forwardOnly;
+    private int rowLimit = -1; //-1 means no_limit
+    private boolean hasFinalBuffer;
     
     public BatchCollector(BatchProducer sourceNode, BufferManager bm, CommandContext context, boolean forwardOnly) throws TeiidComponentException {
         this.sourceNode = sourceNode;
         this.forwardOnly = forwardOnly;
-        if (!this.sourceNode.hasFinalBuffer()) {
+        this.hasFinalBuffer = this.sourceNode.hasFinalBuffer();
+        if (!this.hasFinalBuffer) {
             this.buffer = bm.createTupleBuffer(sourceNode.getOutputElements(), context.getConnectionId(), TupleSourceType.PROCESSOR);
             this.buffer.setForwardOnly(forwardOnly);
         }
@@ -132,9 +138,9 @@
     public TupleBuffer collectTuples() throws TeiidComponentException, TeiidProcessingException {
         TupleBatch batch = null;
     	while(!done) {
-    		if (this.sourceNode.hasFinalBuffer()) {
+    		if (this.hasFinalBuffer) {
 	    		if (this.buffer == null) {
-	    			TupleBuffer finalBuffer = this.sourceNode.getFinalBuffer();
+	    			TupleBuffer finalBuffer = this.sourceNode.getFinalBuffer(rowLimit);
 	    			Assertion.isNotNull(finalBuffer);
 					this.buffer = finalBuffer;
 	    		}
@@ -145,6 +151,17 @@
 				}
     		}
     		batch = sourceNode.nextBatch();
+    		
+    		if (rowLimit > 0 && rowLimit <= batch.getEndRow()) {
+    	    	if (!done) {
+    	    		this.sourceNode.close();
+    	    	}
+    	    	if (rowLimit < batch.getEndRow()) {
+    	    		List<List<?>> tuples = batch.getTuples().subList(0, rowLimit - batch.getBeginRow() + 1);
+    	    		batch = new TupleBatch(batch.getBeginRow(), tuples);
+    	    	}
+    	    	batch.setTerminationFlag(true);
+    	    }
             
             flushBatch(batch);
 
@@ -176,7 +193,7 @@
     
     @SuppressWarnings("unused")
 	protected void flushBatchDirect(TupleBatch batch, boolean add) throws TeiidComponentException, TeiidProcessingException {
-    	if (!this.sourceNode.hasFinalBuffer()) {
+    	if (!this.hasFinalBuffer) {
     		buffer.addTupleBatch(batch, add);
     	}
     }
@@ -188,4 +205,8 @@
         return buffer.getRowCount();
     }
     
+    public void setRowLimit(int rowLimit) {
+		this.rowLimit = rowLimit;
+	}
+    
 }

Modified: trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -149,7 +149,7 @@
      * @throws TeiidComponentException 
      * @throws BlockedException 
      */
-    public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException, TeiidProcessingException {
+    public TupleBuffer getFinalBuffer(int maxRows) throws BlockedException, TeiidComponentException, TeiidProcessingException {
     	return null;
     }
     

Modified: trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -22,35 +22,30 @@
 
 package org.teiid.query.processor;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.TupleBatch;
 import org.teiid.common.buffer.TupleBuffer;
-import org.teiid.common.buffer.TupleSource;
 import org.teiid.common.buffer.BufferManager.BufferReserveMode;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.TeiidRuntimeException;
 import org.teiid.core.util.Assertion;
-import org.teiid.events.EventDistributor;
+import org.teiid.dqp.internal.process.TupleSourceCache;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
 import org.teiid.logging.MessageLevel;
 import org.teiid.query.QueryPlugin;
 import org.teiid.query.processor.BatchCollector.BatchProducer;
-import org.teiid.query.sql.lang.Command;
 import org.teiid.query.util.CommandContext;
 
 /**
  * Driver for plan processing.
  */
-public class QueryProcessor implements BatchProducer, ProcessorDataManager {
+public class QueryProcessor implements BatchProducer {
 
 	public static class ExpiredTimeSliceException extends TeiidRuntimeException {
 		private static final long serialVersionUID = 4585044674826578060L;
@@ -62,56 +57,6 @@
 		QueryProcessor createQueryProcessor(String query, String recursionGroup, CommandContext commandContext, Object... params) throws TeiidProcessingException, TeiidComponentException;
 	}
 	
-    private class SharedState {
-    	TupleBuffer tb;
-    	TupleSource ts;
-    	int id;
-    	int expectedReaders;
-    	
-    	private void remove() {
-    		ts.closeSource();
-			tb.remove();
-			tb = null;
-			ts = null;
-    	}
-    }
-    
-	private final class BufferedTupleSource implements TupleSource {
-		private int rowNumber = 1;
-		private SharedState state;
-		
-		private BufferedTupleSource(SharedState state) {
-			this.state = state;
-		}
-
-		@Override
-		public List<?> nextTuple() throws TeiidComponentException,
-				TeiidProcessingException {
-			if (rowNumber <= state.tb.getRowCount()) {
-				return state.tb.getBatch(rowNumber).getTuple(rowNumber++);
-			}
-			if (state.tb.isFinal()) {
-				return null;
-			}
-			List<?> row = state.ts.nextTuple();
-			if (row == null) {
-				state.tb.setFinal(true);
-			} else {
-				this.state.tb.addTuple(row);
-				rowNumber++;
-			}
-			return row;
-		}
-
-		@Override
-		public void closeSource() {
-			if (--state.expectedReaders == 0 && sharedStates != null && sharedStates.containsKey(state.id)) {
-				state.remove();
-				sharedStates.remove(state.id);
-			}
-		}
-	}
-	
     private CommandContext context;
 	private ProcessorDataManager dataMgr;
 	private BufferManager bufferMgr;
@@ -123,11 +68,10 @@
     private volatile boolean requestCanceled;
     private static final int DEFAULT_WAIT = 50;       
     private boolean processorClosed;
+    
     private boolean continuous;
     private int rowOffset = 1;
     
-    Map<Integer, SharedState> sharedStates;
-         
     /**
      * Construct a processor with all necessary information to process.
      * @param plan The plan to process
@@ -138,6 +82,7 @@
      */
     public QueryProcessor(ProcessorPlan plan, CommandContext context, BufferManager bufferMgr, final ProcessorDataManager dataMgr) {
         this.context = context;
+        this.context.setTupleSourceCache(new TupleSourceCache());
         this.dataMgr = dataMgr;
 		this.processPlan = plan;
 		this.bufferMgr = bufferMgr;
@@ -203,6 +148,7 @@
 
 	        		if (result.getTerminationFlag()) {
 	        			result.setTerminationFlag(false);
+	        			this.context.getTupleSourceCache().close();
 		        		this.processPlan.close();
 		        		this.processPlan.reset();
 		        		this.context.incrementReuseCount();
@@ -245,7 +191,7 @@
 		// initialize if necessary
 		if(!initialized) {
 			reserved = this.bufferMgr.reserveBuffers(this.bufferMgr.getSchemaSize(this.getOutputElements()), BufferReserveMode.FORCE);
-			this.processPlan.initialize(context, this, bufferMgr);
+			this.processPlan.initialize(context, dataMgr, bufferMgr);
 			initialized = true;
 		}
 		
@@ -266,12 +212,7 @@
     	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
     		LogManager.logDetail(LogConstants.CTX_DQP, "QueryProcessor: closing processor"); //$NON-NLS-1$
     	}
-    	if (sharedStates != null) {
-    		for (SharedState ss : sharedStates.values()) {
-				ss.remove();
-			}
-    		sharedStates = null;
-    	}
+		this.context.getTupleSourceCache().close();
 		this.bufferMgr.releaseBuffers(reserved);
 		reserved = 0;
         processorClosed = true;
@@ -309,12 +250,12 @@
 	}
 
 	@Override
-	public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException, TeiidProcessingException {
+	public TupleBuffer getFinalBuffer(int maxRows) throws BlockedException, TeiidComponentException, TeiidProcessingException {
 		while (true) {
 	    	long wait = DEFAULT_WAIT;
 	    	try {
 	    		init();
-	    		return this.processPlan.getFinalBuffer();
+	    		return this.processPlan.getFinalBuffer(maxRows);
 	    	} catch (BlockedException e) {
 	    		if (!this.context.isNonBlocking()) {
 	    			throw e;
@@ -345,41 +286,14 @@
 	
 	public void setContinuous(boolean continuous) {
 		this.continuous = continuous;
+		if (this.continuous) {
+			this.context.setContinuous();
+		}
 	}
-
-	@Override
-	public Object lookupCodeValue(CommandContext ctx, String codeTableName,
-			String returnElementName, String keyElementName, Object keyValue)
-			throws BlockedException, TeiidComponentException,
-			TeiidProcessingException {
-		return dataMgr.lookupCodeValue(ctx, codeTableName, returnElementName, keyElementName, keyValue);
-	}
 	
 	@Override
-	public EventDistributor getEventDistributor() {
-		return dataMgr.getEventDistributor();
+	public void close() throws TeiidComponentException {
+		closeProcessing();
 	}
 
-	@Override
-	public TupleSource registerRequest(CommandContext ctx, Command command,
-			String modelName, RegisterRequestParameter parameterObject)
-			throws TeiidComponentException, TeiidProcessingException {
-		if (parameterObject.info == null) {
-			return dataMgr.registerRequest(ctx, command, modelName, parameterObject);
-		}
-		//begin handling of shared commands
-		if (sharedStates == null) {
-			sharedStates = new HashMap<Integer, SharedState>();
-		}
-		SharedState state = sharedStates.get(parameterObject.info.id);
-		if (state == null) {
-			state = new SharedState();
-			state.expectedReaders = parameterObject.info.sharingCount;
-			state.tb = QueryProcessor.this.bufferMgr.createTupleBuffer(command.getProjectedSymbols(), ctx.getConnectionId(), TupleSourceType.PROCESSOR);
-			state.ts = dataMgr.registerRequest(ctx, command, modelName, new RegisterRequestParameter(parameterObject.connectorBindingId, 0, -1));
-			state.id = parameterObject.info.id;
-			sharedStates.put(parameterObject.info.id, state);
-		}
-		return new BufferedTupleSource(state);
-	}
 }

Modified: trunk/engine/src/main/java/org/teiid/query/processor/RegisterRequestParameter.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/RegisterRequestParameter.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/RegisterRequestParameter.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -33,6 +33,7 @@
 	public int nodeID = 0;
 	public int limit = -1;
 	public SharedAccessInfo info;
+	public boolean doNotCache;
 
 	public RegisterRequestParameter(String connectorBindingId, int nodeID,
 			int limit) {

Modified: trunk/engine/src/main/java/org/teiid/query/processor/proc/ForEachRowPlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/proc/ForEachRowPlan.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/proc/ForEachRowPlan.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -104,7 +104,8 @@
 			}
 			if (this.rowProcessor == null) {
 				rowProcedure.reset();
-				this.rowProcessor = new QueryProcessor(rowProcedure, getContext(), this.bufferMgr, this.dataMgr);
+				CommandContext context = getContext().clone();
+				this.rowProcessor = new QueryProcessor(rowProcedure, context, this.bufferMgr, this.dataMgr);
 				for (Map.Entry<ElementSymbol, Expression> entry : this.params.entrySet()) {
 					Integer index = (Integer)this.lookupMap.get(entry.getValue());
 					if (index != null) {

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/AccessNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/AccessNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/AccessNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -411,22 +411,22 @@
 
 	public Object clone(){
 		AccessNode clonedNode = new AccessNode();
-		this.copy(this, clonedNode);
+		this.copyTo(clonedNode);
 		return clonedNode;
 	}
 
-	protected void copy(AccessNode source, AccessNode target){
-		super.copy(source, target);
-		target.modelName = source.modelName;
-		target.modelId = source.modelId;
-		target.connectorBindingId = source.connectorBindingId;
-		target.shouldEvaluate = source.shouldEvaluate;
-		if (!source.shouldEvaluate) {
-			target.projection = source.projection;
-			target.originalSelect = source.originalSelect;
+	protected void copyTo(AccessNode target){
+		super.copyTo(target);
+		target.modelName = modelName;
+		target.modelId = modelId;
+		target.connectorBindingId = connectorBindingId;
+		target.shouldEvaluate = shouldEvaluate;
+		if (!shouldEvaluate) {
+			target.projection = projection;
+			target.originalSelect = originalSelect;
 		}
-		target.command = source.command;
-		target.info = source.info;
+		target.command = command;
+		target.info = info;
 	}
 
     public PlanNode getDescriptionProperties() {

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/ArrayTableNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/ArrayTableNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/ArrayTableNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -82,7 +82,7 @@
 	@Override
 	public ArrayTableNode clone() {
 		ArrayTableNode clone = new ArrayTableNode(getID());
-		this.copy(this, clone);
+		this.copyTo(clone);
 		clone.setTable(table);
 		return clone;
 	}

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/BatchedUpdateNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/BatchedUpdateNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/BatchedUpdateNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -176,7 +176,7 @@
      */
     public Object clone() {
         BatchedUpdateNode clonedNode = new BatchedUpdateNode(getID(), updateCommands, contexts, shouldEvaluate, modelName);
-        super.copy(this, clonedNode);
+        super.copyTo(clonedNode);
         return clonedNode;
     }
     

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentAccessNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentAccessNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentAccessNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -105,7 +105,7 @@
         clonedNode.maxSetSize = this.maxSetSize;
         clonedNode.maxPredicates = this.maxPredicates;
         clonedNode.pushdown = this.pushdown;
-        super.copy(this, clonedNode);
+        super.copyTo(clonedNode);
         return clonedNode;
     }
 

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureAccessNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureAccessNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureAccessNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -59,7 +59,7 @@
         DependentProcedureAccessNode copy = new DependentProcedureAccessNode(getID(), inputCriteria,
                                                                                    inputReferences,
                                                                                    inputDefaults);
-        copy(this, copy);
+        copyTo(copy);
         return copy;
     }
     

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -58,7 +58,7 @@
         DependentProcedureExecutionNode copy = new DependentProcedureExecutionNode(getID(), (Criteria)inputCriteria.clone(),
                                                                                    inputReferences,
                                                                                    inputDefaults);
-        copy(this, copy);
+        copyTo(copy);
         return copy;
     }
     

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentValueSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentValueSource.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentValueSource.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -24,6 +24,7 @@
 
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
@@ -47,12 +48,18 @@
                                  ValueIteratorSource {
 
     private TupleBuffer buffer;
+    private List<? extends Expression> schema;
     private Map<Expression, Set<Object>> cachedSets;
     private boolean unused; //TODO: use this value instead of the context
     private boolean distinct;
+
+    public DependentValueSource(TupleBuffer tb) {
+    	this(tb, tb.getSchema());
+    }
     
-    public DependentValueSource(TupleBuffer tupleSourceID) {
-        this.buffer = tupleSourceID;
+    public DependentValueSource(TupleBuffer tb, List<? extends Expression> schema) {
+        this.buffer = tb;
+        this.schema = schema;
     }
     
     public TupleBuffer getTupleBuffer() {
@@ -67,7 +74,7 @@
     	IndexedTupleSource its = buffer.createIndexedTupleSource();
     	int index = 0;
     	if (valueExpression != null) {
-    		index = buffer.getSchema().indexOf(valueExpression);
+    		index = schema.indexOf(valueExpression);
     		Assertion.assertTrue(index != -1);
     	}
         return new TupleSourceValueIterator(its, index);
@@ -85,10 +92,10 @@
 			IndexedTupleSource its = buffer.createIndexedTupleSource();
         	int index = 0;
         	if (valueExpression != null) {
-        		index = buffer.getSchema().indexOf(valueExpression);
+        		index = schema.indexOf(valueExpression);
         	}
         	Assertion.assertTrue(index != -1);
-        	Class<?> type = ((Expression)buffer.getSchema().get(index)).getType();
+        	Class<?> type = ((Expression)schema.get(index)).getType();
         	if (!DataTypeManager.isHashable(type)) {
         		result = new TreeSet<Object>(Constant.COMPARATOR);
     		} else {

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/GroupingNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/GroupingNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/GroupingNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -449,7 +449,7 @@
 
 	public Object clone(){
 		GroupingNode clonedNode = new GroupingNode(super.getID());
-		super.copy(this, clonedNode);
+		super.copyTo(clonedNode);
 		clonedNode.removeDuplicates = removeDuplicates;
 		clonedNode.outputMapping = outputMapping;
 		clonedNode.orderBy = orderBy;

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/InsertPlanExecutionNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/InsertPlanExecutionNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/InsertPlanExecutionNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -92,13 +92,13 @@
 	
 	public Object clone(){
 		InsertPlanExecutionNode clonedNode = new InsertPlanExecutionNode(super.getID(), this.metadata);
-		copy(this, clonedNode);
+		copyTo(clonedNode);
         return clonedNode;
 	}
 	
-	protected void copy(InsertPlanExecutionNode source, InsertPlanExecutionNode target) {
-		target.references = source.references;
-		super.copy(source, target);
+	protected void copyTo(InsertPlanExecutionNode target) {
+		target.references = references;
+		super.copyTo(target);
 	}
 	
 	@Override

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -154,7 +154,7 @@
      */
     public Object clone() {
         JoinNode clonedNode = new JoinNode(super.getID());
-        super.copy(this, clonedNode);
+        super.copyTo(clonedNode);
         
         clonedNode.joinType = this.joinType;
         clonedNode.joinStrategy = this.joinStrategy.clone();
@@ -186,7 +186,8 @@
             this.joinStrategy.loadLeft();
             if (isDependent()) { 
                 TupleBuffer buffer = this.joinStrategy.leftSource.getTupleBuffer();
-                dvs = new DependentValueSource(buffer);
+                //the tuplebuffer may be from a lower node, so pass in the schema
+                dvs = new DependentValueSource(buffer, this.joinStrategy.leftSource.getSource().getElements());
                 dvs.setDistinct(this.joinStrategy.leftSource.isDistinct());
                 this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource, dvs);
             }

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/LimitNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/LimitNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/LimitNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -30,6 +30,7 @@
 import org.teiid.client.plan.PlanNode;
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.query.eval.Evaluator;
@@ -168,7 +169,7 @@
     public Object clone() {
         LimitNode node = new LimitNode(getID(), limitExpr, offsetExpr);
         node.implicit = this.implicit;
-        copy(this, node);
+        copyTo(node);
         node.rowCounter = this.rowCounter;
         return node;
     }
@@ -188,5 +189,24 @@
 	public int getOffset() {
 		return offset;
 	}
+	
+	@Override
+	public boolean hasFinalBuffer() {
+		//TODO: support offset
+		return offsetExpr == null && this.getChildren()[0].hasFinalBuffer();
+	}
+	
+	@Override
+	public TupleBuffer getFinalBuffer(int maxRows) throws BlockedException,
+			TeiidComponentException, TeiidProcessingException {
+		if (maxRows >= 0) {
+			if (limit >= 0) {
+				maxRows = Math.min(maxRows, limit);
+			}
+		} else {
+			maxRows = limit;
+		}
+		return this.getChildren()[0].getFinalBuffer(maxRows);
+	}
 
 }

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/NullNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/NullNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/NullNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -49,7 +49,7 @@
         
 	public Object clone(){
 		NullNode clonedNode = new NullNode(super.getID());
-		super.copy(this, clonedNode);
+		super.copyTo(clonedNode);
 		return clonedNode;
 	}
     

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/PlanExecutionNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/PlanExecutionNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/PlanExecutionNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -164,14 +164,13 @@
 
 	public Object clone(){
 		PlanExecutionNode clonedNode = new PlanExecutionNode();
-		copy(this, clonedNode);
+		copyTo(clonedNode);
         return clonedNode;
 	}
     
-    protected void copy(PlanExecutionNode source,
-                        PlanExecutionNode target) {
-        target.setProcessorPlan(source.plan.clone());
-        super.copy(source, target);
+    protected void copyTo(PlanExecutionNode target) {
+        target.setProcessorPlan(plan.clone());
+        super.copyTo(target);
     }
 
     public PlanNode getDescriptionProperties() {   

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectIntoNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectIntoNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectIntoNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -231,7 +231,7 @@
     
     public Object clone(){
         ProjectIntoNode clonedNode = new ProjectIntoNode();
-        super.copy(this, clonedNode);
+        super.copyTo(clonedNode);
 
         clonedNode.intoGroup = intoGroup;
         clonedNode.intoElements = intoElements;

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -27,24 +27,22 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 import org.teiid.api.exception.query.ExpressionEvaluationException;
 import org.teiid.client.plan.PlanNode;
 import org.teiid.common.buffer.BlockedException;
-import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.query.analysis.AnalysisRecord;
-import org.teiid.query.processor.ProcessorDataManager;
 import org.teiid.query.sql.LanguageObject;
 import org.teiid.query.sql.symbol.AliasSymbol;
 import org.teiid.query.sql.symbol.Expression;
 import org.teiid.query.sql.util.SymbolMap;
-import org.teiid.query.util.CommandContext;
 
 
 public class ProjectNode extends SubqueryAwareRelationalNode {
@@ -52,7 +50,7 @@
 	private List<? extends Expression> selectSymbols;
 
     // Derived element lookup map
-    private Map elementMap;
+    private Map<Expression, Integer> elementMap;
     private boolean needsProject = true;
     private List<Expression> expressions;
     private int[] projectionIndexes;
@@ -86,45 +84,37 @@
 
 	public void setSelectSymbols(List<? extends Expression> symbols) {
 		this.selectSymbols = symbols;
-	}
-	
-	@Override
-	public void initialize(CommandContext context, BufferManager bufferManager,
-			ProcessorDataManager dataMgr) {
-		super.initialize(context, bufferManager, dataMgr);
-
-        // Do this lazily as the node may be reset and re-used and this info doesn't change
-        if(elementMap != null) {
-        	return;
-        }
-    	this.projectionIndexes = new int[this.selectSymbols.size()];
+		elementMap = Collections.emptyMap();
+		this.projectionIndexes = new int[this.selectSymbols.size()];
     	Arrays.fill(this.projectionIndexes, -1);
     	
     	this.expressions = new ArrayList<Expression>(this.selectSymbols.size());
     	for (Expression ses : this.selectSymbols) {
 			this.expressions.add(SymbolMap.getExpression(ses));
 		}
-        //in the case of select with no from, there is no child node
-        //simply return at this point
-        if(this.getChildren()[0] == null){
-            elementMap = new HashMap();
-            return;
-        }
-
+	}
+	
+	@Override
+	public void addChild(RelationalNode child) {
+		super.addChild(child);
+		init();
+	}
+	
+	void init() {
+		List<? extends Expression> childElements = getChildren()[0].getElements();
         // Create element lookup map for evaluating project expressions
-        List childElements = this.getChildren()[0].getElements();
         this.elementMap = createLookupMap(childElements);
 
         // Check whether project needed at all - this occurs if:
         // 1. outputMap == null (see previous block)
         // 2. project elements are either elements or aggregate symbols (no processing required)
         // 3. order of input values == order of output values
-        needsProject = childElements.size() != getElements().size();
+        needsProject = childElements.size() != selectSymbols.size();
         for(int i=0; i<selectSymbols.size(); i++) {
             Expression symbol = selectSymbols.get(i);
             
             if(symbol instanceof AliasSymbol) {
-                Integer index = (Integer) elementMap.get(symbol);
+                Integer index = elementMap.get(symbol);
                 if(index != null && index.intValue() == i) {
                 	projectionIndexes[i] = index;
                     continue;
@@ -132,7 +122,7 @@
                 symbol = ((AliasSymbol)symbol).getSymbol();
             }
 
-            Integer index = (Integer) elementMap.get(symbol);
+            Integer index = elementMap.get(symbol);
             if(index == null || index.intValue() != i) {
                 // input / output element order is not the same
                 needsProject = true;
@@ -140,7 +130,7 @@
             	projectionIndexes[i] = index;
             }
         }
-    }
+	}
 	
 	public TupleBatch nextBatchDirect()
 		throws BlockedException, TeiidComponentException, TeiidProcessingException {
@@ -208,13 +198,17 @@
 
 	public Object clone(){
 		ProjectNode clonedNode = new ProjectNode();
-        this.copy(this, clonedNode);
+        this.copyTo(clonedNode);
 		return clonedNode;
 	}
 
-    protected void copy(ProjectNode source, ProjectNode target){
-        super.copy(source, target);
+    protected void copyTo(ProjectNode target){
+        super.copyTo(target);
         target.selectSymbols = this.selectSymbols;
+        target.needsProject = needsProject;
+        target.elementMap = elementMap;
+        target.expressions = expressions;
+        target.projectionIndexes = projectionIndexes;
     }
 
     public PlanNode getDescriptionProperties() {
@@ -228,4 +222,15 @@
     	return this.selectSymbols;
     }
     
+    @Override
+    public boolean hasFinalBuffer() {
+    	return !needsProject && this.getChildren()[0].hasFinalBuffer();
+    }
+    
+    @Override
+    public TupleBuffer getFinalBuffer(int maxRows) throws BlockedException,
+    		TeiidComponentException, TeiidProcessingException {
+    	return this.getChildren()[0].getFinalBuffer(maxRows);
+    }
+    
 }

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -509,13 +509,13 @@
 	 */
 	public abstract Object clone();
 
-	protected void copy(RelationalNode source, RelationalNode target){
-		target.data = source.data;
+	protected void copyTo(RelationalNode target){
+		target.data = this.data;
         
-        target.children = new RelationalNode[source.children.length];
-        for(int i=0; i<source.children.length; i++) {
-            if(source.children[i] != null) {
-                target.children[i] = (RelationalNode)source.children[i].clone();
+        target.children = new RelationalNode[this.children.length];
+        for(int i=0; i<this.children.length; i++) {
+            if(this.children[i] != null) {
+                target.children[i] = (RelationalNode)this.children[i].clone();
                 target.children[i].setParent(target);
             } else {
                 break;
@@ -619,7 +619,7 @@
 	 * @throws TeiidComponentException 
 	 * @throws BlockedException 
      */
-	public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException, TeiidProcessingException {
+	public TupleBuffer getFinalBuffer(int maxRows) throws BlockedException, TeiidComponentException, TeiidProcessingException {
 		return null;
 	}
 	

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -266,8 +266,8 @@
     }
     
     @Override
-    public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException, TeiidProcessingException {
-    	return root.getFinalBuffer();
+    public TupleBuffer getFinalBuffer(int maxRows) throws BlockedException, TeiidComponentException, TeiidProcessingException {
+    	return root.getFinalBuffer(maxRows);
     }
     
     @Override

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -130,16 +130,16 @@
 	
 	public Object clone(){
 		SelectNode clonedNode = new SelectNode();
-		this.copy(this, clonedNode);
+		this.copyTo(clonedNode);
 		return clonedNode;
 	}
 	
-	protected void copy(SelectNode source, SelectNode target){
-		super.copy(source, target);
+	protected void copyTo(SelectNode target){
+		super.copyTo(target);
 		target.criteria = criteria;
-		target.elementMap = source.elementMap;
-		target.projectionIndexes = source.projectionIndexes;
-		target.projectedExpressions = source.projectedExpressions;
+		target.elementMap = elementMap;
+		target.projectionIndexes = projectionIndexes;
+		target.projectedExpressions = projectedExpressions;
 	}
     
     public PlanNode getDescriptionProperties() {   

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -49,6 +49,8 @@
     private TupleBuffer output;
     private TupleSource outputTs;
     private boolean usingOutput;
+    
+    private int rowLimit = -1;
 
     private static final int SORT = 2;
     private static final int OUTPUT = 3;
@@ -64,6 +66,7 @@
         output = null;
         outputTs = null;
         usingOutput = false;
+        rowLimit = -1;
     }
 
 	public void setSortElements(List<OrderByItem> items) {
@@ -100,6 +103,12 @@
 		if (this.outputTs == null) {
 			this.outputTs = this.output.createIndexedTupleSource();
 		}
+    	if (rowLimit >= 0) {
+			this.output.truncateTo(rowLimit);
+			if (!this.output.isFinal() && this.output.getRowCount() == rowLimit) {
+				this.output.close();
+			}
+		}
         this.phase = OUTPUT;
     }
 
@@ -149,15 +158,15 @@
 		}
 	}
 
-	protected void copy(SortNode source, SortNode target){
-		super.copy(source, target);
-		target.items = source.items;
-		target.mode = source.mode;
+	protected void copyTo(SortNode target){
+		super.copyTo(target);
+		target.items = items;
+		target.mode = mode;
 	}
 
 	public Object clone(){
 		SortNode clonedNode = new SortNode(super.getID());
-		this.copy(this, clonedNode);
+		this.copyTo(clonedNode);
 
 		return clonedNode;
 	}
@@ -175,14 +184,16 @@
     }
     
     @Override
-    public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException, TeiidProcessingException {
+    public TupleBuffer getFinalBuffer(int maxRows) throws BlockedException, TeiidComponentException, TeiidProcessingException {
+    	this.rowLimit = maxRows;
+    	//TODO: push limiting into the sort logic
     	if (this.output == null) {
     		sortPhase();
     	}
     	usingOutput = true;
     	TupleBuffer result = this.output;
     	if (this.output.isFinal()) {
-        	this.output = null;
+    		this.output = null;
     		close();
     	}
     	return result;

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/TextTableNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/TextTableNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/TextTableNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -162,7 +162,7 @@
 	@Override
 	public TextTableNode clone() {
 		TextTableNode clone = new TextTableNode(getID());
-		this.copy(this, clone);
+		this.copyTo(clone);
 		clone.setTable(table);
 		return clone;
 	}

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/UnionAllNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/UnionAllNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/UnionAllNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -147,7 +147,7 @@
 
 	public Object clone(){
 		UnionAllNode clonedNode = new UnionAllNode(super.getID());
-		super.copy(this, clonedNode);
+		super.copyTo(clonedNode);
 		return clonedNode;
 	}
     

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/WindowFunctionProjectNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/WindowFunctionProjectNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/WindowFunctionProjectNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -145,7 +145,7 @@
 	
 	public Object clone(){
 		WindowFunctionProjectNode clonedNode = new WindowFunctionProjectNode();
-        this.copy(this, clonedNode);
+        this.copyTo(clonedNode);
         clonedNode.windows = windows;
         clonedNode.expressionIndexes = expressionIndexes;
         clonedNode.passThrough = passThrough;

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -80,7 +80,7 @@
 		typeMapping.put(DataTypeManager.DefaultDataClasses.DOUBLE, BuiltInAtomicType.DOUBLE);
 	}
 	
-	private static RuntimeException EARLY_TERMINATION = new RuntimeException();
+	private static TeiidRuntimeException EARLY_TERMINATION = new TeiidRuntimeException();
 	
 	private XMLTable table;
 	private List<XMLColumn> projectedColumns;
@@ -102,6 +102,8 @@
 	private int outputRow = 1;
 	private boolean usingOutput;
 	
+	private int rowLimit = -1;
+	
 	public XMLTableNode(int nodeID) {
 		super(nodeID);
 	}
@@ -132,6 +134,7 @@
 		this.buffer = null;
 		this.state = State.BUILDING;
 		this.asynchException = null;
+		this.rowLimit = -1;
 	}
 	
 	public void setTable(XMLTable table) {
@@ -145,7 +148,7 @@
 	@Override
 	public XMLTableNode clone() {
 		XMLTableNode clone = new XMLTableNode(getID());
-		this.copy(this, clone);
+		this.copyTo(clone);
 		clone.setTable(table);
 		clone.setProjectedColumns(projectedColumns);
 		return clone;
@@ -215,21 +218,23 @@
 				public void run() {
 					try {
 						XQueryEvaluator.evaluateXQuery(table.getXQueryExpression(), contextItem, parameters, XMLTableNode.this, getContext());
-						synchronized (XMLTableNode.this) {
-							if (buffer != null) {
-								buffer.close();
-							}
-						}
 					} catch (TeiidException e) {
 						asynchException = new TeiidRuntimeException(e);
 					} catch (TeiidRuntimeException e) {
-						asynchException = e;
-					} catch (RuntimeException e) {
 						if (e != EARLY_TERMINATION) {
-							asynchException = new TeiidRuntimeException(e);
+							asynchException = e;
 						}
+					} catch (RuntimeException e) {
+						asynchException = new TeiidRuntimeException(e);
 					} finally {
 						synchronized (XMLTableNode.this) {
+							if (buffer != null) {
+								try {
+									buffer.close();
+								} catch (TeiidComponentException e) {
+									asynchException = new TeiidRuntimeException(e);
+								}
+							}
 							state = State.DONE;
 							XMLTableNode.this.notifyAll();
 						}
@@ -308,13 +313,14 @@
 	}
 	
 	@Override
-	public TupleBuffer getFinalBuffer() throws BlockedException,
+	public synchronized TupleBuffer getFinalBuffer(int maxRows) throws BlockedException,
 			TeiidComponentException, TeiidProcessingException {
+		this.rowLimit = maxRows;
 		evaluate(true);
 		usingOutput = true;
     	TupleBuffer finalBuffer = this.buffer;
     	if (!this.table.getXQueryExpression().isStreaming()) {
-			close();
+    		close();
     	}
 		return finalBuffer;
 	}
@@ -329,6 +335,9 @@
 		rowCount++;
 		try {
 			this.buffer.addTuple(processRow());
+			if (this.buffer.getRowCount() == rowLimit) {
+				throw EARLY_TERMINATION;
+			}
 			if (state == State.BUILDING && hasNextBatch()) {
 				this.state = State.AVAILABLE;
 				this.notifyAll();

Modified: trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -90,9 +90,7 @@
     private boolean executed;
     private boolean doneLoading;
     
-    public RelationalPlanExecutor (ResultSetInfo resultInfo, CommandContext context, ProcessorDataManager dataMgr, BufferManager bufferMgr) 
-        throws TeiidComponentException{
-        
+    public RelationalPlanExecutor (ResultSetInfo resultInfo, CommandContext context, ProcessorDataManager dataMgr, BufferManager bufferMgr) {
         this.resultInfo = resultInfo;
         this.bufferMgr = bufferMgr;
         this.dataManager = dataMgr;

Modified: trunk/engine/src/main/java/org/teiid/query/sql/visitor/SQLStringVisitor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/sql/visitor/SQLStringVisitor.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/sql/visitor/SQLStringVisitor.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -1161,9 +1161,13 @@
             	constantParts = new String[] {"X'", obj.getValue().toString(), "'"}; //$NON-NLS-1$ //$NON-NLS-2$
             }
             if (constantParts == null) {
-                String strValue = obj.getValue().toString();
-                strValue = escapeStringValue(strValue, "'"); //$NON-NLS-1$
-                constantParts = new String[] {"'", strValue, "'"}; //$NON-NLS-1$ //$NON-NLS-2$
+            	if (DataTypeManager.isLOB(type)) {
+            		constantParts = new String[] {"?"}; //$NON-NLS-1$
+            	} else {
+	                String strValue = obj.getValue().toString();
+	                strValue = escapeStringValue(strValue, "'"); //$NON-NLS-1$
+	                constantParts = new String[] {"'", strValue, "'"}; //$NON-NLS-1$ //$NON-NLS-2$
+            	}
             }
         }
 

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -49,6 +49,7 @@
 import org.teiid.dqp.internal.process.CachedResults;
 import org.teiid.dqp.internal.process.DQPWorkContext;
 import org.teiid.dqp.internal.process.SessionAwareCache;
+import org.teiid.dqp.internal.process.TupleSourceCache;
 import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
 import org.teiid.events.EventDistributor;
 import org.teiid.language.SQLConstants;
@@ -124,6 +125,13 @@
 		RegisterRequestParameter parameterObject)
 		throws TeiidComponentException, TeiidProcessingException {          
 
+ 		if (parameterObject.info != null) {
+ 			TupleSourceCache tsc = context.getTupleSourceCache();
+ 			if (tsc != null) {
+ 				return tsc.getSharedTupleSource(context, command, modelName, parameterObject, bufferManager, this);
+ 			}
+		}
+
 		TempTableStore tempTableStore = context.getTempTableStore();
         if(tempTableStore != null) {
             TupleSource result = registerRequest(context, modelName, command);

Modified: trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -45,6 +45,7 @@
 import org.teiid.dqp.internal.process.DQPWorkContext;
 import org.teiid.dqp.internal.process.PreparedPlan;
 import org.teiid.dqp.internal.process.SessionAwareCache;
+import org.teiid.dqp.internal.process.TupleSourceCache;
 import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
 import org.teiid.dqp.message.RequestID;
 import org.teiid.dqp.service.TransactionContext;
@@ -141,7 +142,7 @@
 	    Set<CommandListener> commandListeners = null;
 	    private LRUCache<String, DecimalFormat> decimalFormatCache;
 		private LRUCache<String, SimpleDateFormat> dateFormatCache;
-		private AtomicLong reuseCount = new AtomicLong();
+		private AtomicLong reuseCount = null;
 		private ClassLoader classLoader;
 		
 	    private List<Exception> warnings = null;
@@ -155,6 +156,7 @@
     private boolean nonBlocking;
     private HashSet<Object> planningObjects;
     private HashSet<Object> dataObjects = this.globalState.dataObjects;
+    private TupleSourceCache tupleSourceCache;
 
     /**
      * Construct a new context.
@@ -228,6 +230,7 @@
             clone.recursionStack = new LinkedList<String>(this.recursionStack);
         }
     	clone.setNonBlocking(this.nonBlocking);
+    	clone.tupleSourceCache = this.tupleSourceCache;
     	return clone;
     }
     
@@ -749,8 +752,20 @@
 	
 	@Override
 	public long getReuseCount() {
+		if (globalState.reuseCount == null) {
+			return 0;
+		}
 		return globalState.reuseCount.get();
 	}	
+	
+	@Override
+	public boolean isContinuous() {
+		return globalState.reuseCount == null;
+	}
+	
+	public void setContinuous() {
+		this.globalState.reuseCount = new AtomicLong();
+	}
 
 	@Override
 	public ClassLoader getVDBClassLoader() {
@@ -791,5 +806,13 @@
 		}
         LogManager.logInfo(LogConstants.CTX_DQP, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID31105, warning.getMessage()));
     }
+    
+    public TupleSourceCache getTupleSourceCache() {
+		return tupleSourceCache;
+	}
+    
+    public void setTupleSourceCache(TupleSourceCache tupleSourceCache) {
+		this.tupleSourceCache = tupleSourceCache;
+	}
 
 }

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -60,6 +60,32 @@
 		assertEquals(2, batch.getBeginRow());
 	}
 	
+	@Test public void testTruncate() throws Exception {
+		ElementSymbol x = new ElementSymbol("x"); //$NON-NLS-1$
+		x.setType(DataTypeManager.DefaultDataClasses.INTEGER);
+		List<ElementSymbol> schema = Arrays.asList(x);
+		TupleBuffer tb = BufferManagerFactory.getStandaloneBufferManager().createTupleBuffer(schema, "x", TupleSourceType.PROCESSOR); //$NON-NLS-1$
+		tb.setBatchSize(2);
+		for (int i = 0; i < 5; i++) {
+			tb.addTuple(Arrays.asList(1));
+		}
+		TupleBatch batch = tb.getBatch(1);
+		assertTrue(!batch.getTerminationFlag());
+		assertEquals(2, batch.getEndRow());
+		tb.close();
+		assertEquals(5, tb.getManagedRowCount());
+		tb.truncateTo(3);
+		assertEquals(3, tb.getManagedRowCount());
+		assertEquals(3, tb.getRowCount());
+		batch = tb.getBatch(3);
+		assertTrue(batch.getTerminationFlag());
+		tb.truncateTo(2);
+		assertEquals(2, tb.getManagedRowCount());
+		assertEquals(2, tb.getRowCount());
+		batch = tb.getBatch(2);
+		assertTrue(batch.getTerminationFlag());
+	}
+	
 	@Test public void testLobHandling() throws Exception {
 		ElementSymbol x = new ElementSymbol("x"); //$NON-NLS-1$
 		x.setType(DataTypeManager.DefaultDataClasses.CLOB);

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -374,6 +374,7 @@
         agds.sleep = 500;
         agds.setUseIntCounter(true);
         RequestMessage reqMsg = exampleRequestMessage(sql);
+        reqMsg.setRowLimit(11);
         reqMsg.setCursorType(ResultSet.TYPE_FORWARD_ONLY);
         DQPWorkContext.getWorkContext().getSession().setSessionId(sessionid);
         DQPWorkContext.getWorkContext().getSession().setUserName(userName);
@@ -393,6 +394,8 @@
         rm = message.get(500000, TimeUnit.MILLISECONDS);
         assertNull(rm.getException());
         assertEquals(5, rm.getResultsList().size());
+        assertEquals(7, rm.getFirstRow());
+        assertEquals(11, rm.getFinalRow());
     }
     
     @Test public void testSourceConcurrency() throws Exception {
@@ -603,6 +606,14 @@
         }
     }
     
+    @Test public void testXmlTableStreamingWithLimit() throws Exception {
+        String sql = "select * from xmltable('/a/b' passing xmlparse(document '<a x=''1''><b>foo</b><b>bar</b><b>zed</b></a>') columns y string path '.') as x limit 2"; //$NON-NLS-1$
+        
+        ResultsMessage rm = execute("A", 1, exampleRequestMessage(sql));
+        assertNull(rm.getException());
+        assertEquals(2, rm.getResultsList().size());
+    }
+    
 	public void helpTestVisibilityFails(String sql) throws Exception {
         RequestMessage reqMsg = exampleRequestMessage(sql); 
         reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedStatement.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedStatement.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedStatement.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -345,10 +345,7 @@
 		helpGetProcessorPlan(preparedSql, values, new SessionAwareCache<PreparedPlan>());
     }
     
-    /**
-     * TODO: there may be other ways of handling this situation in the future
-     */
-    @Test public void testLimitNoCache() throws Exception {
+    @Test public void testLimit() throws Exception {
         // Create query 
         String preparedSql = "SELECT pm1.g1.e1, e2, pm1.g1.e3 as a, e4 as b FROM pm1.g1 WHERE pm1.g1.e2=?"; //$NON-NLS-1$
         
@@ -360,7 +357,7 @@
 
 		helpGetProcessorPlan(preparedSql, values, new DefaultCapabilitiesFinder(), RealMetadataFactory.example1Cached(), planCache, SESSION_ID, false, true, RealMetadataFactory.example1VDB());
 		//make sure the plan wasn't reused
-		assertEquals(0, planCache.getCacheHitCount());
+		assertEquals(1, planCache.getCacheHitCount());
     }
     
     @Test public void testUpdateProcedureCriteria() throws Exception {

Modified: trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java	2012-07-12 14:02:50 UTC (rev 4232)
+++ trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java	2012-07-12 17:45:57 UTC (rev 4233)
@@ -49,6 +49,7 @@
 import org.teiid.query.optimizer.capabilities.SourceCapabilities;
 import org.teiid.query.processor.relational.RelationalNodeUtil;
 import org.teiid.query.sql.symbol.Expression;
+import org.teiid.translator.CacheDirective;
 import org.teiid.translator.DataNotAvailableException;
 import org.teiid.translator.TranslatorException;
 
@@ -179,6 +180,11 @@
 			public boolean copyLobs() {
 				return copyLobs;
 			}
+
+			@Override
+			public CacheDirective getCacheDirective() {
+				return null;
+			}
 			
 		};
     }



More information about the teiid-commits mailing list