Author: shawkins
Date: 2011-06-24 10:37:08 -0400 (Fri, 24 Jun 2011)
New Revision: 3270
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
branches/7.4.x/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java
Log:
TEIID-1657 changes to ensure that matview refresh is handled properly with distributed
caching enabled
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java 2011-06-23
19:56:53 UTC (rev 3269)
+++
branches/7.4.x/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java 2011-06-24
14:37:08 UTC (rev 3270)
@@ -78,7 +78,7 @@
*/
public class TransformationMetadata extends BasicQueryMetadata implements Serializable {
- private final class LiveQueryNode extends QueryNode {
+ private static final class LiveQueryNode extends QueryNode {
Procedure p;
private LiveQueryNode(Procedure p) {
super(null);
@@ -89,7 +89,19 @@
return p.getQueryPlan();
}
}
+
+ private static final class LiveTableQueryNode extends QueryNode {
+ Table t;
+ private LiveTableQueryNode(Table t) {
+ super(null);
+ this.t = t;
+ }
+ public String getQuery() {
+ return t.getSelectTransformation();
+ }
+ }
+
private final class VirtualFileInputStreamFactory extends
InputStreamFactory {
private final VirtualFile f;
@@ -463,8 +475,7 @@
if (!tableRecord.isVirtual()) {
throw new
QueryMetadataException(QueryPlugin.Util.getString("TransformationMetadata.QueryPlan_could_not_be_found_for_physical_group__6")+tableRecord.getFullName());
//$NON-NLS-1$
}
- String transQuery = tableRecord.getSelectTransformation();
- QueryNode queryNode = new QueryNode(transQuery);
+ LiveTableQueryNode queryNode = new LiveTableQueryNode(tableRecord);
// get any bindings and add them onto the query node
List bindings = tableRecord.getBindings();
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-06-23
19:56:53 UTC (rev 3269)
+++
branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-06-24
14:37:08 UTC (rev 3270)
@@ -315,7 +315,7 @@
String matTableName = metadata.getFullName(matTableId);
LogManager.logDetail(LogConstants.CTX_MATVIEWS, "processing refreshmatview
for", matViewName); //$NON-NLS-1$
MatTableInfo info = globalStore.getMatTableInfo(matTableName);
- boolean invalidate =
Boolean.TRUE.equals(((Constant)proc.getParameter(1).getExpression()).getValue());
+ boolean invalidate =
Boolean.TRUE.equals(((Constant)proc.getParameter(2).getExpression()).getValue());
if (invalidate) {
touchTable(context, matTableName, false);
}
@@ -325,7 +325,7 @@
}
GroupSymbol matTable = new GroupSymbol(matTableName);
matTable.setMetadataID(matTableId);
- int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore, info,
null);
+ int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore, info,
null, false);
return CollectionTupleSource.createUpdateCountTupleSource(rowCount);
} else if (StringUtil.endsWithIgnoreCase(proc.getProcedureCallableName(),
REFRESHMATVIEWROW)) {
Object groupID = validateMatView(metadata, proc);
@@ -415,11 +415,13 @@
key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
MatTableEntry entry = this.tables.get(key);
- boolean firstload = !info.isValid();
+ boolean notValid = !info.isValid();
if (entry != null && entry.lastUpdate > info.getUpdateTime()
- && info.getState() != MatState.LOADING) {
- //remote load
- info.setState(MatState.NEEDS_LOADING, firstload?false:entry.valid, null);
+ && info.getState() != MatState.LOADING
+ //TODO: use extension metadata or a config parameter to make this skew
configurable
+ && !(!notValid && entry.valid && info.getState() ==
MatState.LOADED && entry.lastUpdate < info.getUpdateTime() + 30000)) {
+ //trigger a remote load due to the cache being more up to date than the local copy
+ info.setState(MatState.NEEDS_LOADING, notValid?false:entry.valid, null);
loadTime = entry.lastUpdate;
}
}
@@ -427,7 +429,7 @@
if (load) {
if (!info.isValid()) {
//blocking load
- loadGlobalTable(context, group, tableName, globalStore, info, loadTime);
+ loadGlobalTable(context, group, tableName, globalStore, info, loadTime, true);
} else {
loadAsynch(context, group, tableName, globalStore, info, loadTime);
}
@@ -468,7 +470,7 @@
Callable<Integer> toCall = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
- return loadGlobalTable(context, group, tableName, globalStore, info, loadTime);
+ return loadGlobalTable(context, group, tableName, globalStore, info, loadTime,
true);
}
};
FutureTask<Integer> task = new FutureTask<Integer>(toCall);
@@ -477,7 +479,7 @@
private int loadGlobalTable(CommandContext context,
GroupSymbol group, final String tableName,
- TempTableStore globalStore, MatTableInfo info, Long loadTime)
+ TempTableStore globalStore, MatTableInfo info, Long loadTime, boolean useCache)
throws TeiidComponentException, TeiidProcessingException {
LogManager.logInfo(LogConstants.CTX_MATVIEWS,
QueryPlugin.Util.getString("TempTableDataManager.loading", tableName));
//$NON-NLS-1$
QueryMetadataInterface metadata = context.getMetadata();
@@ -511,9 +513,11 @@
if (distributedCache != null) {
cid = new CacheID(new ParseInfo(), fullName, context.getVdbName(),
context.getVdbVersion(), context.getConnectionID(), context.getUserName());
- CachedResults cr = this.distributedCache.get(cid);
- if (cr != null) {
- ts = cr.getResults().createIndexedTupleSource();
+ if (useCache) {
+ CachedResults cr = this.distributedCache.get(cid);
+ if (cr != null) {
+ ts = cr.getResults().createIndexedTupleSource();
+ }
}
}
Modified:
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
===================================================================
---
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-06-23
19:56:53 UTC (rev 3269)
+++
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-06-24
14:37:08 UTC (rev 3270)
@@ -22,7 +22,9 @@
package org.teiid.jdbc;
import java.io.File;
+import java.util.Collection;
import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.Properties;
import org.jboss.deployers.spi.DeploymentException;
@@ -37,6 +39,7 @@
import org.teiid.client.DQP;
import org.teiid.client.security.ILogon;
import org.teiid.deployers.MetadataStoreGroup;
+import org.teiid.deployers.UDFMetaData;
import org.teiid.deployers.VDBRepository;
import org.teiid.dqp.internal.datamgr.ConnectorManager;
import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
@@ -44,6 +47,7 @@
import org.teiid.dqp.internal.process.DQPConfiguration;
import org.teiid.dqp.internal.process.DQPCore;
import org.teiid.dqp.service.FakeBufferService;
+import org.teiid.metadata.FunctionMethod;
import org.teiid.metadata.MetadataRepository;
import org.teiid.metadata.MetadataStore;
import org.teiid.metadata.Schema;
@@ -61,7 +65,7 @@
import org.teiid.transport.LocalServerConnection;
import org.teiid.transport.LogonImpl;
-@SuppressWarnings("nls")
+@SuppressWarnings({"nls", "serial"})
public class FakeServer extends ClientServiceRegistryImpl implements ConnectionProfile {
SessionServiceImpl sessionService = new SessionServiceImpl();
@@ -97,7 +101,12 @@
});
config.setResultsetCacheConfig(new CacheConfiguration(Policy.LRU, 60, 250,
"resultsetcache")); //$NON-NLS-1$
- this.dqp.setCacheFactory(new DefaultCacheFactory());
+ this.dqp.setCacheFactory(new DefaultCacheFactory() {
+ @Override
+ public boolean isReplicated() {
+ return true; //pretend to be replicated for matview tests
+ }
+ });
this.dqp.start(config);
this.sessionService.setDqp(this.dqp);
@@ -121,18 +130,25 @@
public void setUseCallingThread(boolean useCallingThread) {
this.useCallingThread = useCallingThread;
}
+
+ public void deployVDB(String vdbName, String vdbPath) throws Exception {
+ deployVDB(vdbName, vdbPath, null);
+ }
- public void deployVDB(String vdbName, String vdbPath) throws Exception {
-
+ public void deployVDB(String vdbName, String vdbPath, Map<String,
Collection<FunctionMethod>> udfs) throws Exception {
IndexMetadataFactory imf = VDBMetadataFactory.loadMetadata(new
File(vdbPath).toURI().toURL());
MetadataStore metadata = imf.getMetadataStore(repo.getSystemStore().getDatatypes());
LinkedHashMap<String, Resource> entries = imf.getEntriesPlusVisibilities();
-
- deployVDB(vdbName, metadata, entries);
+ deployVDB(vdbName, metadata, entries, udfs);
}
+
+ public void deployVDB(String vdbName, MetadataStore metadata,
+ LinkedHashMap<String, Resource> entries) {
+ deployVDB(vdbName, metadata, entries, null);
+ }
public void deployVDB(String vdbName, MetadataStore metadata,
- LinkedHashMap<String, Resource> entries) {
+ LinkedHashMap<String, Resource> entries, Map<String,
Collection<FunctionMethod>> udfs) {
VDBMetaData vdbMetaData = new VDBMetaData();
vdbMetaData.setName(vdbName);
vdbMetaData.setStatus(VDB.Status.ACTIVE);
@@ -152,7 +168,14 @@
try {
MetadataStoreGroup stores = new MetadataStoreGroup();
stores.addStore(metadata);
- this.repo.addVDB(vdbMetaData, stores, entries, null, cmr);
+ UDFMetaData udfMetaData = null;
+ if (udfs != null) {
+ udfMetaData = new UDFMetaData();
+ for (Map.Entry<String, Collection<FunctionMethod>> entry :
udfs.entrySet()) {
+ udfMetaData.addFunctions(entry.getKey(), entry.getValue());
+ }
+ }
+ this.repo.addVDB(vdbMetaData, stores, entries, udfMetaData, cmr);
this.repo.finishDeployment(vdbName, 1);
} catch (DeploymentException e) {
throw new RuntimeException(e);
Modified:
branches/7.4.x/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java
===================================================================
---
branches/7.4.x/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java 2011-06-23
19:56:53 UTC (rev 3269)
+++
branches/7.4.x/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java 2011-06-24
14:37:08 UTC (rev 3270)
@@ -28,15 +28,22 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.teiid.adminapi.impl.VDBMetaData;
+import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.types.DataTypeManager;
import org.teiid.core.util.UnitTestUtil;
import org.teiid.jdbc.FakeServer;
import org.teiid.jdbc.TeiidSQLException;
-import org.teiid.metadata.Table;
-import org.teiid.query.metadata.TransformationMetadata;
+import org.teiid.metadata.FunctionMethod;
+import org.teiid.metadata.FunctionParameter;
+import org.teiid.metadata.FunctionMethod.Determinism;
+import org.teiid.metadata.FunctionMethod.PushDown;
@SuppressWarnings("nls")
public class TestMatViews {
@@ -44,13 +51,33 @@
private static final String MATVIEWS = "matviews";
private Connection conn;
private FakeServer server;
+
+ private static int count = 0;
+
+ public static int pause() throws InterruptedException {
+ synchronized (TestMatViews.class) {
+ count++;
+ TestMatViews.class.notify();
+ while (count < 2) {
+ TestMatViews.class.wait();
+ }
+ }
+ return 1;
+ }
@Before public void setUp() throws Exception {
server = new FakeServer();
- server.deployVDB(MATVIEWS, UnitTestUtil.getTestDataPath() +
"/matviews.vdb");
+ HashMap<String, Collection<FunctionMethod>> udfs = new
HashMap<String, Collection<FunctionMethod>>();
+ udfs.put("funcs", Arrays.asList(new FunctionMethod("pause",
null, null, PushDown.CANNOT_PUSHDOWN, TestMatViews.class.getName(), "pause",
null, new FunctionParameter("return", DataTypeManager.DefaultDataTypes.INTEGER),
false, Determinism.NONDETERMINISTIC)));
+ server.deployVDB(MATVIEWS, UnitTestUtil.getTestDataPath() +
"/matviews.vdb", udfs);
conn = server.createConnection("jdbc:teiid:matviews");
}
+ @After public void tearDown() throws Exception {
+ server.stop();
+ conn.close();
+ }
+
@Test public void testSystemMatViewsWithImplicitLoad() throws Exception {
Statement s = conn.createStatement();
ResultSet rs = s.executeQuery("select * from MatViews order by name");
@@ -80,6 +107,34 @@
@Test public void testSystemMatViewsWithExplicitRefresh() throws Exception {
Statement s = conn.createStatement();
+ ResultSet rs = s.executeQuery("select * from (call
refreshMatView('TEST.RANDOMVIEW', false)) p");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ rs = s.executeQuery("select * from MatViews where name =
'RandomView'");
+ assertTrue(rs.next());
+ assertEquals("LOADED", rs.getString("loadstate"));
+ assertEquals(true, rs.getBoolean("valid"));
+ rs = s.executeQuery("select x from TEST.RANDOMVIEW");
+ assertTrue(rs.next());
+ double key = rs.getDouble(1);
+
+ rs = s.executeQuery("select * from (call refreshMatView('TEST.RANDOMVIEW',
false)) p");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ rs = s.executeQuery("select * from MatViews where name =
'RandomView'");
+ assertTrue(rs.next());
+ assertEquals("LOADED", rs.getString("loadstate"));
+ assertEquals(true, rs.getBoolean("valid"));
+ rs = s.executeQuery("select x from TEST.RANDOMVIEW");
+ assertTrue(rs.next());
+ double key1 = rs.getDouble(1);
+
+ //ensure that invalidate with distributed caching works
+ assertTrue(key1 != key);
+ }
+
+ @Test public void testSystemManViewsWithExplictRefreshAndInvalidate() throws Exception
{
+ Statement s = conn.createStatement();
ResultSet rs = s.executeQuery("select * from (call
refreshMatView('TEST.MATVIEW', false)) p");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
@@ -87,6 +142,42 @@
assertTrue(rs.next());
assertEquals("LOADED", rs.getString("loadstate"));
assertEquals(true, rs.getBoolean("valid"));
+
+ count = 0;
+ s.execute("alter view TEST.MATVIEW as select pause() as x");
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ Statement s1 = conn.createStatement();
+ ResultSet rs = s1.executeQuery("select * from (call
refreshMatView('TEST.MATVIEW', true)) p");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ } catch (Exception e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+ };
+ t.start();
+ synchronized (TestMatViews.class) {
+ while (count < 1) {
+ TestMatViews.class.wait();
+ }
+ }
+ rs = s.executeQuery("select * from MatViews where name =
'MatView'");
+ assertTrue(rs.next());
+ assertEquals("NEEDS_LOADING", rs.getString("loadstate"));
+ assertEquals(false, rs.getBoolean("valid"));
+
+ synchronized (TestMatViews.class) {
+ count++;
+ TestMatViews.class.notify();
+ }
+ t.join();
+
+ rs = s.executeQuery("select * from MatViews where name =
'MatView'");
+ assertTrue(rs.next());
+ assertEquals("LOADED", rs.getString("loadstate"));
+ assertEquals(true, rs.getBoolean("valid"));
}
@Test(expected=TeiidSQLException.class) public void testSystemMatViewsInvalidView()
throws Exception {
@@ -99,14 +190,27 @@
s.execute("call refreshMatView('foo', false)");
}
+ @Test(expected=TeiidSQLException.class) public void
testSystemMatViewsWithRowRefreshNotAllowed() throws Exception {
+ Statement s = conn.createStatement();
+ s.execute("alter view test.randomview as select rand() as x, rand() as y");
+ ResultSet rs = s.executeQuery("select * from (call
refreshMatView('TEST.RANDOMVIEW', false)) p");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ rs = s.executeQuery("select * from MatViews where name =
'RandomView'");
+ assertTrue(rs.next());
+ assertEquals("LOADED", rs.getString("loadstate"));
+ assertEquals(true, rs.getBoolean("valid"));
+ rs = s.executeQuery("select x from TEST.RANDOMVIEW");
+ assertTrue(rs.next());
+ double key = rs.getDouble(1);
+
+ rs = s.executeQuery("select * from (call
refreshMatViewRow('TEST.RANDOMVIEW', "+key+")) p");
+ }
+
@Test public void testSystemMatViewsWithRowRefresh() throws Exception {
- //TOOD: remove this. it's a workaround for TEIIDDES-549
- VDBMetaData vdb = server.getVDB(MATVIEWS);
- TransformationMetadata tm = vdb.getAttachment(TransformationMetadata.class);
- Table t = tm.getGroupID("TEST.RANDOMVIEW");
- t.setSelectTransformation("/*+ cache(updatable) */ " +
t.getSelectTransformation());
+ Statement s = conn.createStatement();
- Statement s = conn.createStatement();
+ s.execute("alter view test.randomview as /*+ cache(updatable) */ select rand() as
x, rand() as y");
//prior to load refresh of a single row returns -1
ResultSet rs = s.executeQuery("select * from (call
refreshMatViewRow('TEST.RANDOMVIEW', 0)) p");
assertTrue(rs.next());