[teiid-commits] teiid SVN: r3276 - in branches/7.1.1.CP3: engine/src/main/java/org/teiid/query/tempdata and 2 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Mon Jun 27 17:44:19 EDT 2011


Author: mdrillin
Date: 2011-06-27 17:44:19 -0400 (Mon, 27 Jun 2011)
New Revision: 3276

Modified:
   branches/7.1.1.CP3/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
   branches/7.1.1.CP3/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
   branches/7.1.1.CP3/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
   branches/7.1.1.CP3/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java
Log:
TEIID-1657 changes to ensure that matview refresh is handled properly with distributed caching enabled.

Modified: branches/7.1.1.CP3/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
===================================================================
--- branches/7.1.1.CP3/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java	2011-06-27 18:58:27 UTC (rev 3275)
+++ branches/7.1.1.CP3/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java	2011-06-27 21:44:19 UTC (rev 3276)
@@ -80,6 +80,18 @@
  */
 public class TransformationMetadata extends BasicQueryMetadata implements Serializable {
 	
+	private static final class LiveTableQueryNode extends QueryNode {
+		Table t;
+		private LiveTableQueryNode(Table t) {
+			super(t.getFullName(), null);
+			this.t = t;
+		}
+
+		public String getQuery() {
+			return t.getSelectTransformation();
+		}
+	}
+	
 	private final class VirtualFileInputStreamFactory extends
 			InputStreamFactory {
 		private final VirtualFile f;
@@ -480,8 +492,7 @@
         if (!tableRecord.isVirtual()) {
             throw new QueryMetadataException(QueryPlugin.Util.getString("TransformationMetadata.QueryPlan_could_not_be_found_for_physical_group__6")+tableRecord.getFullName()); //$NON-NLS-1$
         }
-        String transQuery = tableRecord.getSelectTransformation();
-        QueryNode queryNode = new QueryNode(tableRecord.getFullName(), transQuery);
+        LiveTableQueryNode queryNode = new LiveTableQueryNode(tableRecord);
 
         // get any bindings and add them onto the query node
         List bindings = tableRecord.getBindings();

Modified: branches/7.1.1.CP3/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- branches/7.1.1.CP3/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2011-06-27 18:58:27 UTC (rev 3275)
+++ branches/7.1.1.CP3/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2011-06-27 21:44:19 UTC (rev 3276)
@@ -303,7 +303,7 @@
 			String matTableName = RelationalPlanner.MAT_PREFIX+matViewName.toUpperCase();
 			LogManager.logDetail(LogConstants.CTX_MATVIEWS, "processing refreshmatview for", matViewName); //$NON-NLS-1$
 			MatTableInfo info = globalStore.getMatTableInfo(matTableName);
-			boolean invalidate = Boolean.TRUE.equals(((Constant)proc.getParameter(1).getExpression()).getValue());
+			boolean invalidate = Boolean.TRUE.equals(((Constant)proc.getParameter(2).getExpression()).getValue());
 			if (invalidate) {
 				touchTable(context, matTableName, false);
 			}
@@ -316,7 +316,7 @@
 			Object matTableId = RelationalPlanner.getGlobalTempTableMetadataId(group, matTableName, context, metadata, AnalysisRecord.createNonRecordingRecord());
 			GroupSymbol matTable = new GroupSymbol(matTableName);
 			matTable.setMetadataID(matTableId);
-			int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore, info, null);
+			int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore, info, null, false);
 			return CollectionTupleSource.createUpdateCountTupleSource(rowCount);
 		} else if (StringUtil.endsWithIgnoreCase(proc.getProcedureCallableName(), REFRESHMATVIEWROW)) {
 			Object groupID = validateMatView(metadata, proc);
@@ -396,18 +396,19 @@
 				key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
 				
 				MatTableEntry entry = this.tables.get(key);
-				boolean firstload = !info.isValid();
-				if (entry != null && entry.lastUpdate > info.getUpdateTime() && info.getState() != MatState.LOADING) {
-					//remote load
-					info.setState(MatState.NEEDS_LOADING, firstload?false:entry.valid, null);
+				boolean notValid = !info.isValid();
+ 				if (entry != null && entry.lastUpdate > info.getUpdateTime() 
+						&& info.getState() != MatState.LOADING) {
+					//trigger a remote load due to the cache being more up to date than the local copy
+					info.setState(MatState.NEEDS_LOADING, notValid?false:entry.valid, null);
 					loadTime = entry.lastUpdate;
-				}
+ 				}
 			}
 			boolean load = info.shouldLoad();
 			if (load) {
 				if (!info.isValid()) {
 					//blocking load
-					loadGlobalTable(context, group, tableName, globalStore, info, loadTime);
+					loadGlobalTable(context, group, tableName, globalStore, info, loadTime, true);
 				} else {
 					loadAsynch(context, group, tableName, globalStore, info, loadTime);
 				}
@@ -440,7 +441,7 @@
 		Callable<Integer> toCall = new Callable<Integer>() {
 			@Override
 			public Integer call() throws Exception {
-				return loadGlobalTable(context, group, tableName, globalStore, info, loadTime);
+				return loadGlobalTable(context, group, tableName, globalStore, info, loadTime, true);
 			}
 		};
 		FutureTask<Integer> task = new FutureTask<Integer>(toCall);
@@ -449,7 +450,7 @@
 
 	private int loadGlobalTable(CommandContext context,
 			GroupSymbol group, final String tableName,
-			TempTableStore globalStore, MatTableInfo info, Long loadTime)
+			TempTableStore globalStore, MatTableInfo info, Long loadTime, boolean useCache)
 			throws TeiidComponentException, TeiidProcessingException {
 		LogManager.logInfo(LogConstants.CTX_MATVIEWS, QueryPlugin.Util.getString("TempTableDataManager.loading", tableName)); //$NON-NLS-1$
 		QueryMetadataInterface metadata = context.getMetadata();
@@ -483,9 +484,11 @@
 			if (distributedCache != null) {
 				cid = new CacheID(new ParseInfo(), fullName, context.getVdbName(), 
 						context.getVdbVersion(), context.getConnectionID(), context.getUserName());
-				CachedResults cr = this.distributedCache.get(cid);
-				if (cr != null) {
-					ts = cr.getResults().createIndexedTupleSource();
+				if (useCache) {
+					CachedResults cr = this.distributedCache.get(cid);
+					if (cr != null) {
+						ts = cr.getResults().createIndexedTupleSource();
+					}
 				}
 			}
 			

Modified: branches/7.1.1.CP3/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
===================================================================
--- branches/7.1.1.CP3/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java	2011-06-27 18:58:27 UTC (rev 3275)
+++ branches/7.1.1.CP3/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java	2011-06-27 21:44:19 UTC (rev 3276)
@@ -22,6 +22,9 @@
 package org.teiid.jdbc;
 
 import java.io.File;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import org.jboss.deployers.spi.DeploymentException;
@@ -31,11 +34,12 @@
 import org.teiid.adminapi.impl.ModelMetaData;
 import org.teiid.adminapi.impl.VDBMetaData;
 import org.teiid.cache.CacheConfiguration;
+import org.teiid.cache.CacheConfiguration.Policy;
 import org.teiid.cache.DefaultCacheFactory;
-import org.teiid.cache.CacheConfiguration.Policy;
 import org.teiid.client.DQP;
 import org.teiid.client.security.ILogon;
 import org.teiid.deployers.MetadataStoreGroup;
+import org.teiid.deployers.UDFMetaData;
 import org.teiid.deployers.VDBRepository;
 import org.teiid.dqp.internal.datamgr.ConnectorManager;
 import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
@@ -48,6 +52,8 @@
 import org.teiid.metadata.index.IndexMetadataFactory;
 import org.teiid.metadata.index.VDBMetadataFactory;
 import org.teiid.query.function.SystemFunctionManager;
+import org.teiid.query.function.metadata.FunctionMethod;
+import org.teiid.query.metadata.TransformationMetadata.Resource;
 import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities;
 import org.teiid.query.optimizer.capabilities.SourceCapabilities;
 import org.teiid.services.SessionServiceImpl;
@@ -66,6 +72,10 @@
 	private ConnectorManagerRepository cmr;
 	
 	public FakeServer() {
+		this(new DQPConfiguration());
+	}
+	
+	public FakeServer(DQPConfiguration config) {
 		this.logon = new LogonImpl(sessionService, null);
 		
 		this.repo.setSystemStore(VDBMetadataFactory.getSystem());
@@ -86,9 +96,13 @@
         	}
         });
         
-        DQPConfiguration config = new DQPConfiguration();
         config.setResultsetCacheConfig(new CacheConfiguration(Policy.LRU, 60, 250, "resultsetcache")); //$NON-NLS-1$
-        this.dqp.setCacheFactory(new DefaultCacheFactory());
+        this.dqp.setCacheFactory(new DefaultCacheFactory() {
+        	@Override
+        	public boolean isReplicated() {
+        		return true; //pretend to be replicated for matview tests
+        	}
+        });
         this.dqp.start(config);
         this.sessionService.setDqp(this.dqp);
         
@@ -96,12 +110,29 @@
         registerClientService(DQP.class, dqp, null);
 	}
 	
+	public void stop() {
+		this.dqp.stop();
+	}
+	
 	public void deployVDB(String vdbName, String vdbPath) throws Exception {
-		
+		deployVDB(vdbName, vdbPath, null);
+	}
+	
+	public void deployVDB(String vdbName, String vdbPath, Map<String, Collection<FunctionMethod>> udfs) throws Exception {
 		IndexMetadataFactory imf = VDBMetadataFactory.loadMetadata(new File(vdbPath).toURI().toURL());
 		MetadataStore metadata = imf.getMetadataStore(repo.getSystemStore().getDatatypes());
-		
-        VDBMetaData vdbMetaData = new VDBMetaData();
+		LinkedHashMap<String, Resource> entries = imf.getEntriesPlusVisibilities();
+        deployVDB(vdbName, metadata, entries, udfs);		
+	}
+	
+	public void deployVDB(String vdbName, MetadataStore metadata,
+			LinkedHashMap<String, Resource> entries) {
+		deployVDB(vdbName, metadata, entries, null);
+	}
+
+	public void deployVDB(String vdbName, MetadataStore metadata,
+			LinkedHashMap<String, Resource> entries, Map<String, Collection<FunctionMethod>> udfs) {
+		VDBMetaData vdbMetaData = new VDBMetaData();
         vdbMetaData.setName(vdbName);
         vdbMetaData.setStatus(VDB.Status.ACTIVE);
         
@@ -120,10 +151,17 @@
         try {
         	MetadataStoreGroup stores = new MetadataStoreGroup();
         	stores.addStore(metadata);
-			this.repo.addVDB(vdbMetaData, stores, imf.getEntriesPlusVisibilities(), null, cmr);
+        	UDFMetaData udfMetaData = null;
+        	if (udfs != null) {
+        		udfMetaData = new UDFMetaData();
+        		for (Map.Entry<String, Collection<FunctionMethod>> entry : udfs.entrySet()) {
+        			udfMetaData.addFunctions(entry.getValue());
+        		}
+        	}
+			this.repo.addVDB(vdbMetaData, stores, entries, udfMetaData, cmr);
 		} catch (DeploymentException e) {
 			throw new RuntimeException(e);
-		}		
+		}
 	}
 	
 	public void removeVDB(String vdbName) {

Modified: branches/7.1.1.CP3/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java
===================================================================
--- branches/7.1.1.CP3/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java	2011-06-27 18:58:27 UTC (rev 3275)
+++ branches/7.1.1.CP3/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java	2011-06-27 21:44:19 UTC (rev 3276)
@@ -22,20 +22,30 @@
 
 package org.teiid.systemmodel;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.teiid.adminapi.impl.VDBMetaData;
+import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.types.DataTypeManager;
 import org.teiid.core.util.UnitTestUtil;
 import org.teiid.jdbc.FakeServer;
 import org.teiid.jdbc.TeiidSQLException;
 import org.teiid.metadata.Table;
+import org.teiid.query.function.metadata.FunctionMethod;
+import org.teiid.query.function.metadata.FunctionParameter;
 import org.teiid.query.metadata.TransformationMetadata;
 
 @SuppressWarnings("nls")
@@ -44,13 +54,32 @@
     private static final String MATVIEWS = "matviews";
 	private Connection conn;
 	private FakeServer server;
+	
+	private static int count = 0;
+	
+	public static int pause() throws InterruptedException {
+		synchronized (TestMatViews.class) {
+			count++;
+			TestMatViews.class.notify();
+			while (count < 2) {
+				TestMatViews.class.wait();
+			}
+		}
+		return 1;
+	}
 
 	@Before public void setUp() throws Exception {
     	server = new FakeServer();
-    	server.deployVDB(MATVIEWS, UnitTestUtil.getTestDataPath() + "/matviews.vdb");
+    	HashMap<String, Collection<FunctionMethod>> udfs = new HashMap<String, Collection<FunctionMethod>>();
+    	udfs.put("funcs", Arrays.asList(new FunctionMethod("pause", null, null, FunctionMethod.CANNOT_PUSHDOWN, TestMatViews.class.getName(), "pause", null, new FunctionParameter("return", DataTypeManager.DefaultDataTypes.INTEGER), false, FunctionMethod.NONDETERMINISTIC)));
+    	server.deployVDB(MATVIEWS, UnitTestUtil.getTestDataPath() + "/matviews.vdb", udfs);
     	conn = server.createConnection("jdbc:teiid:matviews");
     }
 	
+	@After public void tearDown() throws Exception {
+		server.stop();
+	}
+	
 	@Test public void testSystemMatViewsWithImplicitLoad() throws Exception {
 		Statement s = conn.createStatement();
 		ResultSet rs = s.executeQuery("select * from MatViews order by name");
@@ -80,6 +109,34 @@
 	
 	@Test public void testSystemMatViewsWithExplicitRefresh() throws Exception {
 		Statement s = conn.createStatement();
+		ResultSet rs = s.executeQuery("select * from (call refreshMatView('TEST.RANDOMVIEW', false)) p");
+		assertTrue(rs.next());
+		assertEquals(1, rs.getInt(1));
+		rs = s.executeQuery("select * from MatViews where name = 'RandomView'");
+		assertTrue(rs.next());
+		assertEquals("LOADED", rs.getString("loadstate"));
+		assertEquals(true, rs.getBoolean("valid"));
+		rs = s.executeQuery("select x from TEST.RANDOMVIEW");
+		assertTrue(rs.next());
+		double key = rs.getDouble(1);
+
+		rs = s.executeQuery("select * from (call refreshMatView('TEST.RANDOMVIEW', false)) p");
+		assertTrue(rs.next());
+		assertEquals(1, rs.getInt(1));
+		rs = s.executeQuery("select * from MatViews where name = 'RandomView'");
+		assertTrue(rs.next());
+		assertEquals("LOADED", rs.getString("loadstate"));
+		assertEquals(true, rs.getBoolean("valid"));
+		rs = s.executeQuery("select x from TEST.RANDOMVIEW");
+		assertTrue(rs.next());
+		double key1 = rs.getDouble(1);
+
+		//ensure that invalidate with distributed caching works
+		assertTrue(key1 != key);
+	}
+	
+	@Test public void testSystemManViewsWithExplictRefreshAndInvalidate() throws Exception {
+		Statement s = conn.createStatement();
 		ResultSet rs = s.executeQuery("select * from (call refreshMatView('TEST.MATVIEW', false)) p");
 		assertTrue(rs.next());
 		assertEquals(1, rs.getInt(1));
@@ -87,6 +144,47 @@
 		assertTrue(rs.next());
 		assertEquals("LOADED", rs.getString("loadstate"));
 		assertEquals(true, rs.getBoolean("valid"));
+		
+		count = 0;
+		
+		VDBMetaData vdb = server.getVDB(MATVIEWS);
+		TransformationMetadata tm = vdb.getAttachment(TransformationMetadata.class);
+		Table tbl = tm.getGroupID("TEST.MATVIEW");
+		tbl.setSelectTransformation("select pause() as x");
+		
+		Thread t = new Thread() {
+			public void run() {
+				try {
+					Statement s1 = conn.createStatement();
+					ResultSet rs = s1.executeQuery("select * from (call refreshMatView('TEST.MATVIEW', true)) p");
+					assertTrue(rs.next());
+					assertEquals(1, rs.getInt(1));
+				} catch (Exception e) {
+					throw new TeiidRuntimeException(e);
+				}
+			}
+		};
+		t.start();
+		synchronized (TestMatViews.class) {
+			while (count < 1) {
+				TestMatViews.class.wait();
+			}
+		}
+		rs = s.executeQuery("select * from MatViews where name = 'MatView'");
+		assertTrue(rs.next());
+		assertEquals("NEEDS_LOADING", rs.getString("loadstate"));
+		assertEquals(false, rs.getBoolean("valid"));
+
+		synchronized (TestMatViews.class) {
+			count++;
+			TestMatViews.class.notify();
+		}
+		t.join();
+		
+		rs = s.executeQuery("select * from MatViews where name = 'MatView'");
+		assertTrue(rs.next());
+		assertEquals("LOADED", rs.getString("loadstate"));
+		assertEquals(true, rs.getBoolean("valid"));
 	}
 	
 	@Test(expected=TeiidSQLException.class) public void testSystemMatViewsInvalidView() throws Exception {
@@ -99,14 +197,34 @@
 		s.execute("call refreshMatView('foo', false)");
 	}
 	
+	@Test(expected=TeiidSQLException.class) public void testSystemMatViewsWithRowRefreshNotAllowed() throws Exception {
+		Statement s = conn.createStatement();
+		VDBMetaData vdb = server.getVDB(MATVIEWS);
+		TransformationMetadata tm = vdb.getAttachment(TransformationMetadata.class);
+		Table t = tm.getGroupID("TEST.RANDOMVIEW");
+		t.setSelectTransformation("select rand() as x, rand() as y");
+		ResultSet rs = s.executeQuery("select * from (call refreshMatView('TEST.RANDOMVIEW', false)) p");
+		assertTrue(rs.next());
+		assertEquals(1, rs.getInt(1));
+		rs = s.executeQuery("select * from MatViews where name = 'RandomView'");
+		assertTrue(rs.next());
+		assertEquals("LOADED", rs.getString("loadstate"));
+		assertEquals(true, rs.getBoolean("valid"));
+		rs = s.executeQuery("select x from TEST.RANDOMVIEW");
+		assertTrue(rs.next());
+		double key = rs.getDouble(1);
+		
+		rs = s.executeQuery("select * from (call refreshMatViewRow('TEST.RANDOMVIEW', "+key+")) p");
+	}
+	
 	@Test public void testSystemMatViewsWithRowRefresh() throws Exception {
-		//TOOD: remove this. it's a workaround for TEIIDDES-549
+		Statement s = conn.createStatement();
+		
 		VDBMetaData vdb = server.getVDB(MATVIEWS);
 		TransformationMetadata tm = vdb.getAttachment(TransformationMetadata.class);
 		Table t = tm.getGroupID("TEST.RANDOMVIEW");
 		t.setSelectTransformation("/*+ cache(updatable) */ " +  t.getSelectTransformation());
 		
-		Statement s = conn.createStatement();
 		//prior to load refresh of a single row returns -1
 		ResultSet rs = s.executeQuery("select * from (call refreshMatViewRow('TEST.RANDOMVIEW', 0)) p");
 		assertTrue(rs.next());



More information about the teiid-commits mailing list