[teiid-commits] teiid SVN: r3089 - in trunk: api/src/main/java/org/teiid/events and 15 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Apr 13 16:34:39 EDT 2011


Author: shawkins
Date: 2011-04-13 16:34:39 -0400 (Wed, 13 Apr 2011)
New Revision: 3089

Added:
   trunk/api/src/main/java/org/teiid/events/
Removed:
   trunk/engine/src/main/java/org/teiid/events/
Modified:
   trunk/api/src/main/java/org/teiid/events/EventDistributor.java
   trunk/cache-jbosscache/pom.xml
   trunk/console/src/main/resources/META-INF/rhq-plugin.xml
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
   trunk/engine/src/main/java/org/teiid/query/function/aggregate/ArrayAgg.java
   trunk/engine/src/main/java/org/teiid/query/metadata/TempMetadataID.java
   trunk/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
   trunk/engine/src/main/java/org/teiid/query/processor/ProcessorDataManager.java
   trunk/engine/src/main/java/org/teiid/query/sql/lang/BatchedUpdateCommand.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
   trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
   trunk/engine/src/test/java/org/teiid/query/processor/FakeDataManager.java
   trunk/engine/src/test/java/org/teiid/query/processor/HardcodedDataManager.java
   trunk/engine/src/test/java/org/teiid/query/processor/relational/TestBatchedUpdateNode.java
   trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java
   trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
   trunk/test-integration/common/src/test/java/org/teiid/dqp/internal/process/TestXMLTypeTranslations.java
Log:
TEIID-1507 refining previous check-in and adding detection of local data change events

Modified: trunk/api/src/main/java/org/teiid/events/EventDistributor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/events/EventDistributor.java	2011-04-12 23:20:51 UTC (rev 3084)
+++ trunk/api/src/main/java/org/teiid/events/EventDistributor.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -24,8 +24,41 @@
 
 import java.util.List;
 
+/**
+ * Distributes events across the Teiid cluster
+ */
 public interface EventDistributor {
+	
+	/**
+	 * Update the given materialized view row using the internal mat view name #MAT_VIEWFQN.
+	 * The tuple is expected to be in table order, which has the primary key first.
+	 * Deletes need to only send the key, not the entire row contents.
+	 * 
+	 * @param vdbName
+	 * @param vdbVersion
+	 * @param matViewFqn
+	 * @param tuple
+	 * @param delete
+	 */
 	void updateMatViewRow(String vdbName, int vdbVersion, String matViewFqn, List<?> tuple, boolean delete);
-	void schemaModification(String vdbName, int vdbVersion, String fqn);
-	void dataModification(String vdbName, int vdbVersion, String tableFqn);
+	
+	/**
+	 * Notify that the metadata has been changed for the given fqns.
+	 * A fqn has the form schema.entityname.
+	 * This typically implies that the costing metadata has changed, but may also indicate
+	 * a view definition has changed.
+	 * @param vdbName
+	 * @param vdbVersion
+	 * @param fqns
+	 */
+	void schemaModification(String vdbName, int vdbVersion, String... fqns);
+	
+	/**
+	 * Notify that the table data has changed.
+	 * A table fqn has the form schema.tablename.
+	 * @param vdbName
+	 * @param vdbVersion
+	 * @param tableFqns
+	 */
+	void dataModification(String vdbName, int vdbVersion, String... tableFqns);
 }

Modified: trunk/cache-jbosscache/pom.xml
===================================================================
--- trunk/cache-jbosscache/pom.xml	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/cache-jbosscache/pom.xml	2011-04-13 20:34:39 UTC (rev 3089)
@@ -14,7 +14,12 @@
             <groupId>org.jboss.teiid</groupId>
             <artifactId>teiid-common-core</artifactId>
             <scope>provided</scope>
-        </dependency>    
+        </dependency>
+        <dependency>
+            <groupId>org.jboss.teiid</groupId>
+            <artifactId>teiid-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
 		<dependency>
 			<groupId>org.jboss.teiid</groupId>
 			<artifactId>teiid-engine</artifactId>

Modified: trunk/console/src/main/resources/META-INF/rhq-plugin.xml
===================================================================
--- trunk/console/src/main/resources/META-INF/rhq-plugin.xml	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/console/src/main/resources/META-INF/rhq-plugin.xml	2011-04-13 20:34:39 UTC (rev 3089)
@@ -330,7 +330,11 @@
                 <c:simple-property name="RuntimeEngineDeployer.useDataRoles"
                     displayName="Data Roles Enabled"
                     description="Turn on role checking of resources based on the roles defined in VDB (default true)"
-                    required="false" readOnly="false" />                    
+                    required="false" readOnly="false" />
+                <c:simple-property name="RuntimeEngineDeployer.detectingChangeEvents"
+                    displayName="Detecting Change Events"
+                    description="Set to true for the engine to detect local change events. Should be disabled if using external change data capture tools. (default true)"
+                    required="false" readOnly="false" />
 			</c:group>
             
             <c:group name="ResultSetCacheConfig" displayName="ResultSet Cache Properties" hiddenByDefault="false">

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -171,7 +171,7 @@
 				}
 			} else if (o instanceof TempMetadataID) {
 				TempMetadataID tid = (TempMetadataID)o;
-				if (tid.getTableData().getLastModified() - modTime > this.creationTime) {
+				if ((data?tid.getTableData().getLastDataModification():tid.getTableData().getLastModified()) - modTime > this.creationTime) {
 					return false;
 				}
 			}

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -55,6 +55,7 @@
 	private CacheConfiguration preparedPlanCacheConfig = new CacheConfiguration();
 	private int maxODBCLobSizeAllowed = 5*1024*1024; // 5 MB
     private int userRequestSourceConcurrency = DEFAULT_USER_REQUEST_SOURCE_CONCURRENCY;
+    private boolean detectingChangeEvents = true;
     
     private transient AuthorizationValidator authorizationValidator;
     private transient MetadataProvider metadataProvider;
@@ -231,4 +232,13 @@
 		this.preparedPlanCacheConfig = preparedPlanCacheConfig;
 	}
 
+	public boolean isDetectingChangeEvents() {
+		return detectingChangeEvents;
+	}
+	
+	@ManagementProperty(description="Set to true for the engine to detect local change events. Should be disabled if using external change data capture tools. (default true)")
+	public void setDetectingChangeEvents(boolean detectingChangeEvents) {
+		this.detectingChangeEvents = detectingChangeEvents;
+	}
+
 }

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -715,7 +715,7 @@
         	matTables.setBufferManager(this.bufferManager);
         }
         
-        dataTierMgr = new TempTableDataManager(new DataTierManagerImpl(this,this.bufferService), this.bufferManager, this.processWorkerPool, this.rsCache, this.matTables, this.cacheFactory); 
+        dataTierMgr = new TempTableDataManager(new DataTierManagerImpl(this,this.bufferService, this.config.isDetectingChangeEvents()), this.bufferManager, this.processWorkerPool, this.rsCache, this.matTables, this.cacheFactory); 
 	}
 	
 	public void setBufferService(BufferService service) {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -52,6 +52,7 @@
 import org.teiid.dqp.message.AtomicRequestMessage;
 import org.teiid.dqp.message.RequestID;
 import org.teiid.dqp.service.BufferService;
+import org.teiid.events.EventDistributor;
 import org.teiid.metadata.AbstractMetadataRecord;
 import org.teiid.metadata.Column;
 import org.teiid.metadata.Datatype;
@@ -110,12 +111,27 @@
 	// Resources
 	private DQPCore requestMgr;
     private BufferService bufferService;
+    private EventDistributor eventDistributor;
+    private boolean detectChangeEvents;
 
-    public DataTierManagerImpl(DQPCore requestMgr, BufferService bufferService) {
+    public DataTierManagerImpl(DQPCore requestMgr, BufferService bufferService, boolean detectChangeEvents) {
 		this.requestMgr = requestMgr;
         this.bufferService = bufferService;
+        this.detectChangeEvents = detectChangeEvents;
 	}
     
+    public boolean detectChangeEvents() {
+		return detectChangeEvents;
+	}
+    
+    public void setEventDistributor(EventDistributor eventDistributor) {
+		this.eventDistributor = eventDistributor;
+	}
+    
+    public EventDistributor getEventDistributor() {
+		return eventDistributor;
+	}
+    
 	public TupleSource registerRequest(CommandContext context, Command command, String modelName, String connectorBindingId, int nodeID, int limit) throws TeiidComponentException, TeiidProcessingException {
 		RequestWorkItem workItem = requestMgr.getRequestWorkItem((RequestID)context.getProcessorID());
 		

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -23,6 +23,7 @@
 package org.teiid.dqp.internal.process;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -56,7 +57,13 @@
 import org.teiid.dqp.internal.process.DQPCore.FutureWork;
 import org.teiid.dqp.message.AtomicRequestMessage;
 import org.teiid.dqp.message.AtomicResultsMessage;
+import org.teiid.metadata.Table;
 import org.teiid.query.function.source.XMLSystemFunctions;
+import org.teiid.query.processor.relational.RelationalNodeUtil;
+import org.teiid.query.sql.lang.BatchedUpdateCommand;
+import org.teiid.query.sql.lang.Command;
+import org.teiid.query.sql.lang.ProcedureContainer;
+import org.teiid.query.sql.symbol.GroupSymbol;
 import org.teiid.query.sql.symbol.SingleElementSymbol;
 import org.teiid.translator.DataNotAvailableException;
 import org.teiid.translator.TranslatorException;
@@ -133,7 +140,7 @@
 		}, this, 100);
 	}
 
-	private List correctTypes(List row) throws TransformationException {
+	private List<?> correctTypes(List<Object> row) throws TransformationException {
 		//TODO: add a proper intermediate schema
 		for (int i = 0; i < row.size(); i++) {
 			Object value = row.get(i);
@@ -213,6 +220,25 @@
 	    			} else {
 	    				results = getResults();
 	    			}
+	    			//check for update events
+	    			if (index == 0 && this.dtm.detectChangeEvents()) {
+	    				Command command = aqr.getCommand();
+	    				ArrayList<String> updates = new ArrayList<String>();
+	    				int commandIndex = 0;
+	    				if (RelationalNodeUtil.isUpdate(command)) {
+	    					long ts = System.currentTimeMillis();
+	    					checkForUpdates(results, command, updates, commandIndex, ts);
+	    				} else if (command instanceof BatchedUpdateCommand) {
+	    					long ts = System.currentTimeMillis();
+	    					BatchedUpdateCommand bac = (BatchedUpdateCommand)command;
+	    					for (Command uc : bac.getUpdateCommands()) {
+	    						checkForUpdates(results, uc, updates, commandIndex++, ts);
+	    					}
+	    				}
+	    				if (this.dtm.getEventDistributor() != null && !updates.isEmpty()) {
+	    					this.dtm.getEventDistributor().dataModification(this.workItem.getDqpWorkContext().getVdbName(), this.workItem.getDqpWorkContext().getVdbVersion(), updates.toArray(new String[updates.size()]));
+	    				}
+	    			}
     			} catch (TranslatorException e) {
     				results = exceptionOccurred(e, true);
     			} catch (DataNotAvailableException e) {
@@ -241,6 +267,29 @@
     	}
     }
 
+	private void checkForUpdates(AtomicResultsMessage results, Command command,
+			ArrayList<String> updates, int commandIndex, long ts) {
+		if (!RelationalNodeUtil.isUpdate(command) || !(command instanceof ProcedureContainer)) {
+			return;
+		}
+		ProcedureContainer pc = (ProcedureContainer)aqr.getCommand();
+		GroupSymbol gs = pc.getGroup();
+		Integer zero = Integer.valueOf(0);
+		if (results.getResults().length <= commandIndex || zero.equals(results.getResults()[commandIndex].get(0))) {
+			return;
+		}
+		Object metadataId = gs.getMetadataID();
+		if (metadataId == null) {
+			return;
+		}
+		if (!(metadataId instanceof Table)) {
+			return;
+		} 
+		Table t = (Table)metadataId;
+		updates.add(t.getFullName());
+		t.setLastDataModification(ts);
+	}
+
 	private AtomicResultsMessage asynchGet()
 			throws BlockedException, TeiidProcessingException,
 			TeiidComponentException, TranslatorException {

Modified: trunk/engine/src/main/java/org/teiid/query/function/aggregate/ArrayAgg.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/function/aggregate/ArrayAgg.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/query/function/aggregate/ArrayAgg.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -45,6 +45,9 @@
 			this.result = new ArrayList<Object>();
 		}
 		this.result.add(input);
+		if (this.result.size() > 1000) {
+			throw new AssertionError("Exceeded the max allowable array size of 1000."); //$NON-NLS-1$
+		}
 	}
 
 	@Override

Modified: trunk/engine/src/main/java/org/teiid/query/metadata/TempMetadataID.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/metadata/TempMetadataID.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/query/metadata/TempMetadataID.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -33,7 +33,6 @@
 import org.teiid.query.sql.lang.CacheHint;
 import org.teiid.query.sql.symbol.SingleElementSymbol;
 
-
 /**
  * This class represents a temporary metadata ID.  A temporary metadata ID 
  * does not exist in a real metadata source.  Rather, it is used temporarily 
@@ -48,6 +47,8 @@
 	private static final long serialVersionUID = -1879211827339120135L;
 	private static final int LOCAL_CACHE_SIZE = 8;
 	
+	private static final int MOD_COUNT_FOR_COST_UPDATE = 8;
+	
 	public static class TableData {
 		Collection<TempMetadataID> accessPatterns;
 		List<TempMetadataID> elements;
@@ -58,15 +59,31 @@
 		CacheHint cacheHint;
 		List<List<TempMetadataID>> keys;
 		List<List<TempMetadataID>> indexes;
+		long lastDataModification;
 		long lastModified;
+		int modCount;
 		
+		public long getLastDataModification() {
+			return lastDataModification;
+		}
+		
+		public void dataModified(int updateCount) {
+			if (updateCount == 0) {
+				return;
+			}
+			long ts = System.currentTimeMillis();
+			modCount += updateCount;
+			if (modCount > MOD_COUNT_FOR_COST_UPDATE) {
+				this.lastModified = ts;
+				modCount = 0;
+			}
+			this.lastDataModification = ts;
+		}
+		
 		public long getLastModified() {
 			return lastModified;
 		}
 		
-		public void setLastModified(long lastModified) {
-			this.lastModified = lastModified;
-		}
 	}
 	
 	private static TableData DUMMY_DATA = new TableData();

Modified: trunk/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -262,9 +262,9 @@
     
     public boolean hasProcedure(String name) throws TeiidComponentException {
     	try {
-    		return getStoredProcedureInfoForProcedure(name) != null;
+    		return getStoredProcInfoDirect(name) != null;
     	} catch (QueryMetadataException e) {
-    		return false;
+    		return true;
     	}
     }
 

Modified: trunk/engine/src/main/java/org/teiid/query/processor/ProcessorDataManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/ProcessorDataManager.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/query/processor/ProcessorDataManager.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -26,6 +26,7 @@
 import org.teiid.common.buffer.TupleSource;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
+import org.teiid.events.EventDistributor;
 import org.teiid.query.sql.lang.Command;
 import org.teiid.query.util.CommandContext;
 
@@ -47,5 +48,7 @@
                                            String keyElementName,
                                            Object keyValue) throws BlockedException,
                                                            TeiidComponentException, TeiidProcessingException;
+    
+    void setEventDistributor(EventDistributor ed);
 
 }

Modified: trunk/engine/src/main/java/org/teiid/query/sql/lang/BatchedUpdateCommand.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/sql/lang/BatchedUpdateCommand.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/query/sql/lang/BatchedUpdateCommand.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -23,10 +23,9 @@
 package org.teiid.query.sql.lang;
 
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
+import org.teiid.query.sql.LanguageObject;
 import org.teiid.query.sql.LanguageVisitor;
 import org.teiid.query.sql.util.VariableContext;
 
@@ -38,42 +37,13 @@
  */
 public class BatchedUpdateCommand extends Command {
     
-    protected List commands;
+    protected List<Command> commands;
     private List<VariableContext> variableContexts; //processing state
     
-    /**
-     * Add sub command
-     * @param command Additional sub-command
-     */
-    public void addSubCommand(Command command) {
-        if(this.commands == null) {
-            this.commands = new ArrayList();
-        }
-        this.commands.add(command);
-    }
-
-    /**
-     * Add sub commands
-     * @param commands Additional sub-commands
-     */
-    public void addSubCommands(Collection commands) {
-        if(commands == null || commands.size() == 0) {
-            return;
-        }
-        
-        if(this.commands == null) {
-            this.commands = new ArrayList();
-        } 
-        this.commands.addAll(commands);    
-    }
-    
     /** 
      * @see org.teiid.query.sql.lang.Command#getSubCommands()
      */
-    public List getSubCommands() {
-        if(commands == null || commands.size() == 0) {
-            return Collections.EMPTY_LIST;
-        }
+    public List<Command> getSubCommands() {
         return commands;
     }
     
@@ -82,8 +52,8 @@
      * @param updateCommands
      * @since 4.2
      */
-    public BatchedUpdateCommand(List updateCommands) {
-        addSubCommands(updateCommands);
+    public BatchedUpdateCommand(List<? extends Command> updateCommands) {
+        this.commands = new ArrayList<Command>(updateCommands);
     }
     
     /**
@@ -91,7 +61,7 @@
      * @return
      * @since 4.2
      */
-    public List getUpdateCommands() {
+    public List<Command> getUpdateCommands() {
         return getSubCommands();
     }
 
@@ -123,10 +93,7 @@
      * @since 4.2
      */
     public Object clone() {
-        List clonedCommands = new ArrayList(commands.size());
-        for (int i = 0; i < commands.size(); i++) {
-            clonedCommands.add(((Command)commands.get(i)).clone());
-        }
+        List<Command> clonedCommands = LanguageObject.Util.deepClone(this.commands, Command.class);
         BatchedUpdateCommand copy = new BatchedUpdateCommand(clonedCommands);
         copyMetadataState(copy);
         return copy;
@@ -142,9 +109,9 @@
     public String toString() {
         StringBuffer val = new StringBuffer("BatchedUpdate{"); //$NON-NLS-1$
         if (commands != null && commands.size() > 0) {
-            val.append(getCommandToken((Command)commands.get(0)));
+            val.append(getCommandToken(commands.get(0)));
             for (int i = 1; i < commands.size(); i++) {
-                val.append(',').append(getCommandToken((Command)commands.get(i)));
+                val.append(',').append(getCommandToken(commands.get(i)));
             }
         }
         val.append('}');

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -482,7 +482,7 @@
 	}
 	
 	public int truncate() {
-		this.tid.getTableData().setLastModified(System.currentTimeMillis());
+		this.tid.getTableData().dataModified(tree.getRowCount());
 		return tree.truncate();
 	}
 	
@@ -513,7 +513,7 @@
         UpdateProcessor up = new InsertUpdateProcessor(tuples, rowId != null, shouldProject?indexes:null);
         int updateCount = up.process();
         tid.setCardinality(tree.getRowCount());
-        tid.getTableData().setLastModified(System.currentTimeMillis());
+        tid.getTableData().dataModified(updateCount);
         return CollectionTupleSource.createUpdateCountTupleSource(updateCount);
     }
 	
@@ -579,9 +579,7 @@
 			
 		};
 		int updateCount = up.process();
-		if (updateCount > 0) {
-			tid.getTableData().setLastModified(System.currentTimeMillis());
-		}
+		tid.getTableData().dataModified(updateCount);
 		return CollectionTupleSource.createUpdateCountTupleSource(updateCount);
 	}
 
@@ -614,9 +612,7 @@
 		};
 		int updateCount = up.process();
 		tid.setCardinality(tree.getRowCount());
-		if (updateCount > 0) {
-			tid.getTableData().setLastModified(System.currentTimeMillis());
-		}
+		tid.getTableData().dataModified(updateCount);
 		return CollectionTupleSource.createUpdateCountTupleSource(updateCount);
 	}
 	
@@ -646,7 +642,7 @@
 						index.tree.remove(tuple);
 					}
 				}
-				tid.getTableData().setLastModified(System.currentTimeMillis());
+				tid.getTableData().dataModified(1);
 				return result;
 			} 
 			List<?> result = tree.insert(tuple, InsertMode.UPDATE, -1);
@@ -656,7 +652,7 @@
 					index.tree.insert(tuple, InsertMode.UPDATE, -1);
 				}
 			}
-			tid.getTableData().setLastModified(System.currentTimeMillis());
+			tid.getTableData().dataModified(1);
 			return result;
 		} finally {
 			lock.writeLock().unlock();

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -161,6 +161,7 @@
     
     public void setEventDistributor(EventDistributor eventDistributor) {
 		this.eventDistributor = eventDistributor;
+		this.processorDataManager.setEventDistributor(eventDistributor);
 	}
     
 	public TupleSource registerRequest(

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -37,6 +37,7 @@
 import org.mockito.Mockito;
 import org.teiid.CommandContext;
 import org.teiid.api.exception.query.QueryResolverException;
+import org.teiid.cache.CacheConfiguration;
 import org.teiid.cache.DefaultCacheFactory;
 import org.teiid.client.RequestMessage;
 import org.teiid.client.ResultsMessage;
@@ -87,8 +88,10 @@
         config = new DQPConfiguration();
         config.setMaxActivePlans(1);
         config.setUserRequestSourceConcurrency(2);
+        config.setResultsetCacheConfig(new CacheConfiguration());
         core.start(config);
         core.getPrepPlanCache().setModTime(1);
+        core.getRsCache().setModTime(1);
     }
     
     @After public void tearDown() throws Exception {
@@ -376,6 +379,21 @@
 
         Thread.sleep(100);
 
+        //perform a minor update, we should still use the cache
+        sql = "delete from #temp where a12345 = '11'"; //$NON-NLS-1$
+        reqMsg = exampleRequestMessage(sql);
+        rm = execute(userName, sessionid, reqMsg);
+        assertEquals(1, rm.getResults().length); //$NON-NLS-1$
+
+        sql = "select * from #temp"; //$NON-NLS-1$
+        reqMsg = exampleRequestMessage(sql);
+        reqMsg.setStatementType(StatementType.PREPARED);
+        rm = execute(userName, sessionid, reqMsg);
+        assertEquals(10, rm.getResults().length); //$NON-NLS-1$
+        
+        assertEquals(2, this.core.getPrepPlanCache().getCacheHitCount());
+
+        //perform a minor update, we will purge the plan
         sql = "delete from #temp"; //$NON-NLS-1$
         reqMsg = exampleRequestMessage(sql);
         rm = execute(userName, sessionid, reqMsg);
@@ -387,9 +405,42 @@
         rm = execute(userName, sessionid, reqMsg);
         assertEquals(0, rm.getResults().length); //$NON-NLS-1$
         
-        assertEquals(1, this.core.getPrepPlanCache().getCacheHitCount());
+        assertEquals(2, this.core.getPrepPlanCache().getCacheHitCount());
     }
     
+    @Test public void testRsCacheInvalidation() throws Exception {
+        String sql = "select * FROM vqt.SmallB"; //$NON-NLS-1$
+        String userName = "1"; //$NON-NLS-1$
+        int sessionid = 1; //$NON-NLS-1$
+        RequestMessage reqMsg = exampleRequestMessage(sql);
+        reqMsg.setUseResultSetCache(true);
+        ResultsMessage rm = execute(userName, sessionid, reqMsg);
+        assertEquals(10, rm.getResults().length); //$NON-NLS-1$
+                
+        sql = "select * FROM vqt.SmallB"; //$NON-NLS-1$
+        reqMsg = exampleRequestMessage(sql);
+        reqMsg.setUseResultSetCache(true);
+        rm = execute(userName, sessionid, reqMsg);
+        assertEquals(10, rm.getResults().length); //$NON-NLS-1$
+        
+        assertEquals(1, this.core.getRsCache().getCacheHitCount());
+
+        Thread.sleep(100);
+
+        sql = "delete from bqt1.smalla"; //$NON-NLS-1$
+        reqMsg = exampleRequestMessage(sql);
+        rm = execute(userName, sessionid, reqMsg);
+        assertEquals(1, rm.getResults().length); //$NON-NLS-1$
+        
+        sql = "select * FROM vqt.SmallB"; //$NON-NLS-1$
+        reqMsg = exampleRequestMessage(sql);
+        reqMsg.setUseResultSetCache(true);
+        rm = execute(userName, sessionid, reqMsg);
+        assertEquals(10, rm.getResults().length); //$NON-NLS-1$
+        
+        assertEquals(1, this.core.getRsCache().getCacheHitCount());
+    }
+    
 	public void helpTestVisibilityFails(String sql) throws Exception {
         RequestMessage reqMsg = exampleRequestMessage(sql); 
         reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -87,7 +87,7 @@
         
         
         dtm = new DataTierManagerImpl(rm,
-                                  bs);
+                                  bs, true);
         command = helpGetCommand(sql, metadata);
         
         RequestMessage original = new RequestMessage();

Modified: trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -25,6 +25,7 @@
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +39,7 @@
 import org.teiid.dqp.message.AtomicResultsMessage;
 import org.teiid.query.optimizer.TestOptimizer;
 import org.teiid.query.optimizer.capabilities.SourceCapabilities;
+import org.teiid.query.processor.relational.RelationalNodeUtil;
 import org.teiid.query.sql.symbol.SingleElementSymbol;
 import org.teiid.translator.DataNotAvailableException;
 import org.teiid.translator.TranslatorException;
@@ -83,8 +85,11 @@
     @Override
     public ConnectorWork registerRequest(AtomicRequestMessage message)
     		throws TeiidComponentException {
-        List projectedSymbols = (message.getCommand()).getProjectedSymbols();               
+        List projectedSymbols = (message.getCommand()).getProjectedSymbols(); 
         List[] results = createResults(projectedSymbols);
+        if (RelationalNodeUtil.isUpdate(message.getCommand())) {
+        	results = new List[] {Arrays.asList(1)};
+        }
                 
         final AtomicResultsMessage msg = ConnectorWorkItem.createResultsMessage(results, projectedSymbols);
         msg.setFinalRow(rows);

Modified: trunk/engine/src/test/java/org/teiid/query/processor/FakeDataManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/FakeDataManager.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/test/java/org/teiid/query/processor/FakeDataManager.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -34,6 +34,7 @@
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.TupleSource;
 import org.teiid.core.TeiidComponentException;
+import org.teiid.events.EventDistributor;
 import org.teiid.logging.LogManager;
 import org.teiid.query.eval.Evaluator;
 import org.teiid.query.metadata.QueryMetadataInterface;
@@ -388,5 +389,8 @@
 		this.registerTuples(group.getMetadataID(), elementSymbols, tuples);
 	}
 
+	@Override
+	public void setEventDistributor(EventDistributor ed) {
+	}
 
 }
\ No newline at end of file

Modified: trunk/engine/src/test/java/org/teiid/query/processor/HardcodedDataManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/HardcodedDataManager.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/test/java/org/teiid/query/processor/HardcodedDataManager.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -32,6 +32,7 @@
 import org.teiid.common.buffer.TupleSource;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.dqp.internal.datamgr.LanguageBridgeFactory;
+import org.teiid.events.EventDistributor;
 import org.teiid.query.metadata.QueryMetadataInterface;
 import org.teiid.query.sql.lang.Command;
 import org.teiid.query.util.CommandContext;
@@ -167,5 +168,9 @@
 	public void clearCodeTables() {
 		
 	}
+	
+	@Override
+	public void setEventDistributor(EventDistributor ed) {
+	}
 
 }

Modified: trunk/engine/src/test/java/org/teiid/query/processor/relational/TestBatchedUpdateNode.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/relational/TestBatchedUpdateNode.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/test/java/org/teiid/query/processor/relational/TestBatchedUpdateNode.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -36,6 +36,7 @@
 import org.teiid.common.buffer.TupleBatch;
 import org.teiid.common.buffer.TupleSource;
 import org.teiid.core.TeiidComponentException;
+import org.teiid.events.EventDistributor;
 import org.teiid.query.metadata.QueryMetadataInterface;
 import org.teiid.query.optimizer.TestBatchedUpdatePlanner;
 import org.teiid.query.processor.ProcessorDataManager;
@@ -219,6 +220,10 @@
             actualCommands.add(command);
             return new FakeTupleSource(numExecutedCommands);
         }
+    	@Override
+    	public void setEventDistributor(EventDistributor ed) {
+    	}
+
     }
     private static final class FakeTupleSource implements TupleSource {
         private int currentTuple = 0;

Modified: trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -36,6 +36,7 @@
 import org.teiid.common.buffer.TupleSource;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
+import org.teiid.events.EventDistributor;
 import org.teiid.query.eval.Evaluator;
 import org.teiid.query.processor.FakeTupleSource;
 import org.teiid.query.processor.ProcessorDataManager;
@@ -184,6 +185,9 @@
             Object val = row.get(0);
             assertEquals(new Integer(value), val);
         }
+    	@Override
+    	public void setEventDistributor(EventDistributor ed) {
+    	}
     }
     
     private static final class FakeDataTupleSource implements TupleSource {

Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -659,7 +659,7 @@
 	}
 	
 	@Override
-	public void dataModification(String vdbName, int vdbVersion, String tableFqn) {
+	public void dataModification(String vdbName, int vdbVersion, String... tableFqns) {
 		VDBMetaData vdb = this.vdbRepository.getVDB(vdbName, vdbVersion);
 		if (vdb == null) {
 			return;
@@ -668,16 +668,18 @@
 		if (tm == null) {
 			return;
 		}
-		try {
-			Table table = tm.getGroupID(tableFqn);
-			table.setLastDataModification(System.currentTimeMillis());
-		} catch (TeiidException e) {
-			LogManager.logError(LogConstants.CTX_DQP, e, QueryPlugin.Util.getString("DQPCore.unable_to_process_event")); //$NON-NLS-1$
+		for (String tableFqn:tableFqns) {
+			try {
+				Table table = tm.getGroupID(tableFqn);
+				table.setLastDataModification(System.currentTimeMillis());
+			} catch (TeiidException e) {
+				LogManager.logError(LogConstants.CTX_DQP, e, QueryPlugin.Util.getString("DQPCore.unable_to_process_event")); //$NON-NLS-1$
+			}
 		}
 	}
 	
 	@Override
-	public void schemaModification(String vdbName, int vdbVersion, String fqn) {
+	public void schemaModification(String vdbName, int vdbVersion, String... fqns) {
 		VDBMetaData vdb = this.vdbRepository.getVDB(vdbName, vdbVersion);
 		if (vdb == null) {
 			return;
@@ -686,11 +688,13 @@
 		if (tm == null) {
 			return;
 		}
-		try {
-			Table table = tm.getGroupID(fqn);
-			table.setLastModified(System.currentTimeMillis());
-		} catch (TeiidException e) {
-			LogManager.logError(LogConstants.CTX_DQP, e, QueryPlugin.Util.getString("DQPCore.unable_to_process_event")); //$NON-NLS-1$
+		for (String fqn:fqns) {
+			try {
+				Table table = tm.getGroupID(fqn);
+				table.setLastModified(System.currentTimeMillis());
+			} catch (TeiidException e) {
+				LogManager.logError(LogConstants.CTX_DQP, e, QueryPlugin.Util.getString("DQPCore.unable_to_process_event")); //$NON-NLS-1$
+			}
 		}
 	}
 }

Modified: trunk/test-integration/common/src/test/java/org/teiid/dqp/internal/process/TestXMLTypeTranslations.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/dqp/internal/process/TestXMLTypeTranslations.java	2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/test-integration/common/src/test/java/org/teiid/dqp/internal/process/TestXMLTypeTranslations.java	2011-04-13 20:34:39 UTC (rev 3089)
@@ -113,7 +113,7 @@
         Mockito.stub(rwi.getDqpWorkContext()).toReturn(workContext);
         
         Mockito.stub(core.getRequestWorkItem((RequestID)Mockito.anyObject())).toReturn(rwi);
-        DataTierManagerImpl dataMgr = new DataTierManagerImpl(core, null);
+        DataTierManagerImpl dataMgr = new DataTierManagerImpl(core, null, true);
         doProcess(metadata,  
                 sql, 
                 finder, dataMgr , new List[] {Arrays.asList(new String(ObjectConverterUtil.convertToByteArray(new FileInputStream(UnitTestUtil.getTestDataFile("test-schema.xsd")))))}, DEBUG); //$NON-NLS-1$



More information about the teiid-commits mailing list