Author: shawkins
Date: 2011-02-24 17:00:56 -0500 (Thu, 24 Feb 2011)
New Revision: 2938
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleRemoveOptionalJoins.java
trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java
trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
Log:
TEIID-1482 removing unnecessary rebuffering
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java 2011-02-24
17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java 2011-02-24
22:00:56 UTC (rev 2938)
@@ -32,6 +32,8 @@
void remove();
+ void setPrefersMemory(boolean prefers);
+
}
ManagedBatch createManagedBatch(TupleBatch batch, boolean softCache) throws
TeiidComponentException;
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2011-02-24
17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2011-02-24
22:00:56 UTC (rev 2938)
@@ -28,6 +28,7 @@
import java.util.Map;
import java.util.TreeMap;
+import org.teiid.common.buffer.BatchManager.ManagedBatch;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.types.DataTypeManager;
import org.teiid.core.types.Streamable;
@@ -355,6 +356,9 @@
public void setPrefersMemory(boolean prefersMemory) {
this.prefersMemory = prefersMemory;
+ for (ManagedBatch batch : this.batches.values()) {
+ batch.setPrefersMemory(prefersMemory);
+ }
}
public boolean isPrefersMemory() {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-02-24
17:57:38 UTC (rev 2937)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-02-24
22:00:56 UTC (rev 2938)
@@ -210,6 +210,12 @@
this.lobManager = new LobManager();
}
}
+
+ @Override
+ public void setPrefersMemory(boolean prefers) {
+ this.softCache = prefers;
+ //TODO: could recreate the reference
+ }
private void addToCache(boolean update) {
synchronized (activeBatches) {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-02-24
17:57:38 UTC (rev 2937)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-02-24
22:00:56 UTC (rev 2938)
@@ -41,6 +41,7 @@
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleBuffer;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidException;
import org.teiid.core.TeiidProcessingException;
@@ -253,35 +254,32 @@
if (!doneProducingBatches) {
this.processor.getContext().setTimeSliceEnd(System.currentTimeMillis() +
this.processorTimeslice);
sendResultsIfNeeded(null);
- collector.collectTuples();
+ this.resultsBuffer = collector.collectTuples();
+ if (!doneProducingBatches) {
+ doneProducingBatches();
+ addToCache();
+ }
}
- if (doneProducingBatches) {
- if (this.transactionState == TransactionState.ACTIVE) {
- /*
- * TEIID-14 if we are done producing batches, then proactively close transactional
- * executions even ones that were intentionally kept alive. this may
- * break the read of a lob from a transactional source under a transaction
- * if the source does not support holding the clob open after commit
- */
- for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
- if (connectorRequest.isTransactional()) {
- connectorRequest.fullyCloseSource();
- }
- }
- this.transactionState = TransactionState.DONE;
- if (transactionContext.getTransactionType() == TransactionContext.Scope.REQUEST) {
- this.transactionService.commit(transactionContext);
- } else {
- suspend();
- }
+ if (this.transactionState == TransactionState.ACTIVE) {
+ /*
+ * TEIID-14 if we are done producing batches, then proactively close transactional
+ * executions even ones that were intentionally kept alive. this may
+ * break the read of a lob from a transactional source under a transaction
+ * if the source does not support holding the clob open after commit
+ */
+ for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
+ if (connectorRequest.isTransactional()) {
+ connectorRequest.fullyCloseSource();
+ }
+ }
+ this.transactionState = TransactionState.DONE;
+ if (transactionContext.getTransactionType() == TransactionContext.Scope.REQUEST) {
+ this.transactionService.commit(transactionContext);
+ } else {
+ suspend();
}
- sendResultsIfNeeded(null);
- } else {
- moreWork(false); // If the timeslice expired, then the processor can probably produce
more batches.
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "############# PW EXITING on " +
requestID + " - reenqueueing for more processing ###########"); //$NON-NLS-1$
//$NON-NLS-2$
- }
}
+ sendResultsIfNeeded(null);
}
/**
@@ -381,13 +379,9 @@
this.cid = cacheId;
}
processor = request.processor;
- resultsBuffer = processor.createTupleBuffer();
- if (this.cid != null && originalCommand.getCacheHint() != null) {
- LogManager.logDetail(LogConstants.CTX_DQP, requestID, "Using cache hint",
originalCommand.getCacheHint()); //$NON-NLS-1$
- resultsBuffer.setPrefersMemory(originalCommand.getCacheHint().getPrefersMemory());
- }
- collector = new BatchCollector(processor, resultsBuffer) {
+ collector = new BatchCollector(processor, processor.getBufferManager(),
this.request.context, isForwardOnly()) {
protected void flushBatchDirect(TupleBatch batch, boolean add) throws
TeiidComponentException,TeiidProcessingException {
+ resultsBuffer = getTupleBuffer();
boolean added = false;
if (cid != null) {
super.flushBatchDirect(batch, add);
@@ -396,28 +390,12 @@
if (batch.getTerminationFlag()) {
doneProducingBatches();
}
- if (doneProducingBatches && cid != null) {
- Determinism determinismLevel = processor.getContext().getDeterminismLevel();
- CachedResults cr = new CachedResults();
- cr.setCommand(originalCommand);
- cr.setAnalysisRecord(analysisRecord);
- cr.setResults(resultsBuffer);
-
- if (originalCommand.getCacheHint() != null &&
originalCommand.getCacheHint().getDeterminism() != null) {
- determinismLevel = originalCommand.getCacheHint().getDeterminism();
- LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Cache hint modified
the query determinism from ",processor.getContext().getDeterminismLevel(), " to
", determinismLevel }); //$NON-NLS-1$ //$NON-NLS-2$
- }
-
- if (determinismLevel.compareTo(Determinism.SESSION_DETERMINISTIC) <=
0) {
- LogManager.logInfo(LogConstants.CTX_DQP,
QueryPlugin.Util.getString("RequestWorkItem.cache_nondeterministic",
originalCommand)); //$NON-NLS-1$
- }
- dqpCore.getRsCache().put(cid, determinismLevel, cr,
originalCommand.getCacheHint() != null?originalCommand.getCacheHint().getTtl():null);
- }
+ addToCache();
add = sendResultsIfNeeded(batch);
if (!added) {
super.flushBatchDirect(batch, add);
//restrict the buffer size for forward only results
- if (add
+ if (add && !processor.hasFinalBuffer()
&& !batch.getTerminationFlag()
&& this.getTupleBuffer().getManagedRowCount() >= 20 *
this.getTupleBuffer().getBatchSize()) {
//requestMore will trigger more processing
@@ -426,8 +404,11 @@
}
}
};
- resultsBuffer = collector.getTupleBuffer();
- resultsBuffer.setForwardOnly(isForwardOnly());
+ this.resultsBuffer = collector.getTupleBuffer();
+ if (this.resultsBuffer == null) {
+ //This is just a dummy result it will get replaced by collector source
+ resultsBuffer =
this.processor.getBufferManager().createTupleBuffer(this.originalCommand.getProjectedSymbols(),
this.request.context.getConnectionID(), TupleSourceType.FINAL);
+ }
analysisRecord = request.analysisRecord;
analysisRecord.setQueryPlan(processor.getProcessorPlan().getDescriptionProperties());
transactionContext = request.transactionContext;
@@ -442,6 +423,30 @@
this.returnsUpdateCount = request.returnsUpdateCount;
request = null;
}
+
+ private void addToCache() {
+ if (!doneProducingBatches || cid == null) {
+ return;
+ }
+ Determinism determinismLevel = processor.getContext().getDeterminismLevel();
+ CachedResults cr = new CachedResults();
+ cr.setCommand(originalCommand);
+ cr.setAnalysisRecord(analysisRecord);
+ cr.setResults(resultsBuffer);
+ if (originalCommand.getCacheHint() != null) {
+ LogManager.logDetail(LogConstants.CTX_DQP, requestID, "Using cache
hint", originalCommand.getCacheHint()); //$NON-NLS-1$
+ resultsBuffer.setPrefersMemory(originalCommand.getCacheHint().getPrefersMemory());
+ if (originalCommand.getCacheHint().getDeterminism() != null) {
+ determinismLevel = originalCommand.getCacheHint().getDeterminism();
+ LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Cache hint modified
the query determinism from ",processor.getContext().getDeterminismLevel(), " to
", determinismLevel }); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ }
+
+ if (determinismLevel.compareTo(Determinism.SESSION_DETERMINISTIC) <= 0) {
+ LogManager.logInfo(LogConstants.CTX_DQP,
QueryPlugin.Util.getString("RequestWorkItem.cache_nondeterministic",
originalCommand)); //$NON-NLS-1$
+ }
+ dqpCore.getRsCache().put(cid, determinismLevel, cr,
originalCommand.getCacheHint() != null?originalCommand.getCacheHint().getTtl():null);
+ }
/**
* Send results if they have been requested. This should only be called from the
processing thread.
Modified:
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleRemoveOptionalJoins.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleRemoveOptionalJoins.java 2011-02-24
17:57:38 UTC (rev 2937)
+++
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleRemoveOptionalJoins.java 2011-02-24
22:00:56 UTC (rev 2938)
@@ -22,7 +22,6 @@
package org.teiid.query.optimizer.relational.rules;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -40,14 +39,10 @@
import org.teiid.query.optimizer.relational.plantree.NodeConstants;
import org.teiid.query.optimizer.relational.plantree.NodeEditor;
import org.teiid.query.optimizer.relational.plantree.PlanNode;
-import org.teiid.query.resolver.util.ResolverUtil;
import org.teiid.query.sql.LanguageObject;
import org.teiid.query.sql.lang.JoinType;
import org.teiid.query.sql.symbol.AggregateSymbol;
-import org.teiid.query.sql.symbol.Constant;
-import org.teiid.query.sql.symbol.ElementSymbol;
import org.teiid.query.sql.symbol.GroupSymbol;
-import org.teiid.query.sql.util.SymbolMap;
import org.teiid.query.sql.visitor.GroupsUsedByElementsVisitor;
import org.teiid.query.util.CommandContext;
@@ -75,12 +70,12 @@
continue;
}
Set<GroupSymbol> groups =
GroupsUsedByElementsVisitor.getGroups((Collection<? extends
LanguageObject>)planNode.getProperty(NodeConstants.Info.OUTPUT_COLS));
- List<PlanNode> removed = removeJoin(groups, planNode,
planNode.getFirstChild(), metadata);
+ List<PlanNode> removed = removeJoin(groups, planNode,
planNode.getFirstChild());
if (removed != null) {
removedNodes.addAll(removed);
continue;
}
- removed = removeJoin(groups, planNode, planNode.getLastChild(), metadata);
+ removed = removeJoin(groups, planNode, planNode.getLastChild());
if (removed != null) {
removedNodes.addAll(removed);
}
@@ -94,7 +89,7 @@
* @throws TeiidComponentException
* @throws QueryMetadataException
*/
- private List<PlanNode> removeJoin(Set<GroupSymbol> groups, PlanNode
joinNode, PlanNode optionalNode, QueryMetadataInterface metadata) throws
QueryPlannerException, QueryMetadataException, TeiidComponentException {
+ private List<PlanNode> removeJoin(Set<GroupSymbol> groups, PlanNode
joinNode, PlanNode optionalNode) throws QueryPlannerException, QueryMetadataException,
TeiidComponentException {
if (!Collections.disjoint(optionalNode.getGroups(), groups)) {
return null;
}
@@ -110,16 +105,6 @@
joinNode.removeChild(optionalNode);
NodeEditor.removeChildNode(parentNode, joinNode);
- // correct the parent nodes that may be using optional elements
- /*for (GroupSymbol optionalGroup : optionalNode.getGroups()) {
- List<ElementSymbol> optionalElements =
ResolverUtil.resolveElementsInGroup(optionalGroup, metadata);
- List<Constant> replacements = new
ArrayList<Constant>(optionalElements.size());
- for (ElementSymbol elementSymbol : optionalElements) {
- replacements.add(new Constant(null, elementSymbol.getType()));
- }
- FrameUtil.convertFrame(parentNode, optionalGroup, null,
SymbolMap.createSymbolMap(optionalElements, replacements).asMap(), metadata);
- }*/
-
return NodeEditor.findAllNodes(optionalNode, NodeConstants.Types.JOIN);
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java 2011-02-24
17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java 2011-02-24
22:00:56 UTC (rev 2938)
@@ -26,11 +26,15 @@
import org.teiid.api.exception.query.ExpressionEvaluationException;
import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.TupleSource;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.util.Assertion;
+import org.teiid.query.util.CommandContext;
public class BatchCollector {
@@ -51,6 +55,17 @@
* @return List of SingleElementSymbol
*/
List getOutputElements();
+
+ /**
+ * return the final tuple buffer or null if not available
+ * @return
+ * @throws TeiidProcessingException
+ * @throws TeiidComponentException
+ * @throws BlockedException
+ */
+ TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException,
TeiidProcessingException;
+
+ boolean hasFinalBuffer();
}
public static class BatchProducerTupleSource implements TupleSource {
@@ -103,17 +118,33 @@
private boolean done = false;
private TupleBuffer buffer;
+ private boolean forwardOnly;
- public BatchCollector(BatchProducer sourceNode, TupleBuffer buffer) {
+ public BatchCollector(BatchProducer sourceNode, BufferManager bm, CommandContext
context, boolean fowardOnly) throws TeiidComponentException {
this.sourceNode = sourceNode;
- this.buffer = buffer;
+ if (!this.sourceNode.hasFinalBuffer()) {
+ this.buffer = bm.createTupleBuffer(sourceNode.getOutputElements(),
context.getConnectionID(), TupleSourceType.PROCESSOR);
+ this.buffer.setForwardOnly(fowardOnly);
+ }
}
public TupleBuffer collectTuples() throws TeiidComponentException,
TeiidProcessingException {
TupleBatch batch = null;
while(!done) {
- batch = sourceNode.nextBatch();
-
+ if (this.sourceNode.hasFinalBuffer()) {
+ if (this.buffer == null) {
+ TupleBuffer finalBuffer = this.sourceNode.getFinalBuffer();
+ Assertion.isNotNull(finalBuffer);
+ this.buffer = finalBuffer;
+ }
+ if (this.buffer.isFinal()) {
+ this.buffer.setForwardOnly(forwardOnly);
+ done = true;
+ break;
+ }
+ }
+ batch = sourceNode.nextBatch();
+
flushBatch(batch);
// Check for termination condition
@@ -142,10 +173,15 @@
@SuppressWarnings("unused")
protected void flushBatchDirect(TupleBatch batch, boolean add) throws
TeiidComponentException, TeiidProcessingException {
- buffer.addTupleBatch(batch, add);
+ if (!this.sourceNode.hasFinalBuffer()) {
+ buffer.addTupleBatch(batch, add);
+ }
}
public int getRowCount() {
+ if (buffer == null) {
+ return 0;
+ }
return buffer.getRowCount();
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java 2011-02-24
17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java 2011-02-24
22:00:56 UTC (rev 2938)
@@ -31,9 +31,10 @@
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.TupleBuffer;
import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidException;
import org.teiid.core.TeiidProcessingException;
-import org.teiid.core.TeiidException;
import org.teiid.query.analysis.AnalysisRecord;
import org.teiid.query.processor.BatchCollector.BatchProducer;
import org.teiid.query.util.CommandContext;
@@ -159,5 +160,20 @@
props.addProperty(PROP_OUTPUT_COLS,
AnalysisRecord.getOutputColumnProperties(getOutputElements()));
return props;
}
+
+ /**
+ * return the final tuple buffer or null if not available
+ * @return
+ * @throws TeiidProcessingException
+ * @throws TeiidComponentException
+ * @throws BlockedException
+ */
+ public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException,
TeiidProcessingException {
+ return null;
+ }
+ public boolean hasFinalBuffer() {
+ return false;
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java 2011-02-24
17:57:38 UTC (rev 2937)
+++ trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java 2011-02-24
22:00:56 UTC (rev 2938)
@@ -29,7 +29,6 @@
import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.BufferManager.BufferReserveMode;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidException;
import org.teiid.core.TeiidProcessingException;
@@ -123,18 +122,7 @@
TupleBatch result = null;
try {
- // initialize if necessary
- if(!initialized) {
- reserved =
this.bufferMgr.reserveBuffers(this.bufferMgr.getSchemaSize(this.getOutputElements()),
BufferReserveMode.FORCE);
- this.processPlan.initialize(context, this.dataMgr, bufferMgr);
- initialized = true;
- }
- if (!open) {
- // Open the top node for reading
- processPlan.open();
- open = true;
- }
-
+ init();
long currentTime = System.currentTimeMillis();
Assertion.assertTrue(!processorClosed);
@@ -180,6 +168,21 @@
return result;
}
+ private void init() throws TeiidComponentException, TeiidProcessingException {
+ // initialize if necessary
+ if(!initialized) {
+ reserved =
this.bufferMgr.reserveBuffers(this.bufferMgr.getSchemaSize(this.getOutputElements()),
BufferReserveMode.FORCE);
+ this.processPlan.initialize(context, this.dataMgr, bufferMgr);
+ initialized = true;
+ }
+
+ if (!open) {
+ // Open the top node for reading
+ processPlan.open();
+ open = true;
+ }
+ }
+
/**
* Close processing and clean everything up. Should only be called by the same
thread that called process.
@@ -217,15 +220,46 @@
this.requestCanceled = true;
}
- public TupleBuffer createTupleBuffer() throws TeiidComponentException {
- return this.bufferMgr.createTupleBuffer(this.processPlan.getOutputElements(),
context.getConnectionID(), TupleSourceType.PROCESSOR);
- }
-
public BatchCollector createBatchCollector() throws TeiidComponentException {
- return new BatchCollector(this, createTupleBuffer());
+ return new BatchCollector(this, this.bufferMgr, this.context, false);
}
public void setNonBlocking(boolean nonBlocking) {
this.context.setNonBlocking(nonBlocking);
}
+
+ @Override
+ public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException,
TeiidProcessingException {
+ while (true) {
+ long wait = DEFAULT_WAIT;
+ try {
+ init();
+ return this.processPlan.getFinalBuffer();
+ } catch (BlockedException e) {
+ if (!this.context.isNonBlocking()) {
+ throw e;
+ }
+ } catch (TeiidComponentException e) {
+ closeProcessing();
+ throw e;
+ } catch (TeiidProcessingException e) {
+ closeProcessing();
+ throw e;
+ }
+ try {
+ Thread.sleep(wait);
+ } catch (InterruptedException err) {
+ throw new TeiidComponentException(err);
+ }
+ }
+ }
+
+ @Override
+ public boolean hasFinalBuffer() {
+ return this.processPlan.hasFinalBuffer();
+ }
+
+ public BufferManager getBufferManager() {
+ return bufferMgr;
+ }
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2011-02-24
17:57:38 UTC (rev 2937)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2011-02-24
22:00:56 UTC (rev 2938)
@@ -34,6 +34,7 @@
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.TupleBuffer;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.util.Assertion;
@@ -597,4 +598,19 @@
return processingState;
}
+ public boolean hasFinalBuffer() {
+ return false;
+ }
+
+ /**
+ * return the final tuple buffer or null if not available
+ * @return
+ * @throws TeiidProcessingException
+ * @throws TeiidComponentException
+ * @throws BlockedException
+ */
+ public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException,
TeiidProcessingException {
+ return null;
+ }
+
}
\ No newline at end of file
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java 2011-02-24
17:57:38 UTC (rev 2937)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java 2011-02-24
22:00:56 UTC (rev 2938)
@@ -29,6 +29,7 @@
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.TupleBuffer;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.language.SQLConstants;
@@ -285,5 +286,16 @@
}
return false;
}
+
+
+ @Override
+ public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException,
TeiidProcessingException {
+ return root.getFinalBuffer();
+ }
+
+ @Override
+ public boolean hasFinalBuffer() {
+ return root.hasFinalBuffer();
+ }
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java 2011-02-24
17:57:38 UTC (rev 2937)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java 2011-02-24
22:00:56 UTC (rev 2938)
@@ -47,6 +47,7 @@
private int phase = SORT;
private TupleBuffer output;
private TupleSource outputTs;
+ private boolean usingOutput;
private static final int SORT = 2;
private static final int OUTPUT = 3;
@@ -61,6 +62,7 @@
phase = SORT;
output = null;
outputTs = null;
+ usingOutput = false;
}
public void setSortElements(List<OrderByItem> items) {
@@ -103,7 +105,7 @@
private TupleBatch outputPhase() throws BlockedException, TeiidComponentException,
TeiidProcessingException {
if (!this.output.isFinal()) {
this.phase = SORT;
- } else {
+ } else if (!usingOutput) {
this.output.setForwardOnly(true);
}
List<?> tuple = null;
@@ -127,10 +129,10 @@
public void closeDirect() {
if(this.output != null) {
- this.output.remove();
- this.output = null;
- this.outputTs = null;
+ this.output.remove();
+ this.output = null;
}
+ this.outputTs = null;
}
protected void getNodeString(StringBuffer str) {
@@ -166,4 +168,23 @@
return props;
}
+ @Override
+ public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException,
TeiidProcessingException {
+ if (this.output == null) {
+ sortPhase();
+ }
+ usingOutput = true;
+ TupleBuffer result = this.output;
+ if (this.output.isFinal()) {
+ this.output = null;
+ close();
+ }
+ return result;
+ }
+
+ @Override
+ public boolean hasFinalBuffer() {
+ return this.getElements().size() == this.getChildren()[0].getElements().size();
+ }
+
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2011-02-24
17:57:38 UTC (rev 2937)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2011-02-24
22:00:56 UTC (rev 2938)
@@ -31,7 +31,6 @@
import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
-import org.teiid.core.util.Assertion;
import org.teiid.query.processor.BatchCollector;
import org.teiid.query.processor.BatchIterator;
import org.teiid.query.processor.relational.MergeJoinStrategy.SortOption;
@@ -163,7 +162,7 @@
throw new AssertionError("cannot buffer the source"); //$NON-NLS-1$
}
if (collector == null) {
- collector = new BatchCollector(source, createSourceTupleBuffer());
+ collector = new BatchCollector(source, source.getBufferManager(),
source.getContext(), false);
}
this.buffer = collector.collectTuples();
}
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java 2011-02-24
17:57:38 UTC (rev 2937)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java 2011-02-24
22:00:56 UTC (rev 2938)
@@ -39,7 +39,7 @@
public class TestTupleBuffer {
- private final class FakeBatchManager implements BatchManager {
+ public static final class FakeBatchManager implements BatchManager {
@Override
public void remove() {
@@ -60,6 +60,11 @@
throws TeiidComponentException {
return batch;
}
+
+ @Override
+ public void setPrefersMemory(boolean prefers) {
+
+ }
};
}
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java 2011-02-24
17:57:38 UTC (rev 2937)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java 2011-02-24
22:00:56 UTC (rev 2938)
@@ -21,8 +21,7 @@
*/
package org.teiid.dqp.internal.process;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -32,15 +31,11 @@
import java.util.List;
import org.junit.Test;
-import org.mockito.Mockito;
import org.teiid.cache.Cache;
import org.teiid.cache.DefaultCache;
-import org.teiid.common.buffer.BatchManager;
import org.teiid.common.buffer.BufferManager;
-import org.teiid.common.buffer.FileStore;
-import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleBuffer;
-import org.teiid.core.TeiidComponentException;
+import org.teiid.common.buffer.TestTupleBuffer.FakeBatchManager;
import org.teiid.core.types.DataTypeManager;
import org.teiid.dqp.service.FakeBufferService;
import org.teiid.query.sql.lang.Query;
@@ -49,35 +44,6 @@
public class TestCachedResults {
- private final class FakeBatchManager implements BatchManager {
- @Override
- public void remove() {
-
- }
-
- @Override
- public ManagedBatch createManagedBatch(final TupleBatch batch, boolean softCache)
- throws TeiidComponentException {
- return new ManagedBatch() {
-
- @Override
- public void remove() {
-
- }
-
- @Override
- public TupleBatch getBatch(boolean cache, String[] types)
- throws TeiidComponentException {
- return batch;
- }
- };
- }
-
- @Override
- public FileStore createStorage(String prefix) {
- return Mockito.mock(FileStore.class);
- }
- }
@Test
public void testCaching() throws Exception {
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-02-24
17:57:38 UTC (rev 2937)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-02-24
22:00:56 UTC (rev 2938)
@@ -222,6 +222,25 @@
}
}
+ @Test public void testBufferReuse() throws Exception {
+ //the sql should return 100 rows
+ String sql = "SELECT A.IntKey FROM BQT1.SmallA as A, BQT1.SmallA as B ORDER
BY A.IntKey"; //$NON-NLS-1$
+ String userName = "1"; //$NON-NLS-1$
+ String sessionid = "1"; //$NON-NLS-1$
+
+ RequestMessage reqMsg = exampleRequestMessage(sql);
+ reqMsg.setCursorType(ResultSet.TYPE_FORWARD_ONLY);
+ DQPWorkContext.getWorkContext().getSession().setSessionId(sessionid);
+ DQPWorkContext.getWorkContext().getSession().setUserName(userName);
+ ((BufferManagerImpl)core.getBufferManager()).setProcessorBatchSize(2);
+ Future<ResultsMessage> message =
core.executeRequest(reqMsg.getExecutionId(), reqMsg);
+ ResultsMessage rm = message.get(5000, TimeUnit.MILLISECONDS);
+ assertNull(rm.getException());
+ assertEquals(2, rm.getResults().length);
+ RequestWorkItem item =
core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
+ assertEquals(100, item.resultsBuffer.getRowCount());
+ }
+
public void helpTestVisibilityFails(String sql) throws Exception {
RequestMessage reqMsg = exampleRequestMessage(sql);
reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);
Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2011-02-24
17:57:38 UTC (rev 2937)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2011-02-24
22:00:56 UTC (rev 2938)
@@ -233,6 +233,7 @@
if (context.getGlobalTableStore() == null) {
context.setGlobalTableStore(new TempTableStore("SYSTEM"));
}
+ context.setBufferManager(bufferMgr);
if (!(dataManager instanceof TempTableDataManager)) {
SessionAwareCache<CachedResults> cache = new
SessionAwareCache<CachedResults>();
cache.setBufferManager(bufferMgr);