[teiid-commits] teiid SVN: r2938 - in trunk/engine/src: main/java/org/teiid/common/buffer/impl and 7 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Feb 24 17:00:57 EST 2011


Author: shawkins
Date: 2011-02-24 17:00:56 -0500 (Thu, 24 Feb 2011)
New Revision: 2938

Modified:
   trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleRemoveOptionalJoins.java
   trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
   trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java
   trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
   trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
   trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
Log:
TEIID-1482 removing unnecessary rebuffering

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java	2011-02-24 17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java	2011-02-24 22:00:56 UTC (rev 2938)
@@ -32,6 +32,8 @@
 		
 		void remove();
 		
+		void setPrefersMemory(boolean prefers);
+		
 	}
 	
 	ManagedBatch createManagedBatch(TupleBatch batch, boolean softCache) throws TeiidComponentException;

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2011-02-24 17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2011-02-24 22:00:56 UTC (rev 2938)
@@ -28,6 +28,7 @@
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.teiid.common.buffer.BatchManager.ManagedBatch;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.core.types.Streamable;
@@ -355,6 +356,9 @@
 	
 	public void setPrefersMemory(boolean prefersMemory) {
 		this.prefersMemory = prefersMemory;
+		for (ManagedBatch batch : this.batches.values()) {
+			batch.setPrefersMemory(prefersMemory);
+		}
 	}
 	
 	public boolean isPrefersMemory() {

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-02-24 17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-02-24 22:00:56 UTC (rev 2938)
@@ -210,6 +210,12 @@
 				this.lobManager = new LobManager();
 			}
 		}
+		
+		@Override
+		public void setPrefersMemory(boolean prefers) {
+			this.softCache = prefers;
+			//TODO: could recreate the reference
+		}
 
 		private void addToCache(boolean update) {
 			synchronized (activeBatches) {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-02-24 17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-02-24 22:00:56 UTC (rev 2938)
@@ -41,6 +41,7 @@
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.TupleBatch;
 import org.teiid.common.buffer.TupleBuffer;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidException;
 import org.teiid.core.TeiidProcessingException;
@@ -253,35 +254,32 @@
 		if (!doneProducingBatches) {
 			this.processor.getContext().setTimeSliceEnd(System.currentTimeMillis() + this.processorTimeslice);
 			sendResultsIfNeeded(null);
-			collector.collectTuples();
+			this.resultsBuffer = collector.collectTuples();
+			if (!doneProducingBatches) {
+				doneProducingBatches();
+				addToCache();
+			}
 		}
-		if (doneProducingBatches) {
-			if (this.transactionState == TransactionState.ACTIVE) {
-				/*
-				 * TEIID-14 if we are done producing batches, then proactively close transactional 
-				 * executions even ones that were intentionally kept alive. this may 
-				 * break the read of a lob from a transactional source under a transaction 
-				 * if the source does not support holding the clob open after commit
-				 */
-	        	for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
-	        		if (connectorRequest.isTransactional()) {
-	        			connectorRequest.fullyCloseSource();
-	        		}
-	            }
-				this.transactionState = TransactionState.DONE;
-				if (transactionContext.getTransactionType() == TransactionContext.Scope.REQUEST) {
-					this.transactionService.commit(transactionContext);
-				} else {
-					suspend();
-				}
+		if (this.transactionState == TransactionState.ACTIVE) {
+			/*
+			 * TEIID-14 if we are done producing batches, then proactively close transactional 
+			 * executions even ones that were intentionally kept alive. this may 
+			 * break the read of a lob from a transactional source under a transaction 
+			 * if the source does not support holding the clob open after commit
+			 */
+        	for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
+        		if (connectorRequest.isTransactional()) {
+        			connectorRequest.fullyCloseSource();
+        		}
+            }
+			this.transactionState = TransactionState.DONE;
+			if (transactionContext.getTransactionType() == TransactionContext.Scope.REQUEST) {
+				this.transactionService.commit(transactionContext);
+			} else {
+				suspend();
 			}
-			sendResultsIfNeeded(null);
-		} else {
-			moreWork(false); // If the timeslice expired, then the processor can probably produce more batches.
-			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-				LogManager.logDetail(LogConstants.CTX_DQP, "############# PW EXITING on " + requestID + " - reenqueueing for more processing ###########"); //$NON-NLS-1$ //$NON-NLS-2$
-			}
 		}
+		sendResultsIfNeeded(null);
 	}
 
 	/**
@@ -381,13 +379,9 @@
         	this.cid = cacheId;
         }
 		processor = request.processor;
-		resultsBuffer = processor.createTupleBuffer();
-		if (this.cid != null && originalCommand.getCacheHint() != null) {
-			LogManager.logDetail(LogConstants.CTX_DQP, requestID, "Using cache hint", originalCommand.getCacheHint()); //$NON-NLS-1$
-			resultsBuffer.setPrefersMemory(originalCommand.getCacheHint().getPrefersMemory());
-		}
-		collector = new BatchCollector(processor, resultsBuffer) {
+		collector = new BatchCollector(processor, processor.getBufferManager(), this.request.context, isForwardOnly()) {
 			protected void flushBatchDirect(TupleBatch batch, boolean add) throws TeiidComponentException,TeiidProcessingException {
+				resultsBuffer = getTupleBuffer();
 				boolean added = false;
 				if (cid != null) {
 					super.flushBatchDirect(batch, add);
@@ -396,28 +390,12 @@
 				if (batch.getTerminationFlag()) {
 					doneProducingBatches();
 				}
-				if (doneProducingBatches && cid != null) {
-			    	Determinism determinismLevel = processor.getContext().getDeterminismLevel();
-	            	CachedResults cr = new CachedResults();
-	            	cr.setCommand(originalCommand);
-	                cr.setAnalysisRecord(analysisRecord);
-	                cr.setResults(resultsBuffer);
-	                
-	                if (originalCommand.getCacheHint() != null && originalCommand.getCacheHint().getDeterminism() != null) {
-						determinismLevel = originalCommand.getCacheHint().getDeterminism();
-						LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Cache hint modified the query determinism from ",processor.getContext().getDeterminismLevel(), " to ", determinismLevel }); //$NON-NLS-1$ //$NON-NLS-2$
-					}		                
-	                
-	                if (determinismLevel.compareTo(Determinism.SESSION_DETERMINISTIC) <= 0) {
-	    				LogManager.logInfo(LogConstants.CTX_DQP, QueryPlugin.Util.getString("RequestWorkItem.cache_nondeterministic", originalCommand)); //$NON-NLS-1$
-	    			}
-	                dqpCore.getRsCache().put(cid, determinismLevel, cr, originalCommand.getCacheHint() != null?originalCommand.getCacheHint().getTtl():null);
-			    }
+				addToCache();
 				add = sendResultsIfNeeded(batch);
 				if (!added) {
 					super.flushBatchDirect(batch, add);
 					//restrict the buffer size for forward only results
-					if (add 
+					if (add && !processor.hasFinalBuffer()
 							&& !batch.getTerminationFlag() 
 							&& this.getTupleBuffer().getManagedRowCount() >= 20 * this.getTupleBuffer().getBatchSize()) {
 						//requestMore will trigger more processing
@@ -426,8 +404,11 @@
 				}
 			}
 		};
-		resultsBuffer = collector.getTupleBuffer();
-		resultsBuffer.setForwardOnly(isForwardOnly());
+		this.resultsBuffer = collector.getTupleBuffer();
+		if (this.resultsBuffer == null) {
+			//This is just a dummy result it will get replaced by collector source
+	    	resultsBuffer = this.processor.getBufferManager().createTupleBuffer(this.originalCommand.getProjectedSymbols(), this.request.context.getConnectionID(), TupleSourceType.FINAL);
+		}
 		analysisRecord = request.analysisRecord;
 		analysisRecord.setQueryPlan(processor.getProcessorPlan().getDescriptionProperties());
 		transactionContext = request.transactionContext;
@@ -442,6 +423,30 @@
 	    this.returnsUpdateCount = request.returnsUpdateCount;
 		request = null;
 	}
+	
+	private void addToCache() {
+		if (!doneProducingBatches || cid == null) {
+			return;
+		}
+    	Determinism determinismLevel = processor.getContext().getDeterminismLevel();
+    	CachedResults cr = new CachedResults();
+    	cr.setCommand(originalCommand);
+        cr.setAnalysisRecord(analysisRecord);
+        cr.setResults(resultsBuffer);
+        if (originalCommand.getCacheHint() != null) {
+        	LogManager.logDetail(LogConstants.CTX_DQP, requestID, "Using cache hint", originalCommand.getCacheHint()); //$NON-NLS-1$
+			resultsBuffer.setPrefersMemory(originalCommand.getCacheHint().getPrefersMemory());
+        	if (originalCommand.getCacheHint().getDeterminism() != null) {
+				determinismLevel = originalCommand.getCacheHint().getDeterminism();
+				LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Cache hint modified the query determinism from ",processor.getContext().getDeterminismLevel(), " to ", determinismLevel }); //$NON-NLS-1$ //$NON-NLS-2$
+        	}		                
+        }            		
+        
+        if (determinismLevel.compareTo(Determinism.SESSION_DETERMINISTIC) <= 0) {
+			LogManager.logInfo(LogConstants.CTX_DQP, QueryPlugin.Util.getString("RequestWorkItem.cache_nondeterministic", originalCommand)); //$NON-NLS-1$
+		}
+        dqpCore.getRsCache().put(cid, determinismLevel, cr, originalCommand.getCacheHint() != null?originalCommand.getCacheHint().getTtl():null);
+	}
 
 	/**
 	 * Send results if they have been requested.  This should only be called from the processing thread.

Modified: trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleRemoveOptionalJoins.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleRemoveOptionalJoins.java	2011-02-24 17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleRemoveOptionalJoins.java	2011-02-24 22:00:56 UTC (rev 2938)
@@ -22,7 +22,6 @@
 
 package org.teiid.query.optimizer.relational.rules;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -40,14 +39,10 @@
 import org.teiid.query.optimizer.relational.plantree.NodeConstants;
 import org.teiid.query.optimizer.relational.plantree.NodeEditor;
 import org.teiid.query.optimizer.relational.plantree.PlanNode;
-import org.teiid.query.resolver.util.ResolverUtil;
 import org.teiid.query.sql.LanguageObject;
 import org.teiid.query.sql.lang.JoinType;
 import org.teiid.query.sql.symbol.AggregateSymbol;
-import org.teiid.query.sql.symbol.Constant;
-import org.teiid.query.sql.symbol.ElementSymbol;
 import org.teiid.query.sql.symbol.GroupSymbol;
-import org.teiid.query.sql.util.SymbolMap;
 import org.teiid.query.sql.visitor.GroupsUsedByElementsVisitor;
 import org.teiid.query.util.CommandContext;
 
@@ -75,12 +70,12 @@
     			continue;
     		}
     		Set<GroupSymbol> groups = GroupsUsedByElementsVisitor.getGroups((Collection<? extends LanguageObject>)planNode.getProperty(NodeConstants.Info.OUTPUT_COLS));
-    		List<PlanNode> removed = removeJoin(groups, planNode, planNode.getFirstChild(), metadata);
+    		List<PlanNode> removed = removeJoin(groups, planNode, planNode.getFirstChild());
     		if (removed != null) {
     			removedNodes.addAll(removed);
     			continue;
     		}
-    		removed = removeJoin(groups, planNode, planNode.getLastChild(), metadata);
+    		removed = removeJoin(groups, planNode, planNode.getLastChild());
     		if (removed != null) {
     			removedNodes.addAll(removed);
     		}
@@ -94,7 +89,7 @@
      * @throws TeiidComponentException 
      * @throws QueryMetadataException 
      */ 
-    private List<PlanNode> removeJoin(Set<GroupSymbol> groups, PlanNode joinNode, PlanNode optionalNode, QueryMetadataInterface metadata) throws QueryPlannerException, QueryMetadataException, TeiidComponentException {
+    private List<PlanNode> removeJoin(Set<GroupSymbol> groups, PlanNode joinNode, PlanNode optionalNode) throws QueryPlannerException, QueryMetadataException, TeiidComponentException {
         if (!Collections.disjoint(optionalNode.getGroups(), groups)) {
         	return null;
         }
@@ -110,16 +105,6 @@
 		joinNode.removeChild(optionalNode);
 		NodeEditor.removeChildNode(parentNode, joinNode);
 
-		// correct the parent nodes that may be using optional elements
-		/*for (GroupSymbol optionalGroup : optionalNode.getGroups()) {
-			List<ElementSymbol> optionalElements = ResolverUtil.resolveElementsInGroup(optionalGroup, metadata);
-			List<Constant> replacements = new ArrayList<Constant>(optionalElements.size());
-			for (ElementSymbol elementSymbol : optionalElements) {
-				replacements.add(new Constant(null, elementSymbol.getType()));
-			}
-			FrameUtil.convertFrame(parentNode, optionalGroup, null, SymbolMap.createSymbolMap(optionalElements, replacements).asMap(), metadata);
-		}*/
-
 		return NodeEditor.findAllNodes(optionalNode, NodeConstants.Types.JOIN);
     }
     

Modified: trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java	2011-02-24 17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java	2011-02-24 22:00:56 UTC (rev 2938)
@@ -26,11 +26,15 @@
 
 import org.teiid.api.exception.query.ExpressionEvaluationException;
 import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.TupleBatch;
 import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.common.buffer.TupleSource;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.util.Assertion;
+import org.teiid.query.util.CommandContext;
 
 
 public class BatchCollector {
@@ -51,6 +55,17 @@
 	     * @return List of SingleElementSymbol
 	     */
 	    List getOutputElements();
+	    
+	    /**
+	     * return the final tuple buffer or null if not available
+	     * @return
+	     * @throws TeiidProcessingException 
+	     * @throws TeiidComponentException 
+	     * @throws BlockedException 
+	     */
+	    TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException, TeiidProcessingException;
+	    
+	    boolean hasFinalBuffer();
 	}
 	
 	public static class BatchProducerTupleSource implements TupleSource {
@@ -103,17 +118,33 @@
 
     private boolean done = false;
     private TupleBuffer buffer;
+    private boolean forwardOnly;
     
-    public BatchCollector(BatchProducer sourceNode, TupleBuffer buffer) {
+    public BatchCollector(BatchProducer sourceNode, BufferManager bm, CommandContext context, boolean fowardOnly) throws TeiidComponentException {
         this.sourceNode = sourceNode;
-        this.buffer = buffer;
+        if (!this.sourceNode.hasFinalBuffer()) {
+            this.buffer = bm.createTupleBuffer(sourceNode.getOutputElements(), context.getConnectionID(), TupleSourceType.PROCESSOR);
+            this.buffer.setForwardOnly(fowardOnly);
+        }
     }
 
     public TupleBuffer collectTuples() throws TeiidComponentException, TeiidProcessingException {
         TupleBatch batch = null;
     	while(!done) {
-            batch = sourceNode.nextBatch();
-
+    		if (this.sourceNode.hasFinalBuffer()) {
+	    		if (this.buffer == null) {
+	    			TupleBuffer finalBuffer = this.sourceNode.getFinalBuffer();
+	    			Assertion.isNotNull(finalBuffer);
+					this.buffer = finalBuffer;
+	    		}
+	    		if (this.buffer.isFinal()) {
+					this.buffer.setForwardOnly(forwardOnly);
+					done = true;
+					break;
+				}
+    		}
+    		batch = sourceNode.nextBatch();
+            
             flushBatch(batch);
 
             // Check for termination condition
@@ -142,10 +173,15 @@
     
     @SuppressWarnings("unused")
 	protected void flushBatchDirect(TupleBatch batch, boolean add) throws TeiidComponentException, TeiidProcessingException {
-    	buffer.addTupleBatch(batch, add);
+    	if (!this.sourceNode.hasFinalBuffer()) {
+    		buffer.addTupleBatch(batch, add);
+    	}
     }
     
     public int getRowCount() {
+    	if (buffer == null) {
+    		return 0;
+    	}
         return buffer.getRowCount();
     }
     

Modified: trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java	2011-02-24 17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java	2011-02-24 22:00:56 UTC (rev 2938)
@@ -31,9 +31,10 @@
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidException;
 import org.teiid.core.TeiidProcessingException;
-import org.teiid.core.TeiidException;
 import org.teiid.query.analysis.AnalysisRecord;
 import org.teiid.query.processor.BatchCollector.BatchProducer;
 import org.teiid.query.util.CommandContext;
@@ -159,5 +160,20 @@
         props.addProperty(PROP_OUTPUT_COLS, AnalysisRecord.getOutputColumnProperties(getOutputElements()));
         return props;
     }
+ 
+    /**
+     * return the final tuple buffer or null if not available
+     * @return
+     * @throws TeiidProcessingException 
+     * @throws TeiidComponentException 
+     * @throws BlockedException 
+     */
+    public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException, TeiidProcessingException {
+    	return null;
+    }
     
+    public boolean hasFinalBuffer() {
+    	return false;
+    }
+    
 }

Modified: trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java	2011-02-24 17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java	2011-02-24 22:00:56 UTC (rev 2938)
@@ -29,7 +29,6 @@
 import org.teiid.common.buffer.TupleBatch;
 import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.common.buffer.BufferManager.BufferReserveMode;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidException;
 import org.teiid.core.TeiidProcessingException;
@@ -123,18 +122,7 @@
 	    TupleBatch result = null;
 		
 	    try {
-	    	// initialize if necessary
-			if(!initialized) {
-				reserved = this.bufferMgr.reserveBuffers(this.bufferMgr.getSchemaSize(this.getOutputElements()), BufferReserveMode.FORCE);
-				this.processPlan.initialize(context, this.dataMgr, bufferMgr);
-				initialized = true;
-			} 
-			if (!open) {
-				// Open the top node for reading
-				processPlan.open();
-				open = true;
-			}
-	
+	    	init(); 
 			long currentTime = System.currentTimeMillis();
 			Assertion.assertTrue(!processorClosed);
 			
@@ -180,6 +168,21 @@
 		return result;
 	}
 
+	private void init() throws TeiidComponentException, TeiidProcessingException {
+		// initialize if necessary
+		if(!initialized) {
+			reserved = this.bufferMgr.reserveBuffers(this.bufferMgr.getSchemaSize(this.getOutputElements()), BufferReserveMode.FORCE);
+			this.processPlan.initialize(context, this.dataMgr, bufferMgr);
+			initialized = true;
+		}
+		
+		if (!open) {
+			// Open the top node for reading
+			processPlan.open();
+			open = true;
+		}
+	}
+
 	                   
     /**
      * Close processing and clean everything up.  Should only be called by the same thread that called process.
@@ -217,15 +220,46 @@
         this.requestCanceled = true;
     }
     
-    public TupleBuffer createTupleBuffer() throws TeiidComponentException {
-    	return this.bufferMgr.createTupleBuffer(this.processPlan.getOutputElements(), context.getConnectionID(), TupleSourceType.PROCESSOR);
-    }
-    
 	public BatchCollector createBatchCollector() throws TeiidComponentException {
-		return new BatchCollector(this, createTupleBuffer());
+		return new BatchCollector(this, this.bufferMgr, this.context, false);
 	}
 	
 	public void setNonBlocking(boolean nonBlocking) {
 		this.context.setNonBlocking(nonBlocking);
 	}
+
+	@Override
+	public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException, TeiidProcessingException {
+		while (true) {
+	    	long wait = DEFAULT_WAIT;
+	    	try {
+	    		init();
+	    		return this.processPlan.getFinalBuffer();
+	    	} catch (BlockedException e) {
+	    		if (!this.context.isNonBlocking()) {
+	    			throw e;
+	    		}
+	    	} catch (TeiidComponentException e) {
+	    		closeProcessing();
+	    		throw e;
+	    	} catch (TeiidProcessingException e) {
+	    		closeProcessing();
+	    		throw e;
+	    	}
+    		try {
+                Thread.sleep(wait);
+            } catch (InterruptedException err) {
+                throw new TeiidComponentException(err);
+            }
+	    }
+	}
+
+	@Override
+	public boolean hasFinalBuffer() {
+		return this.processPlan.hasFinalBuffer();
+	}
+	
+	public BufferManager getBufferManager() {
+		return bufferMgr;
+	}
 }

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java	2011-02-24 17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java	2011-02-24 22:00:56 UTC (rev 2938)
@@ -34,6 +34,7 @@
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.util.Assertion;
@@ -597,4 +598,19 @@
 		return processingState;
 	}
 	
+	public boolean hasFinalBuffer() {
+		return false;
+	}
+	
+	/**
+     * return the final tuple buffer or null if not available
+     * @return
+	 * @throws TeiidProcessingException 
+	 * @throws TeiidComponentException 
+	 * @throws BlockedException 
+     */
+	public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException, TeiidProcessingException {
+		return null;
+	}
+	
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java	2011-02-24 17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java	2011-02-24 22:00:56 UTC (rev 2938)
@@ -29,6 +29,7 @@
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.language.SQLConstants;
@@ -285,5 +286,16 @@
 		}
 		return false;
     }
+    
+    
+    @Override
+    public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException, TeiidProcessingException {
+    	return root.getFinalBuffer();
+    }
+    
+    @Override
+    public boolean hasFinalBuffer() {
+    	return root.hasFinalBuffer();
+    }
 	
 }

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java	2011-02-24 17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java	2011-02-24 22:00:56 UTC (rev 2938)
@@ -47,6 +47,7 @@
     private int phase = SORT;
     private TupleBuffer output;
     private TupleSource outputTs;
+    private boolean usingOutput;
 
     private static final int SORT = 2;
     private static final int OUTPUT = 3;
@@ -61,6 +62,7 @@
         phase = SORT;
         output = null;
         outputTs = null;
+        usingOutput = false;
     }
 
 	public void setSortElements(List<OrderByItem> items) {
@@ -103,7 +105,7 @@
     private TupleBatch outputPhase() throws BlockedException, TeiidComponentException, TeiidProcessingException {
 		if (!this.output.isFinal()) {
 			this.phase = SORT;
-		} else {
+		} else if (!usingOutput) {
 			this.output.setForwardOnly(true);
 		}
 		List<?> tuple = null;
@@ -127,10 +129,10 @@
 
     public void closeDirect() {
         if(this.output != null) {
-            this.output.remove();
-            this.output = null;
-            this.outputTs = null;
+    		this.output.remove();
+        	this.output = null;
         }
+        this.outputTs = null;
     }
 
 	protected void getNodeString(StringBuffer str) {
@@ -166,4 +168,23 @@
         return props;
     }
     
+    @Override
+    public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException, TeiidProcessingException {
+    	if (this.output == null) {
+    		sortPhase();
+    	}
+    	usingOutput = true;
+    	TupleBuffer result = this.output;
+    	if (this.output.isFinal()) {
+        	this.output = null;
+    		close();
+    	}
+    	return result;
+    }
+    
+    @Override
+    public boolean hasFinalBuffer() {
+    	return this.getElements().size() == this.getChildren()[0].getElements().size();
+    }
+    
 }

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java	2011-02-24 17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java	2011-02-24 22:00:56 UTC (rev 2938)
@@ -31,7 +31,6 @@
 import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
-import org.teiid.core.util.Assertion;
 import org.teiid.query.processor.BatchCollector;
 import org.teiid.query.processor.BatchIterator;
 import org.teiid.query.processor.relational.MergeJoinStrategy.SortOption;
@@ -163,7 +162,7 @@
         		throw new AssertionError("cannot buffer the source"); //$NON-NLS-1$
         	}
         	if (collector == null) {
-                collector = new BatchCollector(source, createSourceTupleBuffer());
+                collector = new BatchCollector(source, source.getBufferManager(), source.getContext(), false);
             }
             this.buffer = collector.collectTuples();
         }

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java	2011-02-24 17:57:38 UTC (rev 2937)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java	2011-02-24 22:00:56 UTC (rev 2938)
@@ -39,7 +39,7 @@
 
 public class TestTupleBuffer {
 
-	private final class FakeBatchManager implements BatchManager {
+	public static final class FakeBatchManager implements BatchManager {
 		@Override
 		public void remove() {
 			
@@ -60,6 +60,11 @@
 						throws TeiidComponentException {
 					return batch;
 				}
+
+				@Override
+				public void setPrefersMemory(boolean prefers) {
+					
+				}
 			};
 		}
 

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java	2011-02-24 17:57:38 UTC (rev 2937)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java	2011-02-24 22:00:56 UTC (rev 2938)
@@ -21,8 +21,7 @@
  */
 package org.teiid.dqp.internal.process;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -32,15 +31,11 @@
 import java.util.List;
 
 import org.junit.Test;
-import org.mockito.Mockito;
 import org.teiid.cache.Cache;
 import org.teiid.cache.DefaultCache;
-import org.teiid.common.buffer.BatchManager;
 import org.teiid.common.buffer.BufferManager;
-import org.teiid.common.buffer.FileStore;
-import org.teiid.common.buffer.TupleBatch;
 import org.teiid.common.buffer.TupleBuffer;
-import org.teiid.core.TeiidComponentException;
+import org.teiid.common.buffer.TestTupleBuffer.FakeBatchManager;
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.dqp.service.FakeBufferService;
 import org.teiid.query.sql.lang.Query;
@@ -49,35 +44,6 @@
 
 public class TestCachedResults {
 
-	private final class FakeBatchManager implements BatchManager {
-		@Override
-		public void remove() {
-			
-		}
-
-		@Override
-		public ManagedBatch createManagedBatch(final TupleBatch batch, boolean softCache)
-				throws TeiidComponentException {
-			return new ManagedBatch() {
-				
-				@Override
-				public void remove() {
-					
-				}
-				
-				@Override
-				public TupleBatch getBatch(boolean cache, String[] types)
-						throws TeiidComponentException {
-					return batch;
-				}
-			};
-		}
-
-		@Override
-		public FileStore createStorage(String prefix) {
-			return Mockito.mock(FileStore.class);
-		}
-	}
 	
 	@Test
 	public void testCaching() throws Exception {

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-02-24 17:57:38 UTC (rev 2937)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-02-24 22:00:56 UTC (rev 2938)
@@ -222,6 +222,25 @@
         }
     }
     
+    @Test public void testBufferReuse() throws Exception {
+    	//the sql should return 100 rows
+        String sql = "SELECT A.IntKey FROM BQT1.SmallA as A, BQT1.SmallA as B ORDER BY A.IntKey"; //$NON-NLS-1$
+        String userName = "1"; //$NON-NLS-1$
+        String sessionid = "1"; //$NON-NLS-1$
+        
+        RequestMessage reqMsg = exampleRequestMessage(sql);
+        reqMsg.setCursorType(ResultSet.TYPE_FORWARD_ONLY);
+        DQPWorkContext.getWorkContext().getSession().setSessionId(sessionid);
+        DQPWorkContext.getWorkContext().getSession().setUserName(userName);
+        ((BufferManagerImpl)core.getBufferManager()).setProcessorBatchSize(2);
+        Future<ResultsMessage> message = core.executeRequest(reqMsg.getExecutionId(), reqMsg);
+        ResultsMessage rm = message.get(5000, TimeUnit.MILLISECONDS);
+        assertNull(rm.getException());
+        assertEquals(2, rm.getResults().length);
+        RequestWorkItem item = core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
+        assertEquals(100, item.resultsBuffer.getRowCount());
+    }
+    
 	public void helpTestVisibilityFails(String sql) throws Exception {
         RequestMessage reqMsg = exampleRequestMessage(sql); 
         reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);

Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java	2011-02-24 17:57:38 UTC (rev 2937)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java	2011-02-24 22:00:56 UTC (rev 2938)
@@ -233,6 +233,7 @@
         if (context.getGlobalTableStore() == null) {
         	context.setGlobalTableStore(new TempTableStore("SYSTEM"));
         }
+        context.setBufferManager(bufferMgr);
         if (!(dataManager instanceof TempTableDataManager)) {
     	    SessionAwareCache<CachedResults> cache = new SessionAwareCache<CachedResults>();
     	    cache.setBufferManager(bufferMgr);



More information about the teiid-commits mailing list