[teiid-commits] teiid SVN: r3332 - in branches/7.1.1.CP3: engine/src/main/java/org/teiid/cache and 7 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Mon Jul 25 11:46:41 EDT 2011


Author: rareddy
Date: 2011-07-25 11:46:41 -0400 (Mon, 25 Jul 2011)
New Revision: 3332

Modified:
   branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java
   branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheListener.java
   branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/TupleBatchCacheLoader.java
   branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/Cache.java
   branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/CacheListener.java
   branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/DefaultCache.java
   branches/7.1.1.CP3/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
   branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
   branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
   branches/7.1.1.CP3/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
   branches/7.1.1.CP3/engine/src/main/resources/org/teiid/query/i18n.properties
   branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java
   branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
   branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
   branches/7.1.1.CP3/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
   branches/7.1.1.CP3/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
Log:
TEIID-1657: Implemented distributed refreshMatView call based on the JBoss Cache Listener mechanism. When user issues a refreshMatView call on one node, then all the nodes gets notified about the change in the original node after load of the mat view table is finished. The proactive load on the passive nodes are done in a separate thread. Also fixed a memory leak with the TupleBufferCacheLoader in loading batches.

Modified: branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java
===================================================================
--- branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java	2011-07-25 15:46:41 UTC (rev 3332)
@@ -112,11 +112,13 @@
 		}
 	}
 	
-	public synchronized void addListener(CacheListener listener) {
-		this.cacheListener = new JBossCacheListener(this.rootFqn, listener);
+	@Override
+	public synchronized void setListener(CacheListener listener) {
+		this.cacheListener = new JBossCacheListener(this.rootFqn, this.cacheStore, this, listener);
 		this.cacheStore.addCacheListener(this.cacheListener);
 	}
 
+	@Override
 	public synchronized void removeListener() {
 		this.cacheStore.removeCacheListener(this.cacheListener);
 		this.cacheListener = null;	

Modified: branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheListener.java
===================================================================
--- branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheListener.java	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheListener.java	2011-07-25 15:46:41 UTC (rev 3332)
@@ -21,7 +21,11 @@
  */
 package org.teiid.cache.jboss;
 
+import java.util.Set;
+
 import org.jboss.cache.Fqn;
+import org.jboss.cache.Node;
+import org.jboss.cache.eviction.ExpirationAlgorithmConfig;
 import org.jboss.cache.notifications.annotation.NodeCreated;
 import org.jboss.cache.notifications.annotation.NodeEvicted;
 import org.jboss.cache.notifications.annotation.NodeLoaded;
@@ -33,14 +37,18 @@
 
 
 @org.jboss.cache.notifications.annotation.CacheListener
-public class JBossCacheListener {
+public class JBossCacheListener<K,V> {
 
 	private CacheListener listener;
 	private Fqn rootFqn;
+	private JBossCache cache;
+	private org.jboss.cache.Cache<K,V> cacheStore;
 
-	public JBossCacheListener(Fqn fqn, CacheListener listener) {
+	public JBossCacheListener(Fqn fqn, org.jboss.cache.Cache cacheStore, JBossCache cache, CacheListener listener) {
 		this.rootFqn = fqn;
 		this.listener = listener;
+		this.cache = cache;
+		this.cacheStore = cacheStore;
 	}
 	
     @NodeCreated
@@ -55,4 +63,61 @@
     		listener.cacheChanged();
     	}
 	}
+    
+    @NodeCreated
+    public synchronized void cacheCreated(NodeEvent ne) {
+    	if (!ne.isPre() && !ne.isOriginLocal()) {
+	    	Fqn fqn = ne.getFqn();
+	    	if (fqn.isChildOrEquals(rootFqn)) {
+	    		Node<K,V> node = this.cacheStore.getNode(fqn);
+	    		if (node != null) {
+		    		Set<K> keys = node.getKeys();
+		    		for (K key:keys) {
+						if ((key instanceof String) && (key.equals(ExpirationAlgorithmConfig.EXPIRATION_KEY))) {
+							continue;
+						}	    			
+		    			listener.cacheCreated(key, cache.get(key));
+		    		}
+	    		}
+	    	}
+    	}
+    }
+    
+    @NodeRemoved
+    public synchronized void cacheRemoved(NodeEvent ne) {
+    	if (!ne.isPre() && !ne.isOriginLocal()) {
+	    	Fqn fqn = ne.getFqn();
+	    	if (fqn.isChildOrEquals(rootFqn)) {
+	    		Node<K,V> node = this.cacheStore.getNode(fqn);
+	    		if (node != null) {
+		    		Set<K> keys = node.getKeys();
+		    		for (K key:keys) {
+						if ((key instanceof String) && (key.equals(ExpirationAlgorithmConfig.EXPIRATION_KEY))) {
+							continue;
+						}	    			
+		    			listener.cacheRemoved(key, cache.get(key));
+		    		}
+	    		}
+	    	}
+    	}
+    }
+
+    @NodeModified
+    public synchronized void cacheModified(NodeEvent ne) {
+    	Fqn fqn = ne.getFqn();
+    	if (!ne.isPre() && !ne.isOriginLocal()) {
+	    	if (fqn.isChildOrEquals(rootFqn)) {
+	    		Node<K,V> node = this.cacheStore.getNode(fqn);
+	    		if (node != null) {
+		    		Set<K> keys = node.getKeys();
+		    		for (K key:keys) {
+						if ((key instanceof String) && (key.equals(ExpirationAlgorithmConfig.EXPIRATION_KEY))) {
+							continue;
+						}
+		    			listener.cacheModified(key, cache.get(key));
+		    		}
+	    		}
+	    	}
+    	}
+    }
 }

Modified: branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/TupleBatchCacheLoader.java
===================================================================
--- branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/TupleBatchCacheLoader.java	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/TupleBatchCacheLoader.java	2011-07-25 15:46:41 UTC (rev 3332)
@@ -62,9 +62,8 @@
 				map.put(id, b);
 				return map;
 			}
-			return super.get(fqn);
 		}
-		return null;
+		return super.get(fqn);
 	}
 
 	@Override

Modified: branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/Cache.java
===================================================================
--- branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/Cache.java	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/Cache.java	2011-07-25 15:46:41 UTC (rev 3332)
@@ -82,4 +82,14 @@
 	 * @return
 	 */
 	Set<K> keys();
+	
+	/**
+	 * set cache listener
+	 * @param listener
+	 */
+	void setListener(CacheListener<K, V> listener);
+	
+	
+	void removeListener();
+	
 }

Modified: branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/CacheListener.java
===================================================================
--- branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/CacheListener.java	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/CacheListener.java	2011-07-25 15:46:41 UTC (rev 3332)
@@ -25,6 +25,9 @@
 /**
  * Listener for the cache events like add,update delete
  */
-public interface CacheListener {
+public interface CacheListener<K, V> {
 	void cacheChanged();
+	void cacheCreated(K key, V value);
+	void cacheRemoved(K key, V value);
+	void cacheModified(K key, V value);
 }

Modified: branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/DefaultCache.java
===================================================================
--- branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/DefaultCache.java	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/DefaultCache.java	2011-07-25 15:46:41 UTC (rev 3332)
@@ -95,9 +95,11 @@
 		this.ttl = ttl;
 	}
 	
-	public void addListener(CacheListener listener) {
-		throw new UnsupportedOperationException();
+	public void setListener(CacheListener listener) {
 	}
+	
+	public void removeListener() {
+	}
 
 	public void clear() {
 		synchronized (map) {

Modified: branches/7.1.1.CP3/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- branches/7.1.1.CP3/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2011-07-25 15:46:41 UTC (rev 3332)
@@ -95,8 +95,12 @@
 			this.uuid = java.util.UUID.randomUUID().toString();
 		}
 		return this.uuid;
-	}	
+	}
 	
+	public void setId(String uuid) {
+		this.uuid = uuid;
+	}
+	
 	public boolean isLobs() {
 		return lobIndexes != null;
 	}

Modified: branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
===================================================================
--- branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java	2011-07-25 15:46:41 UTC (rev 3332)
@@ -130,6 +130,7 @@
 				}
 				buffer = bufferManager.createTupleBuffer(schema, "cached", TupleSourceType.FINAL); //$NON-NLS-1$
 				buffer.setBatchSize(this.batchSize);
+				buffer.setId(this.uuid);
 				if (this.hint != null) {
 					buffer.setPrefersMemory(this.hint.getPrefersMemory());
 				}
@@ -142,6 +143,7 @@
 						return false;
 					}		
 					buffer.addTupleBatch(batch, true);
+					cache.remove(uuid+","+row); //$NON-NLS-1$
 				}
 				this.results = buffer;	
 				bufferManager.addTupleBuffer(this.results);

Modified: branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-07-25 15:46:41 UTC (rev 3332)
@@ -22,6 +22,10 @@
 
 package org.teiid.dqp.internal.process;
 
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -46,6 +50,7 @@
 import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
 import org.teiid.cache.CacheConfiguration;
 import org.teiid.cache.CacheFactory;
+import org.teiid.cache.CacheListener;
 import org.teiid.cache.CacheConfiguration.Policy;
 import org.teiid.client.DQP;
 import org.teiid.client.RequestMessage;
@@ -212,6 +217,7 @@
     private CacheFactory cacheFactory;
 
 	private SessionAwareCache<CachedResults> matTables;
+	private CacheListener<TempTableDataManager.MatTableKey, TempTableDataManager.MatTableEntry> matTableListener;
     
     /**
      * perform a full shutdown and wait for 10 seconds for all threads to finish
@@ -711,11 +717,11 @@
         }
         
         if (cacheFactory.isReplicated()) {
-        	matTables = new SessionAwareCache<CachedResults>(this.cacheFactory, SessionAwareCache.Type.RESULTSET, new CacheConfiguration(Policy.EXPIRATION, -1, -1, "MaterilizationTables")); //$NON-NLS-1$
+        	matTables = new SessionAwareCache<CachedResults>(this.cacheFactory, SessionAwareCache.Type.RESULTSET, new CacheConfiguration(Policy.EXPIRATION, -1, -1, "MaterializationTables")); //$NON-NLS-1$
         	matTables.setBufferManager(this.bufferManager);
         }
         
-        dataTierMgr = new TempTableDataManager(new DataTierManagerImpl(this,this.bufferService), this.bufferManager, this.processWorkerPool, this.rsCache, this.matTables, this.cacheFactory); 
+        dataTierMgr = new TempTableDataManager(new DataTierManagerImpl(this,this.bufferService), this.bufferManager, this.processWorkerPool, this.rsCache, this.matTables, this.cacheFactory, this.matTableListener); 
 	}
 	
 	public void setBufferService(BufferService service) {
@@ -726,6 +732,29 @@
 		this.transactionService = service;
 	}
 	
+	public void setMatTableListener(final CacheListener<TempTableDataManager.MatTableKey, TempTableDataManager.MatTableEntry> listener) {
+		this.matTableListener = (CacheListener<TempTableDataManager.MatTableKey, TempTableDataManager.MatTableEntry>)Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {CacheListener.class}, new InvocationHandler() {
+			@Override
+			public Object invoke(Object proxy, final Method method, final Object[] args) throws Throwable {
+				addWork(new Runnable() {
+					@Override
+					public void run() {
+						try {
+							method.invoke(listener, args);
+						} catch (IllegalArgumentException e) {
+							LogManager.logDetail(LogConstants.CTX_DQP, e);
+						} catch (IllegalAccessException e) {
+							LogManager.logDetail(LogConstants.CTX_DQP, e);
+						} catch (InvocationTargetException e) {
+							LogManager.logDetail(LogConstants.CTX_DQP, e);
+						}
+					}
+				});
+				return null;
+			}
+		});
+	}
+	
 	@Override
 	public boolean cancelRequest(long requestID)
 			throws TeiidProcessingException, TeiidComponentException {

Modified: branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
--- branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java	2011-07-25 15:46:41 UTC (rev 3332)
@@ -32,6 +32,7 @@
 import org.teiid.cache.Cache;
 import org.teiid.cache.CacheConfiguration;
 import org.teiid.cache.CacheFactory;
+import org.teiid.cache.CacheListener;
 import org.teiid.cache.DefaultCache;
 import org.teiid.cache.DefaultCacheFactory;
 import org.teiid.cache.CacheConfiguration.Policy;
@@ -307,4 +308,14 @@
     public void setBufferManager(BufferManager bufferManager) {
     	this.bufferManager = bufferManager;
     }
+    
+	public void setListener(CacheListener<CacheID, T> listener) {
+		this.localCache.setListener(listener);
+		this.distributedCache.setListener(listener);
+	}
+	
+	public void removeListener() {
+		this.localCache.removeListener();
+		this.distributedCache.removeListener();		
+	}
 }

Modified: branches/7.1.1.CP3/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- branches/7.1.1.CP3/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2011-07-25 15:46:41 UTC (rev 3332)
@@ -40,6 +40,7 @@
 import org.teiid.cache.Cache;
 import org.teiid.cache.CacheConfiguration;
 import org.teiid.cache.CacheFactory;
+import org.teiid.cache.CacheListener;
 import org.teiid.cache.CacheConfiguration.Policy;
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
@@ -113,7 +114,7 @@
 	private SessionAwareCache<CachedResults> cache;
     private Executor executor;
     
-    private static class MatTableKey implements Serializable {
+    public static class MatTableKey implements Serializable {
 		private static final long serialVersionUID = 5481692896572663992L;
 		String name;
     	VDBKey vdb;
@@ -134,19 +135,37 @@
     		MatTableKey other = (MatTableKey)obj;
     		return this.name.equals(other.name) && this.vdb.equals(other.vdb);
     	}
+    	
+    	public String getVDBName() {
+    		return vdb.getName();
+    	}
+    	
+    	public int getVDBVersion() {
+    		return vdb.getVersion();
+    	}
     }
     
-    private static class MatTableEntry implements Serializable {
+    public static class MatTableEntry implements Serializable {
 		private static final long serialVersionUID = 8559613701442751579L;
     	long lastUpdate = System.currentTimeMillis();
     	boolean valid;
+    	String viewName;
+    	
+    	public String getViewName() {
+    		return viewName;
+    	}
+    	
+    	public boolean allowsUpdate() {
+    		return valid && viewName != null;
+    	}
     }
     
     private Cache<MatTableKey, MatTableEntry> tables;
+    private Cache<MatTableKey, MatTableEntry> refreshJob;
     private SessionAwareCache<CachedResults> distributedCache;
 
     public TempTableDataManager(ProcessorDataManager processorDataManager, BufferManager bufferManager, 
-    		Executor executor, SessionAwareCache<CachedResults> cache, SessionAwareCache<CachedResults> distibutedCache, CacheFactory cacheFactory){
+    		Executor executor, SessionAwareCache<CachedResults> cache, SessionAwareCache<CachedResults> distibutedCache, CacheFactory cacheFactory, CacheListener<MatTableKey, MatTableEntry> listener){
         this.processorDataManager = processorDataManager;
         this.bufferManager = bufferManager;
         this.executor = executor;
@@ -155,6 +174,10 @@
         if (distibutedCache != null) {
 	        CacheConfiguration cc = new CacheConfiguration(Policy.LRU, -1, -1, "MaterializationUpdates"); //$NON-NLS-1$
 	        tables = cacheFactory.get(cc.getLocation(), cc);
+	        
+	        cc = new CacheConfiguration(Policy.LRU, -1, -1, "MaterializationRefresh"); //$NON-NLS-1$
+	        refreshJob = cacheFactory.get(cc.getLocation(), cc);
+	        refreshJob.setListener(listener);
         }
     }
     
@@ -289,7 +312,7 @@
 		context.setDeterminismLevel(determinismLevel);
 		return tb.createIndexedTupleSource();
 	}
-
+	
 	private TupleSource handleSystemProcedures(CommandContext context, StoredProcedure proc)
 			throws TeiidComponentException, QueryMetadataException,
 			QueryProcessingException, QueryResolverException,
@@ -303,9 +326,23 @@
 			String matTableName = RelationalPlanner.MAT_PREFIX+matViewName.toUpperCase();
 			LogManager.logDetail(LogConstants.CTX_MATVIEWS, "processing refreshmatview for", matViewName); //$NON-NLS-1$
 			MatTableInfo info = globalStore.getMatTableInfo(matTableName);
+			
+			Long loadTime = null;
+			boolean useCache = false;
+			if (this.distributedCache != null) {
+				MatTableKey key = new MatTableKey();
+				key.name = matTableName;
+				key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
+				MatTableEntry entry = this.tables.get(key);
+				useCache = (entry != null && entry.valid && entry.lastUpdate > info.getUpdateTime());
+				if (useCache) {
+					loadTime = entry.lastUpdate;
+				}
+			}			
+			
 			boolean invalidate = Boolean.TRUE.equals(((Constant)proc.getParameter(2).getExpression()).getValue());
 			if (invalidate) {
-				touchTable(context, matTableName, false);
+				touchTable(context, matTableName, matViewName, false, System.currentTimeMillis());
 			}
 			MatState oldState = info.setState(MatState.NEEDS_LOADING, invalidate?Boolean.FALSE:null, null);
 			if (oldState == MatState.LOADING) {
@@ -316,7 +353,7 @@
 			Object matTableId = RelationalPlanner.getGlobalTempTableMetadataId(group, matTableName, context, metadata, AnalysisRecord.createNonRecordingRecord());
 			GroupSymbol matTable = new GroupSymbol(matTableName);
 			matTable.setMetadataID(matTableId);
-			int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore, info, null, false);
+			int rowCount = loadGlobalTable(context, matTable, matTableName, matViewName, globalStore, info, invalidate?null:loadTime, !invalidate && useCache);			
 			return CollectionTupleSource.createUpdateCountTupleSource(rowCount);
 		} else if (StringUtil.endsWithIgnoreCase(proc.getProcedureCallableName(), REFRESHMATVIEWROW)) {
 			Object groupID = validateMatView(metadata, proc);
@@ -408,9 +445,9 @@
 			if (load) {
 				if (!info.isValid()) {
 					//blocking load
-					loadGlobalTable(context, group, tableName, globalStore, info, loadTime, true);
+					loadGlobalTable(context, group, tableName, null, globalStore, info, loadTime, true);
 				} else {
-					loadAsynch(context, group, tableName, globalStore, info, loadTime);
+					loadAsynch(context, group, tableName, null, globalStore, info, loadTime);
 				}
 			} 
 			table = globalStore.getOrCreateTempTable(tableName, query, bufferManager, false);
@@ -435,13 +472,13 @@
 	}
 
 	private void loadAsynch(final CommandContext context,
-			final GroupSymbol group, final String tableName,
+			final GroupSymbol group, final String tableName,final String viewName,
 			final TempTableStore globalStore, final MatTableInfo info,
 			final Long loadTime) {
 		Callable<Integer> toCall = new Callable<Integer>() {
 			@Override
 			public Integer call() throws Exception {
-				return loadGlobalTable(context, group, tableName, globalStore, info, loadTime, true);
+				return loadGlobalTable(context, group, tableName, viewName, globalStore, info, loadTime, true);
 			}
 		};
 		FutureTask<Integer> task = new FutureTask<Integer>(toCall);
@@ -449,9 +486,9 @@
 	}
 
 	private int loadGlobalTable(CommandContext context,
-			GroupSymbol group, final String tableName,
+			GroupSymbol group, final String tableName, final String viewName,
 			TempTableStore globalStore, MatTableInfo info, Long loadTime, boolean useCache)
-			throws TeiidComponentException, TeiidProcessingException {
+			throws TeiidComponentException, TeiidProcessingException {		
 		LogManager.logInfo(LogConstants.CTX_MATVIEWS, QueryPlugin.Util.getString("TempTableDataManager.loading", tableName)); //$NON-NLS-1$
 		QueryMetadataInterface metadata = context.getMetadata();
 		Create create = new Create();
@@ -477,9 +514,11 @@
 			}
 		}
 		int rowCount = -1;
+		boolean tableUpdated = false;
+		String fullName = null;
 		try {
-			String fullName = metadata.getFullName(group.getMetadataID());
-			TupleSource ts = null;
+			fullName = metadata.getFullName(group.getMetadataID());
+			TupleSource ts = null;			
 			CacheID cid = null;
 			if (distributedCache != null) {
 				cid = new CacheID(new ParseInfo(), fullName, context.getVdbName(), 
@@ -487,6 +526,7 @@
 				if (useCache) {
 					CachedResults cr = this.distributedCache.get(cid);
 					if (cr != null) {
+						LogManager.logInfo(LogConstants.CTX_MATVIEWS, QueryPlugin.Util.getString("TempTableDataManager.cache_load", tableName)); //$NON-NLS-1$
 						ts = cr.getResults().createIndexedTupleSource();
 					}
 				}
@@ -501,14 +541,15 @@
 				String transformation = metadata.getVirtualPlan(group.getMetadataID()).getQuery();
 				QueryProcessor qp = context.getQueryProcessorFactory().createQueryProcessor(transformation, fullName, context);
 				qp.setNonBlocking(true);
-				
+
 				if (distributedCache != null) {
 					CachedResults cr = new CachedResults();
 					BatchCollector bc = qp.createBatchCollector();
 					TupleBuffer tb = bc.collectTuples();
 					cr.setResults(tb);
-					touchTable(context, fullName, true);
-					this.distributedCache.put(cid, FunctionMethod.VDB_DETERMINISTIC, cr, info.getTtl());
+					touchTable(context, fullName, viewName, true, info.getUpdateTime());
+					this.distributedCache.put(cid, FunctionMethod.VDB_DETERMINISTIC, cr, info.getTtl());					
+					tableUpdated = true;
 					ts = tb.createIndexedTupleSource();
 				} else {
 					ts = new BatchCollector.BatchProducerTupleSource(qp);
@@ -541,19 +582,40 @@
 				globalStore.swapTempTable(tableName, table);
 				info.setState(MatState.LOADED, true, loadTime);
 				LogManager.logInfo(LogConstants.CTX_MATVIEWS, QueryPlugin.Util.getString("TempTableDataManager.loaded", tableName, rowCount)); //$NON-NLS-1$
+				if (tableUpdated) {
+					initiateRefreshAcrossCluster(context, fullName, viewName);
+				}
 			}
 		}
 		return rowCount;
 	}
 
-	private void touchTable(CommandContext context, String fullName, boolean valid) {
+	private void touchTable(CommandContext context, String fullName, String viewName, boolean valid, long loadtime) {
 		MatTableKey key = new MatTableKey();
 		key.name = fullName;
 		key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
 		MatTableEntry matTableEntry = new MatTableEntry();
 		matTableEntry.valid = valid;
+		matTableEntry.viewName = viewName;
+		matTableEntry.lastUpdate = loadtime;
 		tables.put(key, matTableEntry, null);
 	}
+	
+	private void initiateRefreshAcrossCluster(CommandContext context, String fullName, String viewName) {
+		MatTableKey key = new MatTableKey();
+		key.name = fullName;
+		key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
+		MatTableEntry matTableEntry = new MatTableEntry();
+		matTableEntry.valid = true;
+		matTableEntry.viewName = viewName;
+		matTableEntry.lastUpdate = System.currentTimeMillis();
+		MatTableEntry entry = refreshJob.put(key, matTableEntry, null);
+		if (entry == null) {
+			// in the case of refreshjob, cacheCreate are not being notified correctly due to nature of how Teiid uses the cache
+			// so, in order to get a cacheModified event insert again.
+			refreshJob.put(key, matTableEntry, null);
+		}
+	}
 
 	/**
 	 * Return a list of ElementSymbols for the given index/key object

Modified: branches/7.1.1.CP3/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- branches/7.1.1.CP3/engine/src/main/resources/org/teiid/query/i18n.properties	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/engine/src/main/resources/org/teiid/query/i18n.properties	2011-07-25 15:46:41 UTC (rev 3332)
@@ -783,6 +783,7 @@
 
 TempTableDataManager.failed_load=Failed to load materialized view table {0}.
 TempTableDataManager.loaded=Loaded materialized view table {0} with row count {1}.
+TempTableDataManager.cache_load=Loaded materialized view table {0} from cached contents from another clustered node.
 TempTableDataManager.loading=Loading materialized view table {0}
 TempTableDataManager.not_implicit_matview={0} does not target an internal materialized view.
 TempTableDataManager.row_refresh_pk=Materialized view {0} cannot have a row refreshed since there is no primary key.

Modified: branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java
===================================================================
--- branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java	2011-07-25 15:46:41 UTC (rev 3332)
@@ -74,7 +74,7 @@
 				command.run();
 			}
 	    };
-		dataManager = new TempTableDataManager(hdm, bm, executor, cache, cache, new DefaultCacheFactory());
+		dataManager = new TempTableDataManager(hdm, bm, executor, cache, cache, new DefaultCacheFactory(), null);
 	}
 	
 	private void execute(String sql, List<?>... expectedResults) throws Exception {

Modified: branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
--- branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestProcessor.java	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestProcessor.java	2011-07-25 15:46:41 UTC (rev 3332)
@@ -245,7 +245,7 @@
     				command.run();
     			}
     	    };        	
-        	dataManager = new TempTableDataManager(dataManager, bufferMgr, executor, cache, null, null);
+        	dataManager = new TempTableDataManager(dataManager, bufferMgr, executor, cache, null, null, null);
         }        
         if (context.getQueryProcessorFactory() == null) {
         	context.setQueryProcessorFactory(new QueryProcessorFactoryImpl(bufferMgr, dataManager, new DefaultCapabilitiesFinder(), null, context.getMetadata()));

Modified: branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
===================================================================
--- branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestTempTables.java	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestTempTables.java	2011-07-25 15:46:41 UTC (rev 3332)
@@ -76,7 +76,7 @@
 				command.run();
 			}
 	    };
-		dataManager = new TempTableDataManager(fdm, bm, executor, cache, null, null);
+		dataManager = new TempTableDataManager(fdm, bm, executor, cache, null, null, null);
 	}
 	
 	@Test public void testInsertWithQueryExpression() throws Exception {

Modified: branches/7.1.1.CP3/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
--- branches/7.1.1.CP3/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java	2011-07-25 15:46:41 UTC (rev 3332)
@@ -70,6 +70,7 @@
 import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
 import org.teiid.adminapi.jboss.AdminProvider;
 import org.teiid.cache.CacheFactory;
+import org.teiid.cache.CacheListener;
 import org.teiid.client.DQP;
 import org.teiid.client.RequestMessage;
 import org.teiid.client.ResultsMessage;
@@ -97,6 +98,9 @@
 import org.teiid.logging.LogManager;
 import org.teiid.logging.MessageLevel;
 import org.teiid.net.TeiidURL;
+import org.teiid.query.tempdata.TempTableDataManager;
+import org.teiid.query.tempdata.TempTableDataManager.MatTableEntry;
+import org.teiid.query.tempdata.TempTableDataManager.MatTableKey;
 import org.teiid.security.SecurityHelper;
 import org.teiid.transport.ClientServiceRegistry;
 import org.teiid.transport.ClientServiceRegistryImpl;
@@ -147,7 +151,8 @@
 	
     public void start() {
 		dqpCore.setTransactionService((TransactionService)LogManager.createLoggingProxy(LogConstants.CTX_TXN_LOG, transactionServerImpl, new Class[] {TransactionService.class}, MessageLevel.DETAIL));
-
+		dqpCore.setMatTableListener(getMatTableListener());
+		
     	// create the necessary services
     	createClientServices();
     	
@@ -497,7 +502,7 @@
 	
 	@Override
     @ManagementOperation(description="Execute a sql query", params={@ManagementParameter(name="vdbName"), at ManagementParameter(name="vdbVersion"), @ManagementParameter(name="command"), @ManagementParameter(name="timoutInMilli")})	
-	public List<List> executeQuery(final String vdbName, final int version, final String command, final long timoutInMilli) throws AdminException {
+	public List<List> executeQuery(final String vdbName, final int version, final String command, final long timoutInMilli) throws AdminException {	
 		Properties properties = new Properties();
 		properties.setProperty(TeiidURL.JDBC.VDB_NAME, vdbName);
 		properties.setProperty(TeiidURL.JDBC.VDB_VERSION, String.valueOf(version));
@@ -530,7 +535,14 @@
 					request.setExecutionId(0L);
 					request.setRowLimit(getMaxRowsFetchSize()); // this would limit the number of rows that are returned.
 					Future<ResultsMessage> message = dqpCore.executeRequest(requestID, request);
-					ResultsMessage rm = message.get(timoutInMilli, TimeUnit.MILLISECONDS);
+									
+					ResultsMessage rm = null;
+					if (timoutInMilli < 0) {
+						rm = message.get();
+					}
+					else {
+						rm = message.get(timoutInMilli, TimeUnit.MILLISECONDS);
+					}
 					
 			        if (rm.getException() != null) {
 			            throw new AdminProcessingException(rm.getException());
@@ -607,4 +619,39 @@
 		}		
 		return newResults;
 	}
+	
+    private CacheListener<TempTableDataManager.MatTableKey, TempTableDataManager.MatTableEntry> getMatTableListener() {
+		return new CacheListener<TempTableDataManager.MatTableKey, TempTableDataManager.MatTableEntry>() {
+						
+			@Override
+			public void cacheChanged() {
+			}
+
+			@Override
+			public void cacheCreated(MatTableKey key, MatTableEntry value) {
+				refreshMatView(key, value);
+			}
+
+			@Override
+			public void cacheModified(MatTableKey key, MatTableEntry value) {
+				refreshMatView(key, value);
+			}
+
+			private void refreshMatView(MatTableKey key, MatTableEntry value) {
+				if (value != null) {
+					try {
+						if (value.allowsUpdate()) {
+							executeQuery(key.getVDBName(), key.getVDBVersion(), "execute SYSADMIN.refreshmatview(viewname=>'"+value.getViewName()+"',invalidate=>false)", -1); //$NON-NLS-1$ //$NON-NLS-2$
+						}
+					} catch (AdminException e) {
+						LogManager.logWarning(LogConstants.CTX_RUNTIME, e, IntegrationPlugin.Util.getString("error_refresh", value.getViewName() )); //$NON-NLS-1$
+					}
+				}
+			}
+			@Override
+			public void cacheRemoved(MatTableKey key, MatTableEntry value) {
+				
+			}
+		};
+	}
 }

Modified: branches/7.1.1.CP3/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
===================================================================
--- branches/7.1.1.CP3/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties	2011-07-23 11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties	2011-07-25 15:46:41 UTC (rev 3332)
@@ -46,3 +46,4 @@
 distribute_failed=Deploy of the archive failed {0}
 template_not_found=Template not found for {0}
 admin_executing=JOPR admin {0} is executing command {1}
+error_refresh=error occurred during refreshing the materialized view entries for view {0} 
\ No newline at end of file



More information about the teiid-commits mailing list