Author: shawkins
Date: 2010-09-22 12:46:21 -0400 (Wed, 22 Sep 2010)
New Revision: 2594
Modified:
branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ClusterableCacheFactory.java
branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java
branches/7.1.x/documentation/caching-guide/src/main/docbook/en-US/content/matviews.xml
branches/7.1.x/engine/src/main/java/org/teiid/cache/Cache.java
branches/7.1.x/engine/src/main/java/org/teiid/cache/CacheFactory.java
branches/7.1.x/engine/src/main/java/org/teiid/cache/DefaultCacheFactory.java
branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java
branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
Log:
TEIID-1212 adding replication support for internal materialization
Modified:
branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ClusterableCacheFactory.java
===================================================================
---
branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ClusterableCacheFactory.java 2010-09-21
17:39:11 UTC (rev 2593)
+++
branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ClusterableCacheFactory.java 2010-09-22
16:46:21 UTC (rev 2594)
@@ -89,4 +89,12 @@
public void setCacheManager(String mgrName) {
this.cacheManagerName = mgrName;
}
+
+ @Override
+ public boolean isReplicated() {
+ if (delegate == null) {
+ return false;
+ }
+ return delegate.isReplicated();
+ }
}
Modified:
branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java
===================================================================
---
branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java 2010-09-21
17:39:11 UTC (rev 2593)
+++
branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java 2010-09-22
16:46:21 UTC (rev 2594)
@@ -113,4 +113,9 @@
public void stop() {
destroy();
}
+
+ @Override
+ public boolean isReplicated() {
+ return true;
+ }
}
Modified:
branches/7.1.x/documentation/caching-guide/src/main/docbook/en-US/content/matviews.xml
===================================================================
---
branches/7.1.x/documentation/caching-guide/src/main/docbook/en-US/content/matviews.xml 2010-09-21
17:39:11 UTC (rev 2593)
+++
branches/7.1.x/documentation/caching-guide/src/main/docbook/en-US/content/matviews.xml 2010-09-22
16:46:21 UTC (rev 2594)
@@ -122,7 +122,8 @@
</para>
<section>
<title>TTL Snapshot Refresh</title>
- <para>The <link linkend="cache-hint">cache hint</link>
may be used to automatically trigger a full snapshot refresh after a specified time to
live.
+ <para>The <link linkend="cache-hint">cache hint</link>
may be used to automatically trigger a full snapshot refresh after a specified time to
live (ttl).
+ The ttl starts from the time the table is finished loading.
The refresh is equivalent to <code>CALL SYSADMIN.refreshMatView('view
name', false)</code>, but performed asynchronously so that user queries do not
block on the load.</para>
<example>
<title>Auto-refresh Transformation Query</title>
@@ -178,5 +179,16 @@
</itemizedlist>
</para>
</section>
+ <section>
+ <title>Clustering Considerations</title>
+ <para>Each member in a cluster maintains its own copy of each materialized table
and associated indexes.
+ With cache clustering enabled, an additional snapshot copy of the table is maintained
for loading by other members.
+ An attempt is made to ensure each member receives the same full refresh events as the
others.
+ Full consistency for updatable materialized views however is not guarenteed.
+ Periodic full refreshes of updatable materialized view tables helps ensure consistency
among members.
+ <note><para>Loads of materialized tables are not coordinated across the
cluster. It is possible for the same ttl expiration to trigger a load at each
member.</para></note>
+ In many clustered scenarios using external materialization is advantagious to fully
control the loading of the tables and to have materialized data that is durable.
+ </para>
+ </section>
</section>
</chapter>
Modified: branches/7.1.x/engine/src/main/java/org/teiid/cache/Cache.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/cache/Cache.java 2010-09-21 17:39:11 UTC
(rev 2593)
+++ branches/7.1.x/engine/src/main/java/org/teiid/cache/Cache.java 2010-09-22 16:46:21 UTC
(rev 2594)
@@ -29,10 +29,12 @@
*/
public interface Cache<K, V> {
- public enum Type {SESSION("Session"), //$NON-NLS-1$
- RESULTSET("ResultSet"), //$NON-NLS-1$
- RESULTSET_BATCHES(RESULTSET, "batches"), //$NON-NLS-1$
- PREPAREDPLAN("PreparaedPlan"); //$NON-NLS-1$
+ public enum Type {
+ MATTABLES("MatTables"), //$NON-NLS-1$
+ MATTABLEUPDATES("MatTableUpdates"), //$NON-NLS-1$
+ RESULTSET("ResultSet"), //$NON-NLS-1$
+ RESULTSET_BATCHES(RESULTSET, "batches"), //$NON-NLS-1$
+ PREPAREDPLAN("PreparaedPlan"); //$NON-NLS-1$
private String location;
Modified: branches/7.1.x/engine/src/main/java/org/teiid/cache/CacheFactory.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/cache/CacheFactory.java 2010-09-21
17:39:11 UTC (rev 2593)
+++ branches/7.1.x/engine/src/main/java/org/teiid/cache/CacheFactory.java 2010-09-22
16:46:21 UTC (rev 2594)
@@ -36,4 +36,10 @@
* Destroy the cache factory and any caches underneath.
*/
void destroy();
+
+ /**
+ * Return true if replicated caches are created by this factory
+ * @return
+ */
+ boolean isReplicated();
}
Modified: branches/7.1.x/engine/src/main/java/org/teiid/cache/DefaultCacheFactory.java
===================================================================
---
branches/7.1.x/engine/src/main/java/org/teiid/cache/DefaultCacheFactory.java 2010-09-21
17:39:11 UTC (rev 2593)
+++
branches/7.1.x/engine/src/main/java/org/teiid/cache/DefaultCacheFactory.java 2010-09-22
16:46:21 UTC (rev 2594)
@@ -49,9 +49,13 @@
@Override
public <K, V> Cache<K, V> get(Type type, CacheConfiguration config) {
if (!destroyed) {
- Cache node = cacheRoot.addChild(type.location());
- return node;
+ return cacheRoot.addChild(type.location());
}
throw new TeiidRuntimeException("Cache system has been shutdown");
//$NON-NLS-1$
}
+
+ @Override
+ public boolean isReplicated() {
+ return false;
+ }
}
Modified: branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
---
branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-09-21
17:39:11 UTC (rev 2593)
+++
branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-09-22
16:46:21 UTC (rev 2594)
@@ -700,8 +700,13 @@
this.processWorkerPool = new
ThreadReuseExecutor(DQPConfiguration.PROCESS_PLAN_QUEUE_NAME, config.getMaxThreads());
+ SessionAwareCache<CachedResults> matTables = null;
+ if (cacheFactory.isReplicated()) {
+ matTables = new SessionAwareCache<CachedResults>();
+ }
+
dataTierMgr = new TempTableDataManager(new DataTierManagerImpl(this,
- this.bufferService), this.bufferManager,
this.processWorkerPool, this.rsCache);
+ this.bufferService), this.bufferManager,
this.processWorkerPool, this.rsCache, matTables, this.cacheFactory);
}
public void setBufferService(BufferService service) {
Modified:
branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
---
branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2010-09-21
17:39:11 UTC (rev 2593)
+++
branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2010-09-22
16:46:21 UTC (rev 2594)
@@ -182,8 +182,9 @@
private void clearCache(Cache<CacheID, T> cache, String vdbName, int version) {
Set<CacheID> keys = cache.keys();
+ VDBKey vdbKey = new VDBKey(vdbName, version);
for (CacheID key:keys) {
- if (key.vdbInfo.getName().equalsIgnoreCase(vdbName) &&
key.vdbInfo.getVersion() == version) {
+ if (key.vdbInfo.equals(vdbKey)) {
cache.remove(key);
}
}
Modified:
branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
---
branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2010-09-21
17:39:11 UTC (rev 2593)
+++
branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2010-09-22
16:46:21 UTC (rev 2594)
@@ -22,6 +22,7 @@
package org.teiid.query.tempdata;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -36,6 +37,10 @@
import org.teiid.api.exception.query.QueryProcessingException;
import org.teiid.api.exception.query.QueryResolverException;
import org.teiid.api.exception.query.QueryValidatorException;
+import org.teiid.cache.Cache;
+import org.teiid.cache.CacheConfiguration;
+import org.teiid.cache.CacheFactory;
+import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBuffer;
@@ -44,6 +49,7 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.util.HashCodeUtil;
import org.teiid.core.util.StringUtil;
import org.teiid.dqp.internal.process.CachedResults;
import org.teiid.dqp.internal.process.SessionAwareCache;
@@ -54,6 +60,7 @@
import org.teiid.query.QueryPlugin;
import org.teiid.query.analysis.AnalysisRecord;
import org.teiid.query.eval.Evaluator;
+import org.teiid.query.function.metadata.FunctionMethod;
import org.teiid.query.mapping.relational.QueryNode;
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.metadata.TempMetadataID;
@@ -87,6 +94,7 @@
import org.teiid.query.tempdata.TempTableStore.MatState;
import org.teiid.query.tempdata.TempTableStore.MatTableInfo;
import org.teiid.query.util.CommandContext;
+import org.teiid.vdb.runtime.VDBKey;
/**
* This proxy ProcessorDataManager is used to handle temporary tables.
@@ -104,13 +112,49 @@
private BufferManager bufferManager;
private SessionAwareCache<CachedResults> cache;
private Executor executor;
+
+ private static class MatTableKey implements Serializable {
+ private static final long serialVersionUID = 5481692896572663992L;
+ String name;
+ VDBKey vdb;
+
+ @Override
+ public int hashCode() {
+ return HashCodeUtil.hashCode(name.hashCode(), vdb);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof MatTableKey)) {
+ return false;
+ }
+ MatTableKey other = (MatTableKey)obj;
+ return this.name.equals(other.name) && this.vdb.equals(other.vdb);
+ }
+ }
+
+ private static class MatTableEntry implements Serializable {
+ private static final long serialVersionUID = 8559613701442751579L;
+ transient long lastUpdate = System.currentTimeMillis();
+ }
+
+ private Cache<MatTableKey, MatTableEntry> tables;
+ private SessionAwareCache<CachedResults> distributedCache;
public TempTableDataManager(ProcessorDataManager processorDataManager, BufferManager
bufferManager,
- Executor executor, SessionAwareCache<CachedResults> cache){
+ Executor executor, SessionAwareCache<CachedResults> cache,
SessionAwareCache<CachedResults> distibutedCache, CacheFactory cacheFactory){
this.processorDataManager = processorDataManager;
this.bufferManager = bufferManager;
this.executor = executor;
this.cache = cache;
+ this.distributedCache = distibutedCache;
+ if (distibutedCache != null) {
+ CacheConfiguration cc = new CacheConfiguration(Policy.LRU, -1, -1);
+ tables = cacheFactory.get(Cache.Type.MATTABLES, cc);
+ }
}
public TupleSource registerRequest(
@@ -259,7 +303,7 @@
LogManager.logDetail(LogConstants.CTX_MATVIEWS, "processing refreshmatview
for", matViewName); //$NON-NLS-1$
MatTableInfo info = globalStore.getMatTableInfo(matTableName);
boolean invalidate =
Boolean.TRUE.equals(((Constant)proc.getParameter(1).getExpression()).getValue());
- MatState oldState = info.setState(MatState.NEEDS_LOADING,
invalidate?Boolean.FALSE:null);
+ MatState oldState = info.setState(MatState.NEEDS_LOADING,
invalidate?Boolean.FALSE:null, null);
if (oldState == MatState.LOADING) {
return CollectionTupleSource.createUpdateCountTupleSource(-1);
}
@@ -268,7 +312,7 @@
Object matTableId = RelationalPlanner.getGlobalTempTableMetadataId(group,
matTableName, context, metadata, AnalysisRecord.createNonRecordingRecord());
GroupSymbol matTable = new GroupSymbol(matTableName);
matTable.setMetadataID(matTableId);
- int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore, info);
+ int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore, info,
null);
return CollectionTupleSource.createUpdateCountTupleSource(rowCount);
} else if (StringUtil.endsWithIgnoreCase(proc.getProcedureCallableName(),
REFRESHMATVIEWROW)) {
Object groupID = validateMatView(metadata, proc);
@@ -305,6 +349,7 @@
tuple = Arrays.asList(key.getValue());
}
List<?> result = tempTable.updateTuple(tuple, delete);
+ //TODO: maintain a table log and distribute the events
return CollectionTupleSource.createUpdateCountTupleSource(result != null ? 1 : 0);
}
return null;
@@ -340,20 +385,27 @@
if (group.isGlobalTable()) {
final TempTableStore globalStore = context.getGlobalTableStore();
final MatTableInfo info = globalStore.getMatTableInfo(tableName);
+ Long loadTime = null;
+ if (this.distributedCache != null) {
+ MatTableKey key = new MatTableKey();
+ key.name = tableName;
+ key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
+
+ MatTableEntry entry = this.tables.get(key);
+ if (entry != null && entry.lastUpdate > info.getUpdateTime()
+ && info.getState() != MatState.LOADING) {
+ //remote load
+ info.setState(MatState.NEEDS_LOADING, null, null);
+ loadTime = entry.lastUpdate;
+ }
+ }
boolean load = info.shouldLoad();
if (load) {
if (!info.isValid()) {
//blocking load
- loadGlobalTable(context, group, tableName, globalStore, info);
+ loadGlobalTable(context, group, tableName, globalStore, info, loadTime);
} else {
- Callable<Integer> toCall = new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- return loadGlobalTable(context, group, tableName, globalStore, info);
- }
- };
- FutureTask<Integer> task = new FutureTask<Integer>(toCall);
- executor.execute(task);
+ loadAsynch(context, group, tableName, globalStore, info, loadTime);
}
}
table = globalStore.getOrCreateTempTable(tableName, query, bufferManager, false);
@@ -377,9 +429,23 @@
return table.createTupleSource(query.getProjectedSymbols(), query.getCriteria(),
query.getOrderBy());
}
+ private void loadAsynch(final CommandContext context,
+ final GroupSymbol group, final String tableName,
+ final TempTableStore globalStore, final MatTableInfo info,
+ final Long loadTime) {
+ Callable<Integer> toCall = new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ return loadGlobalTable(context, group, tableName, globalStore, info, loadTime);
+ }
+ };
+ FutureTask<Integer> task = new FutureTask<Integer>(toCall);
+ executor.execute(task);
+ }
+
private int loadGlobalTable(CommandContext context,
GroupSymbol group, final String tableName,
- TempTableStore globalStore, MatTableInfo info)
+ TempTableStore globalStore, MatTableInfo info, Long loadTime)
throws TeiidComponentException, TeiidProcessingException {
LogManager.logInfo(LogConstants.CTX_MATVIEWS,
QueryPlugin.Util.getString("TempTableDataManager.loading", tableName));
//$NON-NLS-1$
QueryMetadataInterface metadata = context.getMetadata();
@@ -408,11 +474,42 @@
int rowCount = -1;
try {
String fullName = metadata.getFullName(group.getMetadataID());
- //TODO: order by primary key nulls first - then have an insert ordered optimization
- String transformation = metadata.getVirtualPlan(group.getMetadataID()).getQuery();
- QueryProcessor qp =
context.getQueryProcessorFactory().createQueryProcessor(transformation, fullName,
context);
- qp.setNonBlocking(true);
- TupleSource ts = new BatchCollector.BatchProducerTupleSource(qp);
+ TupleSource ts = null;
+ CacheID cid = null;
+ if (distributedCache != null) {
+ cid = new CacheID(new ParseInfo(), fullName, context.getVdbName(),
+ context.getVdbVersion(), context.getConnectionID(), context.getUserName());
+ CachedResults cr = this.distributedCache.get(cid);
+ if (cr != null) {
+ ts = cr.getResults().createIndexedTupleSource();
+ }
+ }
+
+ if (ts == null) {
+ //TODO: coordinate a distributed load
+ //TODO: order by primary key nulls first - then have an insert ordered optimization
+ String transformation = metadata.getVirtualPlan(group.getMetadataID()).getQuery();
+ QueryProcessor qp =
context.getQueryProcessorFactory().createQueryProcessor(transformation, fullName,
context);
+ qp.setNonBlocking(true);
+
+ if (distributedCache != null) {
+ CachedResults cr = new CachedResults();
+ BatchCollector bc = qp.createBatchCollector();
+ TupleBuffer tb = bc.collectTuples();
+ cr.setResults(tb);
+ MatTableKey key = new MatTableKey();
+ key.name = fullName;
+ key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
+ MatTableEntry matTableEntry = new MatTableEntry();
+ matTableEntry.lastUpdate = 0;
+ tables.put(key, matTableEntry, null);
+ this.distributedCache.put(cid, FunctionMethod.VDB_DETERMINISTIC, cr,
info.getTtl());
+ ts = tb.createIndexedTupleSource();
+ } else {
+ ts = new BatchCollector.BatchProducerTupleSource(qp);
+ }
+ }
+
//TODO: if this insert fails, it's unnecessary to do the undo processing
table.insert(ts, table.getColumns());
rowCount = table.getRowCount();
@@ -434,10 +531,10 @@
throw e;
} finally {
if (rowCount == -1) {
- info.setState(MatState.FAILED_LOAD, null);
+ info.setState(MatState.FAILED_LOAD, null, null);
} else {
globalStore.swapTempTable(tableName, table);
- info.setState(MatState.LOADED, true);
+ info.setState(MatState.LOADED, true, loadTime);
LogManager.logInfo(LogConstants.CTX_MATVIEWS,
QueryPlugin.Util.getString("TempTableDataManager.loaded", tableName, rowCount));
//$NON-NLS-1$
}
}
Modified:
branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
===================================================================
---
branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java 2010-09-21
17:39:11 UTC (rev 2593)
+++
branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java 2010-09-22
16:46:21 UTC (rev 2594)
@@ -84,16 +84,19 @@
}
}
- public synchronized MatState setState(MatState state, Boolean valid) {
+ public synchronized MatState setState(MatState state, Boolean valid, Long timestamp) {
MatState oldState = this.state;
if (valid != null) {
this.valid = valid;
}
setState(state);
+ if (timestamp != null) {
+ this.updateTime = timestamp;
+ }
notifyAll();
return oldState;
}
-
+
private void setState(MatState state) {
this.state = state;
this.updateTime = System.currentTimeMillis();
@@ -115,6 +118,10 @@
return valid;
}
+ public synchronized long getTtl() {
+ return ttl;
+ }
+
}
private ConcurrentHashMap<String, MatTableInfo> matTables = new
ConcurrentHashMap<String, MatTableInfo>();
Modified:
branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java
===================================================================
---
branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java 2010-09-21
17:39:11 UTC (rev 2593)
+++
branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java 2010-09-22
16:46:21 UTC (rev 2594)
@@ -31,6 +31,7 @@
import org.junit.Before;
import org.junit.Test;
+import org.teiid.cache.DefaultCacheFactory;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.BufferManagerFactory;
import org.teiid.core.TeiidProcessingException;
@@ -73,7 +74,7 @@
command.run();
}
};
- dataManager = new TempTableDataManager(hdm, bm, executor, cache);
+ dataManager = new TempTableDataManager(hdm, bm, executor, cache, cache, new
DefaultCacheFactory());
}
private void execute(String sql, List<?>... expectedResults) throws Exception {
Modified:
branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
---
branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2010-09-21
17:39:11 UTC (rev 2593)
+++
branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2010-09-22
16:46:21 UTC (rev 2594)
@@ -254,7 +254,7 @@
command.run();
}
};
- dataManager = new TempTableDataManager(dataManager, bufferMgr, executor,
cache);
+ dataManager = new TempTableDataManager(dataManager, bufferMgr, executor, cache,
null, null);
}
if (context.getQueryProcessorFactory() == null) {
context.setQueryProcessorFactory(new QueryProcessorFactoryImpl(bufferMgr,
dataManager, new DefaultCapabilitiesFinder(), null, context.getMetadata()));
Modified:
branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
===================================================================
---
branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestTempTables.java 2010-09-21
17:39:11 UTC (rev 2593)
+++
branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestTempTables.java 2010-09-22
16:46:21 UTC (rev 2594)
@@ -75,7 +75,7 @@
command.run();
}
};
- dataManager = new TempTableDataManager(fdm, bm, executor, cache);
+ dataManager = new TempTableDataManager(fdm, bm, executor, cache, null, null);
}
@Test public void testInsertWithQueryExpression() throws Exception {