Author: shawkins
Date: 2011-04-12 15:35:57 -0400 (Tue, 12 Apr 2011)
New Revision: 3083
Added:
trunk/cache-jbosscache/src/main/java/org/teiid/events/
trunk/cache-jbosscache/src/main/java/org/teiid/events/jboss/
trunk/cache-jbosscache/src/main/java/org/teiid/events/jboss/JGroupsEventDistributor.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java
trunk/engine/src/main/java/org/teiid/events/
trunk/engine/src/main/java/org/teiid/events/EventDistributor.java
Modified:
trunk/api/src/main/java/org/teiid/metadata/Table.java
trunk/build/kits/jboss-container/deploy/teiid/teiid-cache-manager-jboss-beans-rename-me.xml
trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
trunk/console/src/main/resources/META-INF/rhq-plugin.xml
trunk/engine/src/main/java/org/teiid/cache/Cachable.java
trunk/engine/src/main/java/org/teiid/cache/CacheConfiguration.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedPlan.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/QueryProcessorFactoryImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
trunk/engine/src/main/java/org/teiid/query/metadata/TempMetadataID.java
trunk/engine/src/main/java/org/teiid/query/metadata/TempMetadataStore.java
trunk/engine/src/main/java/org/teiid/query/optimizer/QueryOptimizer.java
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleMergeCriteria.java
trunk/engine/src/main/java/org/teiid/query/processor/BatchedUpdatePlan.java
trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java
trunk/engine/src/main/java/org/teiid/query/processor/proc/CreateCursorResultSetInstruction.java
trunk/engine/src/main/java/org/teiid/query/processor/proc/ForEachRowPlan.java
trunk/engine/src/main/java/org/teiid/query/processor/proc/IfInstruction.java
trunk/engine/src/main/java/org/teiid/query/processor/proc/LoopInstruction.java
trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java
trunk/engine/src/main/java/org/teiid/query/processor/proc/Program.java
trunk/engine/src/main/java/org/teiid/query/processor/proc/ProgramInstruction.java
trunk/engine/src/main/java/org/teiid/query/processor/proc/WhileInstruction.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java
trunk/engine/src/main/java/org/teiid/query/processor/xml/XMLPlan.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
trunk/engine/src/main/resources/org/teiid/query/i18n.properties
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedPlanCache.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedStatement.java
trunk/engine/src/test/java/org/teiid/query/optimizer/relational/TestMaterialization.java
trunk/engine/src/test/java/org/teiid/query/processor/FakeProcessorPlan.java
trunk/engine/src/test/java/org/teiid/query/unittest/FakeMetadataFactory.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
trunk/pom.xml
Log:
TEIID-1507 initial support for cache invalidation based upon events
Modified: trunk/api/src/main/java/org/teiid/metadata/Table.java
===================================================================
--- trunk/api/src/main/java/org/teiid/metadata/Table.java 2011-04-11 15:36:29 UTC (rev
3082)
+++ trunk/api/src/main/java/org/teiid/metadata/Table.java 2011-04-12 19:35:57 UTC (rev
3083)
@@ -66,6 +66,9 @@
private List<String> schemaPaths;
private String resourcePath;
+ private transient long lastModified;
+ private transient long lastDataModification;
+
public List<String> getBindings() {
return bindings;
}
@@ -270,4 +273,20 @@
column.setParent(this);
}
+ public long getLastDataModification() {
+ return lastDataModification;
+ }
+
+ public long getLastModified() {
+ return lastModified;
+ }
+
+ public void setLastDataModification(long lastDataModification) {
+ this.lastDataModification = lastDataModification;
+ }
+
+ public void setLastModified(long lastModified) {
+ this.lastModified = lastModified;
+ }
+
}
\ No newline at end of file
Modified:
trunk/build/kits/jboss-container/deploy/teiid/teiid-cache-manager-jboss-beans-rename-me.xml
===================================================================
---
trunk/build/kits/jboss-container/deploy/teiid/teiid-cache-manager-jboss-beans-rename-me.xml 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/build/kits/jboss-container/deploy/teiid/teiid-cache-manager-jboss-beans-rename-me.xml 2011-04-12
19:35:57 UTC (rev 3083)
@@ -137,5 +137,15 @@
</map>
</property>
</bean>
+
+ <bean name="EventDistributorFactory"
class="org.teiid.events.jboss.JGroupsEventDistributor">
+ <property name="jndiName">teiid/event-distributor</property>
+ <property name="channelFactory">
+ <inject bean="JChannelFactory" />
+ </property>
+ <property
name="clusterName">${jboss.partition.name:DefaultPartition}-teiid-events</property>
+ <property
name="multiplexerStack">${jboss.default.jgroups.stack:udp}</property>
+ <property
name="localEventDistributorName">teiid/runtime-engine</property>
+ </bean>
</deployment>
\ No newline at end of file
Modified: trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2011-04-11
15:36:29 UTC (rev 3082)
+++ trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2011-04-12
19:35:57 UTC (rev 3083)
@@ -57,15 +57,31 @@
<bean name="ResultsetCacheConfig"
class="org.teiid.cache.CacheConfiguration">
<property name="name">ResultSetCacheConfig</property>
<property name="enabled">true</property>
- <!-- Max Entries allowed for ResultSet Cache (default 1024) -->
+ <!-- Max Entries allowed (default 1024) -->
<property name="maxEntries">1024</property>
<!-- Max age in seconds (default 7200 - 2 hours) -->
<property name="maxAgeInSeconds">7200</property>
+ <!-- Max staleness in seconds. Modifications are based upon data updates
+ -1 indicates no max. (default 60 - 1 minute) -->
+ <property name="maxStaleness">60</property>
<!-- Allowed values are LRU, EXPIRATION.
Setting this value to LRU will cause cache hint TTL values
to be ignored. (default EXPIRATION) -->
<property name="type">EXPIRATION</property>
<property name="location">resultset</property>
+ </bean>
+
+ <!-- Configuration for prepared plan caching. (local memory only)
+ -->
+ <bean name="PreparedPlanCacheConfig"
class="org.teiid.cache.CacheConfiguration">
+ <property name="name">PreparedPlanCacheConfig</property>
+ <!-- Max Entries allowed (default 512) -->
+ <property name="maxEntries">512</property>
+ <!-- Max age in seconds (default 28800 - 8 hours) -->
+ <property name="maxAgeInSeconds">28800</property>
+ <!-- Max staleness in seconds. Modifications are based upon costing updates
+ -1 indicates no max. (default 300 - 5 minutes) -->
+ <property name="maxStaleness">300</property>
</bean>
<bean name="RuntimeEngineDeployer"
class="org.teiid.jboss.deployers.RuntimeEngineDeployer">
@@ -84,6 +100,7 @@
<property name="VDBStatusChecker"><inject
bean="VDBStatusChecker"/></property>
<property name="cacheFactory"><inject
bean="CacheFactory"/></property>
<property name="resultsetCacheConfig"><inject
bean="ResultsetCacheConfig"/></property>
+ <property name="preparedPlanCacheConfig"><inject
bean="PreparedPlanCacheConfig"/></property>
<!-- Process pool maximum thread count. (default 64) -->
<property name="maxThreads">64</property>
@@ -100,10 +117,6 @@
<property name="maxRowsFetchSize">20480</property>
<!-- The max lob chunk size in KB transferred each time when processing blobs,
clobs (100KB default) -->
<property name="lobChunkSizeInKB">100</property>
- <!-- The maximum number of query plans that are cached.
- This includes both user plans and internal prepared plans.
- Note: this is a memory based cache. (default 512) -->
- <property name="preparedPlanCacheMaxCount">512</property>
<!-- Turn on role checking based upon the data roles defined in VDBs. (default
true) -->
<property name="useDataRoles">true</property>
<!-- Sets whether temporary table usage is enabled by default (default true)
-->
@@ -116,6 +129,8 @@
<property name="exceptionOnMaxSourceRows">true</property>
<!-- Maximum size of lob allowed through ODBC connection in bytes (default
5MB) -->
<property name="maxODBCLobSizeAllowed">5242880</property>
+ <!-- The JNDI name of the Teiid Event Distributor -->
+ <property
name="eventDistributorName">teiid/event-distributor</property>
</bean>
<!-- JDBC Socket connection properties (SSL see below) -->
Added:
trunk/cache-jbosscache/src/main/java/org/teiid/events/jboss/JGroupsEventDistributor.java
===================================================================
---
trunk/cache-jbosscache/src/main/java/org/teiid/events/jboss/JGroupsEventDistributor.java
(rev 0)
+++
trunk/cache-jbosscache/src/main/java/org/teiid/events/jboss/JGroupsEventDistributor.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -0,0 +1,173 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+ package org.teiid.events.jboss;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Vector;
+
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.jboss.util.naming.Util;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.JChannelFactory;
+import org.jgroups.ReceiverAdapter;
+import org.jgroups.View;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.MethodCall;
+import org.jgroups.blocks.RpcDispatcher;
+import org.teiid.events.EventDistributor;
+
+public class JGroupsEventDistributor extends ReceiverAdapter implements Serializable {
+
+ private static final long serialVersionUID = -1140683411842561358L;
+
+ private transient JChannelFactory channelFactory;
+ private String multiplexerStack;
+ private String clusterName;
+ private String jndiName;
+ private String localEventDistributorName;
+
+ private transient EventDistributor proxyEventDistributor;
+ private transient EventDistributor localEventDistributor;
+
+ private transient Channel channel;
+ private transient RpcDispatcher rpcDispatcher;
+ private transient Vector<Address> members;
+
+ public JChannelFactory getChannelFactory() {
+ return channelFactory;
+ }
+
+ public void setJndiName(String jndiName) {
+ this.jndiName = jndiName;
+ }
+
+ public String getJndiName() {
+ return jndiName;
+ }
+
+ public String getLocalEventDistributorName() {
+ return localEventDistributorName;
+ }
+
+ public void setLocalEventDistributorName(String localEventDistributorName) {
+ this.localEventDistributorName = localEventDistributorName;
+ }
+
+ public String getMultiplexerStack() {
+ return multiplexerStack;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setChannelFactory(JChannelFactory channelFactory) {
+ this.channelFactory = channelFactory;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public void setMultiplexerStack(String multiplexerStack) {
+ this.multiplexerStack = multiplexerStack;
+ }
+
+ public void start() throws Exception {
+ if (this.channelFactory == null) {
+ return; //no need to distribute events
+ }
+ channel = this.channelFactory.createMultiplexerChannel(this.multiplexerStack, null);
+ channel.connect(this.clusterName);
+
+ proxyEventDistributor = (EventDistributor)
Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]
{EventDistributor.class}, new InvocationHandler() {
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ rpcDispatcher.callRemoteMethods(members, new MethodCall(method, args),
GroupRequest.GET_NONE, 0);
+ return null;
+ }
+ });
+ //wrap the local in a proxy to prevent unintended methods from being called
+ rpcDispatcher = new RpcDispatcher(channel, this, this,
Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]
{EventDistributor.class}, new InvocationHandler() {
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ EventDistributor local = getLocalEventDistributor();
+ if (local == null) {
+ return null;
+ }
+ return method.invoke(local, args);
+ }
+ }));
+ rpcDispatcher.setDeadlockDetection(false);
+ if (jndiName != null) {
+ final InitialContext ic = new InitialContext();
+ Util.bind(ic, jndiName, proxyEventDistributor);
+ }
+ }
+
+ private EventDistributor getLocalEventDistributor() {
+ if (localEventDistributor == null && this.localEventDistributorName != null) {
+ try {
+ Context ctx = new InitialContext();
+ return (EventDistributor) ctx.lookup(this.localEventDistributorName);
+ } catch (NamingException e) {
+ return null;
+ }
+ }
+ return localEventDistributor;
+ }
+
+ @Override
+ public void viewAccepted(View newView) {
+ Vector<Address> new_members = new Vector<Address>(newView.getMembers());
+ new_members.remove(this.channel.getLocalAddress());
+ this.members = new_members;
+ }
+
+ public void stop() {
+ if (jndiName != null) {
+ final InitialContext ic ;
+ try {
+ ic = new InitialContext() ;
+ Util.unbind(ic, jndiName) ;
+ } catch (final NamingException ne) {
+ }
+ }
+ if (this.channel != null) {
+ this.channel.close();
+ this.rpcDispatcher.stop();
+ }
+ }
+
+}
Property changes on:
trunk/cache-jbosscache/src/main/java/org/teiid/events/jboss/JGroupsEventDistributor.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/console/src/main/resources/META-INF/rhq-plugin.xml
===================================================================
--- trunk/console/src/main/resources/META-INF/rhq-plugin.xml 2011-04-11 15:36:29 UTC (rev
3082)
+++ trunk/console/src/main/resources/META-INF/rhq-plugin.xml 2011-04-12 19:35:57 UTC (rev
3083)
@@ -323,10 +323,6 @@
displayName="Lob Chunk Size In KB"
description="The max lob chunk size in KB transferred to the client for xml,
blobs, clobs (default 100KB)"
required="false" readOnly="false" />
- <c:simple-property
name="RuntimeEngineDeployer.preparedPlanCacheMaxCount"
- displayName="Prepared Plan Cache Max Count"
- description="The maximum number of query plans that are cached. Note: this is a
memory based cache. (default 512)"
- required="false" readOnly="false" />
<c:simple-property name="RuntimeEngineDeployer.queryThresholdInSecs"
displayName="Long Running Query Threshold"
description="Length of time in seconds before a query is considered long
running"
@@ -350,9 +346,29 @@
<c:simple-property
name="ResultSetCacheConfig.maxAgeInSeconds"
displayName="Max Entry Age"
description="The maximum age of a result set cache entry in
seconds. -1 indicates no max. (default 7200)"
+ required="false" readOnly="false" />
+ <c:simple-property name="ResultSetCacheConfig.maxStaleness"
+ displayName="Max Entry Staleness"
+ description="The maximum staleness of a result set cache entry
in seconds based upon data modifications. -1 indicates no max. (default 60)"
required="false" readOnly="false" />
</c:group>
+ <c:group name="PreparedPlanCacheConfig"
displayName="Prepared Plan Cache Properties"
hiddenByDefault="false">
+ <!-- the below property on RuntimeEngineDeployer -->
+ <c:simple-property
name="PreparedPlanCacheConfig.maxEntries"
+ displayName="Max Entries"
+ description="The maximum number of prepared plan cache entries.
-1 indicates no limit. (default 512"
+ required="false" readOnly="false" />
+ <c:simple-property
name="PreparedPlanCacheConfig.maxAgeInSeconds"
+ displayName="Max Entry Age"
+ description="The maximum age of a prepared plan cache entry in
seconds. -1 indicates no max. (default 28800)"
+ required="false" readOnly="false" />
+ <c:simple-property
name="PreparedPlanCacheConfig.maxStaleness"
+ displayName="Max Entry Staleness"
+ description="The maximum staleness of a prepared plan cache
entry in seconds based upon costing modifications. -1 indicates no max. (default
300)"
+ required="false" readOnly="false" />
+ </c:group>
+
<c:group name="BufferService" displayName="Buffer Service
Properties"
hiddenByDefault="false">
<c:simple-property name="BufferService.maxBufferSpace"
Modified: trunk/engine/src/main/java/org/teiid/cache/Cachable.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/cache/Cachable.java 2011-04-11 15:36:29 UTC (rev
3082)
+++ trunk/engine/src/main/java/org/teiid/cache/Cachable.java 2011-04-12 19:35:57 UTC (rev
3083)
@@ -22,10 +22,13 @@
package org.teiid.cache;
import org.teiid.common.buffer.BufferManager;
+import org.teiid.dqp.internal.process.AccessInfo;
public interface Cachable {
-
+
boolean prepare(Cache cache, BufferManager bufferManager);
boolean restore(Cache cache, BufferManager bufferManager);
+
+ AccessInfo getAccessInfo();
}
Modified: trunk/engine/src/main/java/org/teiid/cache/CacheConfiguration.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/cache/CacheConfiguration.java 2011-04-11 15:36:29
UTC (rev 3082)
+++ trunk/engine/src/main/java/org/teiid/cache/CacheConfiguration.java 2011-04-12 19:35:57
UTC (rev 3083)
@@ -27,6 +27,7 @@
import org.jboss.managed.api.annotation.ManagementObjectID;
import org.jboss.managed.api.annotation.ManagementProperties;
import org.jboss.managed.api.annotation.ManagementProperty;
+import org.teiid.dqp.internal.process.SessionAwareCache;
@ManagementObject(componentType=@ManagementComponent(type="teiid",subtype="dqp"),
properties=ManagementProperties.EXPLICIT)
public class CacheConfiguration {
@@ -37,12 +38,14 @@
}
private Policy policy;
- private int maxage;
- private int maxEntries;
+ private int maxage = -1;
+ private int maxEntries = SessionAwareCache.DEFAULT_MAX_SIZE_TOTAL;
private boolean enabled = true;
private String name;
private String location;
-
+
+ private int maxStaleness = -1;
+
public CacheConfiguration() {
}
@@ -57,7 +60,7 @@
return this.policy;
}
- @ManagementProperty(description="The maximum age of a result set cache entry in
seconds. -1 indicates no max. (default 7200)")
+ @ManagementProperty(description="The maximum age of an entry in seconds. -1
indicates no max.")
public int getMaxAgeInSeconds(){
return maxage;
}
@@ -66,7 +69,16 @@
this.maxage = maxage;
}
- @ManagementProperty(description="The maximum number of result set cache entries. -1
indicates no limit. (default 1024)")
+ @ManagementProperty(description="The maximum staleness in seconds of an entry based
upon modifications. -1 indicates no max.")
+ public int getMaxStaleness() {
+ return maxStaleness;
+ }
+
+ public void setMaxStaleness(int maxStaleDataModification) {
+ this.maxStaleness = maxStaleDataModification;
+ }
+
+ @ManagementProperty(description="The maximum number of cache entries. -1 indicates
no limit. (default 1024)")
public int getMaxEntries() {
return this.maxEntries;
}
@@ -98,37 +110,6 @@
this.location = location;
}
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + maxage;
- result = prime * result + maxEntries;
- result = prime * result + ((policy == null) ? 0 : policy.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- CacheConfiguration other = (CacheConfiguration) obj;
- if (maxage != other.maxage)
- return false;
- if (maxEntries != other.maxEntries)
- return false;
- if (policy == null) {
- if (other.policy != null)
- return false;
- } else if (!policy.equals(other.policy))
- return false;
- return true;
- }
-
public boolean isEnabled() {
return enabled;
}
Added: trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java
(rev 0)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -0,0 +1,182 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package org.teiid.dqp.internal.process;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.teiid.adminapi.impl.VDBMetaData;
+import org.teiid.api.exception.query.QueryResolverException;
+import org.teiid.api.exception.query.QueryValidatorException;
+import org.teiid.core.TeiidComponentException;
+import org.teiid.metadata.Table;
+import org.teiid.query.metadata.TempMetadataID;
+import org.teiid.query.metadata.TransformationMetadata;
+import org.teiid.query.optimizer.relational.RelationalPlanner;
+import org.teiid.query.processor.ProcessorPlan;
+import org.teiid.query.sql.symbol.GroupSymbol;
+import org.teiid.query.tempdata.TempTableStore;
+import org.teiid.query.util.CommandContext;
+
+/**
+ * Tracks what views were used and what tables are accessed
+ */
+public class AccessInfo implements Serializable {
+
+ private static final long serialVersionUID = -2608267960584191359L;
+
+ private transient Set<Table> viewsAccessed;
+ private transient Set<Object> tablesAccessed;
+
+ private List<List<String>> externalTableNames;
+ private List<List<String>> externalViewNames;
+
+ private transient long creationTime = System.currentTimeMillis();
+
+ private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+ externalTableNames = initExternalList(externalTableNames, tablesAccessed);
+ externalViewNames = initExternalList(externalViewNames, viewsAccessed);
+ out.defaultWriteObject();
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ in.defaultReadObject();
+ this.creationTime = System.currentTimeMillis();
+ }
+
+ private List<List<String>> initExternalList(List<List<String>>
externalNames, Set<? extends Object> accessed) {
+ if (externalNames == null) {
+ externalNames = new ArrayList<List<String>>(accessed.size());
+ for (Object object : accessed) {
+ if (object instanceof Table) {
+ Table t = (Table)object;
+ externalNames.add(Arrays.asList(t.getParent().getName(), t.getName()));
+ } else if (object instanceof TempMetadataID) {
+ TempMetadataID t = (TempMetadataID)object;
+ externalNames.add(Arrays.asList(t.getID()));
+ }
+ }
+ }
+ return externalNames;
+ }
+
+ public Set<Table> getViewsAccessed() {
+ return viewsAccessed;
+ }
+
+ public Set<Object> getTablesAccessed() {
+ return tablesAccessed;
+ }
+
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ void populate(ProcessorPlan plan, CommandContext context) {
+ List<GroupSymbol> groups = new ArrayList<GroupSymbol>();
+ plan.getAccessedGroups(groups);
+ if (!groups.isEmpty()) {
+ tablesAccessed = new HashSet<Object>();
+ for (GroupSymbol groupSymbol : groups) {
+ tablesAccessed.add(groupSymbol.getMetadataID());
+ }
+ } else {
+ tablesAccessed = Collections.emptySet();
+ }
+ if (!context.getViewsAccessed().isEmpty()) {
+ this.viewsAccessed = new HashSet<Table>(context.getViewsAccessed());
+ } else {
+ this.viewsAccessed = Collections.emptySet();
+ }
+ }
+
+ void restore() throws QueryResolverException, QueryValidatorException,
TeiidComponentException {
+ if (this.viewsAccessed != null) {
+ return;
+ }
+ VDBMetaData vdb = DQPWorkContext.getWorkContext().getVDB();
+ TransformationMetadata tm = vdb.getAttachment(TransformationMetadata.class);
+ TempTableStore globalStore = vdb.getAttachment(TempTableStore.class);
+ if (!externalViewNames.isEmpty()) {
+ this.viewsAccessed = new HashSet<Table>();
+ for (List<String> key : this.externalViewNames) {
+ this.viewsAccessed.add(tm.getMetadataStore().getSchema(key.get(0).toUpperCase()).getTables().get(key.get(1).toUpperCase()));
+ }
+ } else {
+ this.viewsAccessed = Collections.emptySet();
+ }
+ this.externalViewNames = null;
+ if (!externalTableNames.isEmpty()) {
+ for (List<String> key : this.externalTableNames) {
+ if (key.size() == 1) {
+ String matTableName = key.get(0);
+ TempMetadataID id = globalStore.getMetadataStore().getTempGroupID(matTableName);
+ if (id == null) {
+ //if the id is null, then create a local instance
+ String viewFullName =
matTableName.substring(RelationalPlanner.MAT_PREFIX.length());
+ id = globalStore.getGlobalTempTableMetadataId(tm.getGroupID(viewFullName), tm);
+ }
+ this.tablesAccessed.add(id);
+ } else {
+ this.tablesAccessed.add(tm.getMetadataStore().getSchema(key.get(0).toUpperCase()).getTables().get(key.get(1).toUpperCase()));
+ }
+ }
+ } else {
+ this.tablesAccessed = Collections.emptySet();
+ }
+ this.externalTableNames = null;
+ }
+
+ boolean validate(boolean data, long modTime) {
+ if (this.tablesAccessed == null || modTime < 0) {
+ return true;
+ }
+ if (!data) {
+ for (Table t : getViewsAccessed()) {
+ if (t.getLastModified() - modTime > this.creationTime) {
+ return false;
+ }
+ }
+ }
+ for (Object o : getTablesAccessed()) {
+ if (o instanceof Table) {
+ Table t = (Table)o;
+ if ((data?t.getLastDataModification():t.getLastModified()) - modTime >
this.creationTime) {
+ return false;
+ }
+ } else if (o instanceof TempMetadataID) {
+ TempMetadataID tid = (TempMetadataID)o;
+ if (tid.getTableData().getLastModified() - modTime > this.creationTime) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -35,6 +35,7 @@
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidException;
import org.teiid.core.types.DataTypeManager;
import org.teiid.core.util.Assertion;
import org.teiid.logging.LogConstants;
@@ -44,6 +45,7 @@
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.parser.ParseInfo;
import org.teiid.query.parser.QueryParser;
+import org.teiid.query.processor.ProcessorPlan;
import org.teiid.query.resolver.QueryResolver;
import org.teiid.query.sql.lang.CacheHint;
import org.teiid.query.sql.lang.Command;
@@ -65,6 +67,8 @@
private int rowCount;
private boolean hasLobs;
+ private AccessInfo accessInfo = new AccessInfo();
+
public String getId() {
return this.uuid;
}
@@ -81,13 +85,14 @@
return results;
}
- public void setResults(TupleBuffer results) {
+ public void setResults(TupleBuffer results, ProcessorPlan plan) {
this.results = results;
this.batchSize = results.getBatchSize();
this.types = TupleBuffer.getTypeNames(results.getSchema());
this.rowCount = results.getRowCount();
this.uuid = results.getId();
this.hasLobs = results.isLobs();
+ this.accessInfo.populate(plan, plan.getContext());
}
public void setCommand(Command command) {
@@ -146,10 +151,17 @@
this.results = buffer;
bufferManager.addTupleBuffer(this.results);
}
+ this.accessInfo.restore();
return true;
- } catch (TeiidComponentException e) {
- LogManager.logDetail(LogConstants.CTX_DQP,
QueryPlugin.Util.getString("not_found_cache")); //$NON-NLS-1$
+ } catch (TeiidException e) {
+ LogManager.logDetail(LogConstants.CTX_DQP, e,
QueryPlugin.Util.getString("not_found_cache")); //$NON-NLS-1$
}
return false;
}
+
+ @Override
+ public AccessInfo getAccessInfo() {
+ return accessInfo;
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -45,7 +45,6 @@
private int timeSliceInMilli = DEFAULT_PROCESSOR_TIMESLICE;
private int maxRowsFetchSize = DEFAULT_FETCH_SIZE;
private int lobChunkSizeInKB = 100;
- private int preparedPlanCacheMaxCount = SessionAwareCache.DEFAULT_MAX_SIZE_TOTAL;
private boolean useDataRoles = true;
private boolean allowCreateTemporaryTablesByDefault = true;
private int queryThresholdInSecs = DEFAULT_QUERY_THRESHOLD;
@@ -53,11 +52,12 @@
private int maxSourceRows = -1;
private int maxActivePlans = DEFAULT_MAX_ACTIVE_PLANS;
private CacheConfiguration resultsetCacheConfig;
+ private CacheConfiguration preparedPlanCacheConfig = new CacheConfiguration();
private int maxODBCLobSizeAllowed = 5*1024*1024; // 5 MB
private int userRequestSourceConcurrency = DEFAULT_USER_REQUEST_SOURCE_CONCURRENCY;
- private AuthorizationValidator authorizationValidator;
- private MetadataProvider metadataProvider;
+ private transient AuthorizationValidator authorizationValidator;
+ private transient MetadataProvider metadataProvider;
@ManagementProperty(description="Max active plans (default 20). Increase this
value, and max threads, on highly concurrent systems - but ensure that the underlying
pools can handle the increased load without timeouts.")
public int getMaxActivePlans() {
@@ -116,15 +116,6 @@
this.lobChunkSizeInKB = lobChunkSizeInKB;
}
- @ManagementProperty(description="The maximum number of query plans that are cached.
Note: this is a memory based cache. (default 512)")
- public int getPreparedPlanCacheMaxCount() {
- return this.preparedPlanCacheMaxCount;
- }
-
- public void setPreparedPlanCacheMaxCount(int preparedPlanCacheMaxCount) {
- this.preparedPlanCacheMaxCount = preparedPlanCacheMaxCount;
- }
-
public CacheConfiguration getResultsetCacheConfig() {
return this.resultsetCacheConfig;
}
@@ -230,4 +221,14 @@
public void setMetadataProvider(MetadataProvider metadataProvider) {
this.metadataProvider = metadataProvider;
}
+
+ public CacheConfiguration getPreparedPlanCacheConfig() {
+ return preparedPlanCacheConfig;
+ }
+
+ public void setPreparedPlanCacheConfig(
+ CacheConfiguration preparedPlanCacheConfig) {
+ this.preparedPlanCacheConfig = preparedPlanCacheConfig;
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-04-11
15:36:29 UTC (rev 3082)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -73,7 +73,6 @@
import org.teiid.logging.MessageLevel;
import org.teiid.logging.CommandLogMessage.Event;
import org.teiid.query.QueryPlugin;
-import org.teiid.query.processor.ProcessorDataManager;
import org.teiid.query.tempdata.TempTableDataManager;
import org.teiid.query.tempdata.TempTableStore;
@@ -171,7 +170,7 @@
// Resources
private BufferManager bufferManager;
- private ProcessorDataManager dataTierMgr;
+ private TempTableDataManager dataTierMgr;
private SessionAwareCache<PreparedPlan> prepPlanCache;
private SessionAwareCache<CachedResults> rsCache;
private TransactionService transactionService;
@@ -643,7 +642,7 @@
LogManager.log(MessageLevel.DETAIL, LogConstants.CTX_COMMANDLOGGING, message);
}
- ProcessorDataManager getDataTierManager() {
+ public TempTableDataManager getDataTierManager() {
return this.dataTierMgr;
}
@@ -686,7 +685,8 @@
}
//prepared plan cache
- prepPlanCache = new SessionAwareCache<PreparedPlan>(this.cacheFactory,
SessionAwareCache.Type.PREPAREDPLAN, new CacheConfiguration(Policy.LRU, 60*60*8,
config.getPreparedPlanCacheMaxCount(), "PreparedCache")); //$NON-NLS-1$
+ CacheConfiguration ppCacheConfig = config.getPreparedPlanCacheConfig();
+ prepPlanCache = new SessionAwareCache<PreparedPlan>(this.cacheFactory,
SessionAwareCache.Type.PREPAREDPLAN, ppCacheConfig);
prepPlanCache.setBufferManager(this.bufferManager);
this.processWorkerPool = new
ThreadReuseExecutor(DQPConfiguration.PROCESS_PLAN_QUEUE_NAME, config.getMaxThreads());
@@ -898,4 +898,8 @@
return maxActivePlans;
}
+ SessionAwareCache<PreparedPlan> getPrepPlanCache() {
+ return prepPlanCache;
+ }
+
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedPlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedPlan.java 2011-04-11
15:36:29 UTC (rev 3082)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedPlan.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -24,18 +24,24 @@
import java.util.List;
+import org.teiid.cache.Cachable;
+import org.teiid.cache.Cache;
+import org.teiid.common.buffer.BufferManager;
import org.teiid.query.analysis.AnalysisRecord;
import org.teiid.query.processor.ProcessorPlan;
import org.teiid.query.sql.lang.Command;
import org.teiid.query.sql.symbol.Reference;
+import org.teiid.query.util.CommandContext;
-public class PreparedPlan{
+public class PreparedPlan implements Cachable {
private ProcessorPlan plan;
private Command command;
private List<Reference> refs;
private AnalysisRecord analysisRecord;
+ private AccessInfo accessInfo = new AccessInfo();
+
/**
* Return the ProcessorPlan.
*/
@@ -66,9 +72,11 @@
/**
* Set the ProcessorPlan.
+ * @param context
*/
- public void setPlan(ProcessorPlan planValue){
+ public void setPlan(ProcessorPlan planValue, CommandContext context){
plan = planValue;
+ this.accessInfo.populate(planValue, context);
}
/**
@@ -92,4 +100,19 @@
refs = refsValue;
}
+ @Override
+ public AccessInfo getAccessInfo() {
+ return accessInfo;
+ }
+
+ @Override
+ public boolean prepare(Cache cache, BufferManager bufferManager) {
+ return true; //no remotable actions
+ }
+
+ @Override
+ public boolean restore(Cache cache, BufferManager bufferManager) {
+ return true; //no remotable actions
+ }
+
}
\ No newline at end of file
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -139,7 +139,7 @@
if (!this.addedLimit) { //TODO: this is a little problematic
prepPlan.setCommand(this.userCommand);
// Defect 13751: Clone the plan in its current state (i.e. before processing)
so that it can be used for later queries
- prepPlan.setPlan(processPlan.clone());
+ prepPlan.setPlan(processPlan.clone(), this.context);
prepPlan.setAnalysisRecord(analysisRecord);
Determinism determinismLevel = this.context.getDeterminismLevel();
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/QueryProcessorFactoryImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/QueryProcessorFactoryImpl.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/QueryProcessorFactoryImpl.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -88,7 +88,7 @@
AnalysisRecord record = new AnalysisRecord(false, false);
ProcessorPlan plan = QueryOptimizer.optimizePlan(newCommand, metadata,
idGenerator, finder, record, copy);
pp = new PreparedPlan();
- pp.setPlan(plan);
+ pp.setPlan(plan, copy);
pp.setReferences(references);
pp.setAnalysisRecord(record);
pp.setCommand(newCommand);
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2011-04-11
15:36:29 UTC (rev 3082)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -24,7 +24,6 @@
import java.sql.Connection;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -110,7 +109,7 @@
private final class ViewDefinitionMetadataWrapper extends
BasicQueryMetadataWrapper {
- private Map<List<String>, QueryNode> qnodes = new
HashMap<List<String>, QueryNode>();
+ private Map<Object, QueryNode> qnodes = new HashMap<Object, QueryNode>();
private ViewDefinitionMetadataWrapper(
QueryMetadataInterface actualMetadata) {
@@ -125,9 +124,8 @@
String schema = getName(getModelID(groupID));
String viewName = getName(groupID);
- List<String> key = Arrays.asList(schema, viewName);
- QueryNode cached = qnodes.get(key);
+ QueryNode cached = qnodes.get(groupID);
if (cached != null) {
return cached;
}
@@ -135,14 +133,14 @@
//TODO: could just consider moving up when the context is created
throw new AssertionError("Should not attempt to resolve a view before the
context has been set."); //$NON-NLS-1$
}
- ViewDefinition vd = metadataProvider.getViewDefinition(getName(getModelID(groupID)),
getName(groupID), context);
+ ViewDefinition vd = metadataProvider.getViewDefinition(schema, viewName, context);
if (vd != null) {
result = new QueryNode(DataTypeManager.getCanonicalString(vd.getSql()));
if (vd.getScope() == MetadataProvider.Scope.USER) {
result.setUser(context.getUserName());
}
}
- qnodes.put(key, result);
+ qnodes.put(groupID, result);
return result;
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -480,7 +480,7 @@
CachedResults cr = new CachedResults();
cr.setCommand(originalCommand);
cr.setAnalysisRecord(analysisRecord);
- cr.setResults(resultsBuffer);
+ cr.setResults(resultsBuffer, processor.getProcessorPlan());
if (originalCommand.getCacheHint() != null) {
LogManager.logDetail(LogConstants.CTX_DQP, requestID, "Using cache
hint", originalCommand.getCacheHint()); //$NON-NLS-1$
resultsBuffer.setPrefersMemory(originalCommand.getCacheHint().getPrefersMemory());
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -61,6 +61,8 @@
private Cache tupleBatchCache;
private int maxSize = DEFAULT_MAX_SIZE_TOTAL;
+ private long modTime;
+ private Type type;
private AtomicInteger cacheHit = new AtomicInteger();
private AtomicInteger totalRequests = new AtomicInteger();
@@ -95,6 +97,8 @@
this.tupleBatchCache = this.distributedCache;
}
}
+ this.modTime = config.getMaxStaleness()*1000;
+ this.type = type;
}
public T get(CacheID id){
@@ -115,7 +119,7 @@
result = distributedCache.get(id);
}
- if (result != null && result instanceof Cachable) {
+ if (result instanceof Cachable) {
Cachable c = (Cachable)result;
if (!c.restore(this.tupleBatchCache, this.bufferManager)) {
result = null;
@@ -124,6 +128,19 @@
}
if (result != null) {
+ if (result instanceof Cachable) {
+ Cachable c = (Cachable)result;
+ AccessInfo info = c.getAccessInfo();
+ if (info != null && !info.validate(type == Type.RESULTSET, modTime)) {
+ LogManager.logTrace(LogConstants.CTX_DQP, "Invalidating cache entry", id);
//$NON-NLS-1$
+ if (id.getSessionId() == null) {
+ this.distributedCache.remove(id);
+ } else {
+ this.localCache.remove(id);
+ }
+ return null;
+ }
+ }
LogManager.logTrace(LogConstants.CTX_DQP, "Cache hit for", id);
//$NON-NLS-1$
cacheHit.getAndIncrement();
} else {
@@ -307,4 +324,8 @@
public void setBufferManager(BufferManager bufferManager) {
this.bufferManager = bufferManager;
}
+
+ public void setModTime(long modTime) {
+ this.modTime = modTime;
+ }
}
Added: trunk/engine/src/main/java/org/teiid/events/EventDistributor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/events/EventDistributor.java
(rev 0)
+++ trunk/engine/src/main/java/org/teiid/events/EventDistributor.java 2011-04-12 19:35:57
UTC (rev 3083)
@@ -0,0 +1,31 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.events;
+
+import java.util.List;
+
+public interface EventDistributor {
+ void updateMatViewRow(String vdbName, int vdbVersion, String matViewFqn, List<?>
tuple, boolean delete);
+ void schemaModification(String vdbName, int vdbVersion, String fqn);
+ void dataModification(String vdbName, int vdbVersion, String tableFqn);
+}
Property changes on: trunk/engine/src/main/java/org/teiid/events/EventDistributor.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/query/metadata/TempMetadataID.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/metadata/TempMetadataID.java 2011-04-11
15:36:29 UTC (rev 3082)
+++ trunk/engine/src/main/java/org/teiid/query/metadata/TempMetadataID.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -48,7 +48,7 @@
private static final long serialVersionUID = -1879211827339120135L;
private static final int LOCAL_CACHE_SIZE = 8;
- static class TableData {
+ public static class TableData {
Collection<TempMetadataID> accessPatterns;
List<TempMetadataID> elements;
int cardinality = QueryMetadataInterface.UNKNOWN_CARDINALITY;
@@ -58,6 +58,15 @@
CacheHint cacheHint;
List<List<TempMetadataID>> keys;
List<List<TempMetadataID>> indexes;
+ long lastModified;
+
+ public long getLastModified() {
+ return lastModified;
+ }
+
+ public void setLastModified(long lastModified) {
+ this.lastModified = lastModified;
+ }
}
private static TableData DUMMY_DATA = new TableData();
@@ -336,7 +345,7 @@
this.getTableData().keys.add(key);
}
- private TableData getTableData() {
+ public TableData getTableData() {
if (data == null) {
return DUMMY_DATA;
}
@@ -373,5 +382,5 @@
}
return this.name;
}
-
+
}
Modified: trunk/engine/src/main/java/org/teiid/query/metadata/TempMetadataStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/metadata/TempMetadataStore.java 2011-04-11
15:36:29 UTC (rev 3082)
+++ trunk/engine/src/main/java/org/teiid/query/metadata/TempMetadataStore.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -43,7 +43,8 @@
*/
public class TempMetadataStore implements Serializable {
- // UPPER CASE TEMP GROUP NAME --> TempMetadataID for group
+ private static final long serialVersionUID = 4055072385672022478L;
+ // UPPER CASE TEMP GROUP NAME --> TempMetadataID for group
private Map<String, TempMetadataID> tempGroups;
/**
@@ -216,7 +217,7 @@
* @return Metadata ID or null if not found
*/
public List<TempMetadataID> getTempElementElementIDs(String tempGroup) {
- TempMetadataID groupID = tempGroups.get(tempGroup.toUpperCase());
+ TempMetadataID groupID = getTempGroupID(tempGroup);
if(groupID != null) {
return groupID.getElements();
}
Modified: trunk/engine/src/main/java/org/teiid/query/optimizer/QueryOptimizer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/optimizer/QueryOptimizer.java 2011-04-11
15:36:29 UTC (rev 3082)
+++ trunk/engine/src/main/java/org/teiid/query/optimizer/QueryOptimizer.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -36,6 +36,7 @@
import org.teiid.core.id.IDGenerator;
import org.teiid.core.id.IntegerIDFactory;
import org.teiid.dqp.internal.process.PreparedPlan;
+import org.teiid.metadata.Table;
import org.teiid.metadata.FunctionMethod.Determinism;
import org.teiid.query.analysis.AnalysisRecord;
import org.teiid.query.metadata.QueryMetadataInterface;
@@ -116,14 +117,18 @@
PreparedPlan pp = context.getPlan(fullName);
if (pp == null) {
Determinism determinismLevel = context.resetDeterminismLevel();
- ProcessorPlan plan = planProcedure(command, metadata, idGenerator, capFinder,
analysisRecord, context);
+ CommandContext clone = context.clone();
+ ProcessorPlan plan = planProcedure(command, metadata, idGenerator, capFinder,
analysisRecord, clone);
//note that this is not a full prepared plan. It is not usable by user queries.
pp = new PreparedPlan();
- pp.setPlan(plan);
+ pp.setPlan(plan, clone);
context.putPlan(fullName, pp, context.getDeterminismLevel());
context.setDeterminismLevel(determinismLevel);
}
result = pp.getPlan().clone();
+ for (Table t : pp.getAccessInfo().getViewsAccessed()) {
+ context.accessedView(t);
+ }
}
// propagate procedure parameters to the plan to allow runtime type checking
ProcedureContainer container = (ProcedureContainer)cupc.getUserCommand();
Modified:
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -33,8 +33,6 @@
import org.teiid.api.exception.query.QueryMetadataException;
import org.teiid.api.exception.query.QueryPlannerException;
-import org.teiid.api.exception.query.QueryResolverException;
-import org.teiid.api.exception.query.QueryValidatorException;
import org.teiid.client.plan.Annotation;
import org.teiid.client.plan.Annotation.Priority;
import org.teiid.common.buffer.LobManager;
@@ -43,13 +41,13 @@
import org.teiid.core.id.IDGenerator;
import org.teiid.dqp.internal.process.Request;
import org.teiid.language.SQLConstants;
+import org.teiid.metadata.Table;
import org.teiid.query.QueryPlugin;
import org.teiid.query.analysis.AnalysisRecord;
import org.teiid.query.mapping.relational.QueryNode;
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.metadata.TempMetadataAdapter;
import org.teiid.query.metadata.TempMetadataID;
-import org.teiid.query.metadata.TempMetadataStore;
import org.teiid.query.optimizer.QueryOptimizer;
import org.teiid.query.optimizer.TriggerActionPlanner;
import org.teiid.query.optimizer.capabilities.CapabilitiesFinder;
@@ -1089,9 +1087,13 @@
CacheHint hint = null;
boolean isImplicitGlobal = matMetadataId == null;
if (isImplicitGlobal) {
- matTableName = MAT_PREFIX + metadata.getFullName(metadataID);
- matMetadataId = getGlobalTempTableMetadataId(virtualGroup, matTableName,
context, metadata, analysisRecord);
- hint = ((TempMetadataID)matMetadataId).getCacheHint();
+ TempMetadataID tid =
context.getGlobalTableStore().getGlobalTempTableMetadataId(metadataID, metadata);
+ matTableName = tid.getID();
+ hint = tid.getCacheHint();
+ if (hint != null) {
+ recordAnnotation(analysisRecord, Annotation.MATERIALIZED_VIEW, Priority.LOW,
"SimpleQueryResolver.cache_hint_used", virtualGroup, matTableName, tid);
//$NON-NLS-1$
+ }
+ matMetadataId = tid;
} else {
matTableName = metadata.getFullName(matMetadataId);
}
@@ -1111,7 +1113,10 @@
}
} else {
// Not a materialized view - query the primary transformation
- qnode = metadata.getVirtualPlan(metadataID);
+ qnode = metadata.getVirtualPlan(metadataID);
+ if (metadataID instanceof Table) {
+ this.context.accessedView((Table)metadataID);
+ }
}
Command result = (Command)QueryResolver.resolveView(virtualGroup, qnode,
cacheString, metadata).getCommand().clone();
@@ -1128,59 +1133,6 @@
return query;
}
- public static Object getGlobalTempTableMetadataId(GroupSymbol table, String
matTableName, CommandContext context, QueryMetadataInterface metadata, AnalysisRecord
analysisRecord)
- throws QueryMetadataException, TeiidComponentException, QueryResolverException,
QueryValidatorException {
- TempMetadataStore store = context.getGlobalTableStore().getMetadataStore();
- TempMetadataID id = store.getTempGroupID(matTableName);
- //define the table preserving the primary key
- if (id == null) {
- synchronized (table.getMetadataID()) {
- id = store.getTempGroupID(matTableName);
- if (id == null) {
- //this is really just temporary and will be replaced by the real table
- id = store.addTempGroup(matTableName, ResolverUtil.resolveElementsInGroup(table,
metadata), false, true);
- id.setQueryNode(metadata.getVirtualPlan(table.getMetadataID()));
- id.setCardinality(metadata.getCardinality(table.getMetadataID()));
-
- Object pk = metadata.getPrimaryKey(table.getMetadataID());
- if (pk != null) {
- ArrayList<TempMetadataID> primaryKey = resolveIndex(metadata, id, pk);
- id.setPrimaryKey(primaryKey);
- }
- Collection keys = metadata.getUniqueKeysInGroup(table.getMetadataID());
- for (Object key : keys) {
- id.addUniqueKey(resolveIndex(metadata, id, key));
- }
- Collection indexes = metadata.getIndexesInGroup(table.getMetadataID());
- for (Object index : indexes) {
- id.addIndex(resolveIndex(metadata, id, index));
- }
- Command c = (Command)QueryResolver.resolveView(table,
metadata.getVirtualPlan(table.getMetadataID()), SQLConstants.Reserved.SELECT,
metadata).getCommand().clone();
- CacheHint hint = c.getCacheHint();
- if (hint != null) {
- recordAnnotation(analysisRecord, Annotation.MATERIALIZED_VIEW, Priority.LOW,
"SimpleQueryResolver.cache_hint_used", table, matTableName, id.getCacheHint());
//$NON-NLS-1$
- }
- id.setCacheHint(hint);
- }
- }
- } else if (id.getCacheHint() != null) {
- recordAnnotation(analysisRecord, Annotation.MATERIALIZED_VIEW, Priority.LOW,
"SimpleQueryResolver.cache_hint_used", table, matTableName, id.getCacheHint());
//$NON-NLS-1$
- }
- return id;
- }
-
- private static ArrayList<TempMetadataID> resolveIndex(
- QueryMetadataInterface metadata, TempMetadataID id, Object pk)
- throws TeiidComponentException, QueryMetadataException {
- List cols = metadata.getElementIDsInKey(pk);
- ArrayList<TempMetadataID> primaryKey = new
ArrayList<TempMetadataID>(cols.size());
- for (Object coldId : cols) {
- int pos = metadata.getPosition(coldId) - 1;
- primaryKey.add(id.getElements().get(pos));
- }
- return primaryKey;
- }
-
public static boolean isNoCacheGroup(QueryMetadataInterface metadata,
Object metadataID,
Option option) throws QueryMetadataException,
Modified:
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleMergeCriteria.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleMergeCriteria.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleMergeCriteria.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -299,7 +299,7 @@
//NOTE: we could tap into the relationalplanner at a lower level to get this in a plan
node form,
//the major benefit would be to reuse the dependent join planning logic if possible.
if (analysisRecord != null && analysisRecord.recordDebug()) {
- analysisRecord.println("Attempting to plan " + crit + " as a mege
join"); //$NON-NLS-1$ //$NON-NLS-2$
+ analysisRecord.println("Attempting to plan " + crit + " as a merge
join"); //$NON-NLS-1$ //$NON-NLS-2$
}
RelationalPlan subPlan =
(RelationalPlan)QueryOptimizer.optimizePlan(plannedResult.query, metadata, idGenerator,
capFinder, analysisRecord, context);
Number planCardinality = subPlan.getRootNode().getEstimateNodeCardinality();
Modified: trunk/engine/src/main/java/org/teiid/query/processor/BatchedUpdatePlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/BatchedUpdatePlan.java 2011-04-11
15:36:29 UTC (rev 3082)
+++ trunk/engine/src/main/java/org/teiid/query/processor/BatchedUpdatePlan.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -33,6 +33,7 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.query.sql.lang.Command;
+import org.teiid.query.sql.symbol.GroupSymbol;
import org.teiid.query.sql.util.VariableContext;
import org.teiid.query.util.CommandContext;
@@ -237,5 +238,12 @@
}
return true;
}
+
+ @Override
+ public void getAccessedGroups(List<GroupSymbol> groups) {
+ for (int i = 0; i < getPlanCount(); i++) {
+ updatePlans[i].getAccessedGroups(groups);
+ }
+ }
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java 2011-04-11
15:36:29 UTC (rev 3082)
+++ trunk/engine/src/main/java/org/teiid/query/processor/ProcessorPlan.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -37,6 +37,7 @@
import org.teiid.core.TeiidProcessingException;
import org.teiid.query.analysis.AnalysisRecord;
import org.teiid.query.processor.BatchCollector.BatchProducer;
+import org.teiid.query.sql.symbol.GroupSymbol;
import org.teiid.query.util.CommandContext;
@@ -176,4 +177,6 @@
return false;
}
+ public abstract void getAccessedGroups(List<GroupSymbol> groups);
+
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/proc/CreateCursorResultSetInstruction.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/proc/CreateCursorResultSetInstruction.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/query/processor/proc/CreateCursorResultSetInstruction.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -26,6 +26,7 @@
import static org.teiid.query.analysis.AnalysisRecord.*;
+import java.util.List;
import java.util.Map;
import org.teiid.client.plan.PlanNode;
@@ -102,5 +103,10 @@
public ProcessorPlan getCommand() { //Defect 13291 - added method to support changes
to ProcedurePlan
return plan;
}
+
+ @Override
+ public void getChildPlans(List<ProcessorPlan> plans) {
+ plans.add(this.plan);
+ }
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/proc/ForEachRowPlan.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/proc/ForEachRowPlan.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/query/processor/proc/ForEachRowPlan.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -39,6 +39,7 @@
import org.teiid.query.sql.lang.Command;
import org.teiid.query.sql.symbol.ElementSymbol;
import org.teiid.query.sql.symbol.Expression;
+import org.teiid.query.sql.symbol.GroupSymbol;
import org.teiid.query.sql.symbol.SingleElementSymbol;
import org.teiid.query.util.CommandContext;
@@ -161,5 +162,11 @@
public boolean requiresTransaction(boolean transactionalReads) {
return true;
}
+
+ @Override
+ public void getAccessedGroups(List<GroupSymbol> groups) {
+ this.queryPlan.getAccessedGroups(groups);
+ this.rowProcedure.getAccessedGroups(groups);
+ }
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/proc/IfInstruction.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/proc/IfInstruction.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/query/processor/proc/IfInstruction.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -24,11 +24,14 @@
import static org.teiid.query.analysis.AnalysisRecord.*;
+import java.util.List;
+
import org.teiid.client.plan.PlanNode;
import org.teiid.common.buffer.BlockedException;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.logging.LogManager;
+import org.teiid.query.processor.ProcessorPlan;
import org.teiid.query.sql.lang.Criteria;
@@ -138,4 +141,12 @@
return props;
}
+ @Override
+ public void getChildPlans(List<ProcessorPlan> plans) {
+ ifProgram.getChildPlans(plans);
+ if (elseProgram != null) {
+ elseProgram.getChildPlans(plans);
+ }
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/proc/LoopInstruction.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/proc/LoopInstruction.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/query/processor/proc/LoopInstruction.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -107,5 +107,11 @@
public void postInstruction(ProcedurePlan procEnv) throws TeiidComponentException {
procEnv.removeResults(rsName);
}
+
+ @Override
+ public void getChildPlans(List<ProcessorPlan> plans) {
+ super.getChildPlans(plans);
+ this.loopProgram.getChildPlans(plans);
+ }
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -65,6 +65,7 @@
import org.teiid.query.sql.lang.Criteria;
import org.teiid.query.sql.symbol.ElementSymbol;
import org.teiid.query.sql.symbol.Expression;
+import org.teiid.query.sql.symbol.GroupSymbol;
import org.teiid.query.sql.symbol.Reference;
import org.teiid.query.sql.util.VariableContext;
import org.teiid.query.tempdata.TempTableStore;
@@ -713,4 +714,20 @@
return requiresTransaction || transactionalReads;
}
+ @Override
+ public void getAccessedGroups(List<GroupSymbol> groups) {
+ ArrayList<ProcessorPlan> plans = new ArrayList<ProcessorPlan>();
+ this.originalProgram.getChildPlans(plans);
+ LinkedList<GroupSymbol> tempGroups = new LinkedList<GroupSymbol>();
+ for (ProcessorPlan processorPlan : plans) {
+ processorPlan.getAccessedGroups(tempGroups);
+ }
+ for (GroupSymbol groupSymbol : tempGroups) {
+ if (groupSymbol.isTempTable() && !groupSymbol.isGlobalTable()) {
+ continue;
+ }
+ groups.add(groupSymbol);
+ }
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/proc/Program.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/proc/Program.java 2011-04-11
15:36:29 UTC (rev 3082)
+++ trunk/engine/src/main/java/org/teiid/query/processor/proc/Program.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -26,6 +26,7 @@
import java.util.List;
import org.teiid.client.plan.PlanNode;
+import org.teiid.query.processor.ProcessorPlan;
/**
@@ -212,5 +213,11 @@
buffer.append(counterStr + line + "\n"); //$NON-NLS-1$
}
+
+ void getChildPlans(List<ProcessorPlan> plans) {
+ for (ProgramInstruction instruction : programInstructions) {
+ instruction.getChildPlans(plans);
+ }
+ }
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/proc/ProgramInstruction.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/proc/ProgramInstruction.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/query/processor/proc/ProgramInstruction.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -59,8 +59,7 @@
* @return List of ProcessorPlan
* @since 4.2
*/
- public List<ProcessorPlan> getChildPlans() {
- return null;
+ public void getChildPlans(List<ProcessorPlan> plans) {
}
/**
Modified: trunk/engine/src/main/java/org/teiid/query/processor/proc/WhileInstruction.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/proc/WhileInstruction.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/query/processor/proc/WhileInstruction.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -26,9 +26,12 @@
import static org.teiid.query.analysis.AnalysisRecord.*;
+import java.util.List;
+
import org.teiid.client.plan.PlanNode;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
+import org.teiid.query.processor.ProcessorPlan;
import org.teiid.query.sql.lang.Criteria;
@@ -89,4 +92,9 @@
public void postInstruction(ProcedurePlan procEnv) throws TeiidComponentException {
}
+ @Override
+ public void getChildPlans(List<ProcessorPlan> plans) {
+ whileProgram.getChildPlans(plans);
+ }
+
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -41,10 +41,13 @@
import org.teiid.query.processor.QueryProcessor;
import org.teiid.query.processor.relational.ProjectIntoNode.Mode;
import org.teiid.query.sql.LanguageObject;
+import org.teiid.query.sql.lang.Command;
import org.teiid.query.sql.lang.Create;
import org.teiid.query.sql.lang.Insert;
import org.teiid.query.sql.lang.QueryCommand;
import org.teiid.query.sql.lang.WithQueryCommand;
+import org.teiid.query.sql.symbol.GroupSymbol;
+import org.teiid.query.sql.visitor.GroupCollectorVisitor;
import org.teiid.query.tempdata.TempTableStore;
import org.teiid.query.util.CommandContext;
@@ -287,7 +290,29 @@
return false;
}
+ @Override
+ public void getAccessedGroups(List<GroupSymbol> groups) {
+ getAccessedGroups(groups, this.root);
+ }
+ void getAccessedGroups(List<GroupSymbol> groups, RelationalNode node) {
+ if (node instanceof AccessNode) {
+ Command c = ((AccessNode)node).getCommand();
+ if (c instanceof QueryCommand) {
+ QueryCommand qc = (QueryCommand)c;
+ groups.addAll(GroupCollectorVisitor.getGroupsIgnoreInlineViews(qc, true));
+ }
+ } else if (node instanceof PlanExecutionNode) {
+ PlanExecutionNode pen = (PlanExecutionNode)node;
+ pen.getProcessorPlan().getAccessedGroups(groups);
+ }
+ for (RelationalNode child : node.getChildren()) {
+ if (child != null) {
+ getAccessedGroups(groups, child);
+ }
+ }
+ }
+
@Override
public TupleBuffer getFinalBuffer() throws BlockedException, TeiidComponentException,
TeiidProcessingException {
return root.getFinalBuffer();
Modified: trunk/engine/src/main/java/org/teiid/query/processor/xml/XMLPlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/xml/XMLPlan.java 2011-04-11
15:36:29 UTC (rev 3082)
+++ trunk/engine/src/main/java/org/teiid/query/processor/xml/XMLPlan.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -22,7 +22,7 @@
package org.teiid.query.processor.xml;
-import static org.teiid.query.analysis.AnalysisRecord.PROP_OUTPUT_COLS;
+import static org.teiid.query.analysis.AnalysisRecord.*;
import java.io.IOException;
import java.io.InputStream;
@@ -478,4 +478,9 @@
public Program getOriginalProgram() {
return this.originalProgram;
}
+
+ @Override
+ public void getAccessedGroups(List<GroupSymbol> groups) {
+ //TODO: add support
+ }
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java 2011-04-11 15:36:29
UTC (rev 3082)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java 2011-04-12 19:35:57
UTC (rev 3083)
@@ -482,6 +482,7 @@
}
public int truncate() {
+ this.tid.getTableData().setLastModified(System.currentTimeMillis());
return tree.truncate();
}
@@ -512,6 +513,7 @@
UpdateProcessor up = new InsertUpdateProcessor(tuples, rowId != null,
shouldProject?indexes:null);
int updateCount = up.process();
tid.setCardinality(tree.getRowCount());
+ tid.getTableData().setLastModified(System.currentTimeMillis());
return CollectionTupleSource.createUpdateCountTupleSource(updateCount);
}
@@ -577,6 +579,9 @@
};
int updateCount = up.process();
+ if (updateCount > 0) {
+ tid.getTableData().setLastModified(System.currentTimeMillis());
+ }
return CollectionTupleSource.createUpdateCountTupleSource(updateCount);
}
@@ -609,6 +614,9 @@
};
int updateCount = up.process();
tid.setCardinality(tree.getRowCount());
+ if (updateCount > 0) {
+ tid.getTableData().setLastModified(System.currentTimeMillis());
+ }
return CollectionTupleSource.createUpdateCountTupleSource(updateCount);
}
@@ -638,6 +646,7 @@
index.tree.remove(tuple);
}
}
+ tid.getTableData().setLastModified(System.currentTimeMillis());
return result;
}
List<?> result = tree.insert(tuple, InsertMode.UPDATE, -1);
@@ -647,6 +656,7 @@
index.tree.insert(tuple, InsertMode.UPDATE, -1);
}
}
+ tid.getTableData().setLastModified(System.currentTimeMillis());
return result;
} finally {
lock.writeLock().unlock();
@@ -699,5 +709,9 @@
STree getTree() {
return tree;
}
+
+ public TempMetadataID getMetadataId() {
+ return tid;
+ }
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -54,12 +54,12 @@
import org.teiid.dqp.internal.process.CachedResults;
import org.teiid.dqp.internal.process.SessionAwareCache;
import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
+import org.teiid.events.EventDistributor;
import org.teiid.language.SQLConstants.Reserved;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.metadata.FunctionMethod.Determinism;
import org.teiid.query.QueryPlugin;
-import org.teiid.query.analysis.AnalysisRecord;
import org.teiid.query.eval.Evaluator;
import org.teiid.query.mapping.relational.QueryNode;
import org.teiid.query.metadata.QueryMetadataInterface;
@@ -144,6 +144,7 @@
private Cache<MatTableKey, MatTableEntry> tables;
private SessionAwareCache<CachedResults> distributedCache;
+ private EventDistributor eventDistributor;
public TempTableDataManager(ProcessorDataManager processorDataManager, BufferManager
bufferManager,
Executor executor, SessionAwareCache<CachedResults> cache,
SessionAwareCache<CachedResults> distibutedCache, CacheFactory cacheFactory){
@@ -158,6 +159,10 @@
}
}
+ public void setEventDistributor(EventDistributor eventDistributor) {
+ this.eventDistributor = eventDistributor;
+ }
+
public TupleSource registerRequest(
CommandContext context,
Command command,
@@ -283,7 +288,7 @@
BatchCollector bc = qp.createBatchCollector();
TupleBuffer tb = bc.collectTuples();
CachedResults cr = new CachedResults();
- cr.setResults(tb);
+ cr.setResults(tb, qp.getProcessorPlan());
cr.setHint(hint);
if (hint != null && hint.getDeterminism() != null) {
LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Cache hint modified the
query determinism from ",determinismLevel, " to ", hint.getDeterminism()
}); //$NON-NLS-1$ //$NON-NLS-2$
@@ -303,8 +308,9 @@
TempTableStore globalStore = context.getGlobalTableStore();
if (StringUtil.endsWithIgnoreCase(proc.getProcedureCallableName(), REFRESHMATVIEW)) {
Object groupID = validateMatView(metadata, proc);
+ Object matTableId =
context.getGlobalTableStore().getGlobalTempTableMetadataId(groupID, metadata);
String matViewName = metadata.getFullName(groupID);
- String matTableName = RelationalPlanner.MAT_PREFIX+matViewName.toUpperCase();
+ String matTableName = metadata.getFullName(matTableId);
LogManager.logDetail(LogConstants.CTX_MATVIEWS, "processing refreshmatview
for", matViewName); //$NON-NLS-1$
MatTableInfo info = globalStore.getMatTableInfo(matTableName);
boolean invalidate =
Boolean.TRUE.equals(((Constant)proc.getParameter(1).getExpression()).getValue());
@@ -315,9 +321,6 @@
if (oldState == MatState.LOADING) {
return CollectionTupleSource.createUpdateCountTupleSource(-1);
}
- GroupSymbol group = new GroupSymbol(matViewName);
- group.setMetadataID(groupID);
- Object matTableId = RelationalPlanner.getGlobalTempTableMetadataId(group,
matTableName, context, metadata, AnalysisRecord.createNonRecordingRecord());
GroupSymbol matTable = new GroupSymbol(matTableName);
matTable.setMetadataID(matTableId);
int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore, info,
null);
@@ -349,20 +352,28 @@
QueryProcessor qp =
context.getQueryProcessorFactory().createQueryProcessor(queryString,
matViewName.toUpperCase(), context, key.getValue());
qp.setNonBlocking(true);
TupleSource ts = new BatchCollector.BatchProducerTupleSource(qp);
- tempTable = globalStore.getOrCreateTempTable(matTableName, new Query(), bufferManager,
false);
List<?> tuple = ts.nextTuple();
boolean delete = false;
if (tuple == null) {
delete = true;
tuple = Arrays.asList(key.getValue());
}
- List<?> result = tempTable.updateTuple(tuple, delete);
- //TODO: maintain a table log and distribute the events
+ List<?> result = updateMatViewRow(globalStore, matTableName, tuple, delete);
+ if (result != null && eventDistributor != null) {
+ this.eventDistributor.updateMatViewRow(context.getVdbName(), context.getVdbVersion(),
matTableName, tuple, delete);
+ }
return CollectionTupleSource.createUpdateCountTupleSource(result != null ? 1 : 0);
}
return null;
}
+ public List<?> updateMatViewRow(TempTableStore globalStore,
+ String matTableName, List<?> tuple, boolean delete)
+ throws QueryProcessingException, TeiidComponentException {
+ TempTable tempTable = globalStore.getOrCreateTempTable(matTableName, new Query(),
bufferManager, false);
+ return tempTable.updateTuple(tuple, delete);
+ }
+
private Object validateMatView(QueryMetadataInterface metadata,
StoredProcedure proc) throws TeiidComponentException,
TeiidProcessingException {
@@ -508,7 +519,7 @@
CachedResults cr = new CachedResults();
BatchCollector bc = qp.createBatchCollector();
TupleBuffer tb = bc.collectTuples();
- cr.setResults(tb);
+ cr.setResults(tb, qp.getProcessorPlan());
touchTable(context, fullName, true);
this.distributedCache.put(cid, Determinism.VDB_DETERMINISTIC, cr, info.getTtl());
ts = tb.createIndexedTupleSource();
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java 2011-04-11
15:36:29 UTC (rev 3082)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -23,19 +23,29 @@
package org.teiid.query.tempdata;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.teiid.api.exception.query.QueryMetadataException;
import org.teiid.api.exception.query.QueryProcessingException;
+import org.teiid.api.exception.query.QueryResolverException;
+import org.teiid.api.exception.query.QueryValidatorException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.core.TeiidComponentException;
+import org.teiid.language.SQLConstants;
import org.teiid.query.QueryPlugin;
+import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.metadata.TempMetadataID;
import org.teiid.query.metadata.TempMetadataStore;
+import org.teiid.query.optimizer.relational.RelationalPlanner;
+import org.teiid.query.resolver.QueryResolver;
import org.teiid.query.resolver.command.TempTableResolver;
+import org.teiid.query.resolver.util.ResolverUtil;
+import org.teiid.query.sql.lang.CacheHint;
import org.teiid.query.sql.lang.Command;
import org.teiid.query.sql.lang.Create;
import org.teiid.query.sql.lang.Insert;
@@ -235,5 +245,55 @@
public Set<String> getAllTempTables() {
return new HashSet<String>(this.groupToTupleSourceID.keySet());
}
+
+ public TempMetadataID getGlobalTempTableMetadataId(Object viewId, QueryMetadataInterface
metadata)
+ throws QueryMetadataException, TeiidComponentException, QueryResolverException,
QueryValidatorException {
+ String matViewName = metadata.getFullName(viewId);
+ String matTableName = RelationalPlanner.MAT_PREFIX+matViewName.toUpperCase();
+ GroupSymbol group = new GroupSymbol(matViewName);
+ group.setMetadataID(viewId);
+ TempMetadataID id = tempMetadataStore.getTempGroupID(matTableName);
+ //define the table preserving the key/index information and ensure that only a single
instance exists
+ if (id == null) {
+ synchronized (viewId) {
+ id = tempMetadataStore.getTempGroupID(matTableName);
+ if (id == null) {
+ id = tempMetadataStore.addTempGroup(matTableName,
ResolverUtil.resolveElementsInGroup(group, metadata), false, true);
+ id.setQueryNode(metadata.getVirtualPlan(viewId));
+ id.setCardinality(metadata.getCardinality(viewId));
+
+ Object pk = metadata.getPrimaryKey(viewId);
+ if (pk != null) {
+ ArrayList<TempMetadataID> primaryKey = resolveIndex(metadata, id, pk);
+ id.setPrimaryKey(primaryKey);
+ }
+ Collection keys = metadata.getUniqueKeysInGroup(viewId);
+ for (Object key : keys) {
+ id.addUniqueKey(resolveIndex(metadata, id, key));
+ }
+ Collection indexes = metadata.getIndexesInGroup(viewId);
+ for (Object index : indexes) {
+ id.addIndex(resolveIndex(metadata, id, index));
+ }
+ Command c = (Command)QueryResolver.resolveView(group,
metadata.getVirtualPlan(viewId), SQLConstants.Reserved.SELECT,
metadata).getCommand().clone();
+ CacheHint hint = c.getCacheHint();
+ id.setCacheHint(hint);
+ }
+ }
+ }
+ return id;
+ }
+
+ static ArrayList<TempMetadataID> resolveIndex(
+ QueryMetadataInterface metadata, TempMetadataID id, Object pk)
+ throws TeiidComponentException, QueryMetadataException {
+ List cols = metadata.getElementIDsInKey(pk);
+ ArrayList<TempMetadataID> primaryKey = new
ArrayList<TempMetadataID>(cols.size());
+ for (Object coldId : cols) {
+ int pos = metadata.getPosition(coldId) - 1;
+ primaryKey.add(id.getElements().get(pos));
+ }
+ return primaryKey;
+ }
}
Modified: trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java 2011-04-11
15:36:29 UTC (rev 3082)
+++ trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -23,6 +23,8 @@
package org.teiid.query.util;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.Properties;
import java.util.Random;
@@ -38,6 +40,7 @@
import org.teiid.dqp.internal.process.PreparedPlan;
import org.teiid.dqp.internal.process.SessionAwareCache;
import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
+import org.teiid.metadata.Table;
import org.teiid.metadata.FunctionMethod.Determinism;
import org.teiid.query.QueryPlugin;
import org.teiid.query.eval.SecurityFunctionEvaluator;
@@ -122,6 +125,7 @@
private TempTableStore tempTableStore;
private LinkedList<String> recursionStack;
private boolean nonBlocking;
+ private HashSet<Table> viewsAccessed;
/**
* Construct a new context.
@@ -533,4 +537,18 @@
this.globalState.subject = subject;
}
+ public void accessedView(Table id) {
+ if (this.viewsAccessed == null) {
+ this.viewsAccessed = new HashSet<Table>();
+ }
+ this.viewsAccessed.add(id);
+ }
+
+ public Set<Table> getViewsAccessed() {
+ if (this.viewsAccessed == null) {
+ return Collections.emptySet();
+ }
+ return viewsAccessed;
+ }
+
}
Modified: trunk/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- trunk/engine/src/main/resources/org/teiid/query/i18n.properties 2011-04-11 15:36:29
UTC (rev 3082)
+++ trunk/engine/src/main/resources/org/teiid/query/i18n.properties 2011-04-12 19:35:57
UTC (rev 3083)
@@ -894,7 +894,7 @@
datasource_not_found=Data Source {0} not accessible.
RequestWorkItem.cache_nondeterministic=Caching command "{0}" at a session
level, but less deterministic functions were evaluated.
-not_found_cache=Results not found in cache
+not_found_cache=Failed to restore results
failed_to_cache=Failed to store the result set contents to disk.
failed_to_unwrap_connection=Failed to unwrap the source connection.
connection_factory_not_found=Failed to find the Connection Factory with JNDI name {0}.
Please check the name or deploy the Connection Factory with specified name.
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -23,10 +23,6 @@
import static org.junit.Assert.*;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.List;
@@ -37,13 +33,19 @@
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.TestTupleBuffer.FakeBatchManager;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.util.UnitTestUtil;
import org.teiid.dqp.service.FakeBufferService;
+import org.teiid.metadata.Table;
+import org.teiid.query.processor.FakeProcessorPlan;
+import org.teiid.query.processor.ProcessorPlan;
import org.teiid.query.sql.lang.Query;
import org.teiid.query.sql.symbol.ElementSymbol;
+import org.teiid.query.unittest.FakeMetadataFactory;
+import org.teiid.query.unittest.RealMetadataFactory;
+import org.teiid.query.util.CommandContext;
-
+@SuppressWarnings({"nls", "unchecked"})
public class TestCachedResults {
-
@Test
public void testCaching() throws Exception {
@@ -70,10 +72,15 @@
BufferManager bm = fbs.getBufferManager();
CachedResults results = new CachedResults();
- results.setResults(tb);
+ ProcessorPlan plan = new FakeProcessorPlan(0);
+ CommandContext cc = new CommandContext();
+ Table t = RealMetadataFactory.exampleBQT().getGroupID("bqt1.smalla");
+ cc.accessedView(t);
+ plan.setContext(cc);
+ results.setResults(tb, plan);
results.setCommand(new Query());
Cache cache = new DefaultCache("dummy"); //$NON-NLS-1$
-
+ long ts = results.getAccessInfo().getCreationTime();
// simulate the jboss-cache remote transport, where the batches are remotely looked up
// in cache
for (int row=1; row<=tb.getRowCount();row+=4) {
@@ -82,14 +89,9 @@
results.prepare(cache, bm);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(results);
- oos.close();
+ CachedResults cachedResults = UnitTestUtil.helpSerialize(results);
- ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(baos.toByteArray()));
- CachedResults cachedResults = (CachedResults)ois.readObject();
- ois.close();
+ FakeMetadataFactory.buildWorkContext(RealMetadataFactory.exampleBQT());
cachedResults.restore(cache, bm);
@@ -103,5 +105,6 @@
assertArrayEquals(tb.getBatch(1).getAllTuples(), cachedTb.getBatch(1).getAllTuples());
assertArrayEquals(tb.getBatch(9).getAllTuples(), cachedTb.getBatch(9).getAllTuples());
+ assertTrue(ts - cachedResults.getAccessInfo().getCreationTime() <= 5000);
}
}
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-04-11
15:36:29 UTC (rev 3082)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -28,6 +28,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Before;
@@ -39,12 +40,15 @@
import org.teiid.cache.DefaultCacheFactory;
import org.teiid.client.RequestMessage;
import org.teiid.client.ResultsMessage;
+import org.teiid.client.RequestMessage.StatementType;
+import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.BufferManagerFactory;
import org.teiid.common.buffer.impl.BufferManagerImpl;
import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
import org.teiid.dqp.internal.datamgr.FakeTransactionService;
import org.teiid.dqp.internal.process.AbstractWorkItem.ThreadState;
import org.teiid.dqp.service.AutoGenDataService;
-import org.teiid.dqp.service.FakeBufferService;
+import org.teiid.dqp.service.BufferService;
import org.teiid.metadata.MetadataProvider;
import org.teiid.query.optimizer.TestOptimizer;
import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities;
@@ -70,7 +74,13 @@
Mockito.stub(repo.getConnectorManager(Mockito.anyString())).toReturn(agds);
core = new DQPCore();
- core.setBufferService(new FakeBufferService());
+ core.setBufferService(new BufferService() {
+
+ @Override
+ public BufferManager getBufferManager() {
+ return BufferManagerFactory.createBufferManager();
+ }
+ });
core.setCacheFactory(new DefaultCacheFactory());
core.setTransactionService(new FakeTransactionService());
@@ -78,6 +88,7 @@
config.setMaxActivePlans(1);
config.setUserRequestSourceConcurrency(2);
core.start(config);
+ core.getPrepPlanCache().setModTime(1);
}
@After public void tearDown() throws Exception {
@@ -239,13 +250,13 @@
DQPWorkContext.getWorkContext().getSession().setUserName(userName);
((BufferManagerImpl)core.getBufferManager()).setProcessorBatchSize(2);
Future<ResultsMessage> message =
core.executeRequest(reqMsg.getExecutionId(), reqMsg);
- ResultsMessage rm = message.get(5000, TimeUnit.MILLISECONDS);
+ ResultsMessage rm = message.get(500000, TimeUnit.MILLISECONDS);
assertNull(rm.getException());
assertEquals(2, rm.getResults().length);
RequestWorkItem item =
core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
message = core.processCursorRequest(reqMsg.getExecutionId(), 3, 2);
- rm = message.get(5000, TimeUnit.MILLISECONDS);
+ rm = message.get(500000, TimeUnit.MILLISECONDS);
assertNull(rm.getException());
assertEquals(2, rm.getResults().length);
//ensure that we are idle
@@ -277,7 +288,7 @@
DQPWorkContext.getWorkContext().getSession().setUserName(userName);
((BufferManagerImpl)core.getBufferManager()).setProcessorBatchSize(2);
Future<ResultsMessage> message =
core.executeRequest(reqMsg.getExecutionId(), reqMsg);
- ResultsMessage rm = message.get(5000, TimeUnit.MILLISECONDS);
+ ResultsMessage rm = message.get(500000, TimeUnit.MILLISECONDS);
assertNull(rm.getException());
assertEquals(2, rm.getResults().length);
RequestWorkItem item =
core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
@@ -341,6 +352,44 @@
assertEquals("something else", rm.getResults()[0].get(0));
//$NON-NLS-1$
}
+ @Test public void testPreparedPlanInvalidation() throws Exception {
+ String sql = "insert into #temp select * FROM vqt.SmallB";
//$NON-NLS-1$
+ String userName = "1"; //$NON-NLS-1$
+ int sessionid = 1; //$NON-NLS-1$
+ RequestMessage reqMsg = exampleRequestMessage(sql);
+ ResultsMessage rm = execute(userName, sessionid, reqMsg);
+ assertEquals(1, rm.getResults().length); //$NON-NLS-1$
+
+ sql = "select * from #temp"; //$NON-NLS-1$
+ reqMsg = exampleRequestMessage(sql);
+ reqMsg.setStatementType(StatementType.PREPARED);
+ rm = execute(userName, sessionid, reqMsg);
+ assertEquals(10, rm.getResults().length); //$NON-NLS-1$
+
+ sql = "select * from #temp"; //$NON-NLS-1$
+ reqMsg = exampleRequestMessage(sql);
+ reqMsg.setStatementType(StatementType.PREPARED);
+ rm = execute(userName, sessionid, reqMsg);
+ assertEquals(10, rm.getResults().length); //$NON-NLS-1$
+
+ assertEquals(1, this.core.getPrepPlanCache().getCacheHitCount());
+
+ Thread.sleep(100);
+
+ sql = "delete from #temp"; //$NON-NLS-1$
+ reqMsg = exampleRequestMessage(sql);
+ rm = execute(userName, sessionid, reqMsg);
+ assertEquals(1, rm.getResults().length); //$NON-NLS-1$
+
+ sql = "select * from #temp"; //$NON-NLS-1$
+ reqMsg = exampleRequestMessage(sql);
+ reqMsg.setStatementType(StatementType.PREPARED);
+ rm = execute(userName, sessionid, reqMsg);
+ assertEquals(0, rm.getResults().length); //$NON-NLS-1$
+
+ assertEquals(1, this.core.getPrepPlanCache().getCacheHitCount());
+ }
+
public void helpTestVisibilityFails(String sql) throws Exception {
RequestMessage reqMsg = exampleRequestMessage(sql);
reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);
@@ -356,15 +405,10 @@
private ResultsMessage helpExecute(String sql, String userName, int sessionid,
boolean txnAutoWrap) throws Exception {
RequestMessage reqMsg = exampleRequestMessage(sql);
-
DQPWorkContext.getWorkContext().getSession().setSessionId(String.valueOf(sessionid));
- DQPWorkContext.getWorkContext().getSession().setUserName(userName);
if (txnAutoWrap) {
reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_ON);
}
-
- Future<ResultsMessage> message =
core.executeRequest(reqMsg.getExecutionId(), reqMsg);
- assertNotNull(core.getClientState(String.valueOf(sessionid), false));
- ResultsMessage results = message.get(5000, TimeUnit.MILLISECONDS);
+ ResultsMessage results = execute(userName, sessionid, reqMsg);
core.terminateSession(String.valueOf(sessionid));
assertNull(core.getClientState(String.valueOf(sessionid), false));
if (results.getException() != null) {
@@ -372,4 +416,15 @@
}
return results;
}
+
+ private ResultsMessage execute(String userName, int sessionid, RequestMessage reqMsg)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ DQPWorkContext.getWorkContext().getSession().setSessionId(String.valueOf(sessionid));
+ DQPWorkContext.getWorkContext().getSession().setUserName(userName);
+
+ Future<ResultsMessage> message =
core.executeRequest(reqMsg.getExecutionId(), reqMsg);
+ assertNotNull(core.getClientState(String.valueOf(sessionid), false));
+ ResultsMessage results = message.get(500000, TimeUnit.MILLISECONDS);
+ return results;
+ }
}
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedPlanCache.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedPlanCache.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedPlanCache.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -22,10 +22,7 @@
package org.teiid.dqp.internal.process;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import java.util.ArrayList;
@@ -40,6 +37,8 @@
import org.teiid.query.processor.relational.ProjectNode;
import org.teiid.query.processor.relational.RelationalPlan;
import org.teiid.query.sql.lang.Command;
+import org.teiid.query.sql.symbol.Reference;
+import org.teiid.query.util.CommandContext;
public class TestPreparedPlanCache {
@@ -70,7 +69,7 @@
assertNotNull("Unable to get prepared plan from cache", cache.get(id));
//$NON-NLS-1$
}
- @Test public void testget(){
+ @Test public void testGet(){
SessionAwareCache<PreparedPlan> cache = new
SessionAwareCache<PreparedPlan>();
helpPutPreparedPlans(cache, token, 0, 10);
helpPutPreparedPlans(cache, token2, 0, 15);
@@ -81,7 +80,7 @@
assertEquals("Error getting plan from cache", new RelationalPlan(new
ProjectNode(12)).toString(), pPlan.getPlan().toString()); //$NON-NLS-1$
assertEquals("Error getting command from cache", EXAMPLE_QUERY + 12,
pPlan.getCommand().toString()); //$NON-NLS-1$
assertNotNull("Error getting plan description from cache",
pPlan.getAnalysisRecord()); //$NON-NLS-1$
- assertEquals("Error gettting reference from cache", "ref12",
pPlan.getReferences().get(0)); //$NON-NLS-1$ //$NON-NLS-2$
+ assertEquals("Error gettting reference from cache", new Reference(1),
pPlan.getReferences().get(0)); //$NON-NLS-1$
}
@Test public void testClearAll(){
@@ -152,11 +151,11 @@
PreparedPlan pPlan = new PreparedPlan();
cache.put(id, Determinism.SESSION_DETERMINISTIC, pPlan, null);
pPlan.setCommand(dummy);
- pPlan.setPlan(new RelationalPlan(new ProjectNode(i)));
+ pPlan.setPlan(new RelationalPlan(new ProjectNode(i)), new CommandContext());
AnalysisRecord analysisRecord = new AnalysisRecord(true, false);
pPlan.setAnalysisRecord(analysisRecord);
- ArrayList refs = new ArrayList();
- refs.add("ref"+i); //$NON-NLS-1$
+ ArrayList<Reference> refs = new ArrayList<Reference>();
+ refs.add(new Reference(1));
pPlan.setReferences(refs);
}
}
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedStatement.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedStatement.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestPreparedStatement.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -55,7 +55,7 @@
import org.teiid.query.unittest.FakeMetadataFacade;
import org.teiid.query.unittest.FakeMetadataFactory;
-@SuppressWarnings("nls")
+@SuppressWarnings({"nls", "unchecked"})
public class TestPreparedStatement {
private static final int SESSION_ID = 6;
@@ -148,8 +148,7 @@
Arrays.asList(new Object[] { "a", new Integer(0),
Boolean.FALSE, new Double(2.0) }) //$NON-NLS-1$
};
- List values = new ArrayList();
- values.add(new Short((short)0));
+ List<?> values = Arrays.asList((short)0);
FakeDataManager dataManager = new FakeDataManager();
TestProcessor.sampleData1(dataManager);
helpTestProcessing(preparedSql, values, expected, dataManager,
FakeMetadataFactory.example1Cached(), false, FakeMetadataFactory.example1VDB());
@@ -166,8 +165,7 @@
Arrays.asList(new Object[] { "foo", new Integer(0),
Boolean.FALSE, new Double(2.0) }) //$NON-NLS-1$
};
- List values = new ArrayList();
- values.add(new Short((short)0));
+ List<?> values = Arrays.asList((short)0);
FakeDataManager dataManager = new FakeDataManager();
TestProcessor.sampleData1(dataManager);
helpTestProcessing(preparedSql, values, expected, dataManager,
FakeMetadataFactory.example1Cached(), false, true, FakeMetadataFactory.example1VDB());
@@ -256,8 +254,7 @@
// Create query
String preparedSql = "SELECT pm1.g1.e1, e2, pm1.g1.e3 as a, e4 as b FROM
pm1.g1 WHERE pm1.g1.e1=?"; //$NON-NLS-1$
- List values = new ArrayList();
- values.add("a"); //$NON-NLS-1$
+ List values = Arrays.asList("a"); //$NON-NLS-1$
//Create plan
helpGetProcessorPlan(preparedSql, values, new
SessionAwareCache<PreparedPlan>());
@@ -291,8 +288,7 @@
String preparedSql = "SELECT X.e1 FROM (SELECT pm1.g2.e1 FROM pm1.g2 WHERE
pm1.g2.e1 = ?) as X"; //$NON-NLS-1$
//Create Request
- List values = new ArrayList();
- values.add("d"); //$NON-NLS-1$
+ List values = Arrays.asList("d"); //$NON-NLS-1$
//Create plan
helpGetProcessorPlan(preparedSql, values, new
SessionAwareCache<PreparedPlan>());
@@ -305,8 +301,7 @@
//wrong type
try{
- List values = new ArrayList();
- values.add("x"); //$NON-NLS-1$
+ List values = Arrays.asList("x"); //$NON-NLS-1$
//Create plan
helpGetProcessorPlan(preparedSql, values, prepCache, SESSION_ID);
Modified:
trunk/engine/src/test/java/org/teiid/query/optimizer/relational/TestMaterialization.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/optimizer/relational/TestMaterialization.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/test/java/org/teiid/query/optimizer/relational/TestMaterialization.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -153,7 +153,7 @@
CommandContext cc = new CommandContext();
cc.setGlobalTableStore(new TempTableStore("SYSTEM"));
ProcessorPlan plan = TestOptimizer.getPlan(command, metadata, getGenericFinder(),
analysis, true, cc);
- TestOptimizer.checkAtomicQueries(new String[] {"SELECT
#MAT_MatView.VGroup2.x FROM #MAT_MatView.VGroup2"}, plan);
+ TestOptimizer.checkAtomicQueries(new String[] {"SELECT
#MAT_MATVIEW.VGROUP2.x FROM #MAT_MATVIEW.VGROUP2"}, plan);
Collection<Annotation> annotations = analysis.getAnnotations();
assertNotNull("Expected annotations but got none", annotations);
//$NON-NLS-1$
assertEquals("Expected one annotation", 1, annotations.size());
//$NON-NLS-1$
@@ -171,7 +171,7 @@
cc.setGlobalTableStore(new TempTableStore("SYSTEM"));
RelationalPlan plan = (RelationalPlan)TestOptimizer.getPlan(command, metadata,
getGenericFinder(), analysis, true, cc);
assertEquals(1f, plan.getRootNode().getEstimateNodeCardinality());
- TestOptimizer.checkAtomicQueries(new String[] {"SELECT
#MAT_MatView.VGroup3.x, #MAT_MatView.VGroup3.y FROM #MAT_MatView.VGroup3 WHERE
#MAT_MatView.VGroup3.x = 'foo'"}, plan);
+ TestOptimizer.checkAtomicQueries(new String[] {"SELECT
#MAT_MATVIEW.VGROUP3.x, #MAT_MATVIEW.VGROUP3.y FROM #MAT_MATVIEW.VGROUP3 WHERE
#MAT_MATVIEW.VGROUP3.x = 'foo'"}, plan);
Collection<Annotation> annotations = analysis.getAnnotations();
assertNotNull("Expected annotations but got none", annotations);
//$NON-NLS-1$
assertEquals("Expected one annotation", 1, annotations.size());
//$NON-NLS-1$
@@ -188,7 +188,7 @@
CommandContext cc = new CommandContext();
cc.setGlobalTableStore(new TempTableStore("SYSTEM"));
ProcessorPlan plan = TestOptimizer.getPlan(command, metadata, getGenericFinder(),
analysis, true, cc);
- TestOptimizer.checkAtomicQueries(new String[] {"SELECT
#MAT_MatView.VGroup4.x FROM #MAT_MatView.VGroup4"}, plan);
+ TestOptimizer.checkAtomicQueries(new String[] {"SELECT
#MAT_MATVIEW.VGROUP4.x FROM #MAT_MATVIEW.VGROUP4"}, plan);
Collection<Annotation> annotations = analysis.getAnnotations();
assertNotNull("Expected annotations but got none", annotations);
//$NON-NLS-1$
assertEquals("Expected one annotation", 2, annotations.size());
//$NON-NLS-1$
Modified: trunk/engine/src/test/java/org/teiid/query/processor/FakeProcessorPlan.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/FakeProcessorPlan.java 2011-04-11
15:36:29 UTC (rev 3082)
+++ trunk/engine/src/test/java/org/teiid/query/processor/FakeProcessorPlan.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -32,9 +32,8 @@
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.core.TeiidComponentException;
-import org.teiid.query.processor.ProcessorDataManager;
-import org.teiid.query.processor.ProcessorPlan;
import org.teiid.query.sql.lang.Command;
+import org.teiid.query.sql.symbol.GroupSymbol;
import org.teiid.query.util.CommandContext;
@@ -141,5 +140,9 @@
public List getSchema() {
return this.outputElements;
}
+
+ @Override
+ public void getAccessedGroups(List<GroupSymbol> groups) {
+ }
}
Modified: trunk/engine/src/test/java/org/teiid/query/unittest/FakeMetadataFactory.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/unittest/FakeMetadataFactory.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/engine/src/test/java/org/teiid/query/unittest/FakeMetadataFactory.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -97,6 +97,9 @@
session.setUserName("foo"); //$NON-NLS-1$
session.setVdb(vdb);
workContext.getVDB().addAttchment(QueryMetadataInterface.class, metadata);
+ if (metadata instanceof TransformationMetadata) {
+ workContext.getVDB().addAttchment(TransformationMetadata.class,
(TransformationMetadata)metadata);
+ }
DQPWorkContext.setWorkContext(workContext);
return workContext;
}
Modified:
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
---
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2011-04-11
15:36:29 UTC (rev 3082)
+++
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2011-04-12
19:35:57 UTC (rev 3083)
@@ -67,6 +67,7 @@
import org.teiid.adminapi.impl.DQPManagement;
import org.teiid.adminapi.impl.RequestMetadata;
import org.teiid.adminapi.impl.SessionMetadata;
+import org.teiid.adminapi.impl.VDBMetaData;
import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
import org.teiid.adminapi.jboss.AdminProvider;
import org.teiid.cache.CacheFactory;
@@ -79,6 +80,7 @@
import org.teiid.client.util.ResultsFuture;
import org.teiid.core.ComponentNotFoundException;
import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidException;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.util.LRUCache;
import org.teiid.deployers.VDBLifeCycleListener;
@@ -92,12 +94,17 @@
import org.teiid.dqp.service.SessionService;
import org.teiid.dqp.service.SessionServiceException;
import org.teiid.dqp.service.TransactionService;
+import org.teiid.events.EventDistributor;
import org.teiid.jboss.IntegrationPlugin;
import org.teiid.logging.Log4jListener;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
+import org.teiid.metadata.Table;
import org.teiid.net.TeiidURL;
+import org.teiid.query.QueryPlugin;
+import org.teiid.query.metadata.TransformationMetadata;
+import org.teiid.query.tempdata.TempTableStore;
import org.teiid.security.SecurityHelper;
import org.teiid.transport.ClientServiceRegistry;
import org.teiid.transport.ClientServiceRegistryImpl;
@@ -109,7 +116,7 @@
@ManagementObject(name="RuntimeEngineDeployer", isRuntime=true,
componentType=@ManagementComponent(type="teiid",subtype="dqp"),
properties=ManagementProperties.EXPLICIT)
-public class RuntimeEngineDeployer extends DQPConfiguration implements DQPManagement,
Serializable , ClientServiceRegistry {
+public class RuntimeEngineDeployer extends DQPConfiguration implements DQPManagement,
Serializable , ClientServiceRegistry, EventDistributor {
private static final long serialVersionUID = -4676205340262775388L;
private transient SocketConfiguration jdbcSocketConfiguration;
@@ -130,6 +137,9 @@
private transient ProfileService profileService;
private transient String jndiName;
+
+ private String eventDistributorName;
+ private transient EventDistributor eventDistributor;
public RuntimeEngineDeployer() {
// TODO: this does not belong here
@@ -150,6 +160,17 @@
public void start() {
dqpCore.setTransactionService((TransactionService)LogManager.createLoggingProxy(LogConstants.CTX_TXN_LOG,
transactionServerImpl, new Class[] {TransactionService.class}, MessageLevel.DETAIL));
+ if (this.eventDistributorName != null) {
+ try {
+ InitialContext ic = new InitialContext();
+ this.eventDistributor = (EventDistributor) ic.lookup(this.eventDistributorName);
+ } catch (NamingException ne) {
+ //log at a detail level since we may not be in the all profile
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, ne,
IntegrationPlugin.Util.getString("jndi_failed", new
Date(System.currentTimeMillis()).toString())); //$NON-NLS-1$
+ }
+ }
+ this.dqpCore.start(this);
+ this.dqpCore.getDataTierManager().setEventDistributor(this.eventDistributor);
// create the necessary services
createClientServices();
@@ -285,9 +306,6 @@
}
private void createClientServices() {
-
- this.dqpCore.start(this);
-
this.logon = new LogonImpl(this.sessionService, "teiid-cluster");
//$NON-NLS-1$
if (profileService != null) {
this.admin = AdminProvider.getLocal(profileService, vdbStatusChecker);
@@ -585,7 +603,7 @@
for(int i = 0; i < rows.length; i++) {
List row = rows[i];
- ArrayList newRow = new ArrayList();
+ ArrayList<Object> newRow = new ArrayList<Object>();
for (Object col:row) {
if (col == null) {
newRow.add("null"); //$NON-NLS-1$
@@ -613,4 +631,66 @@
}
return newResults;
}
+
+ public String getEventDistributorName() {
+ return eventDistributorName;
+ }
+
+ public void setEventDistributorName(String eventDistributorName) {
+ this.eventDistributorName = eventDistributorName;
+ }
+
+ @Override
+ public void updateMatViewRow(String vdbName, int vdbVersion, String matViewFqn,
List<?> tuple,
+ boolean delete) {
+ VDBMetaData vdb = this.vdbRepository.getVDB(vdbName, vdbVersion);
+ if (vdb == null) {
+ return;
+ }
+ TempTableStore globalStore = vdb.getAttachment(TempTableStore.class);
+ if (globalStore == null) {
+ return;
+ }
+ try {
+ this.dqpCore.getDataTierManager().updateMatViewRow(globalStore, matViewFqn, tuple,
delete);
+ } catch (TeiidException e) {
+ LogManager.logError(LogConstants.CTX_DQP, e,
QueryPlugin.Util.getString("DQPCore.unable_to_process_event")); //$NON-NLS-1$
+ }
+ }
+
+ @Override
+ public void dataModification(String vdbName, int vdbVersion, String tableFqn) {
+ VDBMetaData vdb = this.vdbRepository.getVDB(vdbName, vdbVersion);
+ if (vdb == null) {
+ return;
+ }
+ TransformationMetadata tm = vdb.getAttachment(TransformationMetadata.class);
+ if (tm == null) {
+ return;
+ }
+ try {
+ Table table = tm.getGroupID(tableFqn);
+ table.setLastDataModification(System.currentTimeMillis());
+ } catch (TeiidException e) {
+ LogManager.logError(LogConstants.CTX_DQP, e,
QueryPlugin.Util.getString("DQPCore.unable_to_process_event")); //$NON-NLS-1$
+ }
+ }
+
+ @Override
+ public void schemaModification(String vdbName, int vdbVersion, String fqn) {
+ VDBMetaData vdb = this.vdbRepository.getVDB(vdbName, vdbVersion);
+ if (vdb == null) {
+ return;
+ }
+ TransformationMetadata tm = vdb.getAttachment(TransformationMetadata.class);
+ if (tm == null) {
+ return;
+ }
+ try {
+ Table table = tm.getGroupID(fqn);
+ table.setLastModified(System.currentTimeMillis());
+ } catch (TeiidException e) {
+ LogManager.logError(LogConstants.CTX_DQP, e,
QueryPlugin.Util.getString("DQPCore.unable_to_process_event")); //$NON-NLS-1$
+ }
+ }
}
Modified: trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
===================================================================
--- trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties 2011-04-11
15:36:29 UTC (rev 3082)
+++ trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties 2011-04-12
19:35:57 UTC (rev 3083)
@@ -46,3 +46,5 @@
distribute_failed=Deploy of the archive failed {0}
template_not_found=Template not found for {0}
admin_executing=JOPR admin {0} is executing command {1}
+
+DQPCore.unable_to_process_event=Unable to process event.
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2011-04-11 15:36:29 UTC (rev 3082)
+++ trunk/pom.xml 2011-04-12 19:35:57 UTC (rev 3083)
@@ -360,12 +360,13 @@
<dependency>
<groupId>jgroups</groupId>
<artifactId>jgroups</artifactId>
- <version>2.6.10.GA</version>
+ <version>2.6.15.GA</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.jboss.cache</groupId>
<artifactId>jbosscache-core</artifactId>
- <version>3.1.0.GA</version>
+ <version>3.2.5.GA</version>
<exclusions>
<exclusion>
<groupId>javax.transaction</groupId>
@@ -379,7 +380,8 @@
<groupId>org.jboss</groupId>
<artifactId>jboss-common-core</artifactId>
</exclusion>
- </exclusions>
+ </exclusions>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.jboss.man</groupId>