[teiid-commits] teiid SVN: r3484 - in branches/7.1.x: engine/src/main/java/org/teiid/query/tempdata and 2 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Tue Sep 13 19:20:28 EDT 2011
Author: loleary
Date: 2011-09-13 19:20:28 -0400 (Tue, 13 Sep 2011)
New Revision: 3484
Modified:
branches/7.1.x/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
branches/7.1.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
branches/7.1.x/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java
Log:
TEIID-1657 changes to ensure that matview refresh is handled properly with distributed caching enabled.
Modified: branches/7.1.x/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java 2011-09-13 23:20:09 UTC (rev 3483)
+++ branches/7.1.x/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java 2011-09-13 23:20:28 UTC (rev 3484)
@@ -80,6 +80,18 @@
*/
public class TransformationMetadata extends BasicQueryMetadata implements Serializable {
+ private static final class LiveTableQueryNode extends QueryNode {
+ Table t;
+ private LiveTableQueryNode(Table t) {
+ super(t.getFullName(), null);
+ this.t = t;
+ }
+
+ public String getQuery() {
+ return t.getSelectTransformation();
+ }
+ }
+
private final class VirtualFileInputStreamFactory extends
InputStreamFactory {
private final VirtualFile f;
@@ -480,8 +492,7 @@
if (!tableRecord.isVirtual()) {
throw new QueryMetadataException(QueryPlugin.Util.getString("TransformationMetadata.QueryPlan_could_not_be_found_for_physical_group__6")+tableRecord.getFullName()); //$NON-NLS-1$
}
- String transQuery = tableRecord.getSelectTransformation();
- QueryNode queryNode = new QueryNode(tableRecord.getFullName(), transQuery);
+ LiveTableQueryNode queryNode = new LiveTableQueryNode(tableRecord);
// get any bindings and add them onto the query node
List bindings = tableRecord.getBindings();
Modified: branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-09-13 23:20:09 UTC (rev 3483)
+++ branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-09-13 23:20:28 UTC (rev 3484)
@@ -303,7 +303,7 @@
String matTableName = RelationalPlanner.MAT_PREFIX+matViewName.toUpperCase();
LogManager.logDetail(LogConstants.CTX_MATVIEWS, "processing refreshmatview for", matViewName); //$NON-NLS-1$
MatTableInfo info = globalStore.getMatTableInfo(matTableName);
- boolean invalidate = Boolean.TRUE.equals(((Constant)proc.getParameter(1).getExpression()).getValue());
+ boolean invalidate = Boolean.TRUE.equals(((Constant)proc.getParameter(2).getExpression()).getValue());
if (invalidate) {
touchTable(context, matTableName, false);
}
@@ -316,7 +316,7 @@
Object matTableId = RelationalPlanner.getGlobalTempTableMetadataId(group, matTableName, context, metadata, AnalysisRecord.createNonRecordingRecord());
GroupSymbol matTable = new GroupSymbol(matTableName);
matTable.setMetadataID(matTableId);
- int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore, info, null);
+ int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore, info, null, false);
return CollectionTupleSource.createUpdateCountTupleSource(rowCount);
} else if (StringUtil.endsWithIgnoreCase(proc.getProcedureCallableName(), REFRESHMATVIEWROW)) {
Object groupID = validateMatView(metadata, proc);
@@ -396,18 +396,19 @@
key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
MatTableEntry entry = this.tables.get(key);
- boolean firstload = !info.isValid();
- if (entry != null && entry.lastUpdate > info.getUpdateTime() && info.getState() != MatState.LOADING) {
- //remote load
- info.setState(MatState.NEEDS_LOADING, firstload?false:entry.valid, null);
+ boolean notValid = !info.isValid();
+ if (entry != null && entry.lastUpdate > info.getUpdateTime()
+ && info.getState() != MatState.LOADING) {
+ //trigger a remote load due to the cache being more up to date than the local copy
+ info.setState(MatState.NEEDS_LOADING, notValid?false:entry.valid, null);
loadTime = entry.lastUpdate;
- }
+ }
}
boolean load = info.shouldLoad();
if (load) {
if (!info.isValid()) {
//blocking load
- loadGlobalTable(context, group, tableName, globalStore, info, loadTime);
+ loadGlobalTable(context, group, tableName, globalStore, info, loadTime, true);
} else {
loadAsynch(context, group, tableName, globalStore, info, loadTime);
}
@@ -440,7 +441,7 @@
Callable<Integer> toCall = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
- return loadGlobalTable(context, group, tableName, globalStore, info, loadTime);
+ return loadGlobalTable(context, group, tableName, globalStore, info, loadTime, true);
}
};
FutureTask<Integer> task = new FutureTask<Integer>(toCall);
@@ -449,7 +450,7 @@
private int loadGlobalTable(CommandContext context,
GroupSymbol group, final String tableName,
- TempTableStore globalStore, MatTableInfo info, Long loadTime)
+ TempTableStore globalStore, MatTableInfo info, Long loadTime, boolean useCache)
throws TeiidComponentException, TeiidProcessingException {
LogManager.logInfo(LogConstants.CTX_MATVIEWS, QueryPlugin.Util.getString("TempTableDataManager.loading", tableName)); //$NON-NLS-1$
QueryMetadataInterface metadata = context.getMetadata();
@@ -483,9 +484,11 @@
if (distributedCache != null) {
cid = new CacheID(new ParseInfo(), fullName, context.getVdbName(),
context.getVdbVersion(), context.getConnectionID(), context.getUserName());
- CachedResults cr = this.distributedCache.get(cid);
- if (cr != null) {
- ts = cr.getResults().createIndexedTupleSource();
+ if (useCache) {
+ CachedResults cr = this.distributedCache.get(cid);
+ if (cr != null) {
+ ts = cr.getResults().createIndexedTupleSource();
+ }
}
}
Modified: branches/7.1.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
===================================================================
--- branches/7.1.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-09-13 23:20:09 UTC (rev 3483)
+++ branches/7.1.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-09-13 23:20:28 UTC (rev 3484)
@@ -22,6 +22,9 @@
package org.teiid.jdbc;
import java.io.File;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.Properties;
import org.jboss.deployers.spi.DeploymentException;
@@ -31,11 +34,12 @@
import org.teiid.adminapi.impl.ModelMetaData;
import org.teiid.adminapi.impl.VDBMetaData;
import org.teiid.cache.CacheConfiguration;
+import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.cache.DefaultCacheFactory;
-import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.client.DQP;
import org.teiid.client.security.ILogon;
import org.teiid.deployers.MetadataStoreGroup;
+import org.teiid.deployers.UDFMetaData;
import org.teiid.deployers.VDBRepository;
import org.teiid.dqp.internal.datamgr.ConnectorManager;
import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
@@ -48,6 +52,8 @@
import org.teiid.metadata.index.IndexMetadataFactory;
import org.teiid.metadata.index.VDBMetadataFactory;
import org.teiid.query.function.SystemFunctionManager;
+import org.teiid.query.function.metadata.FunctionMethod;
+import org.teiid.query.metadata.TransformationMetadata.Resource;
import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities;
import org.teiid.query.optimizer.capabilities.SourceCapabilities;
import org.teiid.services.SessionServiceImpl;
@@ -66,6 +72,10 @@
private ConnectorManagerRepository cmr;
public FakeServer() {
+ this(new DQPConfiguration());
+ }
+
+ public FakeServer(DQPConfiguration config) {
this.logon = new LogonImpl(sessionService, null);
this.repo.setSystemStore(VDBMetadataFactory.getSystem());
@@ -86,9 +96,13 @@
}
});
- DQPConfiguration config = new DQPConfiguration();
config.setResultsetCacheConfig(new CacheConfiguration(Policy.LRU, 60, 250, "resultsetcache")); //$NON-NLS-1$
- this.dqp.setCacheFactory(new DefaultCacheFactory());
+ this.dqp.setCacheFactory(new DefaultCacheFactory() {
+ @Override
+ public boolean isReplicated() {
+ return true; //pretend to be replicated for matview tests
+ }
+ });
this.dqp.start(config);
this.sessionService.setDqp(this.dqp);
@@ -96,12 +110,29 @@
registerClientService(DQP.class, dqp, null);
}
+ public void stop() {
+ this.dqp.stop();
+ }
+
public void deployVDB(String vdbName, String vdbPath) throws Exception {
-
+ deployVDB(vdbName, vdbPath, null);
+ }
+
+ public void deployVDB(String vdbName, String vdbPath, Map<String, Collection<FunctionMethod>> udfs) throws Exception {
IndexMetadataFactory imf = VDBMetadataFactory.loadMetadata(new File(vdbPath).toURI().toURL());
MetadataStore metadata = imf.getMetadataStore(repo.getSystemStore().getDatatypes());
-
- VDBMetaData vdbMetaData = new VDBMetaData();
+ LinkedHashMap<String, Resource> entries = imf.getEntriesPlusVisibilities();
+ deployVDB(vdbName, metadata, entries, udfs);
+ }
+
+ public void deployVDB(String vdbName, MetadataStore metadata,
+ LinkedHashMap<String, Resource> entries) {
+ deployVDB(vdbName, metadata, entries, null);
+ }
+
+ public void deployVDB(String vdbName, MetadataStore metadata,
+ LinkedHashMap<String, Resource> entries, Map<String, Collection<FunctionMethod>> udfs) {
+ VDBMetaData vdbMetaData = new VDBMetaData();
vdbMetaData.setName(vdbName);
vdbMetaData.setStatus(VDB.Status.ACTIVE);
@@ -120,10 +151,17 @@
try {
MetadataStoreGroup stores = new MetadataStoreGroup();
stores.addStore(metadata);
- this.repo.addVDB(vdbMetaData, stores, imf.getEntriesPlusVisibilities(), null, cmr);
+ UDFMetaData udfMetaData = null;
+ if (udfs != null) {
+ udfMetaData = new UDFMetaData();
+ for (Map.Entry<String, Collection<FunctionMethod>> entry : udfs.entrySet()) {
+ udfMetaData.addFunctions(entry.getValue());
+ }
+ }
+ this.repo.addVDB(vdbMetaData, stores, entries, udfMetaData, cmr);
} catch (DeploymentException e) {
throw new RuntimeException(e);
- }
+ }
}
public void removeVDB(String vdbName) {
Modified: branches/7.1.x/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java
===================================================================
--- branches/7.1.x/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java 2011-09-13 23:20:09 UTC (rev 3483)
+++ branches/7.1.x/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java 2011-09-13 23:20:28 UTC (rev 3484)
@@ -22,20 +22,30 @@
package org.teiid.systemmodel;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.teiid.adminapi.impl.VDBMetaData;
+import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.types.DataTypeManager;
import org.teiid.core.util.UnitTestUtil;
import org.teiid.jdbc.FakeServer;
import org.teiid.jdbc.TeiidSQLException;
import org.teiid.metadata.Table;
+import org.teiid.query.function.metadata.FunctionMethod;
+import org.teiid.query.function.metadata.FunctionParameter;
import org.teiid.query.metadata.TransformationMetadata;
@SuppressWarnings("nls")
@@ -44,13 +54,32 @@
private static final String MATVIEWS = "matviews";
private Connection conn;
private FakeServer server;
+
+ private static int count = 0;
+
+ public static int pause() throws InterruptedException {
+ synchronized (TestMatViews.class) {
+ count++;
+ TestMatViews.class.notify();
+ while (count < 2) {
+ TestMatViews.class.wait();
+ }
+ }
+ return 1;
+ }
@Before public void setUp() throws Exception {
server = new FakeServer();
- server.deployVDB(MATVIEWS, UnitTestUtil.getTestDataPath() + "/matviews.vdb");
+ HashMap<String, Collection<FunctionMethod>> udfs = new HashMap<String, Collection<FunctionMethod>>();
+ udfs.put("funcs", Arrays.asList(new FunctionMethod("pause", null, null, FunctionMethod.CANNOT_PUSHDOWN, TestMatViews.class.getName(), "pause", null, new FunctionParameter("return", DataTypeManager.DefaultDataTypes.INTEGER), false, FunctionMethod.NONDETERMINISTIC)));
+ server.deployVDB(MATVIEWS, UnitTestUtil.getTestDataPath() + "/matviews.vdb", udfs);
conn = server.createConnection("jdbc:teiid:matviews");
}
+ @After public void tearDown() throws Exception {
+ server.stop();
+ }
+
@Test public void testSystemMatViewsWithImplicitLoad() throws Exception {
Statement s = conn.createStatement();
ResultSet rs = s.executeQuery("select * from MatViews order by name");
@@ -80,6 +109,34 @@
@Test public void testSystemMatViewsWithExplicitRefresh() throws Exception {
Statement s = conn.createStatement();
+ ResultSet rs = s.executeQuery("select * from (call refreshMatView('TEST.RANDOMVIEW', false)) p");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ rs = s.executeQuery("select * from MatViews where name = 'RandomView'");
+ assertTrue(rs.next());
+ assertEquals("LOADED", rs.getString("loadstate"));
+ assertEquals(true, rs.getBoolean("valid"));
+ rs = s.executeQuery("select x from TEST.RANDOMVIEW");
+ assertTrue(rs.next());
+ double key = rs.getDouble(1);
+
+ rs = s.executeQuery("select * from (call refreshMatView('TEST.RANDOMVIEW', false)) p");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ rs = s.executeQuery("select * from MatViews where name = 'RandomView'");
+ assertTrue(rs.next());
+ assertEquals("LOADED", rs.getString("loadstate"));
+ assertEquals(true, rs.getBoolean("valid"));
+ rs = s.executeQuery("select x from TEST.RANDOMVIEW");
+ assertTrue(rs.next());
+ double key1 = rs.getDouble(1);
+
+ //ensure that invalidate with distributed caching works
+ assertTrue(key1 != key);
+ }
+
+ @Test public void testSystemManViewsWithExplictRefreshAndInvalidate() throws Exception {
+ Statement s = conn.createStatement();
ResultSet rs = s.executeQuery("select * from (call refreshMatView('TEST.MATVIEW', false)) p");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
@@ -87,6 +144,47 @@
assertTrue(rs.next());
assertEquals("LOADED", rs.getString("loadstate"));
assertEquals(true, rs.getBoolean("valid"));
+
+ count = 0;
+
+ VDBMetaData vdb = server.getVDB(MATVIEWS);
+ TransformationMetadata tm = vdb.getAttachment(TransformationMetadata.class);
+ Table tbl = tm.getGroupID("TEST.MATVIEW");
+ tbl.setSelectTransformation("select pause() as x");
+
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ Statement s1 = conn.createStatement();
+ ResultSet rs = s1.executeQuery("select * from (call refreshMatView('TEST.MATVIEW', true)) p");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ } catch (Exception e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+ };
+ t.start();
+ synchronized (TestMatViews.class) {
+ while (count < 1) {
+ TestMatViews.class.wait();
+ }
+ }
+ rs = s.executeQuery("select * from MatViews where name = 'MatView'");
+ assertTrue(rs.next());
+ assertEquals("NEEDS_LOADING", rs.getString("loadstate"));
+ assertEquals(false, rs.getBoolean("valid"));
+
+ synchronized (TestMatViews.class) {
+ count++;
+ TestMatViews.class.notify();
+ }
+ t.join();
+
+ rs = s.executeQuery("select * from MatViews where name = 'MatView'");
+ assertTrue(rs.next());
+ assertEquals("LOADED", rs.getString("loadstate"));
+ assertEquals(true, rs.getBoolean("valid"));
}
@Test(expected=TeiidSQLException.class) public void testSystemMatViewsInvalidView() throws Exception {
@@ -99,14 +197,34 @@
s.execute("call refreshMatView('foo', false)");
}
+ @Test(expected=TeiidSQLException.class) public void testSystemMatViewsWithRowRefreshNotAllowed() throws Exception {
+ Statement s = conn.createStatement();
+ VDBMetaData vdb = server.getVDB(MATVIEWS);
+ TransformationMetadata tm = vdb.getAttachment(TransformationMetadata.class);
+ Table t = tm.getGroupID("TEST.RANDOMVIEW");
+ t.setSelectTransformation("select rand() as x, rand() as y");
+ ResultSet rs = s.executeQuery("select * from (call refreshMatView('TEST.RANDOMVIEW', false)) p");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ rs = s.executeQuery("select * from MatViews where name = 'RandomView'");
+ assertTrue(rs.next());
+ assertEquals("LOADED", rs.getString("loadstate"));
+ assertEquals(true, rs.getBoolean("valid"));
+ rs = s.executeQuery("select x from TEST.RANDOMVIEW");
+ assertTrue(rs.next());
+ double key = rs.getDouble(1);
+
+ rs = s.executeQuery("select * from (call refreshMatViewRow('TEST.RANDOMVIEW', "+key+")) p");
+ }
+
@Test public void testSystemMatViewsWithRowRefresh() throws Exception {
- //TOOD: remove this. it's a workaround for TEIIDDES-549
+ Statement s = conn.createStatement();
+
VDBMetaData vdb = server.getVDB(MATVIEWS);
TransformationMetadata tm = vdb.getAttachment(TransformationMetadata.class);
Table t = tm.getGroupID("TEST.RANDOMVIEW");
t.setSelectTransformation("/*+ cache(updatable) */ " + t.getSelectTransformation());
- Statement s = conn.createStatement();
//prior to load refresh of a single row returns -1
ResultSet rs = s.executeQuery("select * from (call refreshMatViewRow('TEST.RANDOMVIEW', 0)) p");
assertTrue(rs.next());
More information about the teiid-commits
mailing list