Author: shawkins
Date: 2010-05-28 00:08:22 -0400 (Fri, 28 May 2010)
New Revision: 2164
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java
trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
Log:
TEIID-913 ensuring that even if the batch contents are not added that lob references are
still captured
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-05-28
04:06:56 UTC (rev 2163)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-05-28
04:08:22 UTC (rev 2164)
@@ -42,9 +42,8 @@
import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidException;
import org.teiid.core.TeiidProcessingException;
-import org.teiid.core.TeiidException;
-import org.teiid.core.TeiidException;
import org.teiid.core.types.DataTypeManager;
import org.teiid.dqp.DQPPlugin;
import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
@@ -61,7 +60,6 @@
import org.teiid.query.execution.QueryExecPlugin;
import org.teiid.query.processor.BatchCollector;
import org.teiid.query.processor.QueryProcessor;
-import org.teiid.query.processor.BatchCollector.BatchHandler;
import org.teiid.query.sql.lang.Command;
import org.teiid.query.sql.lang.SPParameter;
import org.teiid.query.sql.lang.StoredProcedure;
@@ -354,12 +352,29 @@
this.cid = cacheId;
}
processor = request.processor;
- collector = processor.createBatchCollector();
- collector.setBatchHandler(new BatchHandler() {
- public boolean batchProduced(TupleBatch batch) throws TeiidComponentException {
- return sendResultsIfNeeded(batch);
+ resultsBuffer = processor.createTupleBuffer();
+ collector = new BatchCollector(processor, resultsBuffer) {
+ protected void flushBatchDirect(TupleBatch batch, boolean add) throws
TeiidComponentException,TeiidProcessingException {
+ boolean added = false;
+ if (cid != null || resultsBuffer.isLobs()) {
+ super.flushBatchDirect(batch, add);
+ added = true;
+ }
+ doneProducingBatches = batch.getTerminationFlag();
+ if (doneProducingBatches && cid != null) {
+ boolean sessionScope = processor.getContext().isSessionFunctionEvaluated();
+ CachedResults cr = new CachedResults();
+ cr.setCommand(originalCommand);
+ cr.setAnalysisRecord(analysisRecord);
+ cr.setResults(resultsBuffer);
+ dqpCore.getRsCache().put(cid, sessionScope, cr);
+ }
+ add = sendResultsIfNeeded(batch);
+ if (!added) {
+ super.flushBatchDirect(batch, add);
+ }
}
- });
+ };
resultsBuffer = collector.getTupleBuffer();
resultsBuffer.setForwardOnly(isForwardOnly());
analysisRecord = request.analysisRecord;
@@ -381,17 +396,6 @@
* Send results if they have been requested. This should only be called from the
processing thread.
*/
protected boolean sendResultsIfNeeded(TupleBatch batch) throws TeiidComponentException
{
- if (batch != null) {
- doneProducingBatches = batch.getTerminationFlag();
- if (doneProducingBatches && cid != null) {
- boolean sessionScope = this.processor.getContext().isSessionFunctionEvaluated();
- CachedResults cr = new CachedResults();
- cr.setCommand(originalCommand);
- cr.setAnalysisRecord(analysisRecord);
- cr.setResults(this.resultsBuffer);
- dqpCore.getRsCache().put(cid, sessionScope, cr);
- }
- }
ResultsMessage response = null;
ResultsReceiver<ResultsMessage> receiver = null;
boolean result = true;
Modified: trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java 2010-05-28
04:06:56 UTC (rev 2163)
+++ trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java 2010-05-28
04:08:22 UTC (rev 2164)
@@ -51,12 +51,7 @@
List getOutputElements();
}
- public interface BatchHandler {
- boolean batchProduced(TupleBatch batch) throws TeiidProcessingException,
TeiidComponentException;
- }
-
private BatchProducer sourceNode;
- private BatchHandler batchHandler;
private boolean done = false;
private TupleBuffer buffer;
@@ -91,19 +86,16 @@
* Flush the batch by giving it to the buffer manager.
*/
private void flushBatch(TupleBatch batch) throws TeiidComponentException,
TeiidProcessingException {
- boolean add = true;
- if (this.batchHandler != null && (batch.getRowCount() > 0 ||
batch.getTerminationFlag())) {
- add = this.batchHandler.batchProduced(batch);
- }
- // Add batch
- if(batch.getRowCount() > 0 || batch.getTerminationFlag()) {
- buffer.addTupleBatch(batch, add);
- }
+ if (batch.getRowCount() == 0 && !batch.getTerminationFlag()) {
+ return;
+ }
+ flushBatchDirect(batch, true);
}
- public void setBatchHandler(BatchHandler batchHandler) {
- this.batchHandler = batchHandler;
- }
+ @SuppressWarnings("unused")
+ protected void flushBatchDirect(TupleBatch batch, boolean add) throws
TeiidComponentException, TeiidProcessingException {
+ buffer.addTupleBatch(batch, add);
+ }
public int getRowCount() {
return buffer.getRowCount();
Modified: trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java 2010-05-28
04:06:56 UTC (rev 2163)
+++ trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java 2010-05-28
04:08:22 UTC (rev 2164)
@@ -54,7 +54,7 @@
*/
public abstract class ProcessorPlan implements Cloneable, BatchProducer {
- private List warnings = null;
+ private List<Exception> warnings = null;
private CommandContext context;
@@ -77,18 +77,18 @@
* the current warnings list. The warnings are in order they were detected.
* @return Current list of warnings, never null
*/
- public List getAndClearWarnings() {
+ public List<Exception> getAndClearWarnings() {
if (warnings == null) {
return null;
}
- List copied = warnings;
+ List<Exception> copied = warnings;
warnings = null;
return copied;
}
protected void addWarning(TeiidException warning) {
if (warnings == null) {
- warnings = new ArrayList(1);
+ warnings = new ArrayList<Exception>(1);
}
warnings.add(warning);
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java 2010-05-28
04:06:56 UTC (rev 2163)
+++ trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java 2010-05-28
04:08:22 UTC (rev 2164)
@@ -27,6 +27,7 @@
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.BufferManager.BufferReserveMode;
import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
@@ -223,8 +224,12 @@
this.requestCanceled = true;
}
+ public TupleBuffer createTupleBuffer() throws TeiidComponentException {
+ return this.bufferMgr.createTupleBuffer(this.processPlan.getOutputElements(),
context.getConnectionID(), TupleSourceType.PROCESSOR);
+ }
+
public BatchCollector createBatchCollector() throws TeiidComponentException {
- return new BatchCollector(this,
this.bufferMgr.createTupleBuffer(this.processPlan.getOutputElements(),
context.getConnectionID(), TupleSourceType.PROCESSOR));
+ return new BatchCollector(this, createTupleBuffer());
}
public void setNonBlocking(boolean nonBlocking) {