[teiid-commits] teiid SVN: r3270 - in branches/7.4.x: engine/src/main/java/org/teiid/query/tempdata and 2 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Fri Jun 24 10:37:09 EDT 2011


Author: shawkins
Date: 2011-06-24 10:37:08 -0400 (Fri, 24 Jun 2011)
New Revision: 3270

Modified:
   branches/7.4.x/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
   branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
   branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
   branches/7.4.x/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java
Log:
TEIID-1657 changes to ensure that matview refresh is handled properly with distributed caching enabled

Modified: branches/7.4.x/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java	2011-06-23 19:56:53 UTC (rev 3269)
+++ branches/7.4.x/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java	2011-06-24 14:37:08 UTC (rev 3270)
@@ -78,7 +78,7 @@
  */
 public class TransformationMetadata extends BasicQueryMetadata implements Serializable {
 	
-	private final class LiveQueryNode extends QueryNode {
+	private static final class LiveQueryNode extends QueryNode {
 		Procedure p;
 		private LiveQueryNode(Procedure p) {
 			super(null);
@@ -89,7 +89,19 @@
 			return p.getQueryPlan();
 		}
 	}
+	
+	private static final class LiveTableQueryNode extends QueryNode {
+		Table t;
+		private LiveTableQueryNode(Table t) {
+			super(null);
+			this.t = t;
+		}
 
+		public String getQuery() {
+			return t.getSelectTransformation();
+		}
+	}
+
 	private final class VirtualFileInputStreamFactory extends
 			InputStreamFactory {
 		private final VirtualFile f;
@@ -463,8 +475,7 @@
         if (!tableRecord.isVirtual()) {
             throw new QueryMetadataException(QueryPlugin.Util.getString("TransformationMetadata.QueryPlan_could_not_be_found_for_physical_group__6")+tableRecord.getFullName()); //$NON-NLS-1$
         }
-        String transQuery = tableRecord.getSelectTransformation();
-        QueryNode queryNode = new QueryNode(transQuery);
+        LiveTableQueryNode queryNode = new LiveTableQueryNode(tableRecord);
 
         // get any bindings and add them onto the query node
         List bindings = tableRecord.getBindings();

Modified: branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2011-06-23 19:56:53 UTC (rev 3269)
+++ branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2011-06-24 14:37:08 UTC (rev 3270)
@@ -315,7 +315,7 @@
 			String matTableName = metadata.getFullName(matTableId);
 			LogManager.logDetail(LogConstants.CTX_MATVIEWS, "processing refreshmatview for", matViewName); //$NON-NLS-1$
 			MatTableInfo info = globalStore.getMatTableInfo(matTableName);
-			boolean invalidate = Boolean.TRUE.equals(((Constant)proc.getParameter(1).getExpression()).getValue());
+			boolean invalidate = Boolean.TRUE.equals(((Constant)proc.getParameter(2).getExpression()).getValue());
 			if (invalidate) {
 				touchTable(context, matTableName, false);
 			}
@@ -325,7 +325,7 @@
 			}
 			GroupSymbol matTable = new GroupSymbol(matTableName);
 			matTable.setMetadataID(matTableId);
-			int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore, info, null);
+			int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore, info, null, false);
 			return CollectionTupleSource.createUpdateCountTupleSource(rowCount);
 		} else if (StringUtil.endsWithIgnoreCase(proc.getProcedureCallableName(), REFRESHMATVIEWROW)) {
 			Object groupID = validateMatView(metadata, proc);
@@ -415,11 +415,13 @@
 				key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
 				
 				MatTableEntry entry = this.tables.get(key);
-				boolean firstload = !info.isValid();
+				boolean notValid = !info.isValid();
 				if (entry != null && entry.lastUpdate > info.getUpdateTime() 
-						&& info.getState() != MatState.LOADING) {
-					//remote load
-					info.setState(MatState.NEEDS_LOADING, firstload?false:entry.valid, null);
+						&& info.getState() != MatState.LOADING
+						//TODO: use extension metadata or a config parameter to make this skew configurable
+						&& !(!notValid && entry.valid && info.getState() == MatState.LOADED && entry.lastUpdate < info.getUpdateTime() + 30000)) {
+					//trigger a remote load due to the cache being more up to date than the local copy
+					info.setState(MatState.NEEDS_LOADING, notValid?false:entry.valid, null);
 					loadTime = entry.lastUpdate;
 				}
 			}
@@ -427,7 +429,7 @@
 			if (load) {
 				if (!info.isValid()) {
 					//blocking load
-					loadGlobalTable(context, group, tableName, globalStore, info, loadTime);
+					loadGlobalTable(context, group, tableName, globalStore, info, loadTime, true);
 				} else {
 					loadAsynch(context, group, tableName, globalStore, info, loadTime);
 				}
@@ -468,7 +470,7 @@
 		Callable<Integer> toCall = new Callable<Integer>() {
 			@Override
 			public Integer call() throws Exception {
-				return loadGlobalTable(context, group, tableName, globalStore, info, loadTime);
+				return loadGlobalTable(context, group, tableName, globalStore, info, loadTime, true);
 			}
 		};
 		FutureTask<Integer> task = new FutureTask<Integer>(toCall);
@@ -477,7 +479,7 @@
 
 	private int loadGlobalTable(CommandContext context,
 			GroupSymbol group, final String tableName,
-			TempTableStore globalStore, MatTableInfo info, Long loadTime)
+			TempTableStore globalStore, MatTableInfo info, Long loadTime, boolean useCache)
 			throws TeiidComponentException, TeiidProcessingException {
 		LogManager.logInfo(LogConstants.CTX_MATVIEWS, QueryPlugin.Util.getString("TempTableDataManager.loading", tableName)); //$NON-NLS-1$
 		QueryMetadataInterface metadata = context.getMetadata();
@@ -511,9 +513,11 @@
 			if (distributedCache != null) {
 				cid = new CacheID(new ParseInfo(), fullName, context.getVdbName(), 
 						context.getVdbVersion(), context.getConnectionID(), context.getUserName());
-				CachedResults cr = this.distributedCache.get(cid);
-				if (cr != null) {
-					ts = cr.getResults().createIndexedTupleSource();
+				if (useCache) {
+					CachedResults cr = this.distributedCache.get(cid);
+					if (cr != null) {
+						ts = cr.getResults().createIndexedTupleSource();
+					}
 				}
 			}
 			

Modified: branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
===================================================================
--- branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java	2011-06-23 19:56:53 UTC (rev 3269)
+++ branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java	2011-06-24 14:37:08 UTC (rev 3270)
@@ -22,7 +22,9 @@
 package org.teiid.jdbc;
 
 import java.io.File;
+import java.util.Collection;
 import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import org.jboss.deployers.spi.DeploymentException;
@@ -37,6 +39,7 @@
 import org.teiid.client.DQP;
 import org.teiid.client.security.ILogon;
 import org.teiid.deployers.MetadataStoreGroup;
+import org.teiid.deployers.UDFMetaData;
 import org.teiid.deployers.VDBRepository;
 import org.teiid.dqp.internal.datamgr.ConnectorManager;
 import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
@@ -44,6 +47,7 @@
 import org.teiid.dqp.internal.process.DQPConfiguration;
 import org.teiid.dqp.internal.process.DQPCore;
 import org.teiid.dqp.service.FakeBufferService;
+import org.teiid.metadata.FunctionMethod;
 import org.teiid.metadata.MetadataRepository;
 import org.teiid.metadata.MetadataStore;
 import org.teiid.metadata.Schema;
@@ -61,7 +65,7 @@
 import org.teiid.transport.LocalServerConnection;
 import org.teiid.transport.LogonImpl;
 
- at SuppressWarnings("nls")
+ at SuppressWarnings({"nls", "serial"})
 public class FakeServer extends ClientServiceRegistryImpl implements ConnectionProfile {
 
 	SessionServiceImpl sessionService = new SessionServiceImpl();
@@ -97,7 +101,12 @@
         });
         
         config.setResultsetCacheConfig(new CacheConfiguration(Policy.LRU, 60, 250, "resultsetcache")); //$NON-NLS-1$
-        this.dqp.setCacheFactory(new DefaultCacheFactory());
+        this.dqp.setCacheFactory(new DefaultCacheFactory() {
+        	@Override
+        	public boolean isReplicated() {
+        		return true; //pretend to be replicated for matview tests
+        	}
+        });
         this.dqp.start(config);
         this.sessionService.setDqp(this.dqp);
         
@@ -121,18 +130,25 @@
 	public void setUseCallingThread(boolean useCallingThread) {
 		this.useCallingThread = useCallingThread;
 	}
+
+	public void deployVDB(String vdbName, String vdbPath) throws Exception {
+		deployVDB(vdbName, vdbPath, null);
+	}
 	
-	public void deployVDB(String vdbName, String vdbPath) throws Exception {
-		
+	public void deployVDB(String vdbName, String vdbPath, Map<String, Collection<FunctionMethod>> udfs) throws Exception {
 		IndexMetadataFactory imf = VDBMetadataFactory.loadMetadata(new File(vdbPath).toURI().toURL());
 		MetadataStore metadata = imf.getMetadataStore(repo.getSystemStore().getDatatypes());
 		LinkedHashMap<String, Resource> entries = imf.getEntriesPlusVisibilities();
-		
-        deployVDB(vdbName, metadata, entries);		
+        deployVDB(vdbName, metadata, entries, udfs);		
 	}
+	
+	public void deployVDB(String vdbName, MetadataStore metadata,
+			LinkedHashMap<String, Resource> entries) {
+		deployVDB(vdbName, metadata, entries, null);
+	}
 
 	public void deployVDB(String vdbName, MetadataStore metadata,
-			LinkedHashMap<String, Resource> entries) {
+			LinkedHashMap<String, Resource> entries, Map<String, Collection<FunctionMethod>> udfs) {
 		VDBMetaData vdbMetaData = new VDBMetaData();
         vdbMetaData.setName(vdbName);
         vdbMetaData.setStatus(VDB.Status.ACTIVE);
@@ -152,7 +168,14 @@
         try {
         	MetadataStoreGroup stores = new MetadataStoreGroup();
         	stores.addStore(metadata);
-			this.repo.addVDB(vdbMetaData, stores, entries, null, cmr);
+        	UDFMetaData udfMetaData = null;
+        	if (udfs != null) {
+        		udfMetaData = new UDFMetaData();
+        		for (Map.Entry<String, Collection<FunctionMethod>> entry : udfs.entrySet()) {
+        			udfMetaData.addFunctions(entry.getKey(), entry.getValue());
+        		}
+        	}
+			this.repo.addVDB(vdbMetaData, stores, entries, udfMetaData, cmr);
 			this.repo.finishDeployment(vdbName, 1);
 		} catch (DeploymentException e) {
 			throw new RuntimeException(e);

Modified: branches/7.4.x/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java
===================================================================
--- branches/7.4.x/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java	2011-06-23 19:56:53 UTC (rev 3269)
+++ branches/7.4.x/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java	2011-06-24 14:37:08 UTC (rev 3270)
@@ -28,15 +28,22 @@
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.teiid.adminapi.impl.VDBMetaData;
+import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.types.DataTypeManager;
 import org.teiid.core.util.UnitTestUtil;
 import org.teiid.jdbc.FakeServer;
 import org.teiid.jdbc.TeiidSQLException;
-import org.teiid.metadata.Table;
-import org.teiid.query.metadata.TransformationMetadata;
+import org.teiid.metadata.FunctionMethod;
+import org.teiid.metadata.FunctionParameter;
+import org.teiid.metadata.FunctionMethod.Determinism;
+import org.teiid.metadata.FunctionMethod.PushDown;
 
 @SuppressWarnings("nls")
 public class TestMatViews {
@@ -44,13 +51,33 @@
     private static final String MATVIEWS = "matviews";
 	private Connection conn;
 	private FakeServer server;
+	
+	private static int count = 0;
+	
+	public static int pause() throws InterruptedException {
+		synchronized (TestMatViews.class) {
+			count++;
+			TestMatViews.class.notify();
+			while (count < 2) {
+				TestMatViews.class.wait();
+			}
+		}
+		return 1;
+	}
 
 	@Before public void setUp() throws Exception {
     	server = new FakeServer();
-    	server.deployVDB(MATVIEWS, UnitTestUtil.getTestDataPath() + "/matviews.vdb");
+    	HashMap<String, Collection<FunctionMethod>> udfs = new HashMap<String, Collection<FunctionMethod>>();
+    	udfs.put("funcs", Arrays.asList(new FunctionMethod("pause", null, null, PushDown.CANNOT_PUSHDOWN, TestMatViews.class.getName(), "pause", null, new FunctionParameter("return", DataTypeManager.DefaultDataTypes.INTEGER), false, Determinism.NONDETERMINISTIC)));
+    	server.deployVDB(MATVIEWS, UnitTestUtil.getTestDataPath() + "/matviews.vdb", udfs);
     	conn = server.createConnection("jdbc:teiid:matviews");
     }
 	
+	@After public void tearDown() throws Exception {
+		server.stop();
+		conn.close();
+	}
+	
 	@Test public void testSystemMatViewsWithImplicitLoad() throws Exception {
 		Statement s = conn.createStatement();
 		ResultSet rs = s.executeQuery("select * from MatViews order by name");
@@ -80,6 +107,34 @@
 	
 	@Test public void testSystemMatViewsWithExplicitRefresh() throws Exception {
 		Statement s = conn.createStatement();
+		ResultSet rs = s.executeQuery("select * from (call refreshMatView('TEST.RANDOMVIEW', false)) p");
+		assertTrue(rs.next());
+		assertEquals(1, rs.getInt(1));
+		rs = s.executeQuery("select * from MatViews where name = 'RandomView'");
+		assertTrue(rs.next());
+		assertEquals("LOADED", rs.getString("loadstate"));
+		assertEquals(true, rs.getBoolean("valid"));
+		rs = s.executeQuery("select x from TEST.RANDOMVIEW");
+		assertTrue(rs.next());
+		double key = rs.getDouble(1);
+
+		rs = s.executeQuery("select * from (call refreshMatView('TEST.RANDOMVIEW', false)) p");
+		assertTrue(rs.next());
+		assertEquals(1, rs.getInt(1));
+		rs = s.executeQuery("select * from MatViews where name = 'RandomView'");
+		assertTrue(rs.next());
+		assertEquals("LOADED", rs.getString("loadstate"));
+		assertEquals(true, rs.getBoolean("valid"));
+		rs = s.executeQuery("select x from TEST.RANDOMVIEW");
+		assertTrue(rs.next());
+		double key1 = rs.getDouble(1);
+
+		//ensure that invalidate with distributed caching works
+		assertTrue(key1 != key);
+	}
+	
+	@Test public void testSystemManViewsWithExplictRefreshAndInvalidate() throws Exception {
+		Statement s = conn.createStatement();
 		ResultSet rs = s.executeQuery("select * from (call refreshMatView('TEST.MATVIEW', false)) p");
 		assertTrue(rs.next());
 		assertEquals(1, rs.getInt(1));
@@ -87,6 +142,42 @@
 		assertTrue(rs.next());
 		assertEquals("LOADED", rs.getString("loadstate"));
 		assertEquals(true, rs.getBoolean("valid"));
+		
+		count = 0;
+		s.execute("alter view TEST.MATVIEW as select pause() as x");
+		Thread t = new Thread() {
+			public void run() {
+				try {
+					Statement s1 = conn.createStatement();
+					ResultSet rs = s1.executeQuery("select * from (call refreshMatView('TEST.MATVIEW', true)) p");
+					assertTrue(rs.next());
+					assertEquals(1, rs.getInt(1));
+				} catch (Exception e) {
+					throw new TeiidRuntimeException(e);
+				}
+			}
+		};
+		t.start();
+		synchronized (TestMatViews.class) {
+			while (count < 1) {
+				TestMatViews.class.wait();
+			}
+		}
+		rs = s.executeQuery("select * from MatViews where name = 'MatView'");
+		assertTrue(rs.next());
+		assertEquals("NEEDS_LOADING", rs.getString("loadstate"));
+		assertEquals(false, rs.getBoolean("valid"));
+
+		synchronized (TestMatViews.class) {
+			count++;
+			TestMatViews.class.notify();
+		}
+		t.join();
+		
+		rs = s.executeQuery("select * from MatViews where name = 'MatView'");
+		assertTrue(rs.next());
+		assertEquals("LOADED", rs.getString("loadstate"));
+		assertEquals(true, rs.getBoolean("valid"));
 	}
 	
 	@Test(expected=TeiidSQLException.class) public void testSystemMatViewsInvalidView() throws Exception {
@@ -99,14 +190,27 @@
 		s.execute("call refreshMatView('foo', false)");
 	}
 	
+	@Test(expected=TeiidSQLException.class) public void testSystemMatViewsWithRowRefreshNotAllowed() throws Exception {
+		Statement s = conn.createStatement();
+		s.execute("alter view test.randomview as select rand() as x, rand() as y");
+		ResultSet rs = s.executeQuery("select * from (call refreshMatView('TEST.RANDOMVIEW', false)) p");
+		assertTrue(rs.next());
+		assertEquals(1, rs.getInt(1));
+		rs = s.executeQuery("select * from MatViews where name = 'RandomView'");
+		assertTrue(rs.next());
+		assertEquals("LOADED", rs.getString("loadstate"));
+		assertEquals(true, rs.getBoolean("valid"));
+		rs = s.executeQuery("select x from TEST.RANDOMVIEW");
+		assertTrue(rs.next());
+		double key = rs.getDouble(1);
+		
+		rs = s.executeQuery("select * from (call refreshMatViewRow('TEST.RANDOMVIEW', "+key+")) p");
+	}
+	
 	@Test public void testSystemMatViewsWithRowRefresh() throws Exception {
-		//TOOD: remove this. it's a workaround for TEIIDDES-549
-		VDBMetaData vdb = server.getVDB(MATVIEWS);
-		TransformationMetadata tm = vdb.getAttachment(TransformationMetadata.class);
-		Table t = tm.getGroupID("TEST.RANDOMVIEW");
-		t.setSelectTransformation("/*+ cache(updatable) */ " +  t.getSelectTransformation());
+		Statement s = conn.createStatement();
 		
-		Statement s = conn.createStatement();
+		s.execute("alter view test.randomview as /*+ cache(updatable) */ select rand() as x, rand() as y");
 		//prior to load refresh of a single row returns -1
 		ResultSet rs = s.executeQuery("select * from (call refreshMatViewRow('TEST.RANDOMVIEW', 0)) p");
 		assertTrue(rs.next());



More information about the teiid-commits mailing list