Author: rareddy
Date: 2011-07-25 11:46:41 -0400 (Mon, 25 Jul 2011)
New Revision: 3332
Modified:
branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java
branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheListener.java
branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/TupleBatchCacheLoader.java
branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/Cache.java
branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/CacheListener.java
branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/DefaultCache.java
branches/7.1.1.CP3/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
branches/7.1.1.CP3/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
branches/7.1.1.CP3/engine/src/main/resources/org/teiid/query/i18n.properties
branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java
branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
branches/7.1.1.CP3/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
branches/7.1.1.CP3/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
Log:
TEIID-1657: Implemented distributed refreshMatView call based on the JBoss Cache Listener
mechanism. When user issues a refreshMatView call on one node, then all the nodes gets
notified about the change in the original node after load of the mat view table is
finished. The proactive load on the passive nodes are done in a separate thread. Also
fixed a memory leak with the TupleBufferCacheLoader in loading batches.
Modified:
branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java
===================================================================
---
branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java 2011-07-23
11:41:47 UTC (rev 3331)
+++
branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java 2011-07-25
15:46:41 UTC (rev 3332)
@@ -112,11 +112,13 @@
}
}
- public synchronized void addListener(CacheListener listener) {
- this.cacheListener = new JBossCacheListener(this.rootFqn, listener);
+ @Override
+ public synchronized void setListener(CacheListener listener) {
+ this.cacheListener = new JBossCacheListener(this.rootFqn, this.cacheStore, this,
listener);
this.cacheStore.addCacheListener(this.cacheListener);
}
+ @Override
public synchronized void removeListener() {
this.cacheStore.removeCacheListener(this.cacheListener);
this.cacheListener = null;
Modified:
branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheListener.java
===================================================================
---
branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheListener.java 2011-07-23
11:41:47 UTC (rev 3331)
+++
branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheListener.java 2011-07-25
15:46:41 UTC (rev 3332)
@@ -21,7 +21,11 @@
*/
package org.teiid.cache.jboss;
+import java.util.Set;
+
import org.jboss.cache.Fqn;
+import org.jboss.cache.Node;
+import org.jboss.cache.eviction.ExpirationAlgorithmConfig;
import org.jboss.cache.notifications.annotation.NodeCreated;
import org.jboss.cache.notifications.annotation.NodeEvicted;
import org.jboss.cache.notifications.annotation.NodeLoaded;
@@ -33,14 +37,18 @@
@org.jboss.cache.notifications.annotation.CacheListener
-public class JBossCacheListener {
+public class JBossCacheListener<K,V> {
private CacheListener listener;
private Fqn rootFqn;
+ private JBossCache cache;
+ private org.jboss.cache.Cache<K,V> cacheStore;
- public JBossCacheListener(Fqn fqn, CacheListener listener) {
+ public JBossCacheListener(Fqn fqn, org.jboss.cache.Cache cacheStore, JBossCache cache,
CacheListener listener) {
this.rootFqn = fqn;
this.listener = listener;
+ this.cache = cache;
+ this.cacheStore = cacheStore;
}
@NodeCreated
@@ -55,4 +63,61 @@
listener.cacheChanged();
}
}
+
+ @NodeCreated
+ public synchronized void cacheCreated(NodeEvent ne) {
+ if (!ne.isPre() && !ne.isOriginLocal()) {
+ Fqn fqn = ne.getFqn();
+ if (fqn.isChildOrEquals(rootFqn)) {
+ Node<K,V> node = this.cacheStore.getNode(fqn);
+ if (node != null) {
+ Set<K> keys = node.getKeys();
+ for (K key:keys) {
+ if ((key instanceof String) &&
(key.equals(ExpirationAlgorithmConfig.EXPIRATION_KEY))) {
+ continue;
+ }
+ listener.cacheCreated(key, cache.get(key));
+ }
+ }
+ }
+ }
+ }
+
+ @NodeRemoved
+ public synchronized void cacheRemoved(NodeEvent ne) {
+ if (!ne.isPre() && !ne.isOriginLocal()) {
+ Fqn fqn = ne.getFqn();
+ if (fqn.isChildOrEquals(rootFqn)) {
+ Node<K,V> node = this.cacheStore.getNode(fqn);
+ if (node != null) {
+ Set<K> keys = node.getKeys();
+ for (K key:keys) {
+ if ((key instanceof String) &&
(key.equals(ExpirationAlgorithmConfig.EXPIRATION_KEY))) {
+ continue;
+ }
+ listener.cacheRemoved(key, cache.get(key));
+ }
+ }
+ }
+ }
+ }
+
+ @NodeModified
+ public synchronized void cacheModified(NodeEvent ne) {
+ Fqn fqn = ne.getFqn();
+ if (!ne.isPre() && !ne.isOriginLocal()) {
+ if (fqn.isChildOrEquals(rootFqn)) {
+ Node<K,V> node = this.cacheStore.getNode(fqn);
+ if (node != null) {
+ Set<K> keys = node.getKeys();
+ for (K key:keys) {
+ if ((key instanceof String) &&
(key.equals(ExpirationAlgorithmConfig.EXPIRATION_KEY))) {
+ continue;
+ }
+ listener.cacheModified(key, cache.get(key));
+ }
+ }
+ }
+ }
+ }
}
Modified:
branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/TupleBatchCacheLoader.java
===================================================================
---
branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/TupleBatchCacheLoader.java 2011-07-23
11:41:47 UTC (rev 3331)
+++
branches/7.1.1.CP3/cache-jbosscache/src/main/java/org/teiid/cache/jboss/TupleBatchCacheLoader.java 2011-07-25
15:46:41 UTC (rev 3332)
@@ -62,9 +62,8 @@
map.put(id, b);
return map;
}
- return super.get(fqn);
}
- return null;
+ return super.get(fqn);
}
@Override
Modified: branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/Cache.java
===================================================================
--- branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/Cache.java 2011-07-23 11:41:47
UTC (rev 3331)
+++ branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/Cache.java 2011-07-25 15:46:41
UTC (rev 3332)
@@ -82,4 +82,14 @@
* @return
*/
Set<K> keys();
+
+ /**
+ * set cache listener
+ * @param listener
+ */
+ void setListener(CacheListener<K, V> listener);
+
+
+ void removeListener();
+
}
Modified: branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/CacheListener.java
===================================================================
--- branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/CacheListener.java 2011-07-23
11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/CacheListener.java 2011-07-25
15:46:41 UTC (rev 3332)
@@ -25,6 +25,9 @@
/**
* Listener for the cache events like add,update delete
*/
-public interface CacheListener {
+public interface CacheListener<K, V> {
void cacheChanged();
+ void cacheCreated(K key, V value);
+ void cacheRemoved(K key, V value);
+ void cacheModified(K key, V value);
}
Modified: branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/DefaultCache.java
===================================================================
--- branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/DefaultCache.java 2011-07-23
11:41:47 UTC (rev 3331)
+++ branches/7.1.1.CP3/engine/src/main/java/org/teiid/cache/DefaultCache.java 2011-07-25
15:46:41 UTC (rev 3332)
@@ -95,9 +95,11 @@
this.ttl = ttl;
}
- public void addListener(CacheListener listener) {
- throw new UnsupportedOperationException();
+ public void setListener(CacheListener listener) {
}
+
+ public void removeListener() {
+ }
public void clear() {
synchronized (map) {
Modified:
branches/7.1.1.CP3/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
---
branches/7.1.1.CP3/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2011-07-23
11:41:47 UTC (rev 3331)
+++
branches/7.1.1.CP3/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2011-07-25
15:46:41 UTC (rev 3332)
@@ -95,8 +95,12 @@
this.uuid = java.util.UUID.randomUUID().toString();
}
return this.uuid;
- }
+ }
+ public void setId(String uuid) {
+ this.uuid = uuid;
+ }
+
public boolean isLobs() {
return lobIndexes != null;
}
Modified:
branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
===================================================================
---
branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2011-07-23
11:41:47 UTC (rev 3331)
+++
branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2011-07-25
15:46:41 UTC (rev 3332)
@@ -130,6 +130,7 @@
}
buffer = bufferManager.createTupleBuffer(schema, "cached",
TupleSourceType.FINAL); //$NON-NLS-1$
buffer.setBatchSize(this.batchSize);
+ buffer.setId(this.uuid);
if (this.hint != null) {
buffer.setPrefersMemory(this.hint.getPrefersMemory());
}
@@ -142,6 +143,7 @@
return false;
}
buffer.addTupleBatch(batch, true);
+ cache.remove(uuid+","+row); //$NON-NLS-1$
}
this.results = buffer;
bufferManager.addTupleBuffer(this.results);
Modified:
branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
---
branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-07-23
11:41:47 UTC (rev 3331)
+++
branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-07-25
15:46:41 UTC (rev 3332)
@@ -22,6 +22,10 @@
package org.teiid.dqp.internal.process;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -46,6 +50,7 @@
import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
import org.teiid.cache.CacheConfiguration;
import org.teiid.cache.CacheFactory;
+import org.teiid.cache.CacheListener;
import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.client.DQP;
import org.teiid.client.RequestMessage;
@@ -212,6 +217,7 @@
private CacheFactory cacheFactory;
private SessionAwareCache<CachedResults> matTables;
+ private CacheListener<TempTableDataManager.MatTableKey,
TempTableDataManager.MatTableEntry> matTableListener;
/**
* perform a full shutdown and wait for 10 seconds for all threads to finish
@@ -711,11 +717,11 @@
}
if (cacheFactory.isReplicated()) {
- matTables = new SessionAwareCache<CachedResults>(this.cacheFactory,
SessionAwareCache.Type.RESULTSET, new CacheConfiguration(Policy.EXPIRATION, -1, -1,
"MaterilizationTables")); //$NON-NLS-1$
+ matTables = new SessionAwareCache<CachedResults>(this.cacheFactory,
SessionAwareCache.Type.RESULTSET, new CacheConfiguration(Policy.EXPIRATION, -1, -1,
"MaterializationTables")); //$NON-NLS-1$
matTables.setBufferManager(this.bufferManager);
}
- dataTierMgr = new TempTableDataManager(new
DataTierManagerImpl(this,this.bufferService), this.bufferManager, this.processWorkerPool,
this.rsCache, this.matTables, this.cacheFactory);
+ dataTierMgr = new TempTableDataManager(new
DataTierManagerImpl(this,this.bufferService), this.bufferManager, this.processWorkerPool,
this.rsCache, this.matTables, this.cacheFactory, this.matTableListener);
}
public void setBufferService(BufferService service) {
@@ -726,6 +732,29 @@
this.transactionService = service;
}
+ public void setMatTableListener(final CacheListener<TempTableDataManager.MatTableKey,
TempTableDataManager.MatTableEntry> listener) {
+ this.matTableListener = (CacheListener<TempTableDataManager.MatTableKey,
TempTableDataManager.MatTableEntry>)Proxy.newProxyInstance(this.getClass().getClassLoader(),
new Class[] {CacheListener.class}, new InvocationHandler() {
+ @Override
+ public Object invoke(Object proxy, final Method method, final Object[] args) throws
Throwable {
+ addWork(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ method.invoke(listener, args);
+ } catch (IllegalArgumentException e) {
+ LogManager.logDetail(LogConstants.CTX_DQP, e);
+ } catch (IllegalAccessException e) {
+ LogManager.logDetail(LogConstants.CTX_DQP, e);
+ } catch (InvocationTargetException e) {
+ LogManager.logDetail(LogConstants.CTX_DQP, e);
+ }
+ }
+ });
+ return null;
+ }
+ });
+ }
+
@Override
public boolean cancelRequest(long requestID)
throws TeiidProcessingException, TeiidComponentException {
Modified:
branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
---
branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2011-07-23
11:41:47 UTC (rev 3331)
+++
branches/7.1.1.CP3/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2011-07-25
15:46:41 UTC (rev 3332)
@@ -32,6 +32,7 @@
import org.teiid.cache.Cache;
import org.teiid.cache.CacheConfiguration;
import org.teiid.cache.CacheFactory;
+import org.teiid.cache.CacheListener;
import org.teiid.cache.DefaultCache;
import org.teiid.cache.DefaultCacheFactory;
import org.teiid.cache.CacheConfiguration.Policy;
@@ -307,4 +308,14 @@
public void setBufferManager(BufferManager bufferManager) {
this.bufferManager = bufferManager;
}
+
+ public void setListener(CacheListener<CacheID, T> listener) {
+ this.localCache.setListener(listener);
+ this.distributedCache.setListener(listener);
+ }
+
+ public void removeListener() {
+ this.localCache.removeListener();
+ this.distributedCache.removeListener();
+ }
}
Modified:
branches/7.1.1.CP3/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
---
branches/7.1.1.CP3/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-07-23
11:41:47 UTC (rev 3331)
+++
branches/7.1.1.CP3/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-07-25
15:46:41 UTC (rev 3332)
@@ -40,6 +40,7 @@
import org.teiid.cache.Cache;
import org.teiid.cache.CacheConfiguration;
import org.teiid.cache.CacheFactory;
+import org.teiid.cache.CacheListener;
import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
@@ -113,7 +114,7 @@
private SessionAwareCache<CachedResults> cache;
private Executor executor;
- private static class MatTableKey implements Serializable {
+ public static class MatTableKey implements Serializable {
private static final long serialVersionUID = 5481692896572663992L;
String name;
VDBKey vdb;
@@ -134,19 +135,37 @@
MatTableKey other = (MatTableKey)obj;
return this.name.equals(other.name) && this.vdb.equals(other.vdb);
}
+
+ public String getVDBName() {
+ return vdb.getName();
+ }
+
+ public int getVDBVersion() {
+ return vdb.getVersion();
+ }
}
- private static class MatTableEntry implements Serializable {
+ public static class MatTableEntry implements Serializable {
private static final long serialVersionUID = 8559613701442751579L;
long lastUpdate = System.currentTimeMillis();
boolean valid;
+ String viewName;
+
+ public String getViewName() {
+ return viewName;
+ }
+
+ public boolean allowsUpdate() {
+ return valid && viewName != null;
+ }
}
private Cache<MatTableKey, MatTableEntry> tables;
+ private Cache<MatTableKey, MatTableEntry> refreshJob;
private SessionAwareCache<CachedResults> distributedCache;
public TempTableDataManager(ProcessorDataManager processorDataManager, BufferManager
bufferManager,
- Executor executor, SessionAwareCache<CachedResults> cache,
SessionAwareCache<CachedResults> distibutedCache, CacheFactory cacheFactory){
+ Executor executor, SessionAwareCache<CachedResults> cache,
SessionAwareCache<CachedResults> distibutedCache, CacheFactory cacheFactory,
CacheListener<MatTableKey, MatTableEntry> listener){
this.processorDataManager = processorDataManager;
this.bufferManager = bufferManager;
this.executor = executor;
@@ -155,6 +174,10 @@
if (distibutedCache != null) {
CacheConfiguration cc = new CacheConfiguration(Policy.LRU, -1, -1,
"MaterializationUpdates"); //$NON-NLS-1$
tables = cacheFactory.get(cc.getLocation(), cc);
+
+ cc = new CacheConfiguration(Policy.LRU, -1, -1,
"MaterializationRefresh"); //$NON-NLS-1$
+ refreshJob = cacheFactory.get(cc.getLocation(), cc);
+ refreshJob.setListener(listener);
}
}
@@ -289,7 +312,7 @@
context.setDeterminismLevel(determinismLevel);
return tb.createIndexedTupleSource();
}
-
+
private TupleSource handleSystemProcedures(CommandContext context, StoredProcedure
proc)
throws TeiidComponentException, QueryMetadataException,
QueryProcessingException, QueryResolverException,
@@ -303,9 +326,23 @@
String matTableName = RelationalPlanner.MAT_PREFIX+matViewName.toUpperCase();
LogManager.logDetail(LogConstants.CTX_MATVIEWS, "processing refreshmatview
for", matViewName); //$NON-NLS-1$
MatTableInfo info = globalStore.getMatTableInfo(matTableName);
+
+ Long loadTime = null;
+ boolean useCache = false;
+ if (this.distributedCache != null) {
+ MatTableKey key = new MatTableKey();
+ key.name = matTableName;
+ key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
+ MatTableEntry entry = this.tables.get(key);
+ useCache = (entry != null && entry.valid && entry.lastUpdate >
info.getUpdateTime());
+ if (useCache) {
+ loadTime = entry.lastUpdate;
+ }
+ }
+
boolean invalidate =
Boolean.TRUE.equals(((Constant)proc.getParameter(2).getExpression()).getValue());
if (invalidate) {
- touchTable(context, matTableName, false);
+ touchTable(context, matTableName, matViewName, false, System.currentTimeMillis());
}
MatState oldState = info.setState(MatState.NEEDS_LOADING,
invalidate?Boolean.FALSE:null, null);
if (oldState == MatState.LOADING) {
@@ -316,7 +353,7 @@
Object matTableId = RelationalPlanner.getGlobalTempTableMetadataId(group,
matTableName, context, metadata, AnalysisRecord.createNonRecordingRecord());
GroupSymbol matTable = new GroupSymbol(matTableName);
matTable.setMetadataID(matTableId);
- int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore, info,
null, false);
+ int rowCount = loadGlobalTable(context, matTable, matTableName, matViewName,
globalStore, info, invalidate?null:loadTime, !invalidate && useCache);
return CollectionTupleSource.createUpdateCountTupleSource(rowCount);
} else if (StringUtil.endsWithIgnoreCase(proc.getProcedureCallableName(),
REFRESHMATVIEWROW)) {
Object groupID = validateMatView(metadata, proc);
@@ -408,9 +445,9 @@
if (load) {
if (!info.isValid()) {
//blocking load
- loadGlobalTable(context, group, tableName, globalStore, info, loadTime, true);
+ loadGlobalTable(context, group, tableName, null, globalStore, info, loadTime,
true);
} else {
- loadAsynch(context, group, tableName, globalStore, info, loadTime);
+ loadAsynch(context, group, tableName, null, globalStore, info, loadTime);
}
}
table = globalStore.getOrCreateTempTable(tableName, query, bufferManager, false);
@@ -435,13 +472,13 @@
}
private void loadAsynch(final CommandContext context,
- final GroupSymbol group, final String tableName,
+ final GroupSymbol group, final String tableName,final String viewName,
final TempTableStore globalStore, final MatTableInfo info,
final Long loadTime) {
Callable<Integer> toCall = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
- return loadGlobalTable(context, group, tableName, globalStore, info, loadTime,
true);
+ return loadGlobalTable(context, group, tableName, viewName, globalStore, info,
loadTime, true);
}
};
FutureTask<Integer> task = new FutureTask<Integer>(toCall);
@@ -449,9 +486,9 @@
}
private int loadGlobalTable(CommandContext context,
- GroupSymbol group, final String tableName,
+ GroupSymbol group, final String tableName, final String viewName,
TempTableStore globalStore, MatTableInfo info, Long loadTime, boolean useCache)
- throws TeiidComponentException, TeiidProcessingException {
+ throws TeiidComponentException, TeiidProcessingException {
LogManager.logInfo(LogConstants.CTX_MATVIEWS,
QueryPlugin.Util.getString("TempTableDataManager.loading", tableName));
//$NON-NLS-1$
QueryMetadataInterface metadata = context.getMetadata();
Create create = new Create();
@@ -477,9 +514,11 @@
}
}
int rowCount = -1;
+ boolean tableUpdated = false;
+ String fullName = null;
try {
- String fullName = metadata.getFullName(group.getMetadataID());
- TupleSource ts = null;
+ fullName = metadata.getFullName(group.getMetadataID());
+ TupleSource ts = null;
CacheID cid = null;
if (distributedCache != null) {
cid = new CacheID(new ParseInfo(), fullName, context.getVdbName(),
@@ -487,6 +526,7 @@
if (useCache) {
CachedResults cr = this.distributedCache.get(cid);
if (cr != null) {
+ LogManager.logInfo(LogConstants.CTX_MATVIEWS,
QueryPlugin.Util.getString("TempTableDataManager.cache_load", tableName));
//$NON-NLS-1$
ts = cr.getResults().createIndexedTupleSource();
}
}
@@ -501,14 +541,15 @@
String transformation = metadata.getVirtualPlan(group.getMetadataID()).getQuery();
QueryProcessor qp =
context.getQueryProcessorFactory().createQueryProcessor(transformation, fullName,
context);
qp.setNonBlocking(true);
-
+
if (distributedCache != null) {
CachedResults cr = new CachedResults();
BatchCollector bc = qp.createBatchCollector();
TupleBuffer tb = bc.collectTuples();
cr.setResults(tb);
- touchTable(context, fullName, true);
- this.distributedCache.put(cid, FunctionMethod.VDB_DETERMINISTIC, cr,
info.getTtl());
+ touchTable(context, fullName, viewName, true, info.getUpdateTime());
+ this.distributedCache.put(cid, FunctionMethod.VDB_DETERMINISTIC, cr,
info.getTtl());
+ tableUpdated = true;
ts = tb.createIndexedTupleSource();
} else {
ts = new BatchCollector.BatchProducerTupleSource(qp);
@@ -541,19 +582,40 @@
globalStore.swapTempTable(tableName, table);
info.setState(MatState.LOADED, true, loadTime);
LogManager.logInfo(LogConstants.CTX_MATVIEWS,
QueryPlugin.Util.getString("TempTableDataManager.loaded", tableName, rowCount));
//$NON-NLS-1$
+ if (tableUpdated) {
+ initiateRefreshAcrossCluster(context, fullName, viewName);
+ }
}
}
return rowCount;
}
- private void touchTable(CommandContext context, String fullName, boolean valid) {
+ private void touchTable(CommandContext context, String fullName, String viewName,
boolean valid, long loadtime) {
MatTableKey key = new MatTableKey();
key.name = fullName;
key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
MatTableEntry matTableEntry = new MatTableEntry();
matTableEntry.valid = valid;
+ matTableEntry.viewName = viewName;
+ matTableEntry.lastUpdate = loadtime;
tables.put(key, matTableEntry, null);
}
+
+ private void initiateRefreshAcrossCluster(CommandContext context, String fullName,
String viewName) {
+ MatTableKey key = new MatTableKey();
+ key.name = fullName;
+ key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
+ MatTableEntry matTableEntry = new MatTableEntry();
+ matTableEntry.valid = true;
+ matTableEntry.viewName = viewName;
+ matTableEntry.lastUpdate = System.currentTimeMillis();
+ MatTableEntry entry = refreshJob.put(key, matTableEntry, null);
+ if (entry == null) {
+ // in the case of refreshjob, cacheCreate are not being notified correctly due to
nature of how Teiid uses the cache
+ // so, in order to get a cacheModified event insert again.
+ refreshJob.put(key, matTableEntry, null);
+ }
+ }
/**
* Return a list of ElementSymbols for the given index/key object
Modified: branches/7.1.1.CP3/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
---
branches/7.1.1.CP3/engine/src/main/resources/org/teiid/query/i18n.properties 2011-07-23
11:41:47 UTC (rev 3331)
+++
branches/7.1.1.CP3/engine/src/main/resources/org/teiid/query/i18n.properties 2011-07-25
15:46:41 UTC (rev 3332)
@@ -783,6 +783,7 @@
TempTableDataManager.failed_load=Failed to load materialized view table {0}.
TempTableDataManager.loaded=Loaded materialized view table {0} with row count {1}.
+TempTableDataManager.cache_load=Loaded materialized view table {0} from cached contents
from another clustered node.
TempTableDataManager.loading=Loading materialized view table {0}
TempTableDataManager.not_implicit_matview={0} does not target an internal materialized
view.
TempTableDataManager.row_refresh_pk=Materialized view {0} cannot have a row refreshed
since there is no primary key.
Modified:
branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java
===================================================================
---
branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java 2011-07-23
11:41:47 UTC (rev 3331)
+++
branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java 2011-07-25
15:46:41 UTC (rev 3332)
@@ -74,7 +74,7 @@
command.run();
}
};
- dataManager = new TempTableDataManager(hdm, bm, executor, cache, cache, new
DefaultCacheFactory());
+ dataManager = new TempTableDataManager(hdm, bm, executor, cache, cache, new
DefaultCacheFactory(), null);
}
private void execute(String sql, List<?>... expectedResults) throws Exception {
Modified:
branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
---
branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2011-07-23
11:41:47 UTC (rev 3331)
+++
branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2011-07-25
15:46:41 UTC (rev 3332)
@@ -245,7 +245,7 @@
command.run();
}
};
- dataManager = new TempTableDataManager(dataManager, bufferMgr, executor, cache,
null, null);
+ dataManager = new TempTableDataManager(dataManager, bufferMgr, executor, cache,
null, null, null);
}
if (context.getQueryProcessorFactory() == null) {
context.setQueryProcessorFactory(new QueryProcessorFactoryImpl(bufferMgr,
dataManager, new DefaultCapabilitiesFinder(), null, context.getMetadata()));
Modified:
branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
===================================================================
---
branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestTempTables.java 2011-07-23
11:41:47 UTC (rev 3331)
+++
branches/7.1.1.CP3/engine/src/test/java/org/teiid/query/processor/TestTempTables.java 2011-07-25
15:46:41 UTC (rev 3332)
@@ -76,7 +76,7 @@
command.run();
}
};
- dataManager = new TempTableDataManager(fdm, bm, executor, cache, null, null);
+ dataManager = new TempTableDataManager(fdm, bm, executor, cache, null, null, null);
}
@Test public void testInsertWithQueryExpression() throws Exception {
Modified:
branches/7.1.1.CP3/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
---
branches/7.1.1.CP3/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2011-07-23
11:41:47 UTC (rev 3331)
+++
branches/7.1.1.CP3/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2011-07-25
15:46:41 UTC (rev 3332)
@@ -70,6 +70,7 @@
import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
import org.teiid.adminapi.jboss.AdminProvider;
import org.teiid.cache.CacheFactory;
+import org.teiid.cache.CacheListener;
import org.teiid.client.DQP;
import org.teiid.client.RequestMessage;
import org.teiid.client.ResultsMessage;
@@ -97,6 +98,9 @@
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
import org.teiid.net.TeiidURL;
+import org.teiid.query.tempdata.TempTableDataManager;
+import org.teiid.query.tempdata.TempTableDataManager.MatTableEntry;
+import org.teiid.query.tempdata.TempTableDataManager.MatTableKey;
import org.teiid.security.SecurityHelper;
import org.teiid.transport.ClientServiceRegistry;
import org.teiid.transport.ClientServiceRegistryImpl;
@@ -147,7 +151,8 @@
public void start() {
dqpCore.setTransactionService((TransactionService)LogManager.createLoggingProxy(LogConstants.CTX_TXN_LOG,
transactionServerImpl, new Class[] {TransactionService.class}, MessageLevel.DETAIL));
-
+ dqpCore.setMatTableListener(getMatTableListener());
+
// create the necessary services
createClientServices();
@@ -497,7 +502,7 @@
@Override
@ManagementOperation(description="Execute a sql query",
params={@ManagementParameter(name="vdbName"),@ManagementParameter(name="vdbVersion"),
@ManagementParameter(name="command"),
@ManagementParameter(name="timoutInMilli")})
- public List<List> executeQuery(final String vdbName, final int version, final
String command, final long timoutInMilli) throws AdminException {
+ public List<List> executeQuery(final String vdbName, final int version, final
String command, final long timoutInMilli) throws AdminException {
Properties properties = new Properties();
properties.setProperty(TeiidURL.JDBC.VDB_NAME, vdbName);
properties.setProperty(TeiidURL.JDBC.VDB_VERSION, String.valueOf(version));
@@ -530,7 +535,14 @@
request.setExecutionId(0L);
request.setRowLimit(getMaxRowsFetchSize()); // this would limit the number of rows
that are returned.
Future<ResultsMessage> message = dqpCore.executeRequest(requestID, request);
- ResultsMessage rm = message.get(timoutInMilli, TimeUnit.MILLISECONDS);
+
+ ResultsMessage rm = null;
+ if (timoutInMilli < 0) {
+ rm = message.get();
+ }
+ else {
+ rm = message.get(timoutInMilli, TimeUnit.MILLISECONDS);
+ }
if (rm.getException() != null) {
throw new AdminProcessingException(rm.getException());
@@ -607,4 +619,39 @@
}
return newResults;
}
+
+ private CacheListener<TempTableDataManager.MatTableKey,
TempTableDataManager.MatTableEntry> getMatTableListener() {
+ return new CacheListener<TempTableDataManager.MatTableKey,
TempTableDataManager.MatTableEntry>() {
+
+ @Override
+ public void cacheChanged() {
+ }
+
+ @Override
+ public void cacheCreated(MatTableKey key, MatTableEntry value) {
+ refreshMatView(key, value);
+ }
+
+ @Override
+ public void cacheModified(MatTableKey key, MatTableEntry value) {
+ refreshMatView(key, value);
+ }
+
+ private void refreshMatView(MatTableKey key, MatTableEntry value) {
+ if (value != null) {
+ try {
+ if (value.allowsUpdate()) {
+ executeQuery(key.getVDBName(), key.getVDBVersion(), "execute
SYSADMIN.refreshmatview(viewname=>'"+value.getViewName()+"',invalidate=>false)",
-1); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ } catch (AdminException e) {
+ LogManager.logWarning(LogConstants.CTX_RUNTIME, e,
IntegrationPlugin.Util.getString("error_refresh", value.getViewName() ));
//$NON-NLS-1$
+ }
+ }
+ }
+ @Override
+ public void cacheRemoved(MatTableKey key, MatTableEntry value) {
+
+ }
+ };
+ }
}
Modified:
branches/7.1.1.CP3/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
===================================================================
---
branches/7.1.1.CP3/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties 2011-07-23
11:41:47 UTC (rev 3331)
+++
branches/7.1.1.CP3/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties 2011-07-25
15:46:41 UTC (rev 3332)
@@ -46,3 +46,4 @@
distribute_failed=Deploy of the archive failed {0}
template_not_found=Template not found for {0}
admin_executing=JOPR admin {0} is executing command {1}
+error_refresh=error occurred during refreshing the materialized view entries for view {0}
\ No newline at end of file