[teiid-commits] teiid SVN: r2463 - in trunk: cache-jbosscache/src/main/java and 8 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Mon Aug 16 11:47:24 EDT 2010
Author: shawkins
Date: 2010-08-16 11:47:22 -0400 (Mon, 16 Aug 2010)
New Revision: 2463
Added:
trunk/engine/src/test/java/org/teiid/cache/
trunk/engine/src/test/java/org/teiid/cache/TestDefaultCache.java
Removed:
trunk/cache-jbosscache/src/main/java/com/
Modified:
trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ExpirationAwareCache.java
trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java
trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java
trunk/engine/src/main/java/org/teiid/cache/Cache.java
trunk/engine/src/main/java/org/teiid/cache/CacheConfiguration.java
trunk/engine/src/main/java/org/teiid/cache/DefaultCache.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
trunk/engine/src/main/java/org/teiid/query/tempdata/IndexCondition.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
trunk/test-integration/common/src/test/java/org/teiid/jdbc/TestResultsCache.java
Log:
TEIID-168 using the cache hint ttl for result set caching. adding ttl support for the default cache. always treating eviction as removal (until we do much more work with pinning, persistence, etc.) using a separate thread to load a mat view if it is currently valid
Modified: trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2010-08-16 15:47:22 UTC (rev 2463)
@@ -49,13 +49,21 @@
<property name="clusteredCacheName">mvcc-shared</property>
</bean>
+ <!-- Configuration for result set caching.
+ There will be 2 caches with these settings.
+ One cache holds results that are specific to sessions.
+ The other cache holds vdb scoped results and can
+ be replicated.
+ -->
<bean name="ResultsetCacheConfig" class="org.teiid.cache.CacheConfiguration">
<property name="enabled">true</property>
- <!-- Max Entries allowed for ResultSet Cache -->
+ <!-- Max Entries allowed for ResultSet Cache (default 1024) -->
<property name="maxEntries">1024</property>
- <!-- 2 hrs -->
+ <!-- Max age in seconds (default 7200 - 2 hours) -->
<property name="maxAgeInSeconds">7200</property>
- <!-- Allowed values are LRU, FIFO, LFU, EXPIRATION -->
+ <!-- Allowed values are LRU, EXPIRATION.
+ Setting this value to LRU will cause cache hint TTL values
+ to be ignored. (default EXPIRATION) -->
<property name="type">EXPIRATION</property>
</bean>
Modified: trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ExpirationAwareCache.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ExpirationAwareCache.java 2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ExpirationAwareCache.java 2010-08-16 15:47:22 UTC (rev 2463)
@@ -21,10 +21,13 @@
*/
package org.teiid.cache.jboss;
+import java.util.Set;
+
import org.jboss.cache.Cache;
import org.jboss.cache.Fqn;
import org.jboss.cache.Node;
import org.jboss.cache.eviction.ExpirationAlgorithmConfig;
+import org.teiid.cache.DefaultCache;
public class ExpirationAwareCache<K, V> extends JBossCache<K, V> {
@@ -35,28 +38,56 @@
@Override
public V get(K key) {
Node<K, V> node = getRootNode();
- Node child = node.getChild(Fqn.fromString(String.valueOf(key.getClass().getSimpleName()+key.hashCode())));
+ Node child = node.getChild(getFqn(key));
if (child != null) {
return (V)child.get(key);
}
- return super.get(key);
+ return null;
}
+ private Fqn<String> getFqn(K key) {
+ return Fqn.fromString(String.valueOf(key.getClass().getSimpleName()+key.hashCode()));
+ }
+
@Override
public V put(K key, V value) {
+ return this.put(key, value, null);
+ }
+
+ @Override
+ public V put(K key, V value, Long ttl) {
Node<K, V> node = getRootNode();
- Node child = node.addChild(Fqn.fromString(String.valueOf(key.getClass().getSimpleName()+key.hashCode())));
- Long future = new Long(System.currentTimeMillis() + (config.getMaxAgeInSeconds()*1000));
+ Node child = node.addChild(getFqn(key));
+
+ long future = DefaultCache.getExpirationTime(config.getMaxAgeInSeconds()*1000, ttl);
child.put(ExpirationAlgorithmConfig.EXPIRATION_KEY, future);
return (V)child.put(key, value);
}
-
+
@Override
- public org.teiid.cache.Cache<K, V> addChild(String name) {
+ public V remove(K key) {
Node<K, V> node = getRootNode();
- Node child = node.addChild(Fqn.fromString(name));
- child.put(ExpirationAlgorithmConfig.EXPIRATION_KEY, Long.MAX_VALUE);
- return new JBossCache<K, V>(this.cacheStore, child.getFqn());
+ Node child = node.getChild(getFqn(key));
+ if (child != null) {
+ return (V)child.remove(key);
+ }
+ return null;
}
+
+ @Override
+ public void clear() {
+ Node<K, V> node = getRootNode();
+ node.clearData();
+ Set<Node<K,V>> nodes = node.getChildren();
+ for (Node<K, V> child : nodes) {
+ child.clearData();
+ }
+ }
+
+ @Override
+ public int size() {
+ Node<K, V> node = getRootNode();
+ return node.getChildren().size();
+ }
}
Modified: trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java 2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java 2010-08-16 15:47:22 UTC (rev 2463)
@@ -50,30 +50,22 @@
this.rootFqn = fqn;
}
- /**
- * {@inheritDoc}
- */
public V get(K key) {
return this.cacheStore.get(this.rootFqn, key);
}
- /**
- * {@inheritDoc}
- */
public V put(K key, V value) {
return this.cacheStore.put(this.rootFqn, key, value);
}
+
+ public V put(K key, V value, Long ttl) {
+ return this.put(key, value);
+ }
- /**
- * {@inheritDoc}
- */
public V remove(K key) {
return this.cacheStore.remove(this.rootFqn, key);
}
- /**
- * {@inheritDoc}
- */
public Set<K> keySet() {
Node<K, V> node = this.cacheStore.getRoot().getChild(this.rootFqn);
if (node != null) {
@@ -82,9 +74,6 @@
return Collections.emptySet();
}
- /**
- * {@inheritDoc}
- */
public int size() {
Node<K, V> node = this.cacheStore.getRoot().getChild(this.rootFqn);
if (node != null) {
@@ -93,9 +82,6 @@
return 0;
}
- /**
- * {@inheritDoc}
- */
public void clear() {
Node<K, V> node = this.cacheStore.getRoot().getChild(this.rootFqn);
if (node != null) {
@@ -103,7 +89,6 @@
}
}
- @Override
public Collection<V> values() {
Node<K, V> node = this.cacheStore.getRoot().getChild(this.rootFqn);
if (node != null) {
@@ -120,28 +105,17 @@
this.cacheStore.addCacheListener(this.cacheListener);
}
- /**
- * {@inheritDoc}
- */
public synchronized void removeListener() {
this.cacheStore.removeCacheListener(this.cacheListener);
this.cacheListener = null;
}
- /**
- * {@inheritDoc}
- */
- @Override
public Cache<K, V> addChild(String name) {
Node<K, V> node = getRootNode();
Node<K, V> childNode = node.addChild(Fqn.fromString(name));
return new JBossCache<K, V>(this.cacheStore, childNode.getFqn());
}
- /**
- * {@inheritDoc}
- */
- @Override
public Cache<K, V> getChild(String name) {
Node<K, V> node = getRootNode();
Node<K, V> child = node.getChild(Fqn.fromString(name));
@@ -159,10 +133,6 @@
return node;
}
- /**
- * {@inheritDoc}
- */
- @Override
public List<Cache> getChildren() {
Node<K, V> node = getRootNode();
Set<Node<K,V>> nodes = node.getChildren();
@@ -176,10 +146,6 @@
return children;
}
- /**
- * {@inheritDoc}
- */
- @Override
public boolean removeChild(String name) {
Node<K, V> node = getRootNode();
return node.removeChild(Fqn.fromString(name));
Modified: trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java 2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java 2010-08-16 15:47:22 UTC (rev 2463)
@@ -31,8 +31,6 @@
import org.jboss.cache.config.EvictionAlgorithmConfig;
import org.jboss.cache.config.EvictionRegionConfig;
import org.jboss.cache.eviction.ExpirationAlgorithmConfig;
-import org.jboss.cache.eviction.FIFOAlgorithmConfig;
-import org.jboss.cache.eviction.LFUAlgorithmConfig;
import org.jboss.cache.eviction.LRUAlgorithmConfig;
import org.jboss.cache.eviction.RemoveOnEvictActionPolicy;
import org.teiid.cache.Cache;
@@ -96,16 +94,6 @@
lru.setTimeToLive(-1); // -1 no limit
evictionConfig = lru;
}
- else if (config.getPolicy() == Policy.FIFO) {
- FIFOAlgorithmConfig fifo = new FIFOAlgorithmConfig();
- fifo.setMaxNodes(config.getMaxEntries());
- evictionConfig = fifo;
- }
- else if (config.getPolicy() == Policy.LFU) {
- LFUAlgorithmConfig lfu = new LFUAlgorithmConfig();
- lfu.setMaxNodes(config.getMaxEntries());
- evictionConfig = lfu;
- }
else if (config.getPolicy() == Policy.EXPIRATION) {
ExpirationAlgorithmConfig lfu = new ExpirationAlgorithmConfig();
lfu.setMaxNodes(config.getMaxEntries());
@@ -113,14 +101,10 @@
}
EvictionRegionConfig erc = new EvictionRegionConfig(rootFqn, evictionConfig);
-
- if (config.getPolicy() == Policy.EXPIRATION) {
- erc.setEvictionActionPolicyClassName(RemoveOnEvictActionPolicy.class.getName());
- }
+ erc.setEvictionActionPolicyClassName(RemoveOnEvictActionPolicy.class.getName());
return erc;
}
-
public void destroy() {
this.destroyed = true;
}
Modified: trunk/engine/src/main/java/org/teiid/cache/Cache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/cache/Cache.java 2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/cache/Cache.java 2010-08-16 15:47:22 UTC (rev 2463)
@@ -22,11 +22,6 @@
package org.teiid.cache;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-
-
/**
* Abstraction over cache providers
*/
@@ -61,11 +56,12 @@
*
* @param key key with which the specified value is to be associated.
* @param value value to be associated with the specified key.
+ * @param ttl the time for this entry to live
* @return previous value associated with specified key, or <code>null</code> if there was no mapping for key.
* A <code>null</code> return can also indicate that the key previously associated <code>null</code> with the specified key,
* if the implementation supports null values.
*/
- V put(K key, V value);
+ V put(K key, V value, Long ttl);
/**
* Removes the value for this key from a Cache.
@@ -83,70 +79,12 @@
*/
int size();
-
- /**
- * Returns a {@link Set} containing the data in this Cache
- *
- * @return a {@link Set} containing the data in this Cache. If there is no data,
- * an empty {@link Set} is returned. The {@link Set} returned is always immutable.
- */
- Set<K> keySet();
-
/**
* Removes all the keys and their values from the Cache
*/
void clear();
/**
- * Listener to get the updates on this Cache
- * @param listener
- */
- void addListener(CacheListener listener);
-
- /**
- * Remove Listener to stop the updates on this Cache
- * @param listener
- */
- void removeListener();
-
- /**
- * Returns a {@link Collection} containing the data in this Cache
- *
- * @return a {@link Collection} containing the data in this Cache. If there is no data,
- * an empty {@link Collection} is returned.
- */
- Collection<V> values();
-
-
- /**
- * Add a child node to the current cache node
- * @param name - name of the child
- * @return Cache instance.
- */
- Cache addChild(String name);
-
- /**
- * Get the child cache node from the current node
- * @param name
- * @return null if not found
- */
- Cache getChild(String name);
-
- /**
- * Destroys the child from the current node; no-op if node not found
- * @param name
- * @return true if removed; false otherwise
- */
- boolean removeChild(String name);
-
-
- /**
- * Get child nodes under this cache node. If none found empty set is returned
- * @return
- */
- List<Cache> getChildren();
-
- /**
* Name of the cache node
* @return
*/
Modified: trunk/engine/src/main/java/org/teiid/cache/CacheConfiguration.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/cache/CacheConfiguration.java 2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/cache/CacheConfiguration.java 2010-08-16 15:47:22 UTC (rev 2463)
@@ -28,9 +28,7 @@
public enum Policy {
LRU, // Least Recently Used
- FIFO, // First in First Out
- LFU, // Least frequently Used
- EXPIRATION; // expires after certain time.
+ EXPIRATION
}
private Policy policy;
Modified: trunk/engine/src/main/java/org/teiid/cache/DefaultCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/cache/DefaultCache.java 2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/cache/DefaultCache.java 2010-08-16 15:47:22 UTC (rev 2463)
@@ -24,91 +24,186 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.teiid.core.util.LRUCache;
public class DefaultCache<K, V> implements Cache<K, V>, Serializable {
private static final long serialVersionUID = -511120208522577206L;
public static final int DEFAULT_MAX_SIZE_TOTAL = 250;
+ public static final int DEFAULT_MAX_AGE = 1000 * 60 * 60 * 2;
- Map<K, V> map;
- Map<String, Cache> children = new HashMap();
- String name;
+ private static class ExpirationEntry<K, V> {
+ long expiration;
+ K key;
+ V value;
+
+ public ExpirationEntry(long expiration, K key, V value) {
+ this.expiration = expiration;
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public int hashCode() {
+ return key.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof ExpirationEntry<?, ?>)) {
+ return false;
+ }
+ ExpirationEntry<K, V> other = (ExpirationEntry<K, V>)obj;
+ return this.key.equals(other.key);
+ }
+ }
+ protected LRUCache<K, ExpirationEntry<K, V>> map;
+ protected Map<String, Cache> children = new ConcurrentHashMap<String, Cache>();
+ protected String name;
+ protected long ttl;
+ protected LinkedHashSet<ExpirationEntry<K, V>> expirationQueue = new LinkedHashSet<ExpirationEntry<K, V>>();
+
public DefaultCache(String name) {
- this(name, DEFAULT_MAX_SIZE_TOTAL);
+ this(name, DEFAULT_MAX_SIZE_TOTAL, DEFAULT_MAX_SIZE_TOTAL);
}
- public DefaultCache(String name, int maxSize) {
- if(maxSize < 0){
- maxSize = DEFAULT_MAX_SIZE_TOTAL;
- }
- this.map = Collections.synchronizedMap(new LRUCache<K, V>(maxSize));
+ public DefaultCache(String name, int maxEntries, long ttl) {
+ this.map = new LRUCache<K, ExpirationEntry<K, V>>(maxEntries) {
+ @Override
+ protected boolean removeEldestEntry(java.util.Map.Entry<K, ExpirationEntry<K, V>> eldest) {
+ if (super.removeEldestEntry(eldest)) {
+ Iterator<ExpirationEntry<K, V>> iter = expirationQueue.iterator();
+ return validate(iter.next()) != null;
+ }
+ return false;
+ }
+ };
+
this.name = name;
+ this.ttl = ttl;
}
public void addListener(CacheListener listener) {
+ throw new UnsupportedOperationException();
}
public void clear() {
- map.clear();
+ synchronized (map) {
+ map.clear();
+ expirationQueue.clear();
+ }
}
public V get(K key) {
- return map.get(key);
+ synchronized (map) {
+ ExpirationEntry<K, V> result = map.get(key);
+ if (result != null) {
+ return validate(result);
+ }
+ return null;
+ }
}
+ private V validate(ExpirationEntry<K, V> result) {
+ if (result.expiration < System.currentTimeMillis()) {
+ remove(result.key);
+ return null;
+ }
+ return result.value;
+ }
+
public Set<K> keySet() {
- return map.keySet();
+ synchronized (map) {
+ return new HashSet<K>(map.keySet());
+ }
}
public V put(K key, V value) {
- return map.put(key, value);
+ return this.put(key, value, ttl);
}
+
+ public static long getExpirationTime(long defaultTtl, Long ttl) {
+ if (ttl == null) {
+ ttl = defaultTtl;
+ }
+ if (ttl < 0) {
+ return Long.MAX_VALUE;
+ }
+ return System.currentTimeMillis() + ttl;
+ }
+
+ public V put(K key, V value, Long timeToLive) {
+ synchronized (map) {
+ ExpirationEntry<K, V> entry = new ExpirationEntry<K, V>(getExpirationTime(ttl, timeToLive), key, value);
+ expirationQueue.add(entry);
+ ExpirationEntry<K, V> result = map.put(key, entry);
+ if (result != null) {
+ return result.value;
+ }
+ return null;
+ }
+ }
public V remove(K key) {
- return map.remove(key);
+ synchronized (map) {
+ ExpirationEntry<K, V> entry = new ExpirationEntry<K, V>(-1, key, null);
+ ExpirationEntry<K, V> result = map.put(key, entry);
+ if (result != null) {
+ expirationQueue.remove(entry);
+ return result.value;
+ }
+ return null;
+ }
}
public int size() {
- return map.size();
+ synchronized (map) {
+ return map.size();
+ }
}
public Collection<V> values() {
- return map.values();
+ synchronized (map) {
+ ArrayList<V> result = new ArrayList<V>(map.size());
+ for (ExpirationEntry<K, V> entry : new ArrayList<ExpirationEntry<K, V>>(map.values())) {
+ V value = validate(entry);
+ if (value != null) {
+ result.add(value);
+ }
+ }
+ return result;
+ }
}
- @Override
- public void removeListener() {
- }
-
- @Override
public Cache addChild(String name) {
- if (children.get(name) != null) {
- return children.get(name);
+ Cache c = children.get(name);
+ if (c != null) {
+ return c;
}
- Cache c = new DefaultCache(name);
+ c = new DefaultCache(name, map.getSpaceLimit(), ttl);
children.put(name, c);
return c;
}
- @Override
public Cache getChild(String name) {
return children.get(name);
}
- @Override
- public List<Cache> getChildren() {
- return new ArrayList<Cache>(children.values());
+ public Collection<Cache> getChildren() {
+ return children.values();
}
- @Override
public boolean removeChild(String name) {
Object obj = children.remove(name);
return obj != null;
@@ -117,5 +212,6 @@
@Override
public String getName() {
return name;
- }
+ }
+
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2010-08-16 15:47:22 UTC (rev 2463)
@@ -39,7 +39,6 @@
import org.teiid.core.types.DataTypeManager;
import org.teiid.core.util.Assertion;
import org.teiid.dqp.DQPPlugin;
-import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.query.analysis.AnalysisRecord;
@@ -109,7 +108,7 @@
TupleBatch batch = results.getBatch(row);
UUID uuid = java.util.UUID.randomUUID();
batch.preserveTypes();
- cache.put(uuid, batch);
+ cache.put(uuid, batch, this.hint != null?this.hint.getTtl():null);
this.cachedBatches.add(uuid);
}
return true;
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2010-08-16 15:47:22 UTC (rev 2463)
@@ -648,7 +648,7 @@
}
//prepared plan cache
- prepPlanCache = new SessionAwareCache<PreparedPlan>(this.cacheFactory, Cache.Type.PREPAREDPLAN, new CacheConfiguration(Policy.LRU, 60, config.getPreparedPlanCacheMaxCount()));
+ prepPlanCache = new SessionAwareCache<PreparedPlan>(this.cacheFactory, Cache.Type.PREPAREDPLAN, new CacheConfiguration(Policy.LRU, 60*60*8, config.getPreparedPlanCacheMaxCount()));
prepPlanCache.setBufferManager(this.bufferManager);
//get buffer manager
@@ -657,7 +657,7 @@
this.processWorkerPool = new ThreadReuseExecutor(DQPConfiguration.PROCESS_PLAN_QUEUE_NAME, config.getMaxThreads());
dataTierMgr = new TempTableDataManager(new DataTierManagerImpl(this,
- this.bufferService), this.bufferManager);
+ this.bufferService), this.bufferManager, this.processWorkerPool);
}
public void setBufferService(BufferService service) {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-08-16 15:47:22 UTC (rev 2463)
@@ -377,7 +377,7 @@
if (determinismLevel > FunctionMethod.SESSION_DETERMINISTIC) {
LogManager.logInfo(LogConstants.CTX_DQP, DQPPlugin.Util.getString("RequestWorkItem.cache_nondeterministic", originalCommand)); //$NON-NLS-1$
}
- dqpCore.getRsCache().put(cid, determinismLevel, cr);
+ dqpCore.getRsCache().put(cid, determinismLevel, cr, originalCommand.getCacheHint() != null?originalCommand.getCacheHint().getTtl():null);
}
add = sendResultsIfNeeded(batch);
if (!added) {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2010-08-16 15:47:22 UTC (rev 2463)
@@ -70,7 +70,7 @@
if(this.maxSize < 0){
this.maxSize = DEFAULT_MAX_SIZE_TOTAL;
}
- this.localCache = new DefaultCache<CacheID, T>("local", maxSize); //$NON-NLS-1$
+ this.localCache = new DefaultCache<CacheID, T>("local", maxSize, config.getMaxAgeInSeconds()*1000); //$NON-NLS-1$
if (type == Cache.Type.PREPAREDPLAN) {
this.distributedCache = localCache;
@@ -114,17 +114,18 @@
return cacheHit.get();
}
- /**
- * Create PreparedPlan for the given clientConn and SQl query
- */
public void put(CacheID id, int determinismLevel, T t){
+ this.put(id, determinismLevel, t, null);
+ }
+
+ public void put(CacheID id, int determinismLevel, T t, Long ttl){
if (!id.cachable) {
return;
}
if (determinismLevel >= FunctionMethod.SESSION_DETERMINISTIC) {
id.setSessionId(id.originalSessionId);
- this.localCache.put(id, t);
+ this.localCache.put(id, t, ttl);
}
else {
@@ -145,7 +146,7 @@
}
if (insert) {
- this.distributedCache.put(id, t);
+ this.distributedCache.put(id, t, ttl);
}
}
}
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/IndexCondition.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/IndexCondition.java 2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/IndexCondition.java 2010-08-16 15:47:22 UTC (rev 2463)
@@ -50,7 +50,7 @@
if (criteria instanceof CompareCriteria) {
CompareCriteria cc = (CompareCriteria)criteria;
if (cc.getOperator() == CompareCriteria.NE
- || !(cc.getRightExpression() instanceof Constant) || (cc.getOperator() != CompareCriteria.EQ && i > 0)) {
+ || !(cc.getRightExpression() instanceof Constant)) {
critIter.remove();
continue;
}
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2010-08-16 15:47:22 UTC (rev 2463)
@@ -25,6 +25,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.FutureTask;
import org.teiid.api.exception.query.ExpressionEvaluationException;
import org.teiid.api.exception.query.QueryMetadataException;
@@ -90,16 +93,28 @@
private ProcessorDataManager processorDataManager;
private BufferManager bufferManager;
+
+ private Executor executor;
+ public TempTableDataManager(ProcessorDataManager processorDataManager, BufferManager bufferManager) {
+ this(processorDataManager, bufferManager, new Executor() {
+ @Override
+ public void execute(Runnable command) {
+ command.run();
+ }
+ });
+ }
+
/**
* Constructor takes the "real" ProcessorDataManager that this object will be a proxy to,
* and will pass most calls through to transparently. Only when a request is registered for
* a temp group will this proxy do it's thing.
* @param processorDataManager the real ProcessorDataManager that this object is a proxy to
- */
- public TempTableDataManager(ProcessorDataManager processorDataManager, BufferManager bufferManager){
+ */
+ public TempTableDataManager(ProcessorDataManager processorDataManager, BufferManager bufferManager, Executor executor){
this.processorDataManager = processorDataManager;
this.bufferManager = bufferManager;
+ this.executor = executor;
}
public TupleSource registerRequest(
@@ -265,12 +280,12 @@
}
}
- private TupleSource registerQuery(CommandContext context,
+ private TupleSource registerQuery(final CommandContext context,
TempTableStore contextStore, Query query)
throws TeiidComponentException, QueryMetadataException,
TeiidProcessingException, ExpressionEvaluationException,
QueryProcessingException {
- GroupSymbol group = query.getFrom().getGroups().get(0);
+ final GroupSymbol group = query.getFrom().getGroups().get(0);
if (!group.isTempGroupSymbol()) {
return null;
}
@@ -278,11 +293,23 @@
boolean remapColumns = !tableName.equalsIgnoreCase(group.getName());
TempTable table = null;
if (group.isGlobalTable()) {
- TempTableStore globalStore = context.getGlobalTableStore();
- MatTableInfo info = globalStore.getMatTableInfo(tableName);
+ final TempTableStore globalStore = context.getGlobalTableStore();
+ final MatTableInfo info = globalStore.getMatTableInfo(tableName);
boolean load = info.shouldLoad();
if (load) {
- loadGlobalTable(context, group, tableName, globalStore, info);
+ if (!info.isValid()) {
+ //blocking load
+ loadGlobalTable(context, group, tableName, globalStore, info);
+ } else {
+ Callable<Integer> toCall = new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ return loadGlobalTable(context, group, tableName, globalStore, info);
+ }
+ };
+ FutureTask<Integer> task = new FutureTask<Integer>(toCall);
+ executor.execute(task);
+ }
}
table = globalStore.getOrCreateTempTable(tableName, query, bufferManager, false);
} else {
@@ -308,8 +335,7 @@
private int loadGlobalTable(CommandContext context,
GroupSymbol group, final String tableName,
TempTableStore globalStore, MatTableInfo info)
- throws QueryMetadataException, TeiidComponentException,
- TeiidProcessingException, ExpressionEvaluationException {
+ throws TeiidComponentException, TeiidProcessingException {
LogManager.logInfo(LogConstants.CTX_MATVIEWS, QueryExecPlugin.Util.getString("TempTableDataManager.loading", tableName)); //$NON-NLS-1$
QueryMetadataInterface metadata = context.getMetadata();
Create create = new Create();
@@ -344,10 +370,15 @@
//TODO: if this insert fails, it's unnecessary to do the undo processing
table.insert(ts, table.getColumns());
rowCount = table.getRowCount();
+ } catch (TeiidComponentException e) {
+ LogManager.logError(LogConstants.CTX_MATVIEWS, e, QueryExecPlugin.Util.getString("TempTableDataManager.failed_load", tableName)); //$NON-NLS-1$
+ throw e;
+ } catch (TeiidProcessingException e) {
+ LogManager.logError(LogConstants.CTX_MATVIEWS, e, QueryExecPlugin.Util.getString("TempTableDataManager.failed_load", tableName)); //$NON-NLS-1$
+ throw e;
} finally {
if (rowCount == -1) {
info.setState(MatState.FAILED_LOAD, null);
- LogManager.logInfo(LogConstants.CTX_MATVIEWS, QueryExecPlugin.Util.getString("TempTableDataManager.failed_load", tableName)); //$NON-NLS-1$
} else {
globalStore.swapTempTable(tableName, table);
info.setState(MatState.LOADED, true);
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java 2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java 2010-08-16 15:47:22 UTC (rev 2463)
@@ -61,9 +61,8 @@
for (;;) {
switch (state) {
case NEEDS_LOADING:
- updateTime = System.currentTimeMillis();
case FAILED_LOAD:
- state = MatState.LOADING;
+ setState(MatState.LOADING);
return true;
case LOADING:
if (valid) {
@@ -77,7 +76,7 @@
continue;
case LOADED:
if (ttl >= 0 && System.currentTimeMillis() - updateTime - ttl > 0) {
- state = MatState.LOADING;
+ setState(MatState.LOADING);
return true;
}
return false;
@@ -90,11 +89,15 @@
if (valid != null) {
this.valid = valid;
}
- this.state = state;
- this.updateTime = System.currentTimeMillis();
+ setState(state);
notifyAll();
return oldState;
}
+
+ private void setState(MatState state) {
+ this.state = state;
+ this.updateTime = System.currentTimeMillis();
+ }
public synchronized void setTtl(long ttl) {
this.ttl = ttl;
Added: trunk/engine/src/test/java/org/teiid/cache/TestDefaultCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/cache/TestDefaultCache.java (rev 0)
+++ trunk/engine/src/test/java/org/teiid/cache/TestDefaultCache.java 2010-08-16 15:47:22 UTC (rev 2463)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.cache;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class TestDefaultCache {
+
+ @Test public void testExpiration() throws InterruptedException {
+ DefaultCache<Integer, Integer> cache = new DefaultCache<Integer, Integer>("foo", 2, 50);
+ cache.put(1, 1);
+ Thread.sleep(60);
+ assertNull(cache.get(1));
+ cache.put(2, 2);
+ Thread.sleep(30);
+ cache.put(3, 3);
+ assertNotNull(cache.get(2));
+ Thread.sleep(40);
+ cache.put(4, 4);
+ //preferred to purge 2 instead of 3
+ assertNotNull(cache.get(3));
+ }
+
+}
Property changes on: trunk/engine/src/test/java/org/teiid/cache/TestDefaultCache.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java 2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java 2010-08-16 15:47:22 UTC (rev 2463)
@@ -96,7 +96,7 @@
CachedResults results = new CachedResults();
results.setResults(tb);
results.setCommand(new Query());
- Cache cache = new DefaultCache("dummy", 250); //$NON-NLS-1$
+ Cache cache = new DefaultCache("dummy"); //$NON-NLS-1$
results.prepare(cache, fbs.getBufferManager());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Modified: trunk/test-integration/common/src/test/java/org/teiid/jdbc/TestResultsCache.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/jdbc/TestResultsCache.java 2010-08-14 21:19:46 UTC (rev 2462)
+++ trunk/test-integration/common/src/test/java/org/teiid/jdbc/TestResultsCache.java 2010-08-16 15:47:22 UTC (rev 2463)
@@ -56,6 +56,19 @@
assertFalse(rs.next());
}
+ @Test public void testCacheHintTtl() throws Exception {
+ Statement s = conn.createStatement();
+ s.execute("set showplan on");
+ ResultSet rs = s.executeQuery("/* cache(ttl:50) */ select 1");
+ assertTrue(rs.next());
+ s.execute("set noexec on");
+ rs = s.executeQuery("/* cache(ttl:50) */ select 1");
+ assertTrue(rs.next());
+ Thread.sleep(60);
+ rs = s.executeQuery("/* cache(ttl:50) */ select 1");
+ assertFalse(rs.next());
+ }
+
@Test public void testExecutionProperty() throws Exception {
Statement s = conn.createStatement();
s.execute("set showplan on");
More information about the teiid-commits
mailing list