Author: shawkins
Date: 2012-07-12 13:45:57 -0400 (Thu, 12 Jul 2012)
New Revision: 4233
Added:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java
Modified:
trunk/api/src/main/java/org/teiid/CommandContext.java
trunk/api/src/main/java/org/teiid/translator/BaseDelegatingExecutionFactory.java
trunk/api/src/main/java/org/teiid/translator/CacheDirective.java
trunk/api/src/main/java/org/teiid/translator/ExecutionContext.java
trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java
trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ExecutionContextImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/LanguageBridgeFactory.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/PlanToProcessConverter.java
trunk/engine/src/main/java/org/teiid/query/parser/ParseInfo.java
trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java
trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
trunk/engine/src/main/java/org/teiid/query/processor/RegisterRequestParameter.java
trunk/engine/src/main/java/org/teiid/query/processor/proc/ForEachRowPlan.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/AccessNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/ArrayTableNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/BatchedUpdateNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentAccessNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureAccessNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentValueSource.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/GroupingNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/InsertPlanExecutionNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/LimitNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/NullNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/PlanExecutionNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectIntoNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/TextTableNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/UnionAllNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/WindowFunctionProjectNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java
trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java
trunk/engine/src/main/java/org/teiid/query/sql/visitor/SQLStringVisitor.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedStatement.java
trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
Log:
TEIID-2098 TEIID-1598 fixing several issues with maxRows and further refining connector
caching logic
Modified: trunk/api/src/main/java/org/teiid/CommandContext.java
===================================================================
--- trunk/api/src/main/java/org/teiid/CommandContext.java 2012-07-12 14:02:50 UTC (rev
4232)
+++ trunk/api/src/main/java/org/teiid/CommandContext.java 2012-07-12 17:45:57 UTC (rev
4233)
@@ -149,6 +149,7 @@
/**
* Get the number of times this command has been reused. Useful
* in continuous executions.
+ * @see #isContinuous()
* @return
*/
long getReuseCount();
@@ -166,4 +167,10 @@
*/
void addWarning(Exception ex);
+ /**
+ *
+ * @return true if this is a continuous query
+ */
+ boolean isContinuous();
+
}
Modified:
trunk/api/src/main/java/org/teiid/translator/BaseDelegatingExecutionFactory.java
===================================================================
---
trunk/api/src/main/java/org/teiid/translator/BaseDelegatingExecutionFactory.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/api/src/main/java/org/teiid/translator/BaseDelegatingExecutionFactory.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -433,4 +433,10 @@
public boolean supportsOnlyLiteralComparison() {
return delegate.supportsOnlyLiteralComparison();
}
+ @Override
+ public CacheDirective getCacheDirective(Command command,
+ ExecutionContext executionContext, RuntimeMetadata metadata)
+ throws TranslatorException {
+ return delegate.getCacheDirective(command, executionContext, metadata);
+ }
}
Modified: trunk/api/src/main/java/org/teiid/translator/CacheDirective.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/CacheDirective.java 2012-07-12 14:02:50
UTC (rev 4232)
+++ trunk/api/src/main/java/org/teiid/translator/CacheDirective.java 2012-07-12 17:45:57
UTC (rev 4233)
@@ -30,8 +30,9 @@
public class CacheDirective implements Serializable {
public enum Scope {
+ NONE,
+ SESSION,
USER,
- SESSION,
VDB
}
@@ -58,10 +59,18 @@
this.prefersMemory = prefersMemory;
}
+ /**
+ * Get the time to live in milliseconds
+ * @return
+ */
public Long getTtl() {
return ttl;
}
+ /**
+ * Set the time to live in milliseconds
+ * @param ttl
+ */
public void setTtl(Long ttl) {
this.ttl = ttl;
}
Modified: trunk/api/src/main/java/org/teiid/translator/ExecutionContext.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/ExecutionContext.java 2012-07-12 14:02:50
UTC (rev 4232)
+++ trunk/api/src/main/java/org/teiid/translator/ExecutionContext.java 2012-07-12 17:45:57
UTC (rev 4233)
@@ -201,4 +201,10 @@
* @return
*/
CommandContext getCommandContext();
+
+ /**
+ * Get the {@link CacheDirective}
+ * @return
+ */
+ CacheDirective getCacheDirective();
}
Modified: trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java 2012-07-12 14:02:50
UTC (rev 4232)
+++ trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java 2012-07-12 17:45:57
UTC (rev 4233)
@@ -46,6 +46,7 @@
import org.teiid.metadata.FunctionParameter;
import org.teiid.metadata.MetadataFactory;
import org.teiid.metadata.RuntimeMetadata;
+import org.teiid.translator.CacheDirective.Scope;
import org.teiid.translator.TypeFacility.RUNTIME_CODES;
import org.teiid.translator.TypeFacility.RUNTIME_NAMES;
@@ -986,4 +987,18 @@
return true;
}
+ /**
+ * Get the {@link CacheDirective} to control command caching.
+ * <p>Use {@link Scope#NONE} to indicate to the engine that no caching should be
performed by the engine.</p>
+ * <p>If cache parameters on the {@link CacheDirective} will be changed by the
{@link Execution}, then
+ * a new instance of a {@link CacheDirective} should be set each time.</p>
+ * @param command
+ * @param executionContext
+ * @param metadata
+ * @throws TranslatorException
+ */
+ public CacheDirective getCacheDirective(Command command, ExecutionContext
executionContext, RuntimeMetadata metadata) throws TranslatorException {
+ return null;
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2012-07-12
14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -22,6 +22,7 @@
package org.teiid.common.buffer;
+import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
@@ -156,6 +157,9 @@
for (Long batch : this.batches.values()) {
this.manager.remove(batch);
}
+ if (this.lobManager != null) {
+ this.lobManager.remove();
+ }
this.batches.clear();
}
@@ -236,9 +240,6 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Removing
TupleBuffer:", this.tupleSourceID); //$NON-NLS-1$
}
- if (this.lobManager != null) {
- this.lobManager.remove();
- }
this.batchBuffer = null;
purge();
this.manager.remove();
@@ -368,5 +369,40 @@
}
return this.lobManager.getLobCount();
}
+
+ public void truncateTo(int rowLimit) throws TeiidComponentException {
+ if (rowCount <= rowLimit) {
+ return;
+ }
+ //TODO this could be more efficient with handling the last batch
+ TupleBatch last = this.getBatch(rowLimit);
+ TupleBatch tb = last;
+ if (this.batchBuffer != null) {
+ this.batchBuffer.clear();
+ }
+ int begin = tb.getBeginRow();
+ do {
+ if (tb == null) {
+ tb = this.getBatch(begin);
+ }
+ Long id = this.batches.remove(begin);
+ if (id != null) {
+ this.manager.remove(id);
+ }
+ if (this.lobManager != null) {
+ for (List<?> tuple : tb.getTuples()) {
+ this.lobManager.updateReferences(tuple, ReferenceMode.REMOVE);
+ }
+ }
+ begin = tb.getEndRow() + 1;
+ tb = null;
+ } while (begin <= rowCount);
+ rowCount = last.getBeginRow() - 1;
+ Iterator<List<?>> iter = last.getTuples().iterator();
+ while (rowCount < rowLimit) {
+ addTuple(iter.next());
+ }
+ saveBatch(false);
+ }
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -25,6 +25,7 @@
import org.teiid.common.buffer.BlockedException;
import org.teiid.dqp.internal.process.RequestWorkItem;
import org.teiid.dqp.message.AtomicResultsMessage;
+import org.teiid.translator.CacheDirective;
import org.teiid.translator.TranslatorException;
@@ -46,5 +47,7 @@
boolean isDataAvailable();
boolean copyLobs();
+
+ CacheDirective getCacheDirective() throws TranslatorException;
}
\ No newline at end of file
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -40,7 +40,6 @@
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.CommandLogMessage.Event;
-import org.teiid.metadata.RuntimeMetadata;
import org.teiid.query.QueryPlugin;
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.metadata.TempMetadataAdapter;
@@ -49,6 +48,7 @@
import org.teiid.query.sql.lang.QueryCommand;
import org.teiid.query.sql.lang.StoredProcedure;
import org.teiid.resource.spi.WrappedConnection;
+import org.teiid.translator.CacheDirective;
import org.teiid.translator.DataNotAvailableException;
import org.teiid.translator.Execution;
import org.teiid.translator.ExecutionFactory;
@@ -65,7 +65,7 @@
private ConnectorManager manager;
private AtomicRequestMessage requestMsg;
private ExecutionFactory<Object, Object> connector;
- private QueryMetadataInterface queryMetadata;
+ private RuntimeMetadataImpl queryMetadata;
/* Created on new request */
private Object connection;
@@ -81,6 +81,7 @@
private boolean error;
private AtomicBoolean isCancelled = new AtomicBoolean();
+ private org.teiid.language.Command translatedCommand;
ConnectorWorkItem(AtomicRequestMessage message, ConnectorManager manager) {
this.id = message.getAtomicRequestID();
@@ -99,9 +100,13 @@
this.connector = manager.getExecutionFactory();
VDBMetaData vdb = requestMsg.getWorkContext().getVDB();
- this.queryMetadata = vdb.getAttachment(QueryMetadataInterface.class);
- this.queryMetadata = new TempMetadataAdapter(this.queryMetadata, new
TempMetadataStore());
+ QueryMetadataInterface qmi = vdb.getAttachment(QueryMetadataInterface.class);
+ qmi = new TempMetadataAdapter(qmi, new TempMetadataStore());
+ this.queryMetadata = new RuntimeMetadataImpl(qmi);
this.securityContext.setTransactional(requestMsg.isTransactional());
+ LanguageBridgeFactory factory = new LanguageBridgeFactory(this.queryMetadata);
+ factory.setConvertIn(!this.connector.supportsInCriteria());
+ translatedCommand = factory.translate(message.getCommand());
}
@Override
@@ -154,12 +159,14 @@
} catch (Throwable e) {
LogManager.logError(LogConstants.CTX_CONNECTOR, e, e.getMessage());
} finally {
- try {
- this.connector.closeConnection(connection, connectionFactory);
- } catch (Throwable e) {
- LogManager.logError(LogConstants.CTX_CONNECTOR, e, e.getMessage());
+ if (this.connector.isSourceRequired() && this.connection != null) {
+ try {
+ this.connector.closeConnection(connection, connectionFactory);
+ } catch (Throwable e) {
+ LogManager.logError(LogConstants.CTX_CONNECTOR, e, e.getMessage());
+ }
+ LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id,
"Closed connection"}); //$NON-NLS-1$
}
- LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id,
"Closed connection"}); //$NON-NLS-1$
}
}
@@ -217,16 +224,12 @@
if (command instanceof StoredProcedure) {
this.expectedColumns =
((StoredProcedure)command).getResultSetColumns().size();
}
- LanguageBridgeFactory factory = new LanguageBridgeFactory(queryMetadata);
- factory.setConvertIn(!this.connector.supportsInCriteria());
- org.teiid.language.Command translatedCommand = factory.translate(command);
Execution exec =
this.requestMsg.getCommandContext().getReusableExecution(this.securityContext.getPartIdentifier());
if (exec != null) {
((ReusableExecution)exec).reset(translatedCommand, this.securityContext,
connection);
} else {
- RuntimeMetadata rmd = new RuntimeMetadataImpl(queryMetadata);
- exec = connector.createExecution(translatedCommand, this.securityContext, rmd,
(unwrapped == null) ? this.connection:unwrapped);
+ exec = connector.createExecution(translatedCommand, this.securityContext,
queryMetadata, (unwrapped == null) ? this.connection:unwrapped);
if (exec instanceof ReusableExecution<?>) {
this.requestMsg.getCommandContext().putReusableExecution(this.securityContext.getPartIdentifier(),
(ReusableExecution<?>) exec);
}
@@ -388,5 +391,12 @@
public boolean copyLobs() {
return this.connector.isCopyLobs();
}
+
+ @Override
+ public CacheDirective getCacheDirective() throws TranslatorException {
+ CacheDirective cd = connector.getCacheDirective(this.translatedCommand,
this.securityContext, this.queryMetadata);
+ this.securityContext.setCacheDirective(cd);
+ return cd;
+ }
}
\ No newline at end of file
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ExecutionContextImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ExecutionContextImpl.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ExecutionContextImpl.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -38,6 +38,7 @@
import org.teiid.dqp.internal.process.RequestWorkItem;
import org.teiid.dqp.message.RequestID;
import org.teiid.query.util.CommandContext;
+import org.teiid.translator.CacheDirective;
import org.teiid.translator.ExecutionContext;
@@ -63,6 +64,7 @@
private String generalHint;
private String hint;
private CommandContext commandContext;
+ private CacheDirective cacheDirective;
public ExecutionContextImpl(String vdbName, int vdbVersion, Serializable
executionPayload,
String originalConnectionID, String connectorName, long requestId, String
partId, String execCount) {
@@ -269,4 +271,13 @@
public int getVirtualDatabaseVersion() {
return getVdbVersion();
}
+
+ @Override
+ public CacheDirective getCacheDirective() {
+ return cacheDirective;
+ }
+
+ public void setCacheDirective(CacheDirective directive) {
+ this.cacheDirective = directive;
+ }
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/LanguageBridgeFactory.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/LanguageBridgeFactory.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/LanguageBridgeFactory.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -139,6 +139,10 @@
}
}
+ public LanguageBridgeFactory(RuntimeMetadataImpl metadata) {
+ this.metadataFactory = metadata;
+ }
+
public void setConvertIn(boolean convertIn) {
this.convertIn = convertIn;
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -56,9 +56,12 @@
import org.teiid.dqp.internal.datamgr.ConnectorManager;
import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
import org.teiid.dqp.internal.datamgr.ConnectorWork;
+import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
+import org.teiid.dqp.internal.process.TupleSourceCache.CachableVisitor;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.RequestID;
import org.teiid.events.EventDistributor;
+import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
import org.teiid.metadata.*;
@@ -67,6 +70,7 @@
import org.teiid.query.metadata.TempMetadataID;
import org.teiid.query.metadata.TransformationMetadata;
import org.teiid.query.optimizer.relational.RelationalPlanner;
+import org.teiid.query.parser.ParseInfo;
import org.teiid.query.processor.CollectionTupleSource;
import org.teiid.query.processor.ProcessorDataManager;
import org.teiid.query.processor.RegisterRequestParameter;
@@ -75,12 +79,16 @@
import org.teiid.query.sql.lang.SourceHint;
import org.teiid.query.sql.lang.StoredProcedure;
import org.teiid.query.sql.lang.UnaryFromClause;
+import org.teiid.query.sql.navigator.PreOrPostOrderNavigator;
import org.teiid.query.sql.symbol.Constant;
import org.teiid.query.sql.symbol.GroupSymbol;
import org.teiid.query.sql.visitor.GroupCollectorVisitor;
import org.teiid.query.tempdata.GlobalTableStore;
import org.teiid.query.tempdata.GlobalTableStoreImpl.MatTableInfo;
import org.teiid.query.util.CommandContext;
+import org.teiid.translator.CacheDirective;
+import org.teiid.translator.TranslatorException;
+import org.teiid.translator.CacheDirective.Scope;
/**
* Full {@link ProcessorDataManager} implementation that
@@ -181,13 +189,51 @@
if (parameterObject.limit > 0) {
aqr.setFetchSize(Math.min(parameterObject.limit, aqr.getFetchSize()));
}
+ Collection<GroupSymbol> accessedGroups = null;
if (context.getDataObjects() != null) {
- for (GroupSymbol gs : GroupCollectorVisitor.getGroupsIgnoreInlineViews(command,
false)) {
+ accessedGroups = GroupCollectorVisitor.getGroupsIgnoreInlineViews(command, false);
+ for (GroupSymbol gs : accessedGroups) {
context.accessedDataObject(gs.getMetadataID());
}
}
ConnectorManagerRepository cmr =
workItem.getDqpWorkContext().getVDB().getAttachment(ConnectorManagerRepository.class);
- ConnectorWork work =
cmr.getConnectorManager(aqr.getConnectorName()).registerRequest(aqr);
+ ConnectorManager connectorManager = cmr.getConnectorManager(aqr.getConnectorName());
+ ConnectorWork work = connectorManager.registerRequest(aqr);
+ CacheID cid = null;
+ CacheDirective cd = null;
+ if (workItem.getRsCache() != null && command.areResultsCachable()) {
+ CachableVisitor cv = new CachableVisitor();
+ PreOrPostOrderNavigator.doVisit(command, cv, PreOrPostOrderNavigator.PRE_ORDER,
true);
+ if (cv.cacheable) {
+ try {
+ cd = work.getCacheDirective();
+ } catch (TranslatorException e) {
+ throw new TeiidProcessingException(QueryPlugin.Event.TEIID30504, e,
aqr.getConnectorName() + ": " + e.getMessage()); //$NON-NLS-1$
+ }
+ if (cd != null) {
+ if (cd.getScope() == Scope.NONE) {
+ parameterObject.doNotCache = true;
+ } else {
+ String cmdString = command.toString();
+ if (cmdString.length() < 200000) { //TODO: this check won't be needed if
keys aren't exclusively held in memory
+ cid = new CacheID(workItem.getDqpWorkContext(), ParseInfo.DEFAULT_INSTANCE,
cmdString);
+ cid.setParameters(cv.parameters);
+ CachedResults cr = workItem.getRsCache().get(cid);
+ if (cr != null) {
+ parameterObject.doNotCache = true;
+ LogManager.logDetail(LogConstants.CTX_DQP, "Using cache entry for",
cid); //$NON-NLS-1$
+ work.close();
+ return cr.getResults().createIndexedTupleSource();
+ }
+ }
+ }
+ } else {
+ LogManager.logTrace(LogConstants.CTX_DQP, aqr.getAtomicRequestID(), "no cache
directive"); //$NON-NLS-1$
+ }
+ } else {
+ LogManager.logTrace(LogConstants.CTX_DQP, aqr.getAtomicRequestID(), "command not
cachable"); //$NON-NLS-1$
+ }
+ }
work.setRequestWorkItem(workItem);
return new DataTierTupleSource(aqr, workItem, work, this,
parameterObject.limit);
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -128,7 +128,8 @@
* @throws TeiidProcessingException
* @see org.teiid.dqp.internal.process.Request#generatePlan()
*/
- protected void generatePlan() throws TeiidComponentException,
TeiidProcessingException {
+ @Override
+ protected void generatePlan(boolean addLimit) throws TeiidComponentException,
TeiidProcessingException {
String sqlQuery = requestMsg.getCommands()[0];
CacheID id = new CacheID(this.workContext, Request.createParseInfo(this.requestMsg),
sqlQuery);
prepPlan = prepPlanCache.get(id);
@@ -151,21 +152,19 @@
//if prepared plan does not exist, create one
prepPlan = new PreparedPlan();
LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Query does not
exist in cache: ", sqlQuery}); //$NON-NLS-1$
- super.generatePlan();
- if (!this.addedLimit) { //TODO: this is a little problematic
- prepPlan.setCommand(this.userCommand);
- // Defect 13751: Clone the plan in its current state (i.e. before processing)
so that it can be used for later queries
- prepPlan.setPlan(processPlan.clone(), this.context);
- prepPlan.setAnalysisRecord(analysisRecord);
-
- Determinism determinismLevel = this.context.getDeterminismLevel();
- if (userCommand.getCacheHint() != null &&
userCommand.getCacheHint().getDeterminism() != null) {
- LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Cache hint modified
the query determinism from ",this.context.getDeterminismLevel(), " to ",
determinismLevel }); //$NON-NLS-1$ //$NON-NLS-2$
- determinismLevel = userCommand.getCacheHint().getDeterminism();
- }
-
- this.prepPlanCache.put(id, determinismLevel, prepPlan,
userCommand.getCacheHint() != null?userCommand.getCacheHint().getTtl():null);
- }
+ super.generatePlan(false);
+ prepPlan.setCommand(this.userCommand);
+ // Defect 13751: Clone the plan in its current state (i.e. before processing) so
that it can be used for later queries
+ prepPlan.setPlan(processPlan.clone(), this.context);
+ prepPlan.setAnalysisRecord(analysisRecord);
+
+ Determinism determinismLevel = this.context.getDeterminismLevel();
+ if (userCommand.getCacheHint() != null &&
userCommand.getCacheHint().getDeterminism() != null) {
+ LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Cache hint modified
the query determinism from ",this.context.getDeterminismLevel(), " to ",
determinismLevel }); //$NON-NLS-1$ //$NON-NLS-2$
+ determinismLevel = userCommand.getCacheHint().getDeterminism();
+ }
+
+ this.prepPlanCache.put(id, determinismLevel, prepPlan,
userCommand.getCacheHint() != null?userCommand.getCacheHint().getTtl():null);
}
if (requestMsg.isBatchedUpdate()) {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2012-07-12
14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -373,7 +373,7 @@
* @throws TeiidComponentException
* @throws TeiidProcessingException
*/
- protected void generatePlan() throws TeiidComponentException,
TeiidProcessingException {
+ protected void generatePlan(boolean addLimit) throws TeiidComponentException,
TeiidProcessingException {
Command command = parseCommand();
List<Reference> references =
ReferenceCollectorVisitor.getReferences(command);
@@ -404,7 +404,7 @@
* Adds a row limit to a query if Statement.setMaxRows has been called and the
command
* doesn't already have a limit clause.
*/
- if (requestMsg.getRowLimit() > 0 && command instanceof QueryCommand)
{
+ if (addLimit && requestMsg.getRowLimit() > 0 && command
instanceof QueryCommand) {
QueryCommand query = (QueryCommand)command;
if (query.getLimit() == null) {
query.setLimit(new Limit(null, new Constant(new
Integer(requestMsg.getRowLimit()), DataTypeManager.DefaultDataClasses.INTEGER)));
@@ -447,7 +447,7 @@
initMetadata();
- generatePlan();
+ generatePlan(true);
postProcessXML();
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -581,6 +581,9 @@
}
}
};
+ if (!request.addedLimit && this.requestMsg.getRowLimit() > 0) {
+ this.collector.setRowLimit(this.requestMsg.getRowLimit());
+ }
this.resultsBuffer = collector.getTupleBuffer();
if (this.resultsBuffer == null) {
//This is just a dummy result it will get replaced by collector source
@@ -632,6 +635,10 @@
}
dqpCore.getRsCache().put(cid, determinismLevel, cr,
originalCommand.getCacheHint() != null?originalCommand.getCacheHint().getTtl():null);
}
+
+ public SessionAwareCache<CachedResults> getRsCache() {
+ return dqpCore.getRsCache();
+ }
/**
* Send results if they have been requested. This should only be called from the
processing thread.
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -38,7 +38,6 @@
import org.teiid.cache.DefaultCacheFactory;
import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.common.buffer.TupleBufferCache;
-import org.teiid.core.types.DataTypeManager;
import org.teiid.core.util.Assertion;
import org.teiid.core.util.EquivalenceUtil;
import org.teiid.core.util.HashCodeUtil;
@@ -261,7 +260,7 @@
*/
public boolean setParameters(List<?> parameters) {
if (parameters != null && !parameters.isEmpty()) {
- this.parameters = new ArrayList<Serializable>();
+ this.parameters = new ArrayList<Serializable>(parameters.size());
for (Object obj:parameters) {
if (obj == null) {
this.parameters.add(null);
@@ -270,11 +269,6 @@
if (!(obj instanceof Serializable)) {
return false;
}
-
- Class<?> type = DataTypeManager.determineDataTypeClass(obj);
- if (type == DataTypeManager.DefaultDataClasses.OBJECT) {
- return false;
- }
this.parameters.add((Serializable)obj);
}
}
Added: trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -0,0 +1,167 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.dqp.internal.process;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.TupleBuffer;
+import org.teiid.common.buffer.TupleSource;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
+import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.types.DataTypeManager;
+import org.teiid.query.processor.ProcessorDataManager;
+import org.teiid.query.processor.RegisterRequestParameter;
+import org.teiid.query.sql.LanguageVisitor;
+import org.teiid.query.sql.lang.Command;
+import org.teiid.query.sql.lang.DependentSetCriteria;
+import org.teiid.query.sql.symbol.Constant;
+import org.teiid.query.util.CommandContext;
+
+public class TupleSourceCache {
+
+ final static class CachableVisitor extends LanguageVisitor {
+ boolean cacheable = true;
+ List<Object> parameters;
+
+ @Override
+ public void visit(Constant c) {
+ if (c.isMultiValued()) {
+ notCachable();
+ } else if (DataTypeManager.isLOB(c.getType())) {
+ if (parameters == null) {
+ parameters = new ArrayList<Object>();
+ }
+ parameters.add(c.getValue());
+ }
+ }
+
+ private void notCachable() {
+ cacheable = false;
+ setAbort(true);
+ }
+
+ @Override
+ public void visit(DependentSetCriteria obj) {
+ notCachable();
+ }
+ }
+
+ private static class SharedState {
+ TupleBuffer tb;
+ TupleSource ts;
+ int id;
+ int expectedReaders;
+
+ private void remove() {
+ ts.closeSource();
+ tb.remove();
+ tb = null;
+ ts = null;
+ }
+ }
+
+ public abstract static class BufferedTupleSource implements TupleSource {
+ private int rowNumber = 1;
+ private TupleBuffer tb;
+ private TupleSource ts;
+
+ protected BufferedTupleSource(TupleBuffer tb, TupleSource ts) {
+ this.tb = tb;
+ this.ts = ts;
+ }
+
+ @Override
+ public List<?> nextTuple() throws TeiidComponentException,
+ TeiidProcessingException {
+ if (rowNumber <= tb.getRowCount()) {
+ return tb.getBatch(rowNumber).getTuple(rowNumber++);
+ }
+ if (tb.isFinal()) {
+ return null;
+ }
+ List<?> row = ts.nextTuple();
+ if (row == null) {
+ tb.setFinal(true);
+ } else {
+ tb.addTuple(row);
+ rowNumber++;
+ }
+ return row;
+ }
+
+ }
+
+ private class SharedTupleSource extends BufferedTupleSource {
+ private SharedState state;
+
+ public SharedTupleSource(SharedState state) {
+ super(state.tb, state.ts);
+ this.state = state;
+ }
+
+ @Override
+ public void closeSource() {
+ if (--state.expectedReaders == 0 && sharedStates != null &&
sharedStates.containsKey(state.id)) {
+ state.remove();
+ sharedStates.remove(state.id);
+ }
+ }
+ }
+
+ private Map<Integer, SharedState> sharedStates;
+
+ public void close() {
+ if (sharedStates != null) {
+ for (SharedState ss : sharedStates.values()) {
+ ss.remove();
+ }
+ sharedStates = null;
+ }
+ }
+
+ public TupleSource getSharedTupleSource(CommandContext context, Command command,
String modelName, RegisterRequestParameter parameterObject, BufferManager bufferMgr,
ProcessorDataManager pdm) throws TeiidComponentException, TeiidProcessingException {
+ if (sharedStates == null) {
+ sharedStates = new HashMap<Integer, SharedState>();
+ }
+ SharedState state = sharedStates.get(parameterObject.info.id);
+ if (state == null) {
+ state = new SharedState();
+ state.expectedReaders = parameterObject.info.sharingCount;
+ RegisterRequestParameter param = new
RegisterRequestParameter(parameterObject.connectorBindingId, 0, -1);
+ state.ts = pdm.registerRequest(context, command, modelName, param);
+ if (param.doNotCache) {
+ return state.ts;
+ }
+ state.tb = bufferMgr.createTupleBuffer(command.getProjectedSymbols(),
context.getConnectionId(), TupleSourceType.PROCESSOR);
+ state.id = parameterObject.info.id;
+ sharedStates.put(parameterObject.info.id, state);
+ }
+ return new SharedTupleSource(state);
+ }
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java 2012-07-12 14:02:50 UTC
(rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/eval/Evaluator.java 2012-07-12 17:45:57 UTC
(rev 4233)
@@ -229,13 +229,13 @@
public Boolean evaluate(CompoundCriteria criteria, List<?> tuple)
throws ExpressionEvaluationException, BlockedException, TeiidComponentException {
- List subCrits = criteria.getCriteria();
- Iterator subCritIter = subCrits.iterator();
+ List<Criteria> subCrits = criteria.getCriteria();
+ Iterator<Criteria> subCritIter = subCrits.iterator();
boolean and = criteria.getOperator() == CompoundCriteria.AND;
Boolean result = and?Boolean.TRUE:Boolean.FALSE;
while(subCritIter.hasNext()) {
- Criteria subCrit = (Criteria) subCritIter.next();
+ Criteria subCrit = subCritIter.next();
Boolean value = evaluateTVL(subCrit, tuple);
if (value == null) {
result = null;
Modified:
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/PlanToProcessConverter.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/PlanToProcessConverter.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/PlanToProcessConverter.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.teiid.api.exception.query.QueryMetadataException;
import org.teiid.api.exception.query.QueryPlannerException;
@@ -80,7 +81,7 @@
//state for detecting and reusing source queries
private Map<Command, AccessNode> sharedCommands = new HashMap<Command,
AccessNode>();
- private int sharedId;
+ private static AtomicInteger sharedId = new AtomicInteger();
public static class SharedStateKey {
int id;
@@ -114,7 +115,6 @@
return processPlan;
} finally {
sharedCommands.clear();
- sharedId = 0;
}
}
@@ -538,7 +538,7 @@
} else {
if (other.info == null) {
other.info = new RegisterRequestParameter.SharedAccessInfo();
- other.info.id = sharedId++;
+ other.info.id = sharedId.getAndIncrement();
}
other.info.sharingCount++;
aNode.info = other.info;
Modified: trunk/engine/src/main/java/org/teiid/query/parser/ParseInfo.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/parser/ParseInfo.java 2012-07-12 14:02:50
UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/parser/ParseInfo.java 2012-07-12 17:45:57
UTC (rev 4233)
@@ -33,6 +33,11 @@
private static final boolean ANSI_QUOTED_DEFAULT =
PropertiesUtils.getBooleanProperty(System.getProperties(),
"org.teiid.ansiQuotedIdentifiers", true); //$NON-NLS-1$
public int referenceCount = 0;
+
+ public static final ParseInfo DEFAULT_INSTANCE = new ParseInfo();
+ static {
+ DEFAULT_INSTANCE.ansiQuotedIdentifiers = true;
+ }
// treat a double quoted variable as variable instead of string
public boolean ansiQuotedIdentifiers=ANSI_QUOTED_DEFAULT;
Modified: trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java 2012-07-12
14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/BatchCollector.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -58,14 +58,17 @@
/**
* return the final tuple buffer or null if not available
+ * @param maxRows
* @return
* @throws TeiidProcessingException
* @throws TeiidComponentException
* @throws BlockedException
*/
- TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException,
TeiidProcessingException;
+ TupleBuffer getFinalBuffer(int maxRows) throws BlockedException,
TeiidComponentException, TeiidProcessingException;
boolean hasFinalBuffer();
+
+ void close() throws TeiidComponentException;
}
public static class BatchProducerTupleSource implements TupleSource {
@@ -119,11 +122,14 @@
private boolean done = false;
private TupleBuffer buffer;
private boolean forwardOnly;
+ private int rowLimit = -1; //-1 means no_limit
+ private boolean hasFinalBuffer;
public BatchCollector(BatchProducer sourceNode, BufferManager bm, CommandContext
context, boolean forwardOnly) throws TeiidComponentException {
this.sourceNode = sourceNode;
this.forwardOnly = forwardOnly;
- if (!this.sourceNode.hasFinalBuffer()) {
+ this.hasFinalBuffer = this.sourceNode.hasFinalBuffer();
+ if (!this.hasFinalBuffer) {
this.buffer = bm.createTupleBuffer(sourceNode.getOutputElements(),
context.getConnectionId(), TupleSourceType.PROCESSOR);
this.buffer.setForwardOnly(forwardOnly);
}
@@ -132,9 +138,9 @@
public TupleBuffer collectTuples() throws TeiidComponentException,
TeiidProcessingException {
TupleBatch batch = null;
while(!done) {
- if (this.sourceNode.hasFinalBuffer()) {
+ if (this.hasFinalBuffer) {
if (this.buffer == null) {
- TupleBuffer finalBuffer = this.sourceNode.getFinalBuffer();
+ TupleBuffer finalBuffer = this.sourceNode.getFinalBuffer(rowLimit);
Assertion.isNotNull(finalBuffer);
this.buffer = finalBuffer;
}
@@ -145,6 +151,17 @@
}
}
batch = sourceNode.nextBatch();
+
+ if (rowLimit > 0 && rowLimit <= batch.getEndRow()) {
+ if (!done) {
+ this.sourceNode.close();
+ }
+ if (rowLimit < batch.getEndRow()) {
+ List<List<?>> tuples = batch.getTuples().subList(0, rowLimit -
batch.getBeginRow() + 1);
+ batch = new TupleBatch(batch.getBeginRow(), tuples);
+ }
+ batch.setTerminationFlag(true);
+ }
flushBatch(batch);
@@ -176,7 +193,7 @@
@SuppressWarnings("unused")
protected void flushBatchDirect(TupleBatch batch, boolean add) throws
TeiidComponentException, TeiidProcessingException {
- if (!this.sourceNode.hasFinalBuffer()) {
+ if (!this.hasFinalBuffer) {
buffer.addTupleBatch(batch, add);
}
}
@@ -188,4 +205,8 @@
return buffer.getRowCount();
}
+ public void setRowLimit(int rowLimit) {
+ this.rowLimit = rowLimit;
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java 2012-07-12
14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -149,7 +149,7 @@
* @throws TeiidComponentException
* @throws BlockedException
*/
- public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException,
TeiidProcessingException {
+ public TupleBuffer getFinalBuffer(int maxRows) throws BlockedException,
TeiidComponentException, TeiidProcessingException {
return null;
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java 2012-07-12
14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -22,35 +22,30 @@
package org.teiid.query.processor;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleBuffer;
-import org.teiid.common.buffer.TupleSource;
import org.teiid.common.buffer.BufferManager.BufferReserveMode;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.util.Assertion;
-import org.teiid.events.EventDistributor;
+import org.teiid.dqp.internal.process.TupleSourceCache;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
import org.teiid.query.QueryPlugin;
import org.teiid.query.processor.BatchCollector.BatchProducer;
-import org.teiid.query.sql.lang.Command;
import org.teiid.query.util.CommandContext;
/**
* Driver for plan processing.
*/
-public class QueryProcessor implements BatchProducer, ProcessorDataManager {
+public class QueryProcessor implements BatchProducer {
public static class ExpiredTimeSliceException extends TeiidRuntimeException {
private static final long serialVersionUID = 4585044674826578060L;
@@ -62,56 +57,6 @@
QueryProcessor createQueryProcessor(String query, String recursionGroup, CommandContext
commandContext, Object... params) throws TeiidProcessingException,
TeiidComponentException;
}
- private class SharedState {
- TupleBuffer tb;
- TupleSource ts;
- int id;
- int expectedReaders;
-
- private void remove() {
- ts.closeSource();
- tb.remove();
- tb = null;
- ts = null;
- }
- }
-
- private final class BufferedTupleSource implements TupleSource {
- private int rowNumber = 1;
- private SharedState state;
-
- private BufferedTupleSource(SharedState state) {
- this.state = state;
- }
-
- @Override
- public List<?> nextTuple() throws TeiidComponentException,
- TeiidProcessingException {
- if (rowNumber <= state.tb.getRowCount()) {
- return state.tb.getBatch(rowNumber).getTuple(rowNumber++);
- }
- if (state.tb.isFinal()) {
- return null;
- }
- List<?> row = state.ts.nextTuple();
- if (row == null) {
- state.tb.setFinal(true);
- } else {
- this.state.tb.addTuple(row);
- rowNumber++;
- }
- return row;
- }
-
- @Override
- public void closeSource() {
- if (--state.expectedReaders == 0 && sharedStates != null &&
sharedStates.containsKey(state.id)) {
- state.remove();
- sharedStates.remove(state.id);
- }
- }
- }
-
private CommandContext context;
private ProcessorDataManager dataMgr;
private BufferManager bufferMgr;
@@ -123,11 +68,10 @@
private volatile boolean requestCanceled;
private static final int DEFAULT_WAIT = 50;
private boolean processorClosed;
+
private boolean continuous;
private int rowOffset = 1;
- Map<Integer, SharedState> sharedStates;
-
/**
* Construct a processor with all necessary information to process.
* @param plan The plan to process
@@ -138,6 +82,7 @@
*/
public QueryProcessor(ProcessorPlan plan, CommandContext context, BufferManager
bufferMgr, final ProcessorDataManager dataMgr) {
this.context = context;
+ this.context.setTupleSourceCache(new TupleSourceCache());
this.dataMgr = dataMgr;
this.processPlan = plan;
this.bufferMgr = bufferMgr;
@@ -203,6 +148,7 @@
if (result.getTerminationFlag()) {
result.setTerminationFlag(false);
+ this.context.getTupleSourceCache().close();
this.processPlan.close();
this.processPlan.reset();
this.context.incrementReuseCount();
@@ -245,7 +191,7 @@
// initialize if necessary
if(!initialized) {
reserved =
this.bufferMgr.reserveBuffers(this.bufferMgr.getSchemaSize(this.getOutputElements()),
BufferReserveMode.FORCE);
- this.processPlan.initialize(context, this, bufferMgr);
+ this.processPlan.initialize(context, dataMgr, bufferMgr);
initialized = true;
}
@@ -266,12 +212,7 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_DQP, "QueryProcessor: closing
processor"); //$NON-NLS-1$
}
- if (sharedStates != null) {
- for (SharedState ss : sharedStates.values()) {
- ss.remove();
- }
- sharedStates = null;
- }
+ this.context.getTupleSourceCache().close();
this.bufferMgr.releaseBuffers(reserved);
reserved = 0;
processorClosed = true;
@@ -309,12 +250,12 @@
}
@Override
- public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException,
TeiidProcessingException {
+ public TupleBuffer getFinalBuffer(int maxRows) throws BlockedException,
TeiidComponentException, TeiidProcessingException {
while (true) {
long wait = DEFAULT_WAIT;
try {
init();
- return this.processPlan.getFinalBuffer();
+ return this.processPlan.getFinalBuffer(maxRows);
} catch (BlockedException e) {
if (!this.context.isNonBlocking()) {
throw e;
@@ -345,41 +286,14 @@
public void setContinuous(boolean continuous) {
this.continuous = continuous;
+ if (this.continuous) {
+ this.context.setContinuous();
+ }
}
-
- @Override
- public Object lookupCodeValue(CommandContext ctx, String codeTableName,
- String returnElementName, String keyElementName, Object keyValue)
- throws BlockedException, TeiidComponentException,
- TeiidProcessingException {
- return dataMgr.lookupCodeValue(ctx, codeTableName, returnElementName, keyElementName,
keyValue);
- }
@Override
- public EventDistributor getEventDistributor() {
- return dataMgr.getEventDistributor();
+ public void close() throws TeiidComponentException {
+ closeProcessing();
}
- @Override
- public TupleSource registerRequest(CommandContext ctx, Command command,
- String modelName, RegisterRequestParameter parameterObject)
- throws TeiidComponentException, TeiidProcessingException {
- if (parameterObject.info == null) {
- return dataMgr.registerRequest(ctx, command, modelName, parameterObject);
- }
- //begin handling of shared commands
- if (sharedStates == null) {
- sharedStates = new HashMap<Integer, SharedState>();
- }
- SharedState state = sharedStates.get(parameterObject.info.id);
- if (state == null) {
- state = new SharedState();
- state.expectedReaders = parameterObject.info.sharingCount;
- state.tb =
QueryProcessor.this.bufferMgr.createTupleBuffer(command.getProjectedSymbols(),
ctx.getConnectionId(), TupleSourceType.PROCESSOR);
- state.ts = dataMgr.registerRequest(ctx, command, modelName, new
RegisterRequestParameter(parameterObject.connectorBindingId, 0, -1));
- state.id = parameterObject.info.id;
- sharedStates.put(parameterObject.info.id, state);
- }
- return new BufferedTupleSource(state);
- }
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/RegisterRequestParameter.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/RegisterRequestParameter.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/RegisterRequestParameter.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -33,6 +33,7 @@
public int nodeID = 0;
public int limit = -1;
public SharedAccessInfo info;
+ public boolean doNotCache;
public RegisterRequestParameter(String connectorBindingId, int nodeID,
int limit) {
Modified: trunk/engine/src/main/java/org/teiid/query/processor/proc/ForEachRowPlan.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/proc/ForEachRowPlan.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/proc/ForEachRowPlan.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -104,7 +104,8 @@
}
if (this.rowProcessor == null) {
rowProcedure.reset();
- this.rowProcessor = new QueryProcessor(rowProcedure, getContext(), this.bufferMgr,
this.dataMgr);
+ CommandContext context = getContext().clone();
+ this.rowProcessor = new QueryProcessor(rowProcedure, context, this.bufferMgr,
this.dataMgr);
for (Map.Entry<ElementSymbol, Expression> entry : this.params.entrySet()) {
Integer index = (Integer)this.lookupMap.get(entry.getValue());
if (index != null) {
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/AccessNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/AccessNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/AccessNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -411,22 +411,22 @@
public Object clone(){
AccessNode clonedNode = new AccessNode();
- this.copy(this, clonedNode);
+ this.copyTo(clonedNode);
return clonedNode;
}
- protected void copy(AccessNode source, AccessNode target){
- super.copy(source, target);
- target.modelName = source.modelName;
- target.modelId = source.modelId;
- target.connectorBindingId = source.connectorBindingId;
- target.shouldEvaluate = source.shouldEvaluate;
- if (!source.shouldEvaluate) {
- target.projection = source.projection;
- target.originalSelect = source.originalSelect;
+ protected void copyTo(AccessNode target){
+ super.copyTo(target);
+ target.modelName = modelName;
+ target.modelId = modelId;
+ target.connectorBindingId = connectorBindingId;
+ target.shouldEvaluate = shouldEvaluate;
+ if (!shouldEvaluate) {
+ target.projection = projection;
+ target.originalSelect = originalSelect;
}
- target.command = source.command;
- target.info = source.info;
+ target.command = command;
+ target.info = info;
}
public PlanNode getDescriptionProperties() {
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/ArrayTableNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/ArrayTableNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/ArrayTableNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -82,7 +82,7 @@
@Override
public ArrayTableNode clone() {
ArrayTableNode clone = new ArrayTableNode(getID());
- this.copy(this, clone);
+ this.copyTo(clone);
clone.setTable(table);
return clone;
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/BatchedUpdateNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/BatchedUpdateNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/BatchedUpdateNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -176,7 +176,7 @@
*/
public Object clone() {
BatchedUpdateNode clonedNode = new BatchedUpdateNode(getID(), updateCommands,
contexts, shouldEvaluate, modelName);
- super.copy(this, clonedNode);
+ super.copyTo(clonedNode);
return clonedNode;
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentAccessNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentAccessNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentAccessNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -105,7 +105,7 @@
clonedNode.maxSetSize = this.maxSetSize;
clonedNode.maxPredicates = this.maxPredicates;
clonedNode.pushdown = this.pushdown;
- super.copy(this, clonedNode);
+ super.copyTo(clonedNode);
return clonedNode;
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureAccessNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureAccessNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureAccessNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -59,7 +59,7 @@
DependentProcedureAccessNode copy = new DependentProcedureAccessNode(getID(),
inputCriteria,
inputReferences,
inputDefaults);
- copy(this, copy);
+ copyTo(copy);
return copy;
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentProcedureExecutionNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -58,7 +58,7 @@
DependentProcedureExecutionNode copy = new
DependentProcedureExecutionNode(getID(), (Criteria)inputCriteria.clone(),
inputReferences,
inputDefaults);
- copy(this, copy);
+ copyTo(copy);
return copy;
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentValueSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentValueSource.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentValueSource.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@@ -47,12 +48,18 @@
ValueIteratorSource {
private TupleBuffer buffer;
+ private List<? extends Expression> schema;
private Map<Expression, Set<Object>> cachedSets;
private boolean unused; //TODO: use this value instead of the context
private boolean distinct;
+
+ public DependentValueSource(TupleBuffer tb) {
+ this(tb, tb.getSchema());
+ }
- public DependentValueSource(TupleBuffer tupleSourceID) {
- this.buffer = tupleSourceID;
+ public DependentValueSource(TupleBuffer tb, List<? extends Expression> schema)
{
+ this.buffer = tb;
+ this.schema = schema;
}
public TupleBuffer getTupleBuffer() {
@@ -67,7 +74,7 @@
IndexedTupleSource its = buffer.createIndexedTupleSource();
int index = 0;
if (valueExpression != null) {
- index = buffer.getSchema().indexOf(valueExpression);
+ index = schema.indexOf(valueExpression);
Assertion.assertTrue(index != -1);
}
return new TupleSourceValueIterator(its, index);
@@ -85,10 +92,10 @@
IndexedTupleSource its = buffer.createIndexedTupleSource();
int index = 0;
if (valueExpression != null) {
- index = buffer.getSchema().indexOf(valueExpression);
+ index = schema.indexOf(valueExpression);
}
Assertion.assertTrue(index != -1);
- Class<?> type = ((Expression)buffer.getSchema().get(index)).getType();
+ Class<?> type = ((Expression)schema.get(index)).getType();
if (!DataTypeManager.isHashable(type)) {
result = new TreeSet<Object>(Constant.COMPARATOR);
} else {
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/GroupingNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/GroupingNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/GroupingNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -449,7 +449,7 @@
public Object clone(){
GroupingNode clonedNode = new GroupingNode(super.getID());
- super.copy(this, clonedNode);
+ super.copyTo(clonedNode);
clonedNode.removeDuplicates = removeDuplicates;
clonedNode.outputMapping = outputMapping;
clonedNode.orderBy = orderBy;
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/InsertPlanExecutionNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/InsertPlanExecutionNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/InsertPlanExecutionNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -92,13 +92,13 @@
public Object clone(){
InsertPlanExecutionNode clonedNode = new InsertPlanExecutionNode(super.getID(),
this.metadata);
- copy(this, clonedNode);
+ copyTo(clonedNode);
return clonedNode;
}
- protected void copy(InsertPlanExecutionNode source, InsertPlanExecutionNode target) {
- target.references = source.references;
- super.copy(source, target);
+ protected void copyTo(InsertPlanExecutionNode target) {
+ target.references = references;
+ super.copyTo(target);
}
@Override
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -154,7 +154,7 @@
*/
public Object clone() {
JoinNode clonedNode = new JoinNode(super.getID());
- super.copy(this, clonedNode);
+ super.copyTo(clonedNode);
clonedNode.joinType = this.joinType;
clonedNode.joinStrategy = this.joinStrategy.clone();
@@ -186,7 +186,8 @@
this.joinStrategy.loadLeft();
if (isDependent()) {
TupleBuffer buffer = this.joinStrategy.leftSource.getTupleBuffer();
- dvs = new DependentValueSource(buffer);
+ //the tuplebuffer may be from a lower node, so pass in the schema
+ dvs = new DependentValueSource(buffer,
this.joinStrategy.leftSource.getSource().getElements());
dvs.setDistinct(this.joinStrategy.leftSource.isDistinct());
this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource, dvs);
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/LimitNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/LimitNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/LimitNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -30,6 +30,7 @@
import org.teiid.client.plan.PlanNode;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.TupleBuffer;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.query.eval.Evaluator;
@@ -168,7 +169,7 @@
public Object clone() {
LimitNode node = new LimitNode(getID(), limitExpr, offsetExpr);
node.implicit = this.implicit;
- copy(this, node);
+ copyTo(node);
node.rowCounter = this.rowCounter;
return node;
}
@@ -188,5 +189,24 @@
public int getOffset() {
return offset;
}
+
+ @Override
+ public boolean hasFinalBuffer() {
+ //TODO: support offset
+ return offsetExpr == null && this.getChildren()[0].hasFinalBuffer();
+ }
+
+ @Override
+ public TupleBuffer getFinalBuffer(int maxRows) throws BlockedException,
+ TeiidComponentException, TeiidProcessingException {
+ if (maxRows >= 0) {
+ if (limit >= 0) {
+ maxRows = Math.min(maxRows, limit);
+ }
+ } else {
+ maxRows = limit;
+ }
+ return this.getChildren()[0].getFinalBuffer(maxRows);
+ }
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/NullNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/NullNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/NullNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -49,7 +49,7 @@
public Object clone(){
NullNode clonedNode = new NullNode(super.getID());
- super.copy(this, clonedNode);
+ super.copyTo(clonedNode);
return clonedNode;
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/PlanExecutionNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/PlanExecutionNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/PlanExecutionNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -164,14 +164,13 @@
public Object clone(){
PlanExecutionNode clonedNode = new PlanExecutionNode();
- copy(this, clonedNode);
+ copyTo(clonedNode);
return clonedNode;
}
- protected void copy(PlanExecutionNode source,
- PlanExecutionNode target) {
- target.setProcessorPlan(source.plan.clone());
- super.copy(source, target);
+ protected void copyTo(PlanExecutionNode target) {
+ target.setProcessorPlan(plan.clone());
+ super.copyTo(target);
}
public PlanNode getDescriptionProperties() {
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectIntoNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectIntoNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectIntoNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -231,7 +231,7 @@
public Object clone(){
ProjectIntoNode clonedNode = new ProjectIntoNode();
- super.copy(this, clonedNode);
+ super.copyTo(clonedNode);
clonedNode.intoGroup = intoGroup;
clonedNode.intoElements = intoElements;
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -27,24 +27,22 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.teiid.api.exception.query.ExpressionEvaluationException;
import org.teiid.client.plan.PlanNode;
import org.teiid.common.buffer.BlockedException;
-import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.TupleBuffer;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.query.analysis.AnalysisRecord;
-import org.teiid.query.processor.ProcessorDataManager;
import org.teiid.query.sql.LanguageObject;
import org.teiid.query.sql.symbol.AliasSymbol;
import org.teiid.query.sql.symbol.Expression;
import org.teiid.query.sql.util.SymbolMap;
-import org.teiid.query.util.CommandContext;
public class ProjectNode extends SubqueryAwareRelationalNode {
@@ -52,7 +50,7 @@
private List<? extends Expression> selectSymbols;
// Derived element lookup map
- private Map elementMap;
+ private Map<Expression, Integer> elementMap;
private boolean needsProject = true;
private List<Expression> expressions;
private int[] projectionIndexes;
@@ -86,45 +84,37 @@
public void setSelectSymbols(List<? extends Expression> symbols) {
this.selectSymbols = symbols;
- }
-
- @Override
- public void initialize(CommandContext context, BufferManager bufferManager,
- ProcessorDataManager dataMgr) {
- super.initialize(context, bufferManager, dataMgr);
-
- // Do this lazily as the node may be reset and re-used and this info doesn't
change
- if(elementMap != null) {
- return;
- }
- this.projectionIndexes = new int[this.selectSymbols.size()];
+ elementMap = Collections.emptyMap();
+ this.projectionIndexes = new int[this.selectSymbols.size()];
Arrays.fill(this.projectionIndexes, -1);
this.expressions = new ArrayList<Expression>(this.selectSymbols.size());
for (Expression ses : this.selectSymbols) {
this.expressions.add(SymbolMap.getExpression(ses));
}
- //in the case of select with no from, there is no child node
- //simply return at this point
- if(this.getChildren()[0] == null){
- elementMap = new HashMap();
- return;
- }
-
+ }
+
+ @Override
+ public void addChild(RelationalNode child) {
+ super.addChild(child);
+ init();
+ }
+
+ void init() {
+ List<? extends Expression> childElements = getChildren()[0].getElements();
// Create element lookup map for evaluating project expressions
- List childElements = this.getChildren()[0].getElements();
this.elementMap = createLookupMap(childElements);
// Check whether project needed at all - this occurs if:
// 1. outputMap == null (see previous block)
// 2. project elements are either elements or aggregate symbols (no processing
required)
// 3. order of input values == order of output values
- needsProject = childElements.size() != getElements().size();
+ needsProject = childElements.size() != selectSymbols.size();
for(int i=0; i<selectSymbols.size(); i++) {
Expression symbol = selectSymbols.get(i);
if(symbol instanceof AliasSymbol) {
- Integer index = (Integer) elementMap.get(symbol);
+ Integer index = elementMap.get(symbol);
if(index != null && index.intValue() == i) {
projectionIndexes[i] = index;
continue;
@@ -132,7 +122,7 @@
symbol = ((AliasSymbol)symbol).getSymbol();
}
- Integer index = (Integer) elementMap.get(symbol);
+ Integer index = elementMap.get(symbol);
if(index == null || index.intValue() != i) {
// input / output element order is not the same
needsProject = true;
@@ -140,7 +130,7 @@
projectionIndexes[i] = index;
}
}
- }
+ }
public TupleBatch nextBatchDirect()
throws BlockedException, TeiidComponentException, TeiidProcessingException {
@@ -208,13 +198,17 @@
public Object clone(){
ProjectNode clonedNode = new ProjectNode();
- this.copy(this, clonedNode);
+ this.copyTo(clonedNode);
return clonedNode;
}
- protected void copy(ProjectNode source, ProjectNode target){
- super.copy(source, target);
+ protected void copyTo(ProjectNode target){
+ super.copyTo(target);
target.selectSymbols = this.selectSymbols;
+ target.needsProject = needsProject;
+ target.elementMap = elementMap;
+ target.expressions = expressions;
+ target.projectionIndexes = projectionIndexes;
}
public PlanNode getDescriptionProperties() {
@@ -228,4 +222,15 @@
return this.selectSymbols;
}
+ @Override
+ public boolean hasFinalBuffer() {
+ return !needsProject && this.getChildren()[0].hasFinalBuffer();
+ }
+
+ @Override
+ public TupleBuffer getFinalBuffer(int maxRows) throws BlockedException,
+ TeiidComponentException, TeiidProcessingException {
+ return this.getChildren()[0].getFinalBuffer(maxRows);
+ }
+
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -509,13 +509,13 @@
*/
public abstract Object clone();
- protected void copy(RelationalNode source, RelationalNode target){
- target.data = source.data;
+ protected void copyTo(RelationalNode target){
+ target.data = this.data;
- target.children = new RelationalNode[source.children.length];
- for(int i=0; i<source.children.length; i++) {
- if(source.children[i] != null) {
- target.children[i] = (RelationalNode)source.children[i].clone();
+ target.children = new RelationalNode[this.children.length];
+ for(int i=0; i<this.children.length; i++) {
+ if(this.children[i] != null) {
+ target.children[i] = (RelationalNode)this.children[i].clone();
target.children[i].setParent(target);
} else {
break;
@@ -619,7 +619,7 @@
* @throws TeiidComponentException
* @throws BlockedException
*/
- public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException,
TeiidProcessingException {
+ public TupleBuffer getFinalBuffer(int maxRows) throws BlockedException,
TeiidComponentException, TeiidProcessingException {
return null;
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -266,8 +266,8 @@
}
@Override
- public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException,
TeiidProcessingException {
- return root.getFinalBuffer();
+ public TupleBuffer getFinalBuffer(int maxRows) throws BlockedException,
TeiidComponentException, TeiidProcessingException {
+ return root.getFinalBuffer(maxRows);
}
@Override
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/SelectNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -130,16 +130,16 @@
public Object clone(){
SelectNode clonedNode = new SelectNode();
- this.copy(this, clonedNode);
+ this.copyTo(clonedNode);
return clonedNode;
}
- protected void copy(SelectNode source, SelectNode target){
- super.copy(source, target);
+ protected void copyTo(SelectNode target){
+ super.copyTo(target);
target.criteria = criteria;
- target.elementMap = source.elementMap;
- target.projectionIndexes = source.projectionIndexes;
- target.projectedExpressions = source.projectedExpressions;
+ target.elementMap = elementMap;
+ target.projectionIndexes = projectionIndexes;
+ target.projectedExpressions = projectedExpressions;
}
public PlanNode getDescriptionProperties() {
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -49,6 +49,8 @@
private TupleBuffer output;
private TupleSource outputTs;
private boolean usingOutput;
+
+ private int rowLimit = -1;
private static final int SORT = 2;
private static final int OUTPUT = 3;
@@ -64,6 +66,7 @@
output = null;
outputTs = null;
usingOutput = false;
+ rowLimit = -1;
}
public void setSortElements(List<OrderByItem> items) {
@@ -100,6 +103,12 @@
if (this.outputTs == null) {
this.outputTs = this.output.createIndexedTupleSource();
}
+ if (rowLimit >= 0) {
+ this.output.truncateTo(rowLimit);
+ if (!this.output.isFinal() && this.output.getRowCount() == rowLimit) {
+ this.output.close();
+ }
+ }
this.phase = OUTPUT;
}
@@ -149,15 +158,15 @@
}
}
- protected void copy(SortNode source, SortNode target){
- super.copy(source, target);
- target.items = source.items;
- target.mode = source.mode;
+ protected void copyTo(SortNode target){
+ super.copyTo(target);
+ target.items = items;
+ target.mode = mode;
}
public Object clone(){
SortNode clonedNode = new SortNode(super.getID());
- this.copy(this, clonedNode);
+ this.copyTo(clonedNode);
return clonedNode;
}
@@ -175,14 +184,16 @@
}
@Override
- public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException,
TeiidProcessingException {
+ public TupleBuffer getFinalBuffer(int maxRows) throws BlockedException,
TeiidComponentException, TeiidProcessingException {
+ this.rowLimit = maxRows;
+ //TODO: push limiting into the sort logic
if (this.output == null) {
sortPhase();
}
usingOutput = true;
TupleBuffer result = this.output;
if (this.output.isFinal()) {
- this.output = null;
+ this.output = null;
close();
}
return result;
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/TextTableNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/TextTableNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/TextTableNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -162,7 +162,7 @@
@Override
public TextTableNode clone() {
TextTableNode clone = new TextTableNode(getID());
- this.copy(this, clone);
+ this.copyTo(clone);
clone.setTable(table);
return clone;
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/UnionAllNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/UnionAllNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/UnionAllNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -147,7 +147,7 @@
public Object clone(){
UnionAllNode clonedNode = new UnionAllNode(super.getID());
- super.copy(this, clonedNode);
+ super.copyTo(clonedNode);
return clonedNode;
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/WindowFunctionProjectNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/WindowFunctionProjectNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/WindowFunctionProjectNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -145,7 +145,7 @@
public Object clone(){
WindowFunctionProjectNode clonedNode = new WindowFunctionProjectNode();
- this.copy(this, clonedNode);
+ this.copyTo(clonedNode);
clonedNode.windows = windows;
clonedNode.expressionIndexes = expressionIndexes;
clonedNode.passThrough = passThrough;
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -80,7 +80,7 @@
typeMapping.put(DataTypeManager.DefaultDataClasses.DOUBLE, BuiltInAtomicType.DOUBLE);
}
- private static RuntimeException EARLY_TERMINATION = new RuntimeException();
+ private static TeiidRuntimeException EARLY_TERMINATION = new TeiidRuntimeException();
private XMLTable table;
private List<XMLColumn> projectedColumns;
@@ -102,6 +102,8 @@
private int outputRow = 1;
private boolean usingOutput;
+ private int rowLimit = -1;
+
public XMLTableNode(int nodeID) {
super(nodeID);
}
@@ -132,6 +134,7 @@
this.buffer = null;
this.state = State.BUILDING;
this.asynchException = null;
+ this.rowLimit = -1;
}
public void setTable(XMLTable table) {
@@ -145,7 +148,7 @@
@Override
public XMLTableNode clone() {
XMLTableNode clone = new XMLTableNode(getID());
- this.copy(this, clone);
+ this.copyTo(clone);
clone.setTable(table);
clone.setProjectedColumns(projectedColumns);
return clone;
@@ -215,21 +218,23 @@
public void run() {
try {
XQueryEvaluator.evaluateXQuery(table.getXQueryExpression(), contextItem,
parameters, XMLTableNode.this, getContext());
- synchronized (XMLTableNode.this) {
- if (buffer != null) {
- buffer.close();
- }
- }
} catch (TeiidException e) {
asynchException = new TeiidRuntimeException(e);
} catch (TeiidRuntimeException e) {
- asynchException = e;
- } catch (RuntimeException e) {
if (e != EARLY_TERMINATION) {
- asynchException = new TeiidRuntimeException(e);
+ asynchException = e;
}
+ } catch (RuntimeException e) {
+ asynchException = new TeiidRuntimeException(e);
} finally {
synchronized (XMLTableNode.this) {
+ if (buffer != null) {
+ try {
+ buffer.close();
+ } catch (TeiidComponentException e) {
+ asynchException = new TeiidRuntimeException(e);
+ }
+ }
state = State.DONE;
XMLTableNode.this.notifyAll();
}
@@ -308,13 +313,14 @@
}
@Override
- public TupleBuffer getFinalBuffer() throws BlockedException,
+ public synchronized TupleBuffer getFinalBuffer(int maxRows) throws BlockedException,
TeiidComponentException, TeiidProcessingException {
+ this.rowLimit = maxRows;
evaluate(true);
usingOutput = true;
TupleBuffer finalBuffer = this.buffer;
if (!this.table.getXQueryExpression().isStreaming()) {
- close();
+ close();
}
return finalBuffer;
}
@@ -329,6 +335,9 @@
rowCount++;
try {
this.buffer.addTuple(processRow());
+ if (this.buffer.getRowCount() == rowLimit) {
+ throw EARLY_TERMINATION;
+ }
if (state == State.BUILDING && hasNextBatch()) {
this.state = State.AVAILABLE;
this.notifyAll();
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -90,9 +90,7 @@
private boolean executed;
private boolean doneLoading;
- public RelationalPlanExecutor (ResultSetInfo resultInfo, CommandContext context,
ProcessorDataManager dataMgr, BufferManager bufferMgr)
- throws TeiidComponentException{
-
+ public RelationalPlanExecutor (ResultSetInfo resultInfo, CommandContext context,
ProcessorDataManager dataMgr, BufferManager bufferMgr) {
this.resultInfo = resultInfo;
this.bufferMgr = bufferMgr;
this.dataManager = dataMgr;
Modified: trunk/engine/src/main/java/org/teiid/query/sql/visitor/SQLStringVisitor.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/sql/visitor/SQLStringVisitor.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/sql/visitor/SQLStringVisitor.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -1161,9 +1161,13 @@
constantParts = new String[] {"X'", obj.getValue().toString(),
"'"}; //$NON-NLS-1$ //$NON-NLS-2$
}
if (constantParts == null) {
- String strValue = obj.getValue().toString();
- strValue = escapeStringValue(strValue, "'"); //$NON-NLS-1$
- constantParts = new String[] {"'", strValue,
"'"}; //$NON-NLS-1$ //$NON-NLS-2$
+ if (DataTypeManager.isLOB(type)) {
+ constantParts = new String[] {"?"}; //$NON-NLS-1$
+ } else {
+ String strValue = obj.getValue().toString();
+ strValue = escapeStringValue(strValue, "'");
//$NON-NLS-1$
+ constantParts = new String[] {"'", strValue,
"'"}; //$NON-NLS-1$ //$NON-NLS-2$
+ }
}
}
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -49,6 +49,7 @@
import org.teiid.dqp.internal.process.CachedResults;
import org.teiid.dqp.internal.process.DQPWorkContext;
import org.teiid.dqp.internal.process.SessionAwareCache;
+import org.teiid.dqp.internal.process.TupleSourceCache;
import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
import org.teiid.events.EventDistributor;
import org.teiid.language.SQLConstants;
@@ -124,6 +125,13 @@
RegisterRequestParameter parameterObject)
throws TeiidComponentException, TeiidProcessingException {
+ if (parameterObject.info != null) {
+ TupleSourceCache tsc = context.getTupleSourceCache();
+ if (tsc != null) {
+ return tsc.getSharedTupleSource(context, command, modelName, parameterObject,
bufferManager, this);
+ }
+ }
+
TempTableStore tempTableStore = context.getTempTableStore();
if(tempTableStore != null) {
TupleSource result = registerRequest(context, modelName, command);
Modified: trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java 2012-07-12
14:02:50 UTC (rev 4232)
+++ trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -45,6 +45,7 @@
import org.teiid.dqp.internal.process.DQPWorkContext;
import org.teiid.dqp.internal.process.PreparedPlan;
import org.teiid.dqp.internal.process.SessionAwareCache;
+import org.teiid.dqp.internal.process.TupleSourceCache;
import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
import org.teiid.dqp.message.RequestID;
import org.teiid.dqp.service.TransactionContext;
@@ -141,7 +142,7 @@
Set<CommandListener> commandListeners = null;
private LRUCache<String, DecimalFormat> decimalFormatCache;
private LRUCache<String, SimpleDateFormat> dateFormatCache;
- private AtomicLong reuseCount = new AtomicLong();
+ private AtomicLong reuseCount = null;
private ClassLoader classLoader;
private List<Exception> warnings = null;
@@ -155,6 +156,7 @@
private boolean nonBlocking;
private HashSet<Object> planningObjects;
private HashSet<Object> dataObjects = this.globalState.dataObjects;
+ private TupleSourceCache tupleSourceCache;
/**
* Construct a new context.
@@ -228,6 +230,7 @@
clone.recursionStack = new LinkedList<String>(this.recursionStack);
}
clone.setNonBlocking(this.nonBlocking);
+ clone.tupleSourceCache = this.tupleSourceCache;
return clone;
}
@@ -749,8 +752,20 @@
@Override
public long getReuseCount() {
+ if (globalState.reuseCount == null) {
+ return 0;
+ }
return globalState.reuseCount.get();
}
+
+ @Override
+ public boolean isContinuous() {
+ return globalState.reuseCount == null;
+ }
+
+ public void setContinuous() {
+ this.globalState.reuseCount = new AtomicLong();
+ }
@Override
public ClassLoader getVDBClassLoader() {
@@ -791,5 +806,13 @@
}
LogManager.logInfo(LogConstants.CTX_DQP,
QueryPlugin.Util.gs(QueryPlugin.Event.TEIID31105, warning.getMessage()));
}
+
+ public TupleSourceCache getTupleSourceCache() {
+ return tupleSourceCache;
+ }
+
+ public void setTupleSourceCache(TupleSourceCache tupleSourceCache) {
+ this.tupleSourceCache = tupleSourceCache;
+ }
}
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java 2012-07-12
14:02:50 UTC (rev 4232)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -60,6 +60,32 @@
assertEquals(2, batch.getBeginRow());
}
+ @Test public void testTruncate() throws Exception {
+ ElementSymbol x = new ElementSymbol("x"); //$NON-NLS-1$
+ x.setType(DataTypeManager.DefaultDataClasses.INTEGER);
+ List<ElementSymbol> schema = Arrays.asList(x);
+ TupleBuffer tb =
BufferManagerFactory.getStandaloneBufferManager().createTupleBuffer(schema, "x",
TupleSourceType.PROCESSOR); //$NON-NLS-1$
+ tb.setBatchSize(2);
+ for (int i = 0; i < 5; i++) {
+ tb.addTuple(Arrays.asList(1));
+ }
+ TupleBatch batch = tb.getBatch(1);
+ assertTrue(!batch.getTerminationFlag());
+ assertEquals(2, batch.getEndRow());
+ tb.close();
+ assertEquals(5, tb.getManagedRowCount());
+ tb.truncateTo(3);
+ assertEquals(3, tb.getManagedRowCount());
+ assertEquals(3, tb.getRowCount());
+ batch = tb.getBatch(3);
+ assertTrue(batch.getTerminationFlag());
+ tb.truncateTo(2);
+ assertEquals(2, tb.getManagedRowCount());
+ assertEquals(2, tb.getRowCount());
+ batch = tb.getBatch(2);
+ assertTrue(batch.getTerminationFlag());
+ }
+
@Test public void testLobHandling() throws Exception {
ElementSymbol x = new ElementSymbol("x"); //$NON-NLS-1$
x.setType(DataTypeManager.DefaultDataClasses.CLOB);
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2012-07-12
14:02:50 UTC (rev 4232)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -374,6 +374,7 @@
agds.sleep = 500;
agds.setUseIntCounter(true);
RequestMessage reqMsg = exampleRequestMessage(sql);
+ reqMsg.setRowLimit(11);
reqMsg.setCursorType(ResultSet.TYPE_FORWARD_ONLY);
DQPWorkContext.getWorkContext().getSession().setSessionId(sessionid);
DQPWorkContext.getWorkContext().getSession().setUserName(userName);
@@ -393,6 +394,8 @@
rm = message.get(500000, TimeUnit.MILLISECONDS);
assertNull(rm.getException());
assertEquals(5, rm.getResultsList().size());
+ assertEquals(7, rm.getFirstRow());
+ assertEquals(11, rm.getFinalRow());
}
@Test public void testSourceConcurrency() throws Exception {
@@ -603,6 +606,14 @@
}
}
+ @Test public void testXmlTableStreamingWithLimit() throws Exception {
+ String sql = "select * from xmltable('/a/b' passing
xmlparse(document '<a
x=''1''><b>foo</b><b>bar</b><b>zed</b></a>')
columns y string path '.') as x limit 2"; //$NON-NLS-1$
+
+ ResultsMessage rm = execute("A", 1, exampleRequestMessage(sql));
+ assertNull(rm.getException());
+ assertEquals(2, rm.getResultsList().size());
+ }
+
public void helpTestVisibilityFails(String sql) throws Exception {
RequestMessage reqMsg = exampleRequestMessage(sql);
reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedStatement.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedStatement.java 2012-07-12
14:02:50 UTC (rev 4232)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedStatement.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -345,10 +345,7 @@
helpGetProcessorPlan(preparedSql, values, new
SessionAwareCache<PreparedPlan>());
}
- /**
- * TODO: there may be other ways of handling this situation in the future
- */
- @Test public void testLimitNoCache() throws Exception {
+ @Test public void testLimit() throws Exception {
// Create query
String preparedSql = "SELECT pm1.g1.e1, e2, pm1.g1.e3 as a, e4 as b FROM
pm1.g1 WHERE pm1.g1.e2=?"; //$NON-NLS-1$
@@ -360,7 +357,7 @@
helpGetProcessorPlan(preparedSql, values, new DefaultCapabilitiesFinder(),
RealMetadataFactory.example1Cached(), planCache, SESSION_ID, false, true,
RealMetadataFactory.example1VDB());
//make sure the plan wasn't reused
- assertEquals(0, planCache.getCacheHitCount());
+ assertEquals(1, planCache.getCacheHitCount());
}
@Test public void testUpdateProcedureCriteria() throws Exception {
Modified: trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java 2012-07-12
14:02:50 UTC (rev 4232)
+++ trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java 2012-07-12
17:45:57 UTC (rev 4233)
@@ -49,6 +49,7 @@
import org.teiid.query.optimizer.capabilities.SourceCapabilities;
import org.teiid.query.processor.relational.RelationalNodeUtil;
import org.teiid.query.sql.symbol.Expression;
+import org.teiid.translator.CacheDirective;
import org.teiid.translator.DataNotAvailableException;
import org.teiid.translator.TranslatorException;
@@ -179,6 +180,11 @@
public boolean copyLobs() {
return copyLobs;
}
+
+ @Override
+ public CacheDirective getCacheDirective() {
+ return null;
+ }
};
}