[teiid-commits] teiid SVN: r2164 - in trunk/engine/src/main/java/org/teiid: query/processor and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Fri May 28 00:08:23 EDT 2010


Author: shawkins
Date: 2010-05-28 00:08:22 -0400 (Fri, 28 May 2010)
New Revision: 2164

Modified:
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
   trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java
   trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
Log:
TEIID-913 ensuring that even if the batch contents are not added that lob references are still captured

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-05-28 04:06:56 UTC (rev 2163)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-05-28 04:08:22 UTC (rev 2164)
@@ -42,9 +42,8 @@
 import org.teiid.common.buffer.TupleBatch;
 import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidException;
 import org.teiid.core.TeiidProcessingException;
-import org.teiid.core.TeiidException;
-import org.teiid.core.TeiidException;
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.dqp.DQPPlugin;
 import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
@@ -61,7 +60,6 @@
 import org.teiid.query.execution.QueryExecPlugin;
 import org.teiid.query.processor.BatchCollector;
 import org.teiid.query.processor.QueryProcessor;
-import org.teiid.query.processor.BatchCollector.BatchHandler;
 import org.teiid.query.sql.lang.Command;
 import org.teiid.query.sql.lang.SPParameter;
 import org.teiid.query.sql.lang.StoredProcedure;
@@ -354,12 +352,29 @@
         	this.cid = cacheId;
         }
 		processor = request.processor;
-		collector = processor.createBatchCollector();
-		collector.setBatchHandler(new BatchHandler() {
-			public boolean batchProduced(TupleBatch batch) throws TeiidComponentException {
-			    return sendResultsIfNeeded(batch);
+		resultsBuffer = processor.createTupleBuffer();
+		collector = new BatchCollector(processor, resultsBuffer) {
+			protected void flushBatchDirect(TupleBatch batch, boolean add) throws TeiidComponentException,TeiidProcessingException {
+				boolean added = false;
+				if (cid != null || resultsBuffer.isLobs()) {
+					super.flushBatchDirect(batch, add);
+					added = true;
+				}
+				doneProducingBatches = batch.getTerminationFlag();
+				if (doneProducingBatches && cid != null) {
+			    	boolean sessionScope = processor.getContext().isSessionFunctionEvaluated();
+	            	CachedResults cr = new CachedResults();
+	            	cr.setCommand(originalCommand);
+	                cr.setAnalysisRecord(analysisRecord);
+	                cr.setResults(resultsBuffer);
+	                dqpCore.getRsCache().put(cid, sessionScope, cr);
+			    }
+				add = sendResultsIfNeeded(batch);
+				if (!added) {
+					super.flushBatchDirect(batch, add);
+				}
 			}
-		});
+		};
 		resultsBuffer = collector.getTupleBuffer();
 		resultsBuffer.setForwardOnly(isForwardOnly());
 		analysisRecord = request.analysisRecord;
@@ -381,17 +396,6 @@
 	 * Send results if they have been requested.  This should only be called from the processing thread.
 	 */
 	protected boolean sendResultsIfNeeded(TupleBatch batch) throws TeiidComponentException {
-		if (batch != null) {
-			doneProducingBatches = batch.getTerminationFlag();
-			if (doneProducingBatches && cid != null) {
-		    	boolean sessionScope = this.processor.getContext().isSessionFunctionEvaluated();
-            	CachedResults cr = new CachedResults();
-            	cr.setCommand(originalCommand);
-                cr.setAnalysisRecord(analysisRecord);
-                cr.setResults(this.resultsBuffer);
-                dqpCore.getRsCache().put(cid, sessionScope, cr);
-		    }
-		}
 		ResultsMessage response = null;
 		ResultsReceiver<ResultsMessage> receiver = null;
 		boolean result = true;

Modified: trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java	2010-05-28 04:06:56 UTC (rev 2163)
+++ trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java	2010-05-28 04:08:22 UTC (rev 2164)
@@ -51,12 +51,7 @@
 	    List getOutputElements();
 	}
 	
-	public interface BatchHandler {
-		boolean batchProduced(TupleBatch batch) throws TeiidProcessingException, TeiidComponentException;
-	}
-
     private BatchProducer sourceNode;
-    private BatchHandler batchHandler;
 
     private boolean done = false;
     private TupleBuffer buffer;
@@ -91,19 +86,16 @@
      * Flush the batch by giving it to the buffer manager.
      */
     private void flushBatch(TupleBatch batch) throws TeiidComponentException, TeiidProcessingException {
-    	boolean add = true;
-		if (this.batchHandler != null && (batch.getRowCount() > 0 || batch.getTerminationFlag())) {
-        	add = this.batchHandler.batchProduced(batch);
-        }
-    	// Add batch
-        if(batch.getRowCount() > 0 || batch.getTerminationFlag()) {
-        	buffer.addTupleBatch(batch, add);
-        }
+    	if (batch.getRowCount() == 0 && !batch.getTerminationFlag()) {
+    		return;
+    	}
+    	flushBatchDirect(batch, true);
     }
     
-	public void setBatchHandler(BatchHandler batchHandler) {
-		this.batchHandler = batchHandler;
-	}
+    @SuppressWarnings("unused")
+	protected void flushBatchDirect(TupleBatch batch, boolean add) throws TeiidComponentException, TeiidProcessingException {
+    	buffer.addTupleBatch(batch, add);
+    }
     
     public int getRowCount() {
         return buffer.getRowCount();

Modified: trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java	2010-05-28 04:06:56 UTC (rev 2163)
+++ trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java	2010-05-28 04:08:22 UTC (rev 2164)
@@ -54,7 +54,7 @@
  */
 public abstract class ProcessorPlan implements Cloneable, BatchProducer {
 	
-    private List warnings = null;
+    private List<Exception> warnings = null;
     
     private CommandContext context;
 
@@ -77,18 +77,18 @@
      * the current warnings list.  The warnings are in order they were detected.
      * @return Current list of warnings, never null
      */
-    public List getAndClearWarnings() {
+    public List<Exception> getAndClearWarnings() {
         if (warnings == null) {
             return null;
         }
-        List copied = warnings;
+        List<Exception> copied = warnings;
         warnings = null;
         return copied;
     }
     
     protected void addWarning(TeiidException warning) {
         if (warnings == null) {
-            warnings = new ArrayList(1);
+            warnings = new ArrayList<Exception>(1);
         }
         warnings.add(warning);
     }

Modified: trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java	2010-05-28 04:06:56 UTC (rev 2163)
+++ trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java	2010-05-28 04:08:22 UTC (rev 2164)
@@ -27,6 +27,7 @@
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.common.buffer.BufferManager.BufferReserveMode;
 import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.core.TeiidComponentException;
@@ -223,8 +224,12 @@
         this.requestCanceled = true;
     }
     
+    public TupleBuffer createTupleBuffer() throws TeiidComponentException {
+    	return this.bufferMgr.createTupleBuffer(this.processPlan.getOutputElements(), context.getConnectionID(), TupleSourceType.PROCESSOR);
+    }
+    
 	public BatchCollector createBatchCollector() throws TeiidComponentException {
-		return new BatchCollector(this, this.bufferMgr.createTupleBuffer(this.processPlan.getOutputElements(), context.getConnectionID(), TupleSourceType.PROCESSOR));
+		return new BatchCollector(this, createTupleBuffer());
 	}
 	
 	public void setNonBlocking(boolean nonBlocking) {



More information about the teiid-commits mailing list