Author: shawkins
Date: 2012-07-30 12:04:34 -0400 (Mon, 30 Jul 2012)
New Revision: 4276
Added:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java
Modified:
trunk/api/src/main/java/org/teiid/translator/CacheDirective.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java
trunk/engine/src/main/java/org/teiid/dqp/message/AtomicResultsMessage.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
Log:
TEIID-1598 adding code support for the translator caching feature
Modified: trunk/api/src/main/java/org/teiid/translator/CacheDirective.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/CacheDirective.java 2012-07-30 13:07:44
UTC (rev 4275)
+++ trunk/api/src/main/java/org/teiid/translator/CacheDirective.java 2012-07-30 16:04:34
UTC (rev 4276)
@@ -40,6 +40,7 @@
private Boolean prefersMemory;
private Boolean updatable;
+ private Boolean readAll;
private Long ttl;
private Scope scope;
@@ -75,6 +76,10 @@
this.ttl = ttl;
}
+ /**
+ * Get whether the result is updatable and therefore sensitive to data changes.
+ * @return
+ */
public Boolean getUpdatable() {
return updatable;
}
@@ -91,6 +96,18 @@
this.scope = scope;
}
+ /**
+ * Whether the engine should read and cache the entire results.
+ * @return
+ */
+ public Boolean getReadAll() {
+ return readAll;
+ }
+
+ public void setReadAll(Boolean readAll) {
+ this.readAll = readAll;
+ }
+
@Override
public boolean equals(Object obj) {
if (obj == this) {
@@ -100,7 +117,8 @@
return false;
}
CacheDirective other = (CacheDirective)obj;
- return EquivalenceUtil.areEqual(this.prefersMemory, other.prefersMemory)
+ return EquivalenceUtil.areEqual(this.prefersMemory, other.prefersMemory)
+ && EquivalenceUtil.areEqual(this.readAll, other.readAll)
&& EquivalenceUtil.areEqual(this.ttl, other.ttl)
&& EquivalenceUtil.areEqual(this.updatable, other.updatable)
&& EquivalenceUtil.areEqual(this.scope, other.scope);
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java 2012-07-30
13:07:44 UTC (rev 4275)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWork.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -49,5 +49,7 @@
boolean copyLobs();
CacheDirective getCacheDirective() throws TranslatorException;
+
+ boolean areLobsUsableAfterClose();
}
\ No newline at end of file
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java 2012-07-30
13:07:44 UTC (rev 4275)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -359,15 +359,21 @@
// if we need to keep the execution alive, then we can not support implicit close.
response.setSupportsImplicitClose(!this.securityContext.keepExecutionAlive());
- response.setTransactional(this.securityContext.isTransactional());
response.setWarnings(this.securityContext.getWarnings());
- response.setSupportsCloseWithLobs(this.connector.areLobsUsableAfterClose());
+ if (this.securityContext.getCacheDirective() != null) {
+ response.setScope(this.securityContext.getCacheDirective().getScope());
+ }
if ( lastBatch ) {
response.setFinalRow(rowCount);
}
return response;
}
+
+ @Override
+ public boolean areLobsUsableAfterClose() {
+ return this.connector.areLobsUsableAfterClose();
+ }
public static AtomicResultsMessage createResultsMessage(List<?>[] batch) {
return new AtomicResultsMessage(batch);
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java 2012-07-30
13:07:44 UTC (rev 4275)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -91,6 +91,13 @@
return externalNames;
}
+ public void addAccessedObject(Object id) {
+ if (this.objectsAccessed == null) {
+ this.objectsAccessed = new HashSet<Object>();
+ }
+ this.objectsAccessed.add(id);
+ }
+
public Set<Object> getObjectsAccessed() {
return objectsAccessed;
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2012-07-30
13:07:44 UTC (rev 4275)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -40,7 +40,6 @@
import org.teiid.query.parser.QueryParser;
import org.teiid.query.processor.ProcessorPlan;
import org.teiid.query.resolver.QueryResolver;
-import org.teiid.query.sql.lang.CacheHint;
import org.teiid.query.sql.lang.Command;
@@ -50,7 +49,6 @@
private transient Command command;
private transient TupleBuffer results;
- private CacheHint hint;
private String uuid;
private boolean hasLobs;
private int rowLimit;
@@ -69,22 +67,15 @@
this.results = results;
this.uuid = results.getId();
this.hasLobs = results.isLobs();
- this.accessInfo.populate(plan.getContext(), true);
+ if (plan != null) {
+ this.accessInfo.populate(plan.getContext(), true);
+ }
}
public void setCommand(Command command) {
this.command = command;
- this.hint = command.getCacheHint();
}
- public void setHint(CacheHint hint) {
- this.hint = hint;
- }
-
- public CacheHint getHint() {
- return hint;
- }
-
public synchronized Command getCommand(String sql, QueryMetadataInterface metadata,
ParseInfo info) throws QueryParserException, QueryResolverException,
TeiidComponentException {
if (command == null) {
command = QueryParser.getQueryParser().parseCommand(sql, info);
Added: trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -0,0 +1,163 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.dqp.internal.process;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.TupleBuffer;
+import org.teiid.common.buffer.TupleSource;
+import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidException;
+import org.teiid.core.TeiidProcessingException;
+import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.metadata.FunctionMethod.Determinism;
+import org.teiid.query.processor.RegisterRequestParameter;
+import org.teiid.query.sql.symbol.GroupSymbol;
+import org.teiid.translator.CacheDirective;
+import org.teiid.translator.CacheDirective.Scope;
+
+/**
+ * A proxy {@link TupleSource} that caches a {@link DataTierTupleSource}
+ */
+final class CachingTupleSource extends
+ TupleSourceCache.BufferedTupleSource {
+ private final DataTierManagerImpl dataTierManagerImpl;
+ private final CacheID cid;
+ private final RegisterRequestParameter parameterObject;
+ private final CacheDirective cd;
+ private final Collection<GroupSymbol> accessedGroups;
+ private boolean cached = false;
+ private DataTierTupleSource dtts;
+
+ CachingTupleSource(DataTierManagerImpl dataTierManagerImpl, TupleBuffer tb,
DataTierTupleSource ts, CacheID cid,
+ RegisterRequestParameter parameterObject, CacheDirective cd,
+ Collection<GroupSymbol> accessedGroups) {
+ super(tb, ts);
+ this.dataTierManagerImpl = dataTierManagerImpl;
+ this.dtts = ts;
+ this.cid = cid;
+ this.parameterObject = parameterObject;
+ this.cd = cd;
+ this.accessedGroups = accessedGroups;
+ }
+
+ @Override
+ public List<?> nextTuple() throws TeiidComponentException,
+ TeiidProcessingException {
+ if (dtts.scope == Scope.NONE || tb == null) {
+ removeTupleBuffer();
+ return ts.nextTuple();
+ }
+ //TODO: the cache directive object needs synchronized for consistency
+ List<?> tuple = super.nextTuple();
+ if (tuple == null && !cached && !dtts.errored) {
+ synchronized (cd) {
+ if (dtts.scope == Scope.NONE) {
+ removeTupleBuffer();
+ return tuple;
+ }
+ cached = true;
+ CachedResults cr = new CachedResults();
+ cr.setResults(tb, null);
+ if (!Boolean.FALSE.equals(cd.getUpdatable())) {
+ if (accessedGroups != null) {
+ for (GroupSymbol gs : accessedGroups) {
+ cr.getAccessInfo().addAccessedObject(gs.getMetadataID());
+ }
+ }
+ } else {
+ cr.getAccessInfo().setSensitiveToMetadataChanges(false);
+ }
+ if (parameterObject.limit > 0 && parameterObject.limit == rowNumber)
{
+ cr.setRowLimit(rowNumber);
+ }
+ tb.setPrefersMemory(Boolean.TRUE.equals(cd.getPrefersMemory()));
+ Determinism determinismLevel = Determinism.SESSION_DETERMINISTIC;
+ if (dtts.scope != null) {
+ switch (dtts.scope) {
+ case VDB:
+ determinismLevel = Determinism.VDB_DETERMINISTIC;
+ case SESSION:
+ determinismLevel = Determinism.SESSION_DETERMINISTIC;
+ case USER:
+ determinismLevel = Determinism.USER_DETERMINISTIC;
+ }
+ }
+ this.dataTierManagerImpl.requestMgr.getRsCache().put(cid, determinismLevel, cr,
cd.getTtl());
+ }
+ }
+ return tuple;
+ }
+
+ private void removeTupleBuffer() {
+ if (tb != null) {
+ tb.remove();
+ tb = null;
+ }
+ }
+
+ @Override
+ public void closeSource() {
+ try {
+ if (tb != null && !cached && !dtts.errored) {
+ boolean readAll = true;
+ synchronized (cd) {
+ readAll = !Boolean.FALSE.equals(cd.getReadAll());
+ }
+ if (readAll) {
+ //TODO that this is blocking, so it could be made faster in non-transactional
scenarios
+ //we should also shut off any warnings, since the plan isn't consuming these
tuples
+ //the approach would probably be to do more read-ahead
+ dtts.getAtomicRequestMessage().setSerial(true);
+ while (dtts.scope != Scope.NONE) {
+ try {
+ List<?> tuple = nextTuple();
+ if (tuple == null) {
+ break;
+ }
+ } catch (BlockedException e) {
+ //this is possible if were were already waiting for an asynch result
+ try {
+ Thread.sleep(50); //TODO: we could synch/notify in the DataTierTupleSource
+ } catch (InterruptedException e1) {
+ break;
+ }
+ } catch (TeiidException e) {
+ LogManager.logDetail(LogConstants.CTX_DQP, e,
dtts.getAtomicRequestMessage().getAtomicRequestID(), "Not using full results due to
error."); //$NON-NLS-1$
+ break;
+ }
+ }
+ }
+ }
+ } finally {
+ if (!cached) {
+ removeTupleBuffer();
+ }
+ ts.closeSource();
+ }
+ }
+}
\ No newline at end of file
Property changes on:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachingTupleSource.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2012-07-30
13:07:44 UTC (rev 4275)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -41,7 +41,9 @@
import org.teiid.client.RequestMessage;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.TupleSource;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.CoreConstants;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
@@ -149,7 +151,7 @@
}
// Resources
- private DQPCore requestMgr;
+ DQPCore requestMgr;
private BufferManager bufferManager;
private EventDistributor eventDistributor;
private boolean detectChangeEvents;
@@ -172,7 +174,7 @@
return eventDistributor;
}
- public TupleSource registerRequest(CommandContext context, Command command, String
modelName, RegisterRequestParameter parameterObject) throws TeiidComponentException,
TeiidProcessingException {
+ public TupleSource registerRequest(CommandContext context, Command command, String
modelName, final RegisterRequestParameter parameterObject) throws TeiidComponentException,
TeiidProcessingException {
RequestWorkItem workItem =
requestMgr.getRequestWorkItem((RequestID)context.getProcessorID());
if(CoreConstants.SYSTEM_MODEL.equals(modelName) ||
CoreConstants.SYSTEM_ADMIN_MODEL.equals(modelName)) {
@@ -215,11 +217,11 @@
parameterObject.doNotCache = true;
} else {
String cmdString = command.toString();
- if (cmdString.length() < 200000) { //TODO: this check won't be needed if
keys aren't exclusively held in memory
+ if (cmdString.length() < 100000) { //TODO: this check won't be needed if
keys aren't exclusively held in memory
cid = new CacheID(workItem.getDqpWorkContext(), ParseInfo.DEFAULT_INSTANCE,
cmdString);
cid.setParameters(cv.parameters);
CachedResults cr = workItem.getRsCache().get(cid);
- if (cr != null) {
+ if (cr != null && (cr.getRowLimit() == 0 || (parameterObject.limit > 0
&& cr.getRowLimit() >= parameterObject.limit))) {
parameterObject.doNotCache = true;
LogManager.logDetail(LogConstants.CTX_DQP, "Using cache entry for",
cid); //$NON-NLS-1$
work.close();
@@ -235,7 +237,12 @@
}
}
work.setRequestWorkItem(workItem);
- return new DataTierTupleSource(aqr, workItem, work, this,
parameterObject.limit);
+ DataTierTupleSource dtts = new DataTierTupleSource(aqr, workItem, work, this,
parameterObject.limit);
+ if (cid != null) {
+ TupleBuffer tb =
getBufferManager().createTupleBuffer(aqr.getCommand().getProjectedSymbols(),
aqr.getCommandContext().getConnectionId(), TupleSourceType.PROCESSOR);
+ return new CachingTupleSource(this, tb, dtts, cid, parameterObject, cd,
accessedGroups);
+ }
+ return dtts;
}
/**
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2012-07-30
13:07:44 UTC (rev 4275)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -68,6 +68,7 @@
import org.teiid.query.sql.symbol.GroupSymbol;
import org.teiid.translator.DataNotAvailableException;
import org.teiid.translator.TranslatorException;
+import org.teiid.translator.CacheDirective.Scope;
/**
@@ -110,6 +111,9 @@
private volatile FutureWork<AtomicResultsMessage> futureResult;
private volatile boolean running;
+ boolean errored;
+ Scope scope; //this is to avoid synchronization
+
public DataTierTupleSource(AtomicRequestMessage aqr, RequestWorkItem workItem,
ConnectorWork cwi, DataTierManagerImpl dtm, int limit) {
this.aqr = aqr;
this.workItem = workItem;
@@ -158,7 +162,7 @@
if (value == result &&
!DataTypeManager.DefaultDataClasses.OBJECT.equals(this.schema[i])) {
convertToRuntimeType[i] = false;
} else {
- if (isLob[i] && !cwi.copyLobs() && !arm.supportsCloseWithLobs()
&& DataTypeManager.isLOB(value.getClass())) {
+ if (isLob[i] && !cwi.copyLobs() && !cwi.areLobsUsableAfterClose()
&& DataTypeManager.isLOB(value.getClass())) {
explicitClose = true;
}
row.set(i, result);
@@ -224,8 +228,13 @@
public List<?> nextTuple() throws TeiidComponentException,
TeiidProcessingException {
while (true) {
if (arm == null) {
+ if (isDone()) {
+ //sanity check
+ return null; //TODO: could throw an illegal state exception
+ }
boolean partial = false;
AtomicResultsMessage results = null;
+ boolean dna = false;
try {
if (futureResult != null || !aqr.isSerial()) {
results = asynchGet();
@@ -248,9 +257,11 @@
}
}
} catch (TranslatorException e) {
- results = exceptionOccurred(e, true);
+ errored = true;
+ results = exceptionOccurred(e);
partial = true;
} catch (DataNotAvailableException e) {
+ dna = true;
if (e.getRetryDelay() >= 0) {
workItem.scheduleWork(new Runnable() {
@Override
@@ -262,7 +273,11 @@
continue;
}
throw BlockedException.block(aqr.getAtomicRequestID(), "Blocking on
DataNotAvailableException", aqr.getAtomicRequestID()); //$NON-NLS-1$
- }
+ } finally {
+ if (!dna && results == null) {
+ errored = true;
+ }
+ }
receiveResults(results, partial);
}
if (index < arm.getResults().length) {
@@ -412,16 +427,14 @@
}
}
- AtomicResultsMessage exceptionOccurred(TranslatorException exception, boolean
removeState) throws TeiidComponentException, TeiidProcessingException {
- if (removeState) {
- fullyCloseSource();
- }
+ AtomicResultsMessage exceptionOccurred(TranslatorException exception) throws
TeiidComponentException, TeiidProcessingException {
if(workItem.requestMsg.supportsPartialResults()) {
AtomicResultsMessage emptyResults = new AtomicResultsMessage(new List[0]);
emptyResults.setWarnings(Arrays.asList((Exception)exception));
emptyResults.setFinalRow(this.rowsProcessed);
return emptyResults;
}
+ fullyCloseSource();
if (exception.getCause() instanceof TeiidComponentException) {
throw (TeiidComponentException)exception.getCause();
}
@@ -433,6 +446,7 @@
void receiveResults(AtomicResultsMessage response, boolean partial) {
this.arm = response;
+ this.scope = response.getScope();
explicitClose |= !arm.supportsImplicitClose();
rowsProcessed += response.getResults().length;
index = 0;
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2012-07-30
13:07:44 UTC (rev 4275)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -566,7 +566,7 @@
if (!dqpCore.hasWaitingPlans(RequestWorkItem.this)) {
//requestMore will trigger more processing
throw BlockedException.block(requestID, "Blocking due to full results
TupleBuffer", //$NON-NLS-1$
- this.getTupleBuffer().getId(), "rows",
this.getTupleBuffer().getManagedRowCount(), "batch size",
this.getTupleBuffer().getBatchSize()); //$NON-NLS-1$ //$NON-NLS-2$
+ this.getTupleBuffer(), "rows",
this.getTupleBuffer().getManagedRowCount(), "batch size",
this.getTupleBuffer().getBatchSize()); //$NON-NLS-1$ //$NON-NLS-2$
}
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_DQP, requestID, "Exceeding buffer limit
since there are pending active plans."); //$NON-NLS-1$
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java 2012-07-30
13:07:44 UTC (rev 4275)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/TupleSourceCache.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -86,9 +86,9 @@
}
public abstract static class BufferedTupleSource implements TupleSource {
- private int rowNumber = 1;
- private TupleBuffer tb;
- private TupleSource ts;
+ int rowNumber = 1;
+ TupleBuffer tb;
+ TupleSource ts;
protected BufferedTupleSource(TupleBuffer tb, TupleSource ts) {
this.tb = tb;
Modified: trunk/engine/src/main/java/org/teiid/dqp/message/AtomicResultsMessage.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/message/AtomicResultsMessage.java 2012-07-30
13:07:44 UTC (rev 4275)
+++ trunk/engine/src/main/java/org/teiid/dqp/message/AtomicResultsMessage.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -24,10 +24,12 @@
import java.util.List;
+import org.teiid.translator.CacheDirective.Scope;
+
public class AtomicResultsMessage {
- private List[] results;
+ private List<?>[] results;
// Final row index in complete result set, if known
private int finalRow = -1;
@@ -35,17 +37,15 @@
// by default we support implicit close.
private boolean supportsImplicitClose = true;
- private boolean supportsCloseWithLobs;
-
- private boolean isTransactional;
-
private List<Exception> warnings;
+
+ private Scope scope;
// to honor the externalizable contract
public AtomicResultsMessage() {
}
- public AtomicResultsMessage(List[] results) {
+ public AtomicResultsMessage(List<?>[] results) {
this.results = results;
}
@@ -53,14 +53,6 @@
return this.supportsImplicitClose;
}
- public boolean supportsCloseWithLobs() {
- return supportsCloseWithLobs;
- }
-
- public void setSupportsCloseWithLobs(boolean supportsCloseWithLobs) {
- this.supportsCloseWithLobs = supportsCloseWithLobs;
- }
-
public void setSupportsImplicitClose(boolean supportsImplicitClose) {
this.supportsImplicitClose = supportsImplicitClose;
}
@@ -77,14 +69,6 @@
return results;
}
- public boolean isTransactional() {
- return isTransactional;
- }
-
- public void setTransactional(boolean isTransactional) {
- this.isTransactional = isTransactional;
- }
-
public void setWarnings(List<Exception> warnings) {
this.warnings = warnings;
}
@@ -92,4 +76,12 @@
public List<Exception> getWarnings() {
return warnings;
}
+
+ public void setScope(Scope scope) {
+ this.scope = scope;
+ }
+
+ public Scope getScope() {
+ return scope;
+ }
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2012-07-30
13:07:44 UTC (rev 4275)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -144,8 +144,6 @@
/**
* Create an index of the smaller size
- *
- * TODO: reuse existing temp table indexes
*/
public void createIndex(SourceState state, boolean sorted) throws
TeiidComponentException, TeiidProcessingException {
int[] expressionIndexes = state.getExpressionIndexes();
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java 2012-07-30
13:07:44 UTC (rev 4275)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/MergeJoinStrategy.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -224,8 +224,8 @@
outerState = this.leftSource;
innerState = this.rightSource;
outerMatched = false;
-
this.leftSource.getIterator().setPosition(this.leftSource.getMaxProbeMatch());
-
this.rightSource.getIterator().setPosition(this.rightSource.getMaxProbeMatch());
+ this.leftSource.setMaxProbePosition();
+ this.rightSource.setMaxProbePosition();
this.mergeState = MergeState.SCAN;
this.matchState = MatchState.MATCH_LEFT;
break;
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2012-07-30
13:07:44 UTC (rev 4275)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -210,7 +210,7 @@
private TupleBuffer createTupleBuffer() throws TeiidComponentException {
TupleBuffer tb = bufferManager.createTupleBuffer(this.schema, this.groupName,
TupleSourceType.PROCESSOR);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "Created intermediate sort buffer
", tb.getId()); //$NON-NLS-1$
+ LogManager.logDetail(LogConstants.CTX_DQP, "Created intermediate sort buffer
", tb); //$NON-NLS-1$
}
tb.setForwardOnly(true);
return tb;
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2012-07-30
13:07:44 UTC (rev 4275)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -238,5 +238,10 @@
this.currentTuple = null;
this.maxProbeMatch = 1;
}
+
+ public void setMaxProbePosition() throws TeiidComponentException {
+ this.getIterator().setPosition(this.getMaxProbeMatch());
+ this.currentTuple = null;
+ }
}
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2012-07-30
13:07:44 UTC (rev 4275)
+++
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -269,7 +269,6 @@
TupleBuffer tb = bc.collectTuples();
CachedResults cr = new CachedResults();
cr.setResults(tb, qp.getProcessorPlan());
- cr.setHint(hint);
if (hint != null && hint.getDeterminism() != null) {
LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Cache hint modified the
query determinism from ",determinismLevel, " to ", hint.getDeterminism()
}); //$NON-NLS-1$ //$NON-NLS-2$
determinismLevel = hint.getDeterminism();
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2012-07-30
13:07:44 UTC (rev 4275)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -29,11 +29,15 @@
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import org.teiid.adminapi.impl.VDBMetaData;
import org.teiid.cache.CacheConfiguration;
import org.teiid.cache.DefaultCacheFactory;
import org.teiid.client.RequestMessage;
import org.teiid.client.SourceWarning;
import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.TupleSource;
+import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.ClobType;
import org.teiid.core.types.InputStreamFactory;
import org.teiid.core.types.InputStreamFactory.StorageMode;
@@ -46,20 +50,20 @@
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.optimizer.capabilities.DefaultCapabilitiesFinder;
import org.teiid.query.parser.QueryParser;
+import org.teiid.query.processor.RegisterRequestParameter;
import org.teiid.query.resolver.QueryResolver;
import org.teiid.query.sql.lang.Command;
import org.teiid.query.unittest.RealMetadataFactory;
import org.teiid.query.util.CommandContext;
+import org.teiid.translator.CacheDirective;
@SuppressWarnings("nls")
public class TestDataTierManager {
+ private VDBMetaData vdb = RealMetadataFactory.exampleBQTVDB();
private DQPCore rm;
private DataTierManagerImpl dtm;
private CommandContext context;
- private AtomicRequestMessage request;
- private Command command;
- private DataTierTupleSource info;
private AutoGenDataService connectorManager = new AutoGenDataService();
private RequestWorkItem workItem;
private int limit = -1;
@@ -75,50 +79,60 @@
return command;
}
- private void helpSetup(int nodeId) throws Exception {
- helpSetup("SELECT * FROM BQT1.SmallA", nodeId); //$NON-NLS-1$
+ private DataTierTupleSource helpSetup(int nodeId) throws Exception {
+ return helpSetup("SELECT * FROM BQT1.SmallA", nodeId); //$NON-NLS-1$
}
- private void helpSetup(String sql, int nodeId) throws Exception {
- QueryMetadataInterface metadata = RealMetadataFactory.exampleBQTCached();
- DQPWorkContext workContext = RealMetadataFactory.buildWorkContext(metadata,
RealMetadataFactory.exampleBQTVDB());
-
- rm = new DQPCore();
- rm.setTransactionService(new FakeTransactionService());
- rm.setBufferManager(new FakeBufferService().getBufferManager());
- rm.setResultsetCache(new SessionAwareCache<CachedResults>(new
DefaultCacheFactory(), SessionAwareCache.Type.RESULTSET, new CacheConfiguration()));
- rm.setPreparedPlanCache(new SessionAwareCache<PreparedPlan>(new
DefaultCacheFactory(), SessionAwareCache.Type.PREPAREDPLAN, new CacheConfiguration()));
- rm.start(new DQPConfiguration());
- FakeBufferService bs = new FakeBufferService();
+ private DataTierTupleSource helpSetup(String sql, int nodeId) throws Exception {
+ helpSetupDataTierManager();
+ AtomicRequestMessage request = helpSetupRequest(sql, nodeId);
+ return new DataTierTupleSource(request, workItem,
connectorManager.registerRequest(request), dtm, limit);
+ }
- ConnectorManagerRepository repo =
Mockito.mock(ConnectorManagerRepository.class);
-
Mockito.stub(repo.getConnectorManager(Mockito.anyString())).toReturn(connectorManager);
+ private AtomicRequestMessage helpSetupRequest(String sql, int nodeId) throws Exception
{
+ QueryMetadataInterface metadata = RealMetadataFactory.exampleBQTCached();
+ DQPWorkContext workContext = RealMetadataFactory.buildWorkContext(metadata,
vdb);
+ Command command = helpGetCommand(sql, metadata);
- dtm = new DataTierManagerImpl(rm,bs.getBufferManager(), true);
- command = helpGetCommand(sql, metadata);
-
RequestMessage original = new RequestMessage();
original.setExecutionId(1);
original.setPartialResults(true);
RequestID requestID = workContext.getRequestID(original.getExecutionId());
context = new CommandContext();
+ context.setSession(workContext.getSession());
context.setProcessorID(requestID);
context.setVdbName("test"); //$NON-NLS-1$
context.setVdbVersion(1);
- context.setQueryProcessorFactory(new
QueryProcessorFactoryImpl(bs.getBufferManager(), dtm, new DefaultCapabilitiesFinder(),
null, metadata));
+ context.setQueryProcessorFactory(new
QueryProcessorFactoryImpl(dtm.getBufferManager(), dtm, new DefaultCapabilitiesFinder(),
null, metadata));
workItem = TestDQPCoreRequestHandling.addRequest(rm, original, requestID, null,
workContext);
- request = new AtomicRequestMessage(original, workContext, nodeId);
+ AtomicRequestMessage request = new AtomicRequestMessage(original, workContext,
nodeId);
request.setCommand(command);
request.setConnectorName("FakeConnectorID"); //$NON-NLS-1$
- info = new DataTierTupleSource(request, workItem,
connectorManager.registerRequest(request), dtm, limit);
- }
+ return request;
+ }
+
+ private void helpSetupDataTierManager() {
+ FakeBufferService bs = new FakeBufferService();
+ rm = new DQPCore();
+ rm.setTransactionService(new FakeTransactionService());
+ rm.setBufferManager(bs.getBufferManager());
+ rm.setResultsetCache(new SessionAwareCache<CachedResults>(new
DefaultCacheFactory(), SessionAwareCache.Type.RESULTSET, new CacheConfiguration()));
+ rm.setPreparedPlanCache(new SessionAwareCache<PreparedPlan>(new
DefaultCacheFactory(), SessionAwareCache.Type.PREPAREDPLAN, new CacheConfiguration()));
+ rm.start(new DQPConfiguration());
+
+ ConnectorManagerRepository repo =
Mockito.mock(ConnectorManagerRepository.class);
+
Mockito.stub(repo.getConnectorManager(Mockito.anyString())).toReturn(connectorManager);
+ vdb.addAttchment(ConnectorManagerRepository.class, repo);
+
+ dtm = new DataTierManagerImpl(rm,bs.getBufferManager(), true);
+ }
@Test public void testCopyLobs() throws Exception {
connectorManager.copyLobs = true;
- helpSetup("SELECT cast(stringkey as clob) from bqt1.smalla", 1);
+ DataTierTupleSource info = helpSetup("SELECT cast(stringkey as clob) from
bqt1.smalla", 1);
for (int i = 0; i < 10;) {
try {
List<?> tuple = info.nextTuple();
@@ -130,7 +144,7 @@
}
}
connectorManager.copyLobs = false;
- helpSetup("SELECT cast(stringkey as clob) from bqt1.smalla", 1);
+ info = helpSetup("SELECT cast(stringkey as clob) from bqt1.smalla", 1);
for (int i = 0; i < 10;) {
try {
List<?> tuple = info.nextTuple();
@@ -144,60 +158,58 @@
}
@Test public void testDataTierTupleSource() throws Exception {
- helpSetup(1);
- for (int i = 0; i < 10;) {
- try {
- info.nextTuple();
- i++;
- } catch (BlockedException e) {
- Thread.sleep(50);
- }
- }
- assertNotNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
+ DataTierTupleSource info = helpSetup(1);
+ assertEquals(10, pullTuples(info, 10));
+
assertNotNull(workItem.getConnectorRequest(info.getAtomicRequestMessage().getAtomicRequestID()));
assertNull(info.nextTuple());
info.closeSource();
- assertNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
+
assertNull(workItem.getConnectorRequest(info.getAtomicRequestMessage().getAtomicRequestID()));
}
@Test public void testDataTierTupleSourceWarnings() throws Exception {
- helpSetup(1);
+ DataTierTupleSource info = helpSetup(1);
connectorManager.addWarning = true;
- for (int i = 0; i < 10;) {
- try {
- info.nextTuple();
- i++;
- } catch (BlockedException e) {
- Thread.sleep(50);
- }
- }
- assertNotNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
+ assertEquals(10, pullTuples(info, 10));
+
assertNotNull(workItem.getConnectorRequest(info.getAtomicRequestMessage().getAtomicRequestID()));
assertNull(info.nextTuple());
assertEquals(1, workItem.getWarnings().size());
SourceWarning warning = (SourceWarning) workItem.getWarnings().get(0);
assertFalse(warning.isPartialResultsError());
info.closeSource();
- assertNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
+
assertNull(workItem.getConnectorRequest(info.getAtomicRequestMessage().getAtomicRequestID()));
}
@Test public void testDataTierTupleSourceLimit() throws Exception {
limit = 1;
- helpSetup(1);
- for (int i = 0; i < 1;) {
+ DataTierTupleSource info = helpSetup(1);
+ assertEquals(1, pullTuples(info, 1));
+
assertNotNull(workItem.getConnectorRequest(info.getAtomicRequestMessage().getAtomicRequestID()));
+ assertNull(info.nextTuple());
+ info.closeSource();
+
assertNull(workItem.getConnectorRequest(info.getAtomicRequestMessage().getAtomicRequestID()));
+ }
+
+ private int pullTuples(TupleSource info, int limit)
+ throws TeiidComponentException, TeiidProcessingException,
+ InterruptedException {
+ int i = 0;
+ while (true) {
try {
- info.nextTuple();
- i++;
+ if (info.nextTuple() == null) {
+ break;
+ }
+ if (++i == limit) {
+ break;
+ }
} catch (BlockedException e) {
Thread.sleep(50);
}
}
- assertNotNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
- assertNull(info.nextTuple());
- info.closeSource();
- assertNull(workItem.getConnectorRequest(request.getAtomicRequestID()));
- }
+ return i;
+ }
@Test public void testPartialResults() throws Exception {
- helpSetup(1);
+ DataTierTupleSource info = helpSetup(1);
connectorManager.throwExceptionOnExecute = true;
for (int i = 0; i < 10; i++) {
try {
@@ -214,7 +226,7 @@
@Test public void testNoRowsException() throws Exception {
this.connectorManager.setRows(0);
- helpSetup(3);
+ DataTierTupleSource info = helpSetup(3);
while (true) {
try {
assertNull(info.nextTuple());
@@ -228,7 +240,7 @@
@Test public void testAsynch() throws Exception {
this.connectorManager.dataNotAvailable = 10;
this.connectorManager.setRows(0);
- helpSetup(3);
+ DataTierTupleSource info = helpSetup(3);
boolean blocked = false;
while (true) {
try {
@@ -242,4 +254,41 @@
assertTrue(blocked);
}
+ @Test public void testCaching() throws Exception {
+ assertEquals(0, connectorManager.getExecuteCount().get());
+
+ CacheDirective cd = new CacheDirective();
+ this.connectorManager.cacheDirective = cd;
+ helpSetupDataTierManager();
+ Command command = helpSetupRequest("SELECT stringkey from bqt1.smalla",
1).getCommand();
+ RegisterRequestParameter rrp = new RegisterRequestParameter();
+ rrp.connectorBindingId = "x";
+ TupleSource ts = dtm.registerRequest(context, command, "foo", rrp);
+ assertTrue(ts instanceof CachingTupleSource);
+ assertEquals(10, pullTuples(ts, -1));
+ assertEquals(1, connectorManager.getExecuteCount().get());
+ assertFalse(rrp.doNotCache);
+
+ //same session, should be cached
+ command = helpSetupRequest("SELECT stringkey from bqt1.smalla",
1).getCommand();
+ rrp = new RegisterRequestParameter();
+ rrp.connectorBindingId = "x";
+ ts = dtm.registerRequest(context, command, "foo", rrp);
+ assertFalse(ts instanceof CachingTupleSource);
+ assertEquals(10, pullTuples(ts, -1));
+ assertEquals(1, connectorManager.getExecuteCount().get());
+ assertTrue(rrp.doNotCache);
+
+ //switch sessions
+ command = helpSetupRequest("SELECT stringkey from bqt1.smalla",
1).getCommand();
+ this.context.getSession().setSessionId("different");
+ rrp = new RegisterRequestParameter();
+ rrp.connectorBindingId = "x";
+ ts = dtm.registerRequest(context, command, "foo", rrp);
+ assertTrue(ts instanceof CachingTupleSource);
+ assertEquals(10, pullTuples(ts, -1));
+ assertEquals(2, connectorManager.getExecuteCount().get());
+ assertFalse(rrp.doNotCache);
+ }
+
}
Modified: trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java 2012-07-30
13:07:44 UTC (rev 4275)
+++ trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java 2012-07-30
16:04:34 UTC (rev 4276)
@@ -72,6 +72,7 @@
private boolean useIntCounter;
public boolean addWarning;
public boolean copyLobs;
+ public CacheDirective cacheDirective;
public AutoGenDataService() {
super("FakeConnector","FakeConnector"); //$NON-NLS-1$
//$NON-NLS-2$
@@ -120,6 +121,11 @@
}
@Override
+ public boolean areLobsUsableAfterClose() {
+ return false;
+ }
+
+ @Override
public void setRequestWorkItem(RequestWorkItem item) {
this.item = item;
}
@@ -183,7 +189,7 @@
@Override
public CacheDirective getCacheDirective() {
- return null;
+ return cacheDirective;
}
};