Author: shawkins
Date: 2011-04-13 16:34:39 -0400 (Wed, 13 Apr 2011)
New Revision: 3089
Added:
trunk/api/src/main/java/org/teiid/events/
Removed:
trunk/engine/src/main/java/org/teiid/events/
Modified:
trunk/api/src/main/java/org/teiid/events/EventDistributor.java
trunk/cache-jbosscache/pom.xml
trunk/console/src/main/resources/META-INF/rhq-plugin.xml
trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/query/function/aggregate/ArrayAgg.java
trunk/engine/src/main/java/org/teiid/query/metadata/TempMetadataID.java
trunk/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
trunk/engine/src/main/java/org/teiid/query/processor/ProcessorDataManager.java
trunk/engine/src/main/java/org/teiid/query/sql/lang/BatchedUpdateCommand.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
trunk/engine/src/test/java/org/teiid/query/processor/FakeDataManager.java
trunk/engine/src/test/java/org/teiid/query/processor/HardcodedDataManager.java
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestBatchedUpdateNode.java
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
trunk/test-integration/common/src/test/java/org/teiid/dqp/internal/process/TestXMLTypeTranslations.java
Log:
TEIID-1507 refining previous check-in and adding detection of local data change events
Modified: trunk/api/src/main/java/org/teiid/events/EventDistributor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/events/EventDistributor.java 2011-04-12 23:20:51
UTC (rev 3084)
+++ trunk/api/src/main/java/org/teiid/events/EventDistributor.java 2011-04-13 20:34:39 UTC
(rev 3089)
@@ -24,8 +24,41 @@
import java.util.List;
+/**
+ * Distributes events across the Teiid cluster
+ */
public interface EventDistributor {
+
+ /**
+ * Update the given materialized view row using the internal mat view name
#MAT_VIEWFQN.
+ * The tuple is expected to be in table order, which has the primary key first.
+ * Deletes need to only send the key, not the entire row contents.
+ *
+ * @param vdbName
+ * @param vdbVersion
+ * @param matViewFqn
+ * @param tuple
+ * @param delete
+ */
void updateMatViewRow(String vdbName, int vdbVersion, String matViewFqn, List<?>
tuple, boolean delete);
- void schemaModification(String vdbName, int vdbVersion, String fqn);
- void dataModification(String vdbName, int vdbVersion, String tableFqn);
+
+ /**
+ * Notify that the metadata has been changed for the given fqns.
+ * A fqn has the form schema.entityname.
+ * This typically implies that the costing metadata has changed, but may also indicate
+ * a view definition has changed.
+ * @param vdbName
+ * @param vdbVersion
+ * @param fqns
+ */
+ void schemaModification(String vdbName, int vdbVersion, String... fqns);
+
+ /**
+ * Notify that the table data has changed.
+ * A table fqn has the form schema.tablename.
+ * @param vdbName
+ * @param vdbVersion
+ * @param tableFqns
+ */
+ void dataModification(String vdbName, int vdbVersion, String... tableFqns);
}
Modified: trunk/cache-jbosscache/pom.xml
===================================================================
--- trunk/cache-jbosscache/pom.xml 2011-04-13 18:47:33 UTC (rev 3088)
+++ trunk/cache-jbosscache/pom.xml 2011-04-13 20:34:39 UTC (rev 3089)
@@ -14,7 +14,12 @@
<groupId>org.jboss.teiid</groupId>
<artifactId>teiid-common-core</artifactId>
<scope>provided</scope>
- </dependency>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.teiid</groupId>
+ <artifactId>teiid-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.jboss.teiid</groupId>
<artifactId>teiid-engine</artifactId>
Modified: trunk/console/src/main/resources/META-INF/rhq-plugin.xml
===================================================================
--- trunk/console/src/main/resources/META-INF/rhq-plugin.xml 2011-04-13 18:47:33 UTC (rev
3088)
+++ trunk/console/src/main/resources/META-INF/rhq-plugin.xml 2011-04-13 20:34:39 UTC (rev
3089)
@@ -330,7 +330,11 @@
<c:simple-property
name="RuntimeEngineDeployer.useDataRoles"
displayName="Data Roles Enabled"
description="Turn on role checking of resources based on the
roles defined in VDB (default true)"
- required="false" readOnly="false" />
+ required="false" readOnly="false" />
+ <c:simple-property
name="RuntimeEngineDeployer.detectingChangeEvents"
+ displayName="Detecting Change Events"
+ description="Set to true for the engine to detect local change
events. Should be disabled if using external change data capture tools. (default
true)"
+ required="false" readOnly="false" />
</c:group>
<c:group name="ResultSetCacheConfig" displayName="ResultSet
Cache Properties" hiddenByDefault="false">
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java 2011-04-13
18:47:33 UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/AccessInfo.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -171,7 +171,7 @@
}
} else if (o instanceof TempMetadataID) {
TempMetadataID tid = (TempMetadataID)o;
- if (tid.getTableData().getLastModified() - modTime > this.creationTime) {
+ if
((data?tid.getTableData().getLastDataModification():tid.getTableData().getLastModified())
- modTime > this.creationTime) {
return false;
}
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java 2011-04-13
18:47:33 UTC (rev 3088)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPConfiguration.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -55,6 +55,7 @@
private CacheConfiguration preparedPlanCacheConfig = new CacheConfiguration();
private int maxODBCLobSizeAllowed = 5*1024*1024; // 5 MB
private int userRequestSourceConcurrency = DEFAULT_USER_REQUEST_SOURCE_CONCURRENCY;
+ private boolean detectingChangeEvents = true;
private transient AuthorizationValidator authorizationValidator;
private transient MetadataProvider metadataProvider;
@@ -231,4 +232,13 @@
this.preparedPlanCacheConfig = preparedPlanCacheConfig;
}
+ public boolean isDetectingChangeEvents() {
+ return detectingChangeEvents;
+ }
+
+ @ManagementProperty(description="Set to true for the engine to detect local change
events. Should be disabled if using external change data capture tools. (default
true)")
+ public void setDetectingChangeEvents(boolean detectingChangeEvents) {
+ this.detectingChangeEvents = detectingChangeEvents;
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-04-13
18:47:33 UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -715,7 +715,7 @@
matTables.setBufferManager(this.bufferManager);
}
- dataTierMgr = new TempTableDataManager(new
DataTierManagerImpl(this,this.bufferService), this.bufferManager, this.processWorkerPool,
this.rsCache, this.matTables, this.cacheFactory);
+ dataTierMgr = new TempTableDataManager(new
DataTierManagerImpl(this,this.bufferService, this.config.isDetectingChangeEvents()),
this.bufferManager, this.processWorkerPool, this.rsCache, this.matTables,
this.cacheFactory);
}
public void setBufferService(BufferService service) {
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2011-04-13
18:47:33 UTC (rev 3088)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -52,6 +52,7 @@
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.RequestID;
import org.teiid.dqp.service.BufferService;
+import org.teiid.events.EventDistributor;
import org.teiid.metadata.AbstractMetadataRecord;
import org.teiid.metadata.Column;
import org.teiid.metadata.Datatype;
@@ -110,12 +111,27 @@
// Resources
private DQPCore requestMgr;
private BufferService bufferService;
+ private EventDistributor eventDistributor;
+ private boolean detectChangeEvents;
- public DataTierManagerImpl(DQPCore requestMgr, BufferService bufferService) {
+ public DataTierManagerImpl(DQPCore requestMgr, BufferService bufferService, boolean
detectChangeEvents) {
this.requestMgr = requestMgr;
this.bufferService = bufferService;
+ this.detectChangeEvents = detectChangeEvents;
}
+ public boolean detectChangeEvents() {
+ return detectChangeEvents;
+ }
+
+ public void setEventDistributor(EventDistributor eventDistributor) {
+ this.eventDistributor = eventDistributor;
+ }
+
+ public EventDistributor getEventDistributor() {
+ return eventDistributor;
+ }
+
public TupleSource registerRequest(CommandContext context, Command command, String
modelName, String connectorBindingId, int nodeID, int limit) throws
TeiidComponentException, TeiidProcessingException {
RequestWorkItem workItem =
requestMgr.getRequestWorkItem((RequestID)context.getProcessorID());
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2011-04-13
18:47:33 UTC (rev 3088)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -23,6 +23,7 @@
package org.teiid.dqp.internal.process;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
@@ -56,7 +57,13 @@
import org.teiid.dqp.internal.process.DQPCore.FutureWork;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.AtomicResultsMessage;
+import org.teiid.metadata.Table;
import org.teiid.query.function.source.XMLSystemFunctions;
+import org.teiid.query.processor.relational.RelationalNodeUtil;
+import org.teiid.query.sql.lang.BatchedUpdateCommand;
+import org.teiid.query.sql.lang.Command;
+import org.teiid.query.sql.lang.ProcedureContainer;
+import org.teiid.query.sql.symbol.GroupSymbol;
import org.teiid.query.sql.symbol.SingleElementSymbol;
import org.teiid.translator.DataNotAvailableException;
import org.teiid.translator.TranslatorException;
@@ -133,7 +140,7 @@
}, this, 100);
}
- private List correctTypes(List row) throws TransformationException {
+ private List<?> correctTypes(List<Object> row) throws
TransformationException {
//TODO: add a proper intermediate schema
for (int i = 0; i < row.size(); i++) {
Object value = row.get(i);
@@ -213,6 +220,25 @@
} else {
results = getResults();
}
+ //check for update events
+ if (index == 0 && this.dtm.detectChangeEvents()) {
+ Command command = aqr.getCommand();
+ ArrayList<String> updates = new ArrayList<String>();
+ int commandIndex = 0;
+ if (RelationalNodeUtil.isUpdate(command)) {
+ long ts = System.currentTimeMillis();
+ checkForUpdates(results, command, updates, commandIndex, ts);
+ } else if (command instanceof BatchedUpdateCommand) {
+ long ts = System.currentTimeMillis();
+ BatchedUpdateCommand bac = (BatchedUpdateCommand)command;
+ for (Command uc : bac.getUpdateCommands()) {
+ checkForUpdates(results, uc, updates, commandIndex++, ts);
+ }
+ }
+ if (this.dtm.getEventDistributor() != null && !updates.isEmpty()) {
+
this.dtm.getEventDistributor().dataModification(this.workItem.getDqpWorkContext().getVdbName(),
this.workItem.getDqpWorkContext().getVdbVersion(), updates.toArray(new
String[updates.size()]));
+ }
+ }
} catch (TranslatorException e) {
results = exceptionOccurred(e, true);
} catch (DataNotAvailableException e) {
@@ -241,6 +267,29 @@
}
}
+ private void checkForUpdates(AtomicResultsMessage results, Command command,
+ ArrayList<String> updates, int commandIndex, long ts) {
+ if (!RelationalNodeUtil.isUpdate(command) || !(command instanceof ProcedureContainer))
{
+ return;
+ }
+ ProcedureContainer pc = (ProcedureContainer)aqr.getCommand();
+ GroupSymbol gs = pc.getGroup();
+ Integer zero = Integer.valueOf(0);
+ if (results.getResults().length <= commandIndex ||
zero.equals(results.getResults()[commandIndex].get(0))) {
+ return;
+ }
+ Object metadataId = gs.getMetadataID();
+ if (metadataId == null) {
+ return;
+ }
+ if (!(metadataId instanceof Table)) {
+ return;
+ }
+ Table t = (Table)metadataId;
+ updates.add(t.getFullName());
+ t.setLastDataModification(ts);
+ }
+
private AtomicResultsMessage asynchGet()
throws BlockedException, TeiidProcessingException,
TeiidComponentException, TranslatorException {
Modified: trunk/engine/src/main/java/org/teiid/query/function/aggregate/ArrayAgg.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/function/aggregate/ArrayAgg.java 2011-04-13
18:47:33 UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/query/function/aggregate/ArrayAgg.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -45,6 +45,9 @@
this.result = new ArrayList<Object>();
}
this.result.add(input);
+ if (this.result.size() > 1000) {
+ throw new AssertionError("Exceeded the max allowable array size of 1000.");
//$NON-NLS-1$
+ }
}
@Override
Modified: trunk/engine/src/main/java/org/teiid/query/metadata/TempMetadataID.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/metadata/TempMetadataID.java 2011-04-13
18:47:33 UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/query/metadata/TempMetadataID.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -33,7 +33,6 @@
import org.teiid.query.sql.lang.CacheHint;
import org.teiid.query.sql.symbol.SingleElementSymbol;
-
/**
* This class represents a temporary metadata ID. A temporary metadata ID
* does not exist in a real metadata source. Rather, it is used temporarily
@@ -48,6 +47,8 @@
private static final long serialVersionUID = -1879211827339120135L;
private static final int LOCAL_CACHE_SIZE = 8;
+ private static final int MOD_COUNT_FOR_COST_UPDATE = 8;
+
public static class TableData {
Collection<TempMetadataID> accessPatterns;
List<TempMetadataID> elements;
@@ -58,15 +59,31 @@
CacheHint cacheHint;
List<List<TempMetadataID>> keys;
List<List<TempMetadataID>> indexes;
+ long lastDataModification;
long lastModified;
+ int modCount;
+ public long getLastDataModification() {
+ return lastDataModification;
+ }
+
+ public void dataModified(int updateCount) {
+ if (updateCount == 0) {
+ return;
+ }
+ long ts = System.currentTimeMillis();
+ modCount += updateCount;
+ if (modCount > MOD_COUNT_FOR_COST_UPDATE) {
+ this.lastModified = ts;
+ modCount = 0;
+ }
+ this.lastDataModification = ts;
+ }
+
public long getLastModified() {
return lastModified;
}
- public void setLastModified(long lastModified) {
- this.lastModified = lastModified;
- }
}
private static TableData DUMMY_DATA = new TableData();
Modified: trunk/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java 2011-04-13
18:47:33 UTC (rev 3088)
+++
trunk/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -262,9 +262,9 @@
public boolean hasProcedure(String name) throws TeiidComponentException {
try {
- return getStoredProcedureInfoForProcedure(name) != null;
+ return getStoredProcInfoDirect(name) != null;
} catch (QueryMetadataException e) {
- return false;
+ return true;
}
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/ProcessorDataManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/ProcessorDataManager.java 2011-04-13
18:47:33 UTC (rev 3088)
+++
trunk/engine/src/main/java/org/teiid/query/processor/ProcessorDataManager.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -26,6 +26,7 @@
import org.teiid.common.buffer.TupleSource;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
+import org.teiid.events.EventDistributor;
import org.teiid.query.sql.lang.Command;
import org.teiid.query.util.CommandContext;
@@ -47,5 +48,7 @@
String keyElementName,
Object keyValue) throws BlockedException,
TeiidComponentException,
TeiidProcessingException;
+
+ void setEventDistributor(EventDistributor ed);
}
Modified: trunk/engine/src/main/java/org/teiid/query/sql/lang/BatchedUpdateCommand.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/sql/lang/BatchedUpdateCommand.java 2011-04-13
18:47:33 UTC (rev 3088)
+++
trunk/engine/src/main/java/org/teiid/query/sql/lang/BatchedUpdateCommand.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -23,10 +23,9 @@
package org.teiid.query.sql.lang;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.List;
+import org.teiid.query.sql.LanguageObject;
import org.teiid.query.sql.LanguageVisitor;
import org.teiid.query.sql.util.VariableContext;
@@ -38,42 +37,13 @@
*/
public class BatchedUpdateCommand extends Command {
- protected List commands;
+ protected List<Command> commands;
private List<VariableContext> variableContexts; //processing state
- /**
- * Add sub command
- * @param command Additional sub-command
- */
- public void addSubCommand(Command command) {
- if(this.commands == null) {
- this.commands = new ArrayList();
- }
- this.commands.add(command);
- }
-
- /**
- * Add sub commands
- * @param commands Additional sub-commands
- */
- public void addSubCommands(Collection commands) {
- if(commands == null || commands.size() == 0) {
- return;
- }
-
- if(this.commands == null) {
- this.commands = new ArrayList();
- }
- this.commands.addAll(commands);
- }
-
/**
* @see org.teiid.query.sql.lang.Command#getSubCommands()
*/
- public List getSubCommands() {
- if(commands == null || commands.size() == 0) {
- return Collections.EMPTY_LIST;
- }
+ public List<Command> getSubCommands() {
return commands;
}
@@ -82,8 +52,8 @@
* @param updateCommands
* @since 4.2
*/
- public BatchedUpdateCommand(List updateCommands) {
- addSubCommands(updateCommands);
+ public BatchedUpdateCommand(List<? extends Command> updateCommands) {
+ this.commands = new ArrayList<Command>(updateCommands);
}
/**
@@ -91,7 +61,7 @@
* @return
* @since 4.2
*/
- public List getUpdateCommands() {
+ public List<Command> getUpdateCommands() {
return getSubCommands();
}
@@ -123,10 +93,7 @@
* @since 4.2
*/
public Object clone() {
- List clonedCommands = new ArrayList(commands.size());
- for (int i = 0; i < commands.size(); i++) {
- clonedCommands.add(((Command)commands.get(i)).clone());
- }
+ List<Command> clonedCommands = LanguageObject.Util.deepClone(this.commands,
Command.class);
BatchedUpdateCommand copy = new BatchedUpdateCommand(clonedCommands);
copyMetadataState(copy);
return copy;
@@ -142,9 +109,9 @@
public String toString() {
StringBuffer val = new StringBuffer("BatchedUpdate{"); //$NON-NLS-1$
if (commands != null && commands.size() > 0) {
- val.append(getCommandToken((Command)commands.get(0)));
+ val.append(getCommandToken(commands.get(0)));
for (int i = 1; i < commands.size(); i++) {
-
val.append(',').append(getCommandToken((Command)commands.get(i)));
+ val.append(',').append(getCommandToken(commands.get(i)));
}
}
val.append('}');
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java 2011-04-13 18:47:33
UTC (rev 3088)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java 2011-04-13 20:34:39
UTC (rev 3089)
@@ -482,7 +482,7 @@
}
public int truncate() {
- this.tid.getTableData().setLastModified(System.currentTimeMillis());
+ this.tid.getTableData().dataModified(tree.getRowCount());
return tree.truncate();
}
@@ -513,7 +513,7 @@
UpdateProcessor up = new InsertUpdateProcessor(tuples, rowId != null,
shouldProject?indexes:null);
int updateCount = up.process();
tid.setCardinality(tree.getRowCount());
- tid.getTableData().setLastModified(System.currentTimeMillis());
+ tid.getTableData().dataModified(updateCount);
return CollectionTupleSource.createUpdateCountTupleSource(updateCount);
}
@@ -579,9 +579,7 @@
};
int updateCount = up.process();
- if (updateCount > 0) {
- tid.getTableData().setLastModified(System.currentTimeMillis());
- }
+ tid.getTableData().dataModified(updateCount);
return CollectionTupleSource.createUpdateCountTupleSource(updateCount);
}
@@ -614,9 +612,7 @@
};
int updateCount = up.process();
tid.setCardinality(tree.getRowCount());
- if (updateCount > 0) {
- tid.getTableData().setLastModified(System.currentTimeMillis());
- }
+ tid.getTableData().dataModified(updateCount);
return CollectionTupleSource.createUpdateCountTupleSource(updateCount);
}
@@ -646,7 +642,7 @@
index.tree.remove(tuple);
}
}
- tid.getTableData().setLastModified(System.currentTimeMillis());
+ tid.getTableData().dataModified(1);
return result;
}
List<?> result = tree.insert(tuple, InsertMode.UPDATE, -1);
@@ -656,7 +652,7 @@
index.tree.insert(tuple, InsertMode.UPDATE, -1);
}
}
- tid.getTableData().setLastModified(System.currentTimeMillis());
+ tid.getTableData().dataModified(1);
return result;
} finally {
lock.writeLock().unlock();
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-04-13
18:47:33 UTC (rev 3088)
+++
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -161,6 +161,7 @@
public void setEventDistributor(EventDistributor eventDistributor) {
this.eventDistributor = eventDistributor;
+ this.processorDataManager.setEventDistributor(eventDistributor);
}
public TupleSource registerRequest(
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-04-13
18:47:33 UTC (rev 3088)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -37,6 +37,7 @@
import org.mockito.Mockito;
import org.teiid.CommandContext;
import org.teiid.api.exception.query.QueryResolverException;
+import org.teiid.cache.CacheConfiguration;
import org.teiid.cache.DefaultCacheFactory;
import org.teiid.client.RequestMessage;
import org.teiid.client.ResultsMessage;
@@ -87,8 +88,10 @@
config = new DQPConfiguration();
config.setMaxActivePlans(1);
config.setUserRequestSourceConcurrency(2);
+ config.setResultsetCacheConfig(new CacheConfiguration());
core.start(config);
core.getPrepPlanCache().setModTime(1);
+ core.getRsCache().setModTime(1);
}
@After public void tearDown() throws Exception {
@@ -376,6 +379,21 @@
Thread.sleep(100);
+ //perform a minor update, we should still use the cache
+ sql = "delete from #temp where a12345 = '11'"; //$NON-NLS-1$
+ reqMsg = exampleRequestMessage(sql);
+ rm = execute(userName, sessionid, reqMsg);
+ assertEquals(1, rm.getResults().length); //$NON-NLS-1$
+
+ sql = "select * from #temp"; //$NON-NLS-1$
+ reqMsg = exampleRequestMessage(sql);
+ reqMsg.setStatementType(StatementType.PREPARED);
+ rm = execute(userName, sessionid, reqMsg);
+ assertEquals(10, rm.getResults().length); //$NON-NLS-1$
+
+ assertEquals(2, this.core.getPrepPlanCache().getCacheHitCount());
+
+ //perform a minor update, we will purge the plan
sql = "delete from #temp"; //$NON-NLS-1$
reqMsg = exampleRequestMessage(sql);
rm = execute(userName, sessionid, reqMsg);
@@ -387,9 +405,42 @@
rm = execute(userName, sessionid, reqMsg);
assertEquals(0, rm.getResults().length); //$NON-NLS-1$
- assertEquals(1, this.core.getPrepPlanCache().getCacheHitCount());
+ assertEquals(2, this.core.getPrepPlanCache().getCacheHitCount());
}
+ @Test public void testRsCacheInvalidation() throws Exception {
+ String sql = "select * FROM vqt.SmallB"; //$NON-NLS-1$
+ String userName = "1"; //$NON-NLS-1$
+ int sessionid = 1; //$NON-NLS-1$
+ RequestMessage reqMsg = exampleRequestMessage(sql);
+ reqMsg.setUseResultSetCache(true);
+ ResultsMessage rm = execute(userName, sessionid, reqMsg);
+ assertEquals(10, rm.getResults().length); //$NON-NLS-1$
+
+ sql = "select * FROM vqt.SmallB"; //$NON-NLS-1$
+ reqMsg = exampleRequestMessage(sql);
+ reqMsg.setUseResultSetCache(true);
+ rm = execute(userName, sessionid, reqMsg);
+ assertEquals(10, rm.getResults().length); //$NON-NLS-1$
+
+ assertEquals(1, this.core.getRsCache().getCacheHitCount());
+
+ Thread.sleep(100);
+
+ sql = "delete from bqt1.smalla"; //$NON-NLS-1$
+ reqMsg = exampleRequestMessage(sql);
+ rm = execute(userName, sessionid, reqMsg);
+ assertEquals(1, rm.getResults().length); //$NON-NLS-1$
+
+ sql = "select * FROM vqt.SmallB"; //$NON-NLS-1$
+ reqMsg = exampleRequestMessage(sql);
+ reqMsg.setUseResultSetCache(true);
+ rm = execute(userName, sessionid, reqMsg);
+ assertEquals(10, rm.getResults().length); //$NON-NLS-1$
+
+ assertEquals(1, this.core.getRsCache().getCacheHitCount());
+ }
+
public void helpTestVisibilityFails(String sql) throws Exception {
RequestMessage reqMsg = exampleRequestMessage(sql);
reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2011-04-13
18:47:33 UTC (rev 3088)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -87,7 +87,7 @@
dtm = new DataTierManagerImpl(rm,
- bs);
+ bs, true);
command = helpGetCommand(sql, metadata);
RequestMessage original = new RequestMessage();
Modified: trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java 2011-04-13
18:47:33 UTC (rev 3088)
+++ trunk/engine/src/test/java/org/teiid/dqp/service/AutoGenDataService.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -25,6 +25,7 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +39,7 @@
import org.teiid.dqp.message.AtomicResultsMessage;
import org.teiid.query.optimizer.TestOptimizer;
import org.teiid.query.optimizer.capabilities.SourceCapabilities;
+import org.teiid.query.processor.relational.RelationalNodeUtil;
import org.teiid.query.sql.symbol.SingleElementSymbol;
import org.teiid.translator.DataNotAvailableException;
import org.teiid.translator.TranslatorException;
@@ -83,8 +85,11 @@
@Override
public ConnectorWork registerRequest(AtomicRequestMessage message)
throws TeiidComponentException {
- List projectedSymbols = (message.getCommand()).getProjectedSymbols();
+ List projectedSymbols = (message.getCommand()).getProjectedSymbols();
List[] results = createResults(projectedSymbols);
+ if (RelationalNodeUtil.isUpdate(message.getCommand())) {
+ results = new List[] {Arrays.asList(1)};
+ }
final AtomicResultsMessage msg = ConnectorWorkItem.createResultsMessage(results,
projectedSymbols);
msg.setFinalRow(rows);
Modified: trunk/engine/src/test/java/org/teiid/query/processor/FakeDataManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/FakeDataManager.java 2011-04-13
18:47:33 UTC (rev 3088)
+++ trunk/engine/src/test/java/org/teiid/query/processor/FakeDataManager.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -34,6 +34,7 @@
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.TupleSource;
import org.teiid.core.TeiidComponentException;
+import org.teiid.events.EventDistributor;
import org.teiid.logging.LogManager;
import org.teiid.query.eval.Evaluator;
import org.teiid.query.metadata.QueryMetadataInterface;
@@ -388,5 +389,8 @@
this.registerTuples(group.getMetadataID(), elementSymbols, tuples);
}
+ @Override
+ public void setEventDistributor(EventDistributor ed) {
+ }
}
\ No newline at end of file
Modified: trunk/engine/src/test/java/org/teiid/query/processor/HardcodedDataManager.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/processor/HardcodedDataManager.java 2011-04-13
18:47:33 UTC (rev 3088)
+++
trunk/engine/src/test/java/org/teiid/query/processor/HardcodedDataManager.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -32,6 +32,7 @@
import org.teiid.common.buffer.TupleSource;
import org.teiid.core.TeiidComponentException;
import org.teiid.dqp.internal.datamgr.LanguageBridgeFactory;
+import org.teiid.events.EventDistributor;
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.sql.lang.Command;
import org.teiid.query.util.CommandContext;
@@ -167,5 +168,9 @@
public void clearCodeTables() {
}
+
+ @Override
+ public void setEventDistributor(EventDistributor ed) {
+ }
}
Modified:
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestBatchedUpdateNode.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestBatchedUpdateNode.java 2011-04-13
18:47:33 UTC (rev 3088)
+++
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestBatchedUpdateNode.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -36,6 +36,7 @@
import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleSource;
import org.teiid.core.TeiidComponentException;
+import org.teiid.events.EventDistributor;
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.optimizer.TestBatchedUpdatePlanner;
import org.teiid.query.processor.ProcessorDataManager;
@@ -219,6 +220,10 @@
actualCommands.add(command);
return new FakeTupleSource(numExecutedCommands);
}
+ @Override
+ public void setEventDistributor(EventDistributor ed) {
+ }
+
}
private static final class FakeTupleSource implements TupleSource {
private int currentTuple = 0;
Modified:
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java 2011-04-13
18:47:33 UTC (rev 3088)
+++
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -36,6 +36,7 @@
import org.teiid.common.buffer.TupleSource;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
+import org.teiid.events.EventDistributor;
import org.teiid.query.eval.Evaluator;
import org.teiid.query.processor.FakeTupleSource;
import org.teiid.query.processor.ProcessorDataManager;
@@ -184,6 +185,9 @@
Object val = row.get(0);
assertEquals(new Integer(value), val);
}
+ @Override
+ public void setEventDistributor(EventDistributor ed) {
+ }
}
private static final class FakeDataTupleSource implements TupleSource {
Modified:
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
---
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2011-04-13
18:47:33 UTC (rev 3088)
+++
trunk/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -659,7 +659,7 @@
}
@Override
- public void dataModification(String vdbName, int vdbVersion, String tableFqn) {
+ public void dataModification(String vdbName, int vdbVersion, String... tableFqns) {
VDBMetaData vdb = this.vdbRepository.getVDB(vdbName, vdbVersion);
if (vdb == null) {
return;
@@ -668,16 +668,18 @@
if (tm == null) {
return;
}
- try {
- Table table = tm.getGroupID(tableFqn);
- table.setLastDataModification(System.currentTimeMillis());
- } catch (TeiidException e) {
- LogManager.logError(LogConstants.CTX_DQP, e,
QueryPlugin.Util.getString("DQPCore.unable_to_process_event")); //$NON-NLS-1$
+ for (String tableFqn:tableFqns) {
+ try {
+ Table table = tm.getGroupID(tableFqn);
+ table.setLastDataModification(System.currentTimeMillis());
+ } catch (TeiidException e) {
+ LogManager.logError(LogConstants.CTX_DQP, e,
QueryPlugin.Util.getString("DQPCore.unable_to_process_event")); //$NON-NLS-1$
+ }
}
}
@Override
- public void schemaModification(String vdbName, int vdbVersion, String fqn) {
+ public void schemaModification(String vdbName, int vdbVersion, String... fqns) {
VDBMetaData vdb = this.vdbRepository.getVDB(vdbName, vdbVersion);
if (vdb == null) {
return;
@@ -686,11 +688,13 @@
if (tm == null) {
return;
}
- try {
- Table table = tm.getGroupID(fqn);
- table.setLastModified(System.currentTimeMillis());
- } catch (TeiidException e) {
- LogManager.logError(LogConstants.CTX_DQP, e,
QueryPlugin.Util.getString("DQPCore.unable_to_process_event")); //$NON-NLS-1$
+ for (String fqn:fqns) {
+ try {
+ Table table = tm.getGroupID(fqn);
+ table.setLastModified(System.currentTimeMillis());
+ } catch (TeiidException e) {
+ LogManager.logError(LogConstants.CTX_DQP, e,
QueryPlugin.Util.getString("DQPCore.unable_to_process_event")); //$NON-NLS-1$
+ }
}
}
}
Modified:
trunk/test-integration/common/src/test/java/org/teiid/dqp/internal/process/TestXMLTypeTranslations.java
===================================================================
---
trunk/test-integration/common/src/test/java/org/teiid/dqp/internal/process/TestXMLTypeTranslations.java 2011-04-13
18:47:33 UTC (rev 3088)
+++
trunk/test-integration/common/src/test/java/org/teiid/dqp/internal/process/TestXMLTypeTranslations.java 2011-04-13
20:34:39 UTC (rev 3089)
@@ -113,7 +113,7 @@
Mockito.stub(rwi.getDqpWorkContext()).toReturn(workContext);
Mockito.stub(core.getRequestWorkItem((RequestID)Mockito.anyObject())).toReturn(rwi);
- DataTierManagerImpl dataMgr = new DataTierManagerImpl(core, null);
+ DataTierManagerImpl dataMgr = new DataTierManagerImpl(core, null, true);
doProcess(metadata,
sql,
finder, dataMgr , new List[] {Arrays.asList(new
String(ObjectConverterUtil.convertToByteArray(new
FileInputStream(UnitTestUtil.getTestDataFile("test-schema.xsd")))))}, DEBUG);
//$NON-NLS-1$