[teiid-commits] teiid SVN: r2463 - in trunk: cache-jbosscache/src/main/java and 8 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Mon Aug 16 11:47:24 EDT 2010


Author: shawkins
Date: 2010-08-16 11:47:22 -0400 (Mon, 16 Aug 2010)
New Revision: 2463

Added:
   trunk/engine/src/test/java/org/teiid/cache/
   trunk/engine/src/test/java/org/teiid/cache/TestDefaultCache.java
Removed:
   trunk/cache-jbosscache/src/main/java/com/
Modified:
   trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
   trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ExpirationAwareCache.java
   trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java
   trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java
   trunk/engine/src/main/java/org/teiid/cache/Cache.java
   trunk/engine/src/main/java/org/teiid/cache/CacheConfiguration.java
   trunk/engine/src/main/java/org/teiid/cache/DefaultCache.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/IndexCondition.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
   trunk/test-integration/common/src/test/java/org/teiid/jdbc/TestResultsCache.java
Log:
TEIID-168 using the cache hint ttl for result set caching.  adding ttl support for the default cache. always treating eviction as removal (until we do much more work with pinning, persistence, etc.)  using a separate thread to load a mat view if it is currently valid

Modified: trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml	2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml	2010-08-16 15:47:22 UTC (rev 2463)
@@ -49,13 +49,21 @@
         <property name="clusteredCacheName">mvcc-shared</property>
     </bean>
     
+    <!-- Configuration for result set caching. 
+    	 There will be 2 caches with these settings.
+    	 One cache holds results that are specific to sessions.
+    	 The other cache holds vdb scoped results and can
+    	 be replicated.
+     -->
     <bean name="ResultsetCacheConfig" class="org.teiid.cache.CacheConfiguration">
         <property name="enabled">true</property>
-        <!-- Max Entries allowed for ResultSet Cache -->
+        <!-- Max Entries allowed for ResultSet Cache (default 1024) -->
         <property name="maxEntries">1024</property>
-        <!-- 2 hrs -->
+        <!-- Max age in seconds (default 7200 - 2 hours) -->
         <property name="maxAgeInSeconds">7200</property>
-        <!-- Allowed values are LRU, FIFO, LFU, EXPIRATION -->
+        <!-- Allowed values are LRU, EXPIRATION.  
+             Setting this value to LRU will cause cache hint TTL values
+             to be ignored. (default EXPIRATION) -->
         <property name="type">EXPIRATION</property>               
     </bean>    
     

Modified: trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ExpirationAwareCache.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ExpirationAwareCache.java	2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ExpirationAwareCache.java	2010-08-16 15:47:22 UTC (rev 2463)
@@ -21,10 +21,13 @@
  */
 package org.teiid.cache.jboss;
 
+import java.util.Set;
+
 import org.jboss.cache.Cache;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.Node;
 import org.jboss.cache.eviction.ExpirationAlgorithmConfig;
+import org.teiid.cache.DefaultCache;
 
 public class ExpirationAwareCache<K, V> extends JBossCache<K, V> {
 
@@ -35,28 +38,56 @@
 	@Override
 	public V get(K key) {
 		Node<K, V> node = getRootNode();
-		Node child = node.getChild(Fqn.fromString(String.valueOf(key.getClass().getSimpleName()+key.hashCode())));
+		Node child = node.getChild(getFqn(key));
 		if (child != null) {
 			return (V)child.get(key);
 		}
-		return super.get(key);
+		return null;
 	}
 
+	private Fqn<String> getFqn(K key) {
+		return Fqn.fromString(String.valueOf(key.getClass().getSimpleName()+key.hashCode()));
+	}
+
 	@Override
 	public V put(K key, V value) {
+		return this.put(key, value, null);
+	}
+	
+	@Override
+	public V put(K key, V value, Long ttl) {
 		Node<K, V> node = getRootNode();
-		Node child = node.addChild(Fqn.fromString(String.valueOf(key.getClass().getSimpleName()+key.hashCode())));
-		Long future = new Long(System.currentTimeMillis() + (config.getMaxAgeInSeconds()*1000));				
+		Node child = node.addChild(getFqn(key));
+		
+		long future = DefaultCache.getExpirationTime(config.getMaxAgeInSeconds()*1000, ttl);				
 		child.put(ExpirationAlgorithmConfig.EXPIRATION_KEY, future);
 		return (V)child.put(key, value);
 	}
-
+	
 	@Override
-	public org.teiid.cache.Cache<K, V> addChild(String name) {
+	public V remove(K key) {
 		Node<K, V> node = getRootNode();
-		Node child = node.addChild(Fqn.fromString(name));
-		child.put(ExpirationAlgorithmConfig.EXPIRATION_KEY, Long.MAX_VALUE);		
-		return new JBossCache<K, V>(this.cacheStore, child.getFqn());
+		Node child = node.getChild(getFqn(key));
+		if (child != null) {
+			return (V)child.remove(key);
+		}
+		return null;
 	}
+	
+	@Override
+	public void clear() {
+		Node<K, V> node = getRootNode();
+		node.clearData();
+		Set<Node<K,V>> nodes = node.getChildren();
+		for (Node<K, V> child : nodes) {
+			child.clearData();
+		}
+	}
+	
+	@Override
+	public int size() {
+		Node<K, V> node = getRootNode();
+		return node.getChildren().size();
+	}
 
 }

Modified: trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java	2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java	2010-08-16 15:47:22 UTC (rev 2463)
@@ -50,30 +50,22 @@
 		this.rootFqn = fqn;
 	}
 	
-	/**
-	 * {@inheritDoc}
-	 */
 	public V get(K key) {
 		return this.cacheStore.get(this.rootFqn, key);
 	}
 
-	/**
-	 * {@inheritDoc}
-	 */
 	public V put(K key, V value) {
 		return this.cacheStore.put(this.rootFqn, key, value);
 	}
+	
+	public V put(K key, V value, Long ttl) {
+		return this.put(key, value);
+	}
 
-	/**
-	 * {@inheritDoc}
-	 */
 	public V remove(K key) {
 		return this.cacheStore.remove(this.rootFqn, key);
 	}
 
-	/**
-	 * {@inheritDoc}
-	 */
 	public Set<K> keySet() {
 		Node<K, V> node = this.cacheStore.getRoot().getChild(this.rootFqn);
 		if (node != null) {
@@ -82,9 +74,6 @@
 		return Collections.emptySet();
 	}
 	
-	/**
-	 * {@inheritDoc}
-	 */
 	public int size() {
 		Node<K, V> node = this.cacheStore.getRoot().getChild(this.rootFqn);
 		if (node != null) {
@@ -93,9 +82,6 @@
 		return 0;
 	}
 	
-	/**
-	 * {@inheritDoc}
-	 */
 	public void clear() {
 		Node<K, V> node = this.cacheStore.getRoot().getChild(this.rootFqn);
 		if (node != null) {
@@ -103,7 +89,6 @@
 		}
 	}
 	
-	@Override
 	public Collection<V> values() {
 		Node<K, V> node = this.cacheStore.getRoot().getChild(this.rootFqn);
 		if (node != null) {
@@ -120,28 +105,17 @@
 		this.cacheStore.addCacheListener(this.cacheListener);
 	}
 
-	/**
-	 * {@inheritDoc}
-	 */
 	public synchronized void removeListener() {
 		this.cacheStore.removeCacheListener(this.cacheListener);
 		this.cacheListener = null;	
 	}
 
-	/**
-	 * {@inheritDoc}
-	 */
-	@Override
 	public Cache<K, V> addChild(String name) {
 		Node<K, V> node = getRootNode();
 		Node<K, V> childNode = node.addChild(Fqn.fromString(name));
 		return new JBossCache<K, V>(this.cacheStore, childNode.getFqn());
 	}
 
-	/**
-	 * {@inheritDoc}
-	 */
-	@Override
 	public Cache<K, V> getChild(String name) {
 		Node<K, V> node = getRootNode();
 		Node<K, V> child = node.getChild(Fqn.fromString(name));
@@ -159,10 +133,6 @@
 		return node;
 	}
 	
-	/**
-	 * {@inheritDoc}
-	 */
-	@Override
 	public List<Cache> getChildren() {
 		Node<K, V> node = getRootNode();
 		Set<Node<K,V>> nodes = node.getChildren();
@@ -176,10 +146,6 @@
 		return children;
 	}
 
-	/**
-	 * {@inheritDoc}
-	 */
-	@Override
 	public boolean removeChild(String name) {
 		Node<K, V> node = getRootNode();
 		return node.removeChild(Fqn.fromString(name));

Modified: trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java	2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java	2010-08-16 15:47:22 UTC (rev 2463)
@@ -31,8 +31,6 @@
 import org.jboss.cache.config.EvictionAlgorithmConfig;
 import org.jboss.cache.config.EvictionRegionConfig;
 import org.jboss.cache.eviction.ExpirationAlgorithmConfig;
-import org.jboss.cache.eviction.FIFOAlgorithmConfig;
-import org.jboss.cache.eviction.LFUAlgorithmConfig;
 import org.jboss.cache.eviction.LRUAlgorithmConfig;
 import org.jboss.cache.eviction.RemoveOnEvictActionPolicy;
 import org.teiid.cache.Cache;
@@ -96,16 +94,6 @@
 			lru.setTimeToLive(-1); // -1 no limit
 			evictionConfig = lru;
 		}
-		else if (config.getPolicy() == Policy.FIFO) {
-			FIFOAlgorithmConfig fifo = new FIFOAlgorithmConfig();
-			fifo.setMaxNodes(config.getMaxEntries());
-			evictionConfig = fifo;
-		}
-		else if (config.getPolicy() == Policy.LFU) {
-			LFUAlgorithmConfig lfu  = new LFUAlgorithmConfig();
-			lfu.setMaxNodes(config.getMaxEntries());
-			evictionConfig = lfu;
-		}
 		else if (config.getPolicy() == Policy.EXPIRATION) {
 			ExpirationAlgorithmConfig lfu  = new ExpirationAlgorithmConfig();
 			lfu.setMaxNodes(config.getMaxEntries());
@@ -113,14 +101,10 @@
 		}
 		
 		EvictionRegionConfig erc = new EvictionRegionConfig(rootFqn, evictionConfig);
-		
-		if (config.getPolicy() == Policy.EXPIRATION) {
-			erc.setEvictionActionPolicyClassName(RemoveOnEvictActionPolicy.class.getName());
-		}
+		erc.setEvictionActionPolicyClassName(RemoveOnEvictActionPolicy.class.getName());
 		return erc;
 	}	
 	
-	
 	public void destroy() {
 		this.destroyed = true;		
 	}	

Modified: trunk/engine/src/main/java/org/teiid/cache/Cache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/cache/Cache.java	2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/cache/Cache.java	2010-08-16 15:47:22 UTC (rev 2463)
@@ -22,11 +22,6 @@
 
 package org.teiid.cache;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-
-
 /**
  * Abstraction over cache providers
  */
@@ -61,11 +56,12 @@
     *
     * @param key   key with which the specified value is to be associated.
     * @param value value to be associated with the specified key.
+    * @param ttl the time for this entry to live
     * @return previous value associated with specified key, or <code>null</code> if there was no mapping for key.
     *    	A <code>null</code> return can also indicate that the key previously associated <code>null</code> with the specified key, 
     *    	if the implementation supports null values.
     */
-	V put(K key, V value);
+	V put(K key, V value, Long ttl);
 	
    /**
     * Removes the value for this key from a Cache.
@@ -83,70 +79,12 @@
 	 */
 	int size();
 	
-	
-   /**
-    * Returns a {@link Set} containing the data in this Cache
-    *
-    * @return a {@link Set} containing the data in this Cache.  If there is no data, 
-    * an empty {@link Set} is returned.  The {@link Set} returned is always immutable.
-    */
-	Set<K> keySet();
-	
 	/**
 	 * Removes all the keys and their values from the Cache
 	 */
 	void clear();
 	   
 	/**
-	 * Listener to get the updates on this Cache
-	 * @param listener
-	 */
-	void addListener(CacheListener listener);
-	
-	/**
-	 * Remove Listener to stop the updates on this Cache
-	 * @param listener
-	 */
-	void removeListener();
-	
-	/**
-     * Returns a {@link Collection} containing the data in this Cache
-     *
-     * @return a {@link Collection} containing the data in this Cache.  If there is no data, 
-     * an empty {@link Collection} is returned.
-     */
-	Collection<V> values();
-	
-	
-	/** 
-	 * Add a child node to the current cache node
-	 * @param name - name of the child
-	 * @return Cache instance.
-	 */
-	Cache addChild(String name);
-	
-	/**
-	 * Get the child cache node from the current node
-	 * @param name
-	 * @return null if not found
-	 */
-	Cache getChild(String name);
-	
-	/**
-	 * Destroys the child from the current node; no-op if node not found
-	 * @param name
-	 * @return true if removed; false otherwise
-	 */
-	boolean removeChild(String name);
-	
-	
-	/**
-	 * Get child nodes under this cache node. If none found empty set is returned
-	 * @return
-	 */
-	List<Cache> getChildren();
-	
-	/**
 	 * Name of the cache node
 	 * @return
 	 */

Modified: trunk/engine/src/main/java/org/teiid/cache/CacheConfiguration.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/cache/CacheConfiguration.java	2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/cache/CacheConfiguration.java	2010-08-16 15:47:22 UTC (rev 2463)
@@ -28,9 +28,7 @@
 		
 	public enum Policy {
 		LRU,  // Least Recently Used
-		FIFO, // First in First Out
-		LFU,  // Least frequently Used
-		EXPIRATION; // expires after certain time.
+		EXPIRATION
 	}
 	
 	private Policy policy;

Modified: trunk/engine/src/main/java/org/teiid/cache/DefaultCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/cache/DefaultCache.java	2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/cache/DefaultCache.java	2010-08-16 15:47:22 UTC (rev 2463)
@@ -24,91 +24,186 @@
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.teiid.core.util.LRUCache;
 
 public class DefaultCache<K, V> implements Cache<K, V>, Serializable {
 	private static final long serialVersionUID = -511120208522577206L;
 	public static final int DEFAULT_MAX_SIZE_TOTAL = 250;
+	public static final int DEFAULT_MAX_AGE = 1000 * 60 * 60 * 2;
 	
-	Map<K, V> map;
-	Map<String, Cache> children = new HashMap();
-	String name;
+	private static class ExpirationEntry<K, V> {
+		long expiration;
+		K key;
+		V value;
+		
+		public ExpirationEntry(long expiration, K key, V value) {
+			this.expiration = expiration;
+			this.key = key;
+			this.value = value;
+		}
+
+		@Override
+		public int hashCode() {
+			return key.hashCode();
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == this) {
+				return true;
+			}
+			if (!(obj instanceof ExpirationEntry<?, ?>)) {
+				return false;
+			}
+			ExpirationEntry<K, V> other = (ExpirationEntry<K, V>)obj;
+			return this.key.equals(other.key);
+		}
+	}
 	
+	protected LRUCache<K, ExpirationEntry<K, V>> map;
+	protected Map<String, Cache> children = new ConcurrentHashMap<String, Cache>();
+	protected String name;
+	protected long ttl;
+	protected LinkedHashSet<ExpirationEntry<K, V>> expirationQueue = new LinkedHashSet<ExpirationEntry<K, V>>();
+	
 	public DefaultCache(String name) {
-		this(name, DEFAULT_MAX_SIZE_TOTAL);
+		this(name, DEFAULT_MAX_SIZE_TOTAL, DEFAULT_MAX_SIZE_TOTAL);
 	}
 	
-	public DefaultCache(String name, int maxSize) {
-		if(maxSize < 0){
-			maxSize = DEFAULT_MAX_SIZE_TOTAL;
-		}			
-		this.map = Collections.synchronizedMap(new LRUCache<K, V>(maxSize));
+	public DefaultCache(String name, int maxEntries, long ttl) {
+		this.map = new LRUCache<K, ExpirationEntry<K, V>>(maxEntries) {
+			@Override
+			protected boolean removeEldestEntry(java.util.Map.Entry<K, ExpirationEntry<K, V>> eldest) {
+				if (super.removeEldestEntry(eldest)) {
+					Iterator<ExpirationEntry<K, V>> iter = expirationQueue.iterator();
+					return validate(iter.next()) != null;
+				}
+				return false;
+			}
+		};
+		
 		this.name = name;
+		this.ttl = ttl;
 	}
 	
 	public void addListener(CacheListener listener) {
+		throw new UnsupportedOperationException();
 	}
 
 	public void clear() {
-		map.clear();
+		synchronized (map) {
+			map.clear();
+			expirationQueue.clear();
+		}
 	}
 
 	public V get(K key) {
-		return map.get(key);
+		synchronized (map) {
+			ExpirationEntry<K, V> result = map.get(key);
+			if (result != null) {
+				return validate(result);
+			}
+			return null;
+		}
 	}
 
+	private V validate(ExpirationEntry<K, V> result) {
+		if (result.expiration < System.currentTimeMillis()) {
+			remove(result.key);
+			return null;
+		}
+		return result.value;
+	}
+
 	public Set<K> keySet() {
-		return map.keySet();
+		synchronized (map) {
+			return new HashSet<K>(map.keySet());
+		}
 	}
 
 	public V put(K key, V value) {
-		return map.put(key, value);
+		return this.put(key, value, ttl);
 	}
+	
+	public static long getExpirationTime(long defaultTtl, Long ttl) {
+		if (ttl == null) {
+			ttl = defaultTtl;
+		}
+		if (ttl < 0) {
+			return Long.MAX_VALUE;
+		}
+		return System.currentTimeMillis() + ttl;
+	}
+	
+	public V put(K key, V value, Long timeToLive) {
+		synchronized (map) {
+			ExpirationEntry<K, V> entry = new ExpirationEntry<K, V>(getExpirationTime(ttl, timeToLive), key, value);
+			expirationQueue.add(entry);
+			ExpirationEntry<K, V> result = map.put(key, entry);
+			if (result != null) {
+				return result.value;
+			}
+			return null;
+		}
+	}
 
 	public V remove(K key) {
-		return map.remove(key);
+		synchronized (map) {
+			ExpirationEntry<K, V> entry = new ExpirationEntry<K, V>(-1, key, null);
+			ExpirationEntry<K, V> result = map.put(key, entry);
+			if (result != null) {
+				expirationQueue.remove(entry);
+				return result.value;
+			}
+			return null;
+		}
 	}
 
 	public int size() {
-		return map.size();
+		synchronized (map) {
+			return map.size();
+		}
 	}
 	
 	public Collection<V> values() {
-		return map.values();
+		synchronized (map) {
+			ArrayList<V> result = new ArrayList<V>(map.size());
+			for (ExpirationEntry<K, V> entry : new ArrayList<ExpirationEntry<K, V>>(map.values())) {
+				V value = validate(entry);
+				if (value != null) {
+					result.add(value);
+				}
+			}
+			return result;
+		}
 	}
 
-	@Override
-	public void removeListener() {
-	}
-
-	@Override
 	public Cache addChild(String name) {
-		if (children.get(name) != null) {
-			return children.get(name);
+		Cache c = children.get(name);
+		if (c != null) {
+			return c;
 		}
 		
-		Cache c = new DefaultCache(name);
+		c = new DefaultCache(name, map.getSpaceLimit(), ttl);
 		children.put(name, c);
 		return c;
 	}
 
-	@Override
 	public Cache getChild(String name) {
 		return children.get(name);
 	}
 
-	@Override
-	public List<Cache> getChildren() {
-		return new ArrayList<Cache>(children.values());
+	public Collection<Cache> getChildren() {
+		return children.values();
 	}
 
-	@Override
 	public boolean removeChild(String name) {
 		Object obj = children.remove(name);
 		return obj != null;
@@ -117,5 +212,6 @@
 	@Override
 	public String getName() {
 		return name;
-	}		
+	}
+	
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java	2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java	2010-08-16 15:47:22 UTC (rev 2463)
@@ -39,7 +39,6 @@
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.core.util.Assertion;
 import org.teiid.dqp.DQPPlugin;
-import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
 import org.teiid.query.analysis.AnalysisRecord;
@@ -109,7 +108,7 @@
 				TupleBatch batch = results.getBatch(row);
 				UUID uuid = java.util.UUID.randomUUID();
 				batch.preserveTypes();
-				cache.put(uuid, batch);
+				cache.put(uuid, batch, this.hint != null?this.hint.getTtl():null);
 				this.cachedBatches.add(uuid);
 			}
 			return true;

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2010-08-16 15:47:22 UTC (rev 2463)
@@ -648,7 +648,7 @@
         }
 
         //prepared plan cache
-        prepPlanCache = new SessionAwareCache<PreparedPlan>(this.cacheFactory, Cache.Type.PREPAREDPLAN,  new CacheConfiguration(Policy.LRU, 60, config.getPreparedPlanCacheMaxCount()));
+        prepPlanCache = new SessionAwareCache<PreparedPlan>(this.cacheFactory, Cache.Type.PREPAREDPLAN,  new CacheConfiguration(Policy.LRU, 60*60*8, config.getPreparedPlanCacheMaxCount()));
         prepPlanCache.setBufferManager(this.bufferManager);
 		
         //get buffer manager
@@ -657,7 +657,7 @@
         this.processWorkerPool = new ThreadReuseExecutor(DQPConfiguration.PROCESS_PLAN_QUEUE_NAME, config.getMaxThreads());
         
         dataTierMgr = new TempTableDataManager(new DataTierManagerImpl(this,
-                                            this.bufferService), this.bufferManager); 
+                                            this.bufferService), this.bufferManager, this.processWorkerPool); 
 	}
 	
 	public void setBufferService(BufferService service) {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2010-08-16 15:47:22 UTC (rev 2463)
@@ -377,7 +377,7 @@
 	                if (determinismLevel > FunctionMethod.SESSION_DETERMINISTIC) {
 	    				LogManager.logInfo(LogConstants.CTX_DQP, DQPPlugin.Util.getString("RequestWorkItem.cache_nondeterministic", originalCommand)); //$NON-NLS-1$
 	    			}
-	                dqpCore.getRsCache().put(cid, determinismLevel, cr);
+	                dqpCore.getRsCache().put(cid, determinismLevel, cr, originalCommand.getCacheHint() != null?originalCommand.getCacheHint().getTtl():null);
 			    }
 				add = sendResultsIfNeeded(batch);
 				if (!added) {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java	2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java	2010-08-16 15:47:22 UTC (rev 2463)
@@ -70,7 +70,7 @@
 		if(this.maxSize < 0){
 			this.maxSize = DEFAULT_MAX_SIZE_TOTAL;
 		}		
-		this.localCache = new DefaultCache<CacheID, T>("local", maxSize); //$NON-NLS-1$
+		this.localCache = new DefaultCache<CacheID, T>("local", maxSize, config.getMaxAgeInSeconds()*1000); //$NON-NLS-1$
 		
 		if (type == Cache.Type.PREPAREDPLAN) {
 			this.distributedCache = localCache;
@@ -114,17 +114,18 @@
 		return cacheHit.get();
 	}
 	
-	/**
-	 * Create PreparedPlan for the given clientConn and SQl query
-	 */
 	public void put(CacheID id, int determinismLevel, T t){
+		this.put(id, determinismLevel, t, null);
+	}
+	
+	public void put(CacheID id, int determinismLevel, T t, Long ttl){
 		if (!id.cachable) {
 			return;
 		}
 		
 		if (determinismLevel >= FunctionMethod.SESSION_DETERMINISTIC) {
 			id.setSessionId(id.originalSessionId);
-			this.localCache.put(id, t);
+			this.localCache.put(id, t, ttl);
 		} 
 		else {
 			
@@ -145,7 +146,7 @@
 			}
 			
 			if (insert) {
-				this.distributedCache.put(id, t);
+				this.distributedCache.put(id, t, ttl);
 			}
 		}
 	}

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/IndexCondition.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/IndexCondition.java	2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/IndexCondition.java	2010-08-16 15:47:22 UTC (rev 2463)
@@ -50,7 +50,7 @@
 				if (criteria instanceof CompareCriteria) {
 					CompareCriteria cc = (CompareCriteria)criteria;
 					if (cc.getOperator() == CompareCriteria.NE 
-							|| !(cc.getRightExpression() instanceof Constant) || (cc.getOperator() != CompareCriteria.EQ && i > 0)) {
+							|| !(cc.getRightExpression() instanceof Constant)) {
 						critIter.remove();
 						continue;
 					}

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2010-08-16 15:47:22 UTC (rev 2463)
@@ -25,6 +25,9 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.FutureTask;
 
 import org.teiid.api.exception.query.ExpressionEvaluationException;
 import org.teiid.api.exception.query.QueryMetadataException;
@@ -90,16 +93,28 @@
 	
 	private ProcessorDataManager processorDataManager;
     private BufferManager bufferManager;
+    
+    private Executor executor;
 
+    public TempTableDataManager(ProcessorDataManager processorDataManager, BufferManager bufferManager) {
+    	this(processorDataManager, bufferManager, new Executor() {
+			@Override
+			public void execute(Runnable command) {
+				command.run();
+			}
+	    });
+    }
+
     /**
      * Constructor takes the "real" ProcessorDataManager that this object will be a proxy to,
      * and will pass most calls through to transparently.  Only when a request is registered for
      * a temp group will this proxy do it's thing.
      * @param processorDataManager the real ProcessorDataManager that this object is a proxy to
-     */
-    public TempTableDataManager(ProcessorDataManager processorDataManager, BufferManager bufferManager){
+     */    
+    public TempTableDataManager(ProcessorDataManager processorDataManager, BufferManager bufferManager, Executor executor){
         this.processorDataManager = processorDataManager;
         this.bufferManager = bufferManager;
+        this.executor = executor;
     }
 
 	public TupleSource registerRequest(
@@ -265,12 +280,12 @@
 		}
 	}
 
-	private TupleSource registerQuery(CommandContext context,
+	private TupleSource registerQuery(final CommandContext context,
 			TempTableStore contextStore, Query query)
 			throws TeiidComponentException, QueryMetadataException,
 			TeiidProcessingException, ExpressionEvaluationException,
 			QueryProcessingException {
-		GroupSymbol group = query.getFrom().getGroups().get(0);
+		final GroupSymbol group = query.getFrom().getGroups().get(0);
 		if (!group.isTempGroupSymbol()) {
 			return null;
 		}
@@ -278,11 +293,23 @@
 		boolean remapColumns = !tableName.equalsIgnoreCase(group.getName());
 		TempTable table = null;
 		if (group.isGlobalTable()) {
-			TempTableStore globalStore = context.getGlobalTableStore();
-			MatTableInfo info = globalStore.getMatTableInfo(tableName);
+			final TempTableStore globalStore = context.getGlobalTableStore();
+			final MatTableInfo info = globalStore.getMatTableInfo(tableName);
 			boolean load = info.shouldLoad();
 			if (load) {
-				loadGlobalTable(context, group, tableName, globalStore, info);
+				if (!info.isValid()) {
+					//blocking load
+					loadGlobalTable(context, group, tableName, globalStore, info);
+				} else {
+					Callable<Integer> toCall = new Callable<Integer>() {
+						@Override
+						public Integer call() throws Exception {
+							return loadGlobalTable(context, group, tableName, globalStore, info);
+						}
+					};
+					FutureTask<Integer> task = new FutureTask<Integer>(toCall);
+					executor.execute(task);
+				}
 			} 
 			table = globalStore.getOrCreateTempTable(tableName, query, bufferManager, false);
 		} else {
@@ -308,8 +335,7 @@
 	private int loadGlobalTable(CommandContext context,
 			GroupSymbol group, final String tableName,
 			TempTableStore globalStore, MatTableInfo info)
-			throws QueryMetadataException, TeiidComponentException,
-			TeiidProcessingException, ExpressionEvaluationException {
+			throws TeiidComponentException, TeiidProcessingException {
 		LogManager.logInfo(LogConstants.CTX_MATVIEWS, QueryExecPlugin.Util.getString("TempTableDataManager.loading", tableName)); //$NON-NLS-1$
 		QueryMetadataInterface metadata = context.getMetadata();
 		Create create = new Create();
@@ -344,10 +370,15 @@
 			//TODO: if this insert fails, it's unnecessary to do the undo processing
 			table.insert(ts, table.getColumns());
 			rowCount = table.getRowCount();
+		} catch (TeiidComponentException e) {
+			LogManager.logError(LogConstants.CTX_MATVIEWS, e, QueryExecPlugin.Util.getString("TempTableDataManager.failed_load", tableName)); //$NON-NLS-1$
+			throw e;
+		} catch (TeiidProcessingException e) {
+			LogManager.logError(LogConstants.CTX_MATVIEWS, e, QueryExecPlugin.Util.getString("TempTableDataManager.failed_load", tableName)); //$NON-NLS-1$
+			throw e;
 		} finally {
 			if (rowCount == -1) {
 				info.setState(MatState.FAILED_LOAD, null);
-				LogManager.logInfo(LogConstants.CTX_MATVIEWS, QueryExecPlugin.Util.getString("TempTableDataManager.failed_load", tableName)); //$NON-NLS-1$
 			} else {
 				globalStore.swapTempTable(tableName, table);
 				info.setState(MatState.LOADED, true);

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java	2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java	2010-08-16 15:47:22 UTC (rev 2463)
@@ -61,9 +61,8 @@
     		for (;;) {
 			switch (state) {
 			case NEEDS_LOADING:
-				updateTime = System.currentTimeMillis();
 			case FAILED_LOAD:
-				state = MatState.LOADING;
+				setState(MatState.LOADING);
 				return true;
 			case LOADING:
 				if (valid) {
@@ -77,7 +76,7 @@
 				continue;
 			case LOADED:
 				if (ttl >= 0 && System.currentTimeMillis() - updateTime - ttl > 0) {
-					state = MatState.LOADING;
+					setState(MatState.LOADING);
 					return true;
 				}
 				return false;
@@ -90,11 +89,15 @@
 			if (valid != null) {
 				this.valid = valid;
 			}
-			this.state = state;
-			this.updateTime = System.currentTimeMillis();
+			setState(state);
 			notifyAll();
 			return oldState;
 		}
+
+		private void setState(MatState state) {
+			this.state = state;
+			this.updateTime = System.currentTimeMillis();
+		}
 		
 		public synchronized void setTtl(long ttl) {
 			this.ttl = ttl;

Added: trunk/engine/src/test/java/org/teiid/cache/TestDefaultCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/cache/TestDefaultCache.java	                        (rev 0)
+++ trunk/engine/src/test/java/org/teiid/cache/TestDefaultCache.java	2010-08-16 15:47:22 UTC (rev 2463)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.cache;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class TestDefaultCache {
+	
+	@Test public void testExpiration() throws InterruptedException {
+		DefaultCache<Integer, Integer> cache = new DefaultCache<Integer, Integer>("foo", 2, 50);
+		cache.put(1, 1);
+		Thread.sleep(60);
+		assertNull(cache.get(1));
+		cache.put(2, 2);
+		Thread.sleep(30);
+		cache.put(3, 3);
+		assertNotNull(cache.get(2));
+		Thread.sleep(40);
+		cache.put(4, 4);
+		//preferred to purge 2 instead of 3
+		assertNotNull(cache.get(3));
+	}
+
+}


Property changes on: trunk/engine/src/test/java/org/teiid/cache/TestDefaultCache.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java	2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java	2010-08-16 15:47:22 UTC (rev 2463)
@@ -96,7 +96,7 @@
 		CachedResults results = new CachedResults();
 		results.setResults(tb);
 		results.setCommand(new Query());
-		Cache cache = new DefaultCache("dummy", 250); //$NON-NLS-1$
+		Cache cache = new DefaultCache("dummy"); //$NON-NLS-1$
 		results.prepare(cache, fbs.getBufferManager());
 		
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();

Modified: trunk/test-integration/common/src/test/java/org/teiid/jdbc/TestResultsCache.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/jdbc/TestResultsCache.java	2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/test-integration/common/src/test/java/org/teiid/jdbc/TestResultsCache.java	2010-08-16 15:47:22 UTC (rev 2463)
@@ -56,6 +56,19 @@
 		assertFalse(rs.next());
 	}
 	
+	@Test public void testCacheHintTtl() throws Exception {
+		Statement s = conn.createStatement();
+		s.execute("set showplan on");
+		ResultSet rs = s.executeQuery("/* cache(ttl:50) */ select 1");
+		assertTrue(rs.next());
+		s.execute("set noexec on");
+		rs = s.executeQuery("/* cache(ttl:50) */ select 1");
+		assertTrue(rs.next());
+		Thread.sleep(60);
+		rs = s.executeQuery("/* cache(ttl:50) */ select 1");
+		assertFalse(rs.next());
+	}
+	
 	@Test public void testExecutionProperty() throws Exception {
 		Statement s = conn.createStatement();
 		s.execute("set showplan on");



More information about the teiid-commits mailing list