teiid SVN: r3499 - branches.
by teiid-commits@lists.jboss.org
Author: loleary
Date: 2011-09-19 19:45:18 -0400 (Mon, 19 Sep 2011)
New Revision: 3499
Added:
branches/7.1.1.CP3_SOA-3362/
Log:
One-off patch branch for SOA-3362 based on 7.1.1.CP3 branch
13 years, 6 months
teiid SVN: r3498 - in branches/as7: build/kits/jboss-as7/standalone/configuration and 3 other directories.
by teiid-commits@lists.jboss.org
Author: rareddy
Date: 2011-09-14 17:03:48 -0400 (Wed, 14 Sep 2011)
New Revision: 3498
Added:
branches/as7/build/kits/jboss-as7/docs/teiid/datasources/file.xml
branches/as7/jboss-integration/src/main/java/org/teiid/jboss/DynamicVDBRootMountDeployer.java
Removed:
branches/as7/build/kits/jboss-as7/docs/teiid/datasources/all-drivers.xml
Modified:
branches/as7/build/kits/jboss-as7/docs/teiid/datasources/h2.xml
branches/as7/build/kits/jboss-as7/docs/teiid/datasources/oracle.xml
…
[View More]branches/as7/build/kits/jboss-as7/standalone/configuration/standalone-teiid.xml
branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TeiidBootServicesAdd.java
branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBDeployer.java
branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBParserDeployer.java
branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBService.java
branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBStructureDeployer.java
branches/as7/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
branches/as7/jboss-integration/src/test/java/org/teiid/jboss/TestTeiidConfiguration.java
Log:
TEIID-1720: Changes for dynamic vdb deployment
Deleted: branches/as7/build/kits/jboss-as7/docs/teiid/datasources/all-drivers.xml
===================================================================
--- branches/as7/build/kits/jboss-as7/docs/teiid/datasources/all-drivers.xml 2011-09-14 20:18:16 UTC (rev 3497)
+++ branches/as7/build/kits/jboss-as7/docs/teiid/datasources/all-drivers.xml 2011-09-14 21:03:48 UTC (rev 3498)
@@ -1,8 +0,0 @@
-<drivers>
- <driver name="h2" module="com.h2database.h2">
- <xa-datasource-class>org.h2.jdbcx.JdbcDataSource</xa-datasource-class>
- </driver>
- <driver name="oracle" module="com.oracle">
- <xa-datasource-class>oracle.jdbc.xa.client.OracleXADataSource</xa-datasource-class>
- </driver>
-</drivers>
\ No newline at end of file
Added: branches/as7/build/kits/jboss-as7/docs/teiid/datasources/file.xml
===================================================================
--- branches/as7/build/kits/jboss-as7/docs/teiid/datasources/file.xml (rev 0)
+++ branches/as7/build/kits/jboss-as7/docs/teiid/datasources/file.xml 2011-09-14 21:03:48 UTC (rev 3498)
@@ -0,0 +1,31 @@
+<!-- If susbsytem is already defined, only copy the contents under it and edit to suit your needs -->
+<subsystem xmlns="urn:jboss:domain:resource-adapters:1.0">
+ <resource-adapters>
+ <resource-adapter>
+ <archive>teiid-connector-file.rar</archive>
+ <transaction-support>NoTransaction</transaction-support>
+ <connection-definitions>
+ <connection-definition class-name="org.teiid.resource.adapter.file.FileManagedConnectionFactory"
+ jndi-name="java:/fileDS"
+ enabled="true"
+ use-java-context="true"
+ pool-name="teiid-file-ds">
+
+ <!-- Directory where the data files are stored -->
+ <config-property name="ParentDirectory">/home/rareddy/testing/</config-property>
+
+ <!-- Optional properties -->
+
+ <!-- Set FileMapping to redirect specific relative paths (case sensitive) to alternative locations.
+ The string value specifies a map in the format key=value(,key=value)*
+ -->
+ <!-- <config-property name="FileMapping">file1.txt=fileX.txt,file2.txt=fileY.txt</config-property> -->
+
+ <!-- Set AllowParentPaths to false to disallow .. in paths.
+ This prevent requesting files that are not contained in the parent directory -->
+ <config-property name="AllowParentPaths">true</config-property>
+ </connection-definition>
+ </connection-definitions>
+ </resource-adapter>
+ </resource-adapters>
+</subsystem>
Property changes on: branches/as7/build/kits/jboss-as7/docs/teiid/datasources/file.xml
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: branches/as7/build/kits/jboss-as7/docs/teiid/datasources/h2.xml
===================================================================
--- branches/as7/build/kits/jboss-as7/docs/teiid/datasources/h2.xml 2011-09-14 20:18:16 UTC (rev 3497)
+++ branches/as7/build/kits/jboss-as7/docs/teiid/datasources/h2.xml 2011-09-14 21:03:48 UTC (rev 3498)
@@ -1,14 +1,21 @@
-<datasource jndi-name="java:jboss/datasources/ExampleDS" pool-name="H2DS" enabled="true" jta="true" use-java-context="true" use-ccm="true">
- <connection-url>jdbc:h2:mem:test;DB_CLOSE_DELAY=-1</connection-url>
- <driver>h2</driver>
- <pool>
- <prefill>false</prefill>
- <use-strict-min>false</use-strict-min>
- <flush-strategy>FailingConnectionOnly</flush-strategy>
- </pool>
- <security>
- <user-name>sa</user-name>
- <password>sa</password>
- </security>
-</datasource>
+<!-- If susbsytem is already defined, only copy the contents under it and edit to suit your needs -->
+<subsystem xmlns="urn:jboss:domain:datasources:1.0">
+ <datasources>
+ <datasource jndi-name="java:jboss/datasources/ExampleDS" enabled="true" use-java-context="true" pool-name="H2DS">
+ <connection-url>jdbc:h2:mem:test;DB_CLOSE_DELAY=-1</connection-url>
+ <driver>h2</driver>
+ <pool></pool>
+ <security>
+ <user-name>sa</user-name>
+ <password>sa</password>
+ </security>
+ </datasource>
+ <drivers>
+ <driver name="h2" module="com.h2database.h2">
+ <xa-datasource-class>org.h2.jdbcx.JdbcDataSource</xa-datasource-class>
+ </driver>
+ </drivers>
+ </datasources>
+</subsystem>
+
Modified: branches/as7/build/kits/jboss-as7/docs/teiid/datasources/oracle.xml
===================================================================
--- branches/as7/build/kits/jboss-as7/docs/teiid/datasources/oracle.xml 2011-09-14 20:18:16 UTC (rev 3497)
+++ branches/as7/build/kits/jboss-as7/docs/teiid/datasources/oracle.xml 2011-09-14 21:03:48 UTC (rev 3498)
@@ -1,13 +1,23 @@
-<datasource jndi-name="java:/Oracle11_PushDS" pool-name="OracleDS" enabled="true" jta="true" use-java-context="true" use-ccm="true">
- <connection-url>jdbc:oracle:thin:@englxdbs11.mw.lab.eng.bos.redhat.com:1521:orcl</connection-url>
- <driver>oracle</driver>
- <pool>
- <prefill>false</prefill>
- <use-strict-min>false</use-strict-min>
- <flush-strategy>FailingConnectionOnly</flush-strategy>
- </pool>
- <security>
- <user-name>bqt2_ro</user-name>
- <password>mm</password>
- </security>
-</datasource>
\ No newline at end of file
+<!-- If susbsytem is already defined, only copy the contents under it and edit to suit your needs -->
+<subsystem xmlns="urn:jboss:domain:datasources:1.0">
+ <datasources>
+ <datasource jndi-name="java:/Oracle11_PushDS" pool-name="OracleDS" enabled="true" jta="true" use-java-context="true" use-ccm="true">
+ <connection-url>jdbc:oracle:thin:{host}:1521:orcl</connection-url>
+ <driver>oracle</driver>
+ <pool>
+ <prefill>false</prefill>
+ <use-strict-min>false</use-strict-min>
+ <flush-strategy>FailingConnectionOnly</flush-strategy>
+ </pool>
+ <security>
+ <user-name>{user}</user-name>
+ <password>{password}</password>
+ </security>
+ </datasource>
+ <drivers>
+ <driver name="oracle" module="com.oracle">
+ <xa-datasource-class>oracle.jdbc.xa.client.OracleXADataSource</xa-datasource-class>
+ </driver>
+ </drivers>
+ </datasources>
+</subsystem>
Modified: branches/as7/build/kits/jboss-as7/standalone/configuration/standalone-teiid.xml
===================================================================
--- branches/as7/build/kits/jboss-as7/standalone/configuration/standalone-teiid.xml 2011-09-14 20:18:16 UTC (rev 3497)
+++ branches/as7/build/kits/jboss-as7/standalone/configuration/standalone-teiid.xml 2011-09-14 21:03:48 UTC (rev 3498)
@@ -34,6 +34,7 @@
<extension module="org.jboss.as.logging"/>
<extension module="org.jboss.as.naming"/>
<extension module="org.jboss.as.osgi"/>
+ <extension module="org.jboss.as.pojo"/>
<extension module="org.jboss.as.remoting"/>
<extension module="org.jboss.as.sar"/>
<extension module="org.jboss.as.security"/>
@@ -196,6 +197,7 @@
<jpa default-datasource=""/>
</subsystem>
<subsystem xmlns="urn:jboss:domain:naming:1.0" />
+ <subsystem xmlns="urn:jboss:domain:pojo:1.0" />
<subsystem xmlns="urn:jboss:domain:osgi:1.0" activation="lazy">
<configuration pid="org.apache.felix.webconsole.internal.servlet.OsgiManager">
<property name="manager.root">jboss-osgi</property>
@@ -360,20 +362,20 @@
<interfaces>
<interface name="management">
- <inet-address value="127.0.0.1"/>
+ <inet-address value="${jboss.bind.address.management:127.0.0.1}"/>
</interface>
<interface name="public">
- <inet-address value="127.0.0.1"/>
+ <inet-address value="${jboss.bind.address.public:127.0.0.1}"/>
</interface>
</interfaces>
<socket-binding-group name="standard-sockets" default-interface="public">
<socket-binding name="http" port="8080"/>
<socket-binding name="https" port="8443"/>
- <socket-binding name="jmx-connector-registry" port="1090"/>
- <socket-binding name="jmx-connector-server" port="1091"/>
+ <socket-binding name="jmx-connector-registry" interface="management" port="1090"/>
+ <socket-binding name="jmx-connector-server" interface="management" port="1091"/>
<socket-binding name="jndi" port="1099"/>
- <socket-binding name="osgi-http" port="8090"/>
+ <socket-binding name="osgi-http" interface="management" port="8090"/>
<socket-binding name="remoting" port="4447"/>
<socket-binding name="txn-recovery-environment" port="4712"/>
<socket-binding name="txn-status-manager" port="4713"/>
Added: branches/as7/jboss-integration/src/main/java/org/teiid/jboss/DynamicVDBRootMountDeployer.java
===================================================================
--- branches/as7/jboss-integration/src/main/java/org/teiid/jboss/DynamicVDBRootMountDeployer.java (rev 0)
+++ branches/as7/jboss-integration/src/main/java/org/teiid/jboss/DynamicVDBRootMountDeployer.java 2011-09-14 21:03:48 UTC (rev 3498)
@@ -0,0 +1,69 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package org.teiid.jboss;
+
+import java.io.Closeable;
+
+import org.jboss.as.server.deployment.*;
+import org.jboss.as.server.deployment.module.ModuleRootMarker;
+import org.jboss.as.server.deployment.module.ModuleSpecification;
+import org.jboss.as.server.deployment.module.ResourceRoot;
+import org.jboss.vfs.VFSUtils;
+import org.jboss.vfs.VirtualFile;
+
+
+
+class DynamicVDBRootMountDeployer implements DeploymentUnitProcessor {
+ private static final String DYNAMIC_VDB_STRUCTURE = "-vdb.xml"; //$NON-NLS-1$
+
+ public void deploy(DeploymentPhaseContext phaseContext) throws DeploymentUnitProcessingException {
+ final DeploymentUnit deploymentUnit = phaseContext.getDeploymentUnit();
+
+ if(deploymentUnit.getAttachment(Attachments.DEPLOYMENT_ROOT) != null) {
+ return;
+ }
+
+ final String deploymentName = deploymentUnit.getName();
+ final VirtualFile deploymentContents = deploymentUnit.getAttachment(Attachments.DEPLOYMENT_CONTENTS);
+
+ // internal deployments do not have any contents, so there is nothing to mount
+ if (deploymentContents == null)
+ return;
+
+ if (deploymentName.endsWith(DYNAMIC_VDB_STRUCTURE)) {
+ // use the contents directly
+ // nothing was mounted
+ final ResourceRoot resourceRoot = new ResourceRoot(deploymentContents, null);
+ ModuleRootMarker.mark(resourceRoot);
+ deploymentUnit.putAttachment(Attachments.DEPLOYMENT_ROOT, resourceRoot);
+ deploymentUnit.putAttachment(Attachments.MODULE_SPECIFICATION, new ModuleSpecification());
+ }
+ }
+
+ public void undeploy(DeploymentUnit context) {
+ final ResourceRoot resourceRoot = context.removeAttachment(Attachments.DEPLOYMENT_ROOT);
+ if (resourceRoot != null) {
+ final Closeable mountHandle = resourceRoot.getMountHandle();
+ VFSUtils.safeClose(mountHandle);
+ }
+ }
+}
Property changes on: branches/as7/jboss-integration/src/main/java/org/teiid/jboss/DynamicVDBRootMountDeployer.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TeiidBootServicesAdd.java
===================================================================
--- branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TeiidBootServicesAdd.java 2011-09-14 20:18:16 UTC (rev 3497)
+++ branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TeiidBootServicesAdd.java 2011-09-14 21:03:48 UTC (rev 3498)
@@ -259,6 +259,7 @@
context.addStep(new AbstractDeploymentChainStep() {
@Override
public void execute(DeploymentProcessorTarget processorTarget) {
+ processorTarget.addDeploymentProcessor(Phase.STRUCTURE, Phase.STRUCTURE_WAR_DEPLOYMENT_INIT,new DynamicVDBRootMountDeployer());
processorTarget.addDeploymentProcessor(Phase.STRUCTURE, Phase.STRUCTURE_WAR_DEPLOYMENT_INIT|0x0001,new VDBStructureDeployer());
processorTarget.addDeploymentProcessor(Phase.PARSE, Phase.PARSE_WEB_DEPLOYMENT|0x0001, new VDBParserDeployer());
processorTarget.addDeploymentProcessor(Phase.DEPENDENCIES, Phase.DEPENDENCIES_WAR_MODULE|0x0001, new VDBDependencyDeployer());
Modified: branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBDeployer.java
===================================================================
--- branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBDeployer.java 2011-09-14 20:18:16 UTC (rev 3497)
+++ branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBDeployer.java 2011-09-14 21:03:48 UTC (rev 3498)
@@ -21,14 +21,17 @@
*/
package org.teiid.jboss;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
-import org.jboss.as.server.deployment.*;
+import org.jboss.as.server.deployment.DeploymentPhaseContext;
+import org.jboss.as.server.deployment.DeploymentUnit;
+import org.jboss.as.server.deployment.DeploymentUnitProcessingException;
+import org.jboss.as.server.deployment.DeploymentUnitProcessor;
import org.jboss.msc.service.*;
import org.jboss.msc.service.ServiceBuilder.DependencyType;
import org.jboss.msc.service.ServiceController.Mode;
-import org.jboss.vfs.VirtualFile;
import org.teiid.adminapi.Translator;
import org.teiid.adminapi.impl.ModelMetaData;
import org.teiid.adminapi.impl.VDBMetaData;
@@ -60,7 +63,7 @@
if (!TeiidAttachments.isVDBDeployment(deploymentUnit)) {
return;
}
- VirtualFile file = deploymentUnit.getAttachment(Attachments.DEPLOYMENT_ROOT).getRoot();
+ final String deploymentName = deploymentUnit.getName();
VDBMetaData deployment = deploymentUnit.getAttachment(TeiidAttachments.VDB_METADATA);
// check to see if there is old vdb already deployed.
@@ -85,7 +88,7 @@
String type = data.getType();
Translator parent = this.translatorRepository.getTranslatorMetaData(type);
if ( parent == null) {
- throw new DeploymentUnitProcessingException(RuntimePlugin.Util.getString("translator_type_not_found", file.getName())); //$NON-NLS-1$
+ throw new DeploymentUnitProcessingException(RuntimePlugin.Util.getString("translator_type_not_found", deploymentName)); //$NON-NLS-1$
}
}
@@ -105,13 +108,17 @@
deploymentUnit.removeAttachment(TeiidAttachments.UDF_METADATA);
// build a VDB service
+ ArrayList<String> unAvailableDS = new ArrayList<String>();
VDBService vdb = new VDBService(deployment);
ServiceBuilder<VDBMetaData> vdbService = context.getServiceTarget().addService(TeiidServiceNames.vdbServiceName(deployment.getName(), deployment.getVersion()), vdb);
- for (ModelMetaData model:deployment.getModelMetaDatas().values()) {
- for (String sourceName:model.getSourceNames()) {
- vdbService.addDependency(DependencyType.OPTIONAL, ServiceName.JBOSS.append("data-source", model.getSourceConnectionJndiName(sourceName))); //$NON-NLS-1$
- }
- }
+// for (ModelMetaData model:deployment.getModelMetaDatas().values()) {
+// for (String sourceName:model.getSourceNames()) {
+// vdbService.addDependency(ServiceName.JBOSS.append("data-source", model.getSourceConnectionJndiName(sourceName))); //$NON-NLS-1$
+// if (context.getServiceRegistry().getService(ServiceName.JBOSS.append("data-source", model.getSourceConnectionJndiName(sourceName))) == null) { //$NON-NLS-1$
+// unAvailableDS.add(model.getSourceConnectionJndiName(sourceName));
+// }
+// }
+// }
// adding the translator services is redundant, however if one is removed then it is an issue.
for (Translator t: deployment.getOverrideTranslators()) {
@@ -127,6 +134,10 @@
vdbService.addDependency(TeiidServiceNames.BUFFER_MGR, BufferServiceImpl.class, vdb.getBufferServiceInjector());
vdbService.addDependency(DependencyType.OPTIONAL, TeiidServiceNames.OBJECT_REPLICATOR, ObjectReplicator.class, vdb.getObjectReplicatorInjector());
vdbService.setInitialMode(Mode.PASSIVE).install();
+
+ if (!unAvailableDS.isEmpty()) {
+ LogManager.logInfo(LogConstants.CTX_RUNTIME, IntegrationPlugin.Util.getString("vdb-inactive", deployment.getName(), deployment.getVersion(), unAvailableDS)); //$NON-NLS-1$
+ }
}
@@ -140,7 +151,7 @@
final ServiceController<?> controller = deploymentUnit.getServiceRegistry().getService(TeiidServiceNames.vdbServiceName(deployment.getName(), deployment.getVersion()));
if (controller != null) {
VDBService vdbService = (VDBService)controller.getService();
- vdbService.undeployinProgress();
+ vdbService.undeployInProgress();
controller.setMode(ServiceController.Mode.REMOVE);
}
Modified: branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBParserDeployer.java
===================================================================
--- branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBParserDeployer.java 2011-09-14 20:18:16 UTC (rev 3497)
+++ branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBParserDeployer.java 2011-09-14 21:03:48 UTC (rev 3498)
@@ -58,7 +58,7 @@
VirtualFile file = deploymentUnit.getAttachment(Attachments.DEPLOYMENT_ROOT).getRoot();
if (TeiidAttachments.isDynamicVDB(deploymentUnit)) {
- parseVDBXML(file, deploymentUnit);
+ parseVDBXML(file, deploymentUnit).setDynamic(true);
}
else {
// scan for different files
@@ -101,11 +101,12 @@
}
}
- private void parseVDBXML(VirtualFile file, DeploymentUnit deploymentUnit) throws DeploymentUnitProcessingException {
+ private VDBMetaData parseVDBXML(VirtualFile file, DeploymentUnit deploymentUnit) throws DeploymentUnitProcessingException {
try {
VDBMetaData vdb = VDBMetadataParser.unmarshell(file.openStream());
deploymentUnit.putAttachment(TeiidAttachments.VDB_METADATA, vdb);
LogManager.logDetail(LogConstants.CTX_RUNTIME,"VDB "+file.getName()+" has been parsed."); //$NON-NLS-1$ //$NON-NLS-2$
+ return vdb;
} catch (XMLStreamException e) {
throw new DeploymentUnitProcessingException(e);
} catch (IOException e) {
Modified: branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBService.java
===================================================================
--- branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBService.java 2011-09-14 20:18:16 UTC (rev 3497)
+++ branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBService.java 2011-09-14 21:03:48 UTC (rev 3498)
@@ -454,7 +454,7 @@
return objectReplicatorInjector;
}
- public void undeployinProgress() {
+ public void undeployInProgress() {
this.undeployInProgress = true;
}
}
Modified: branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBStructureDeployer.java
===================================================================
--- branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBStructureDeployer.java 2011-09-14 20:18:16 UTC (rev 3497)
+++ branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBStructureDeployer.java 2011-09-14 21:03:48 UTC (rev 3498)
@@ -37,12 +37,13 @@
DeploymentUnit deploymentUnit = phaseContext.getDeploymentUnit();
+ String deploymentName = deploymentUnit.getName();
VirtualFile file = deploymentUnit.getAttachment(Attachments.DEPLOYMENT_ROOT).getRoot();
if (file == null) {
return;
}
- if(file.getName().toLowerCase().endsWith(VDB_EXTENSION)) {
+ if(deploymentName.endsWith(VDB_EXTENSION)) {
VirtualFile metainf = file.getChild("META-INF"); //$NON-NLS-1$
if (metainf == null) {
return;
@@ -54,7 +55,7 @@
// adds a TYPE attachment.
TeiidAttachments.setAsVDBDeployment(deploymentUnit);
}
- else if (file.getName().toLowerCase().endsWith(DYNAMIC_VDB_STRUCTURE)) {
+ else if (deploymentName.endsWith(DYNAMIC_VDB_STRUCTURE)) {
TeiidAttachments.setAsDynamicVDBDeployment(deploymentUnit);
}
}
Modified: branches/as7/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
===================================================================
--- branches/as7/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties 2011-09-14 20:18:16 UTC (rev 3497)
+++ branches/as7/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties 2011-09-14 21:03:48 UTC (rev 3498)
@@ -235,3 +235,6 @@
translator.removed = Translator "{0}" removed
translator.failed-to-load = Translator "{0}" not found in the module "{1}"
cache-container-name-required=container-name required for the resultset cache configuration
+
+
+vdb-inactive=VDB {0}.{1} deployed in inactive state due to unavailability of data sources {2}
\ No newline at end of file
Modified: branches/as7/jboss-integration/src/test/java/org/teiid/jboss/TestTeiidConfiguration.java
===================================================================
--- branches/as7/jboss-integration/src/test/java/org/teiid/jboss/TestTeiidConfiguration.java 2011-09-14 20:18:16 UTC (rev 3497)
+++ branches/as7/jboss-integration/src/test/java/org/teiid/jboss/TestTeiidConfiguration.java 2011-09-14 21:03:48 UTC (rev 3498)
@@ -32,7 +32,10 @@
import java.util.concurrent.TimeUnit;
import javax.xml.namespace.QName;
-import javax.xml.stream.*;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
import javax.xml.transform.Source;
import javax.xml.transform.stream.StreamSource;
import javax.xml.validation.Schema;
@@ -54,19 +57,12 @@
import org.jboss.dmr.ModelNode;
import org.jboss.dmr.ModelType;
import org.jboss.dmr.Property;
-import org.jboss.modules.Module;
-import org.jboss.modules.ModuleClassLoader;
-import org.jboss.modules.ModuleIdentifier;
-import org.jboss.modules.ModuleLoader;
import org.jboss.msc.service.*;
import org.jboss.staxmapper.XMLElementWriter;
import org.jboss.staxmapper.XMLMapper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations.Mock;
-import org.mockito.internal.stubbing.Stubber;
import org.teiid.core.util.ObjectConverterUtil;
import org.xml.sax.ErrorHandler;
import org.xml.sax.SAXException;
[View Less]
13 years, 6 months
teiid SVN: r3497 - in trunk/engine/src: main/java/org/teiid/common/buffer/impl and 2 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-09-14 16:18:16 -0400 (Wed, 14 Sep 2011)
New Revision: 3497
Added:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/…
[View More]MemoryStorageManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java
Log:
TEIID-1750 TEIID-1753 converting compaction logic to in place and adding tail tracking
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2011-09-14 20:18:16 UTC (rev 3497)
@@ -176,6 +176,13 @@
public synchronized long getLength() {
return len;
}
+
+ public synchronized void truncate(long length) throws TeiidComponentException {
+ truncateDirect(length);
+ len = length;
+ }
+
+ protected abstract void truncateDirect(long length) throws TeiidComponentException;
public int read(long fileOffset, byte[] b, int offSet, int length)
throws TeiidComponentException {
@@ -199,21 +206,21 @@
} while (n < length);
}
- public void write(byte[] bytes) throws TeiidComponentException {
- write(bytes, 0, bytes.length);
+ public synchronized long write(byte[] bytes, int offset, int length) throws TeiidComponentException {
+ return write(len, bytes, offset, length);
}
-
- public synchronized long write(byte[] bytes, int offset, int length) throws TeiidComponentException {
+
+ public synchronized long write(long start, byte[] bytes, int offset, int length) throws TeiidComponentException {
if (removed) {
throw new TeiidComponentException("already removed"); //$NON-NLS-1$
}
- writeDirect(bytes, offset, length);
+ writeDirect(start, bytes, offset, length);
long result = len;
- len += length;
+ len = Math.max(len, start + length);
return result;
}
- protected abstract void writeDirect(byte[] bytes, int offset, int length) throws TeiidComponentException;
+ protected abstract void writeDirect(long start, byte[] bytes, int offset, int length) throws TeiidComponentException;
public synchronized void remove() {
if (!this.removed) {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-14 20:18:16 UTC (rev 3497)
@@ -23,25 +23,24 @@
package org.teiid.common.buffer.impl;
import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.io.OutputStream;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -113,13 +112,15 @@
}
private final class BatchManagerImpl implements BatchManager {
- private final String id;
- private volatile FileStore store;
- private Map<Long, long[]> physicalMapping = new ConcurrentHashMap<Long, long[]>();
- private ReadWriteLock compactionLock = new ReentrantReadWriteLock();
- private AtomicLong unusedSpace = new AtomicLong();
+ final String id;
+ volatile FileStore store;
+ Map<Long, long[]> physicalMapping = new HashMap<Long, long[]>();
+ long tail;
+ ConcurrentSkipListSet<Long> freed = new ConcurrentSkipListSet<Long>();
+ ReadWriteLock compactionLock = new ReentrantReadWriteLock();
+ AtomicLong unusedSpace = new AtomicLong();
private int[] lobIndexes;
- private SizeUtility sizeUtility;
+ SizeUtility sizeUtility;
private WeakReference<BatchManagerImpl> ref = new WeakReference<BatchManagerImpl>(this);
private BatchManagerImpl(String newID, int[] lobIndexes) {
@@ -130,6 +131,16 @@
this.sizeUtility = new SizeUtility();
}
+ private void freeBatch(Long batch) {
+ long[] info = physicalMapping.remove(batch);
+ if (info != null) {
+ unusedSpace.addAndGet(info[1]);
+ if (info[0] + info[1] == tail) {
+ tail -= info[1];
+ }
+ }
+ }
+
public FileStore createStorage(String prefix) {
return createFileStore(id+prefix);
}
@@ -143,55 +154,84 @@
return mbi;
}
- private boolean shouldCompact(long offset) {
- return offset > COMPACTION_THRESHOLD && unusedSpace.get() * 4 > offset * 3;
- }
-
private long getOffset() throws TeiidComponentException {
- long offset = store.getLength();
- if (!shouldCompact(offset)) {
- return offset;
+ if (store.getLength() <= compactionThreshold || unusedSpace.get() * 4 <= store.getLength() * 3) {
+ return tail;
}
- try {
- this.compactionLock.writeLock().lock();
- offset = store.getLength();
- //retest the condition to ensure that compaction is still needed
- if (!shouldCompact(offset)) {
- return offset;
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_DQP, "Running full compaction on", id); //$NON-NLS-1$
+ }
+ byte[] buffer = new byte[IO_BUFFER_SIZE];
+ TreeSet<long[]> bySize = new TreeSet<long[]>(new Comparator<long[]>() {
+ @Override
+ public int compare(long[] o1, long[] o2) {
+ int signum = Long.signum(o1[1] - o2[1]);
+ if (signum == 0) {
+ //take the upper address first
+ return Long.signum(o2[0] - o1[0]);
+ }
+ return signum;
}
- FileStore newStore = createFileStore(id);
- newStore.setCleanupReference(this);
- byte[] buffer = new byte[IO_BUFFER_SIZE];
- List<long[]> values = new ArrayList<long[]>(physicalMapping.values());
- Collections.sort(values, new Comparator<long[]>() {
- @Override
- public int compare(long[] o1, long[] o2) {
- return Long.signum(o1[0] - o2[0]);
+ });
+ TreeSet<long[]> byAddress = new TreeSet<long[]>(new Comparator<long[]>() {
+
+ @Override
+ public int compare(long[] o1, long[] o2) {
+ return Long.signum(o1[0] - o2[0]);
+ }
+ });
+ bySize.addAll(physicalMapping.values());
+ byAddress.addAll(physicalMapping.values());
+ long lastEndAddress = 0;
+ unusedSpace.set(0);
+ long minFreeSpace = 1 << 11;
+ while (!byAddress.isEmpty()) {
+ long[] info = byAddress.pollFirst();
+ bySize.remove(info);
+
+ long currentOffset = info[0];
+ long space = currentOffset - lastEndAddress;
+ while (space > 0 && !bySize.isEmpty()) {
+ long[] smallest = bySize.first();
+ if (smallest[1] > space) {
+ break;
}
- });
- for (long[] info : values) {
- long oldOffset = info[0];
- info[0] = newStore.getLength();
- int size = (int)info[1];
- while (size > 0) {
- int toWrite = Math.min(IO_BUFFER_SIZE, size);
- store.readFully(oldOffset, buffer, 0, toWrite);
- newStore.write(buffer, 0, toWrite);
- size -= toWrite;
- }
+ bySize.pollFirst();
+ byAddress.remove(smallest);
+ move(smallest, lastEndAddress, buffer);
+ space -= smallest[1];
+ lastEndAddress += smallest[1];
}
- store.remove();
- store = newStore;
- long oldOffset = offset;
- offset = store.getLength();
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Compacted store", id, "pre-size", oldOffset, "post-size", offset); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+
+ if (space <= minFreeSpace) {
+ unusedSpace.addAndGet(space);
+ } else {
+ move(info, lastEndAddress, buffer);
}
- return offset;
- } finally {
- this.compactionLock.writeLock().unlock();
+ lastEndAddress = info[0] + info[1];
}
+ long oldLength = store.getLength();
+ store.truncate(lastEndAddress);
+ tail = lastEndAddress;
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Compacted store", id, "pre-size", oldLength, "post-size", store.getLength()); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ }
+ return tail;
}
+
+ private void move(long[] toMove, long newOffset, byte[] buffer) throws TeiidComponentException {
+ long oldOffset = toMove[0];
+ toMove[0] = newOffset;
+ int size = (int)toMove[1];
+ while (size > 0) {
+ int toWrite = Math.min(IO_BUFFER_SIZE, size);
+ store.readFully(oldOffset, buffer, 0, toWrite);
+ store.write(newOffset, buffer, 0, toWrite);
+ size -= toWrite;
+ oldOffset += toWrite;
+ newOffset += toWrite;
+ }
+ }
@Override
public void remove() {
@@ -350,7 +390,6 @@
try {
batchManager.compactionLock.readLock().lock();
long[] info = batchManager.physicalMapping.get(this.id);
- Assertion.isNotNull(info, "Invalid batch " + id); //$NON-NLS-1$
ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(batchManager.store.createInputStream(info[0]), IO_BUFFER_SIZE));
batch = new TupleBatch();
batch.setRowOffset(ois.readInt());
@@ -378,7 +417,7 @@
}
public synchronized void persist() throws TeiidComponentException {
- BatchManagerImpl batchManager = managerRef.get();
+ final BatchManagerImpl batchManager = managerRef.get();
if (batchManager == null) {
remove();
return;
@@ -392,23 +431,37 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "writing batch to disk, total writes: ", count); //$NON-NLS-1$
}
- long offset = 0;
if (lobManager != null) {
for (List<?> tuple : batch.getTuples()) {
lobManager.updateReferences(batchManager.lobIndexes, tuple);
}
}
- synchronized (batchManager.store) {
- offset = batchManager.getOffset();
- OutputStream fsos = new BufferedOutputStream(batchManager.store.createOutputStream(), IO_BUFFER_SIZE);
- ObjectOutputStream oos = new ObjectOutputStream(fsos);
- oos.writeInt(batch.getBeginRow());
- batch.writeExternal(oos);
- oos.close();
- long size = batchManager.store.getLength() - offset;
- long[] info = new long[] {offset, size};
- batchManager.physicalMapping.put(this.id, info);
+ batchManager.compactionLock.writeLock().lock();
+ Long free = null;
+ while ((free = batchManager.freed.pollFirst()) != null) {
+ batchManager.freeBatch(free);
}
+ lockheld = true;
+ final long offset = batchManager.getOffset();
+ ExtensibleBufferedOutputStream fsos = new ExtensibleBufferedOutputStream(new byte[IO_BUFFER_SIZE]) {
+
+ @Override
+ protected void flushDirect() throws IOException {
+ try {
+ batchManager.store.write(offset + bytesWritten, buf, 0, count);
+ } catch (TeiidComponentException e) {
+ throw new IOException(e);
+ }
+ }
+ };
+ ObjectOutputStream oos = new ObjectOutputStream(fsos);
+ oos.writeInt(batch.getBeginRow());
+ batch.writeExternal(oos);
+ oos.close();
+ long size = fsos.getBytesWritten();
+ long[] info = new long[] {offset, size};
+ batchManager.physicalMapping.put(this.id, info);
+ batchManager.tail = Math.max(batchManager.tail, offset + size);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "batch written starting at:", offset); //$NON-NLS-1$
}
@@ -436,7 +489,12 @@
}
public void remove() {
- cleanupManagedBatch(managerRef.get(), id);
+ activeBatch = null;
+ batchReference = null;
+ BatchManagerImpl batchManager = managerRef.get();
+ if (batchManager != null) {
+ cleanupManagedBatch(batchManager, id);
+ }
}
@Override
@@ -475,6 +533,7 @@
private boolean useWeakReferences = true;
private boolean inlineLobs = true;
private int targetBytesPerRow = TARGET_BYTES_PER_ROW;
+ private int compactionThreshold = COMPACTION_THRESHOLD;
private ReentrantLock lock = new ReentrantLock(true);
private Condition batchesFreed = lock.newCondition();
@@ -581,9 +640,15 @@
}
}
}
- long[] info = batchManager.physicalMapping.remove(id);
- if (info != null) {
- batchManager.unusedSpace.addAndGet(info[1]);
+
+ if (batchManager.compactionLock.writeLock().tryLock()) {
+ try {
+ batchManager.freeBatch(id);
+ } finally {
+ batchManager.compactionLock.writeLock().unlock();
+ }
+ } else {
+ batchManager.freed.add(id);
}
}
@@ -730,7 +795,7 @@
try {
mb.persist();
} catch (TeiidComponentException e) {
- LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read that batch later will result in an exception"); //$NON-NLS-1$
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read that batch later will result in an exception"); //$NON-NLS-1$
}
}
}
@@ -743,8 +808,6 @@
private int[] getSizeEstimates(List<? extends Expression> elements) {
int total = 0;
boolean isValueCacheEnabled = DataTypeManager.isValueCacheEnabled();
- //we make a assumption that the average column size under 64bits is approximately 128bytes
- //this includes alignment, row/array, and reference overhead
for (Expression element : elements) {
Class<?> type = element.getType();
total += SizeUtility.getSize(isValueCacheEnabled, type);
@@ -829,4 +892,8 @@
this.inlineLobs = inlineLobs;
}
+ public void setCompactionThreshold(int compactionThreshold) {
+ this.compactionThreshold = compactionThreshold;
+ }
+
}
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-09-14 20:18:16 UTC (rev 3497)
@@ -0,0 +1,78 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class ExtensibleBufferedOutputStream extends OutputStream {
+
+ protected int bytesWritten;
+ protected byte buf[];
+ protected int count;
+
+ public ExtensibleBufferedOutputStream(byte[] buf) {
+ this.buf = buf;
+ }
+
+ public void write(int b) throws IOException {
+ if (count >= buf.length) {
+ flush();
+ }
+ buf[count++] = (byte)b;
+ }
+
+ public void write(byte b[], int off, int len) throws IOException {
+ while (true) {
+ int toCopy = Math.min(buf.length - count, len);
+ System.arraycopy(b, off, buf, count, toCopy);
+ count += toCopy;
+ len -= toCopy;
+ off += toCopy;
+ if (count < buf.length) {
+ break;
+ }
+ flush();
+ }
+ }
+
+ public void flush() throws IOException {
+ if (count > 0) {
+ flushDirect();
+ }
+ bytesWritten += count;
+ count = 0;
+ }
+
+ protected abstract void flushDirect() throws IOException;
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ }
+
+ public int getBytesWritten() {
+ return bytesWritten;
+ }
+
+}
Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-09-14 20:18:16 UTC (rev 3497)
@@ -100,9 +100,6 @@
this.name = name;
}
- /**
- * Concurrent reads are possible, but only after writing is complete.
- */
public synchronized int readDirect(long fileOffset, byte[] b, int offSet, int length) throws TeiidComponentException {
try {
RandomAccessFile fileAccess = fileInfo.open();
@@ -118,39 +115,45 @@
/**
* Concurrent writes are prevented by FileStore, but in general should not happen since processing is single threaded.
*/
- public void writeDirect(byte[] bytes, int offset, int length) throws TeiidComponentException {
+ public void writeDirect(long fileOffset, byte[] bytes, int offset, int length) throws TeiidComponentException {
long used = usedBufferSpace.addAndGet(length);
if (used > maxBufferSpace) {
usedBufferSpace.addAndGet(-length);
+ //TODO: trigger a compaction before this is thrown
throw new TeiidComponentException(QueryPlugin.Util.getString("FileStoreageManager.space_exhausted", maxBufferSpace)); //$NON-NLS-1$
}
- long fileOffset = 0;
if (fileInfo == null) {
fileInfo = new FileInfo(createFile(name));
- if (fileInfo != null) {
- fileOffset += fileInfo.file.length();
- }
}
- synchronized (this) {
- try {
- RandomAccessFile fileAccess = fileInfo.open();
- long pointer = fileAccess.length();
- fileAccess.setLength(pointer + length);
- fileAccess.seek(pointer);
- fileAccess.write(bytes, offset, length);
- } catch(IOException e) {
- throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
- } finally {
- fileInfo.close();
- }
- }
+ try {
+ RandomAccessFile fileAccess = fileInfo.open();
+ fileAccess.setLength(fileOffset + length);
+ fileAccess.seek(fileOffset);
+ fileAccess.write(bytes, offset, length);
+ } catch(IOException e) {
+ throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
+ } finally {
+ fileInfo.close();
+ }
}
- public synchronized void removeDirect() {
+ public void removeDirect() {
usedBufferSpace.addAndGet(-len);
- fileInfo.delete();
+ if (fileInfo != null){
+ fileInfo.delete();
+ }
}
+ @Override
+ protected void truncateDirect(long length) throws TeiidComponentException {
+ try {
+ RandomAccessFile raf = fileInfo.open();
+ raf.setLength(length);
+ } catch (IOException e) {
+ throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
+ }
+ }
+
}
// Initialization
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java 2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java 2011-09-14 20:18:16 UTC (rev 3497)
@@ -45,19 +45,20 @@
private ByteBuffer buffer = ByteBuffer.allocate(1 << 16);
@Override
- public void writeDirect(byte[] bytes, int offset, int length) throws TeiidComponentException {
- if (getLength() + length > buffer.capacity()) {
+ public void writeDirect(long start, byte[] bytes, int offset, int length) throws TeiidComponentException {
+ buffer.position((int)start);
+ if (buffer.position() + length > buffer.capacity()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2 + length);
buffer.position(0);
newBuffer.put(buffer);
buffer = newBuffer;
+ buffer.position((int)start);
}
- buffer.position((int)getLength());
buffer.put(bytes, offset, length);
}
@Override
- public synchronized void removeDirect() {
+ public void removeDirect() {
removed.incrementAndGet();
buffer = ByteBuffer.allocate(0);
}
@@ -74,6 +75,15 @@
buffer.get(b, offset, length);
return length;
}
+
+ @Override
+ protected void truncateDirect(long length) {
+ ByteBuffer newBuffer = ByteBuffer.allocate((int)length);
+ buffer.position(0);
+ buffer.limit(newBuffer.capacity());
+ newBuffer.put(buffer);
+ buffer = newBuffer;
+ }
};
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java 2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java 2011-09-14 20:18:16 UTC (rev 3497)
@@ -22,6 +22,7 @@
package org.teiid.common.buffer.impl;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -57,14 +58,16 @@
this.name = name;
}
+ @Override
public int readDirect(long fileOffset, byte[] b, int offSet, int length) throws TeiidComponentException {
Map.Entry<Long, FileStore> entry = storageFiles.floorEntry(fileOffset);
FileStore fileInfo = entry.getValue();
return fileInfo.read(fileOffset - entry.getKey(), b, offSet, length);
}
- public void writeDirect(byte[] bytes, int offset, int length) throws TeiidComponentException {
- Map.Entry<Long, FileStore> entry = this.storageFiles.lastEntry();
+ @Override
+ public void writeDirect(long start, byte[] bytes, int offset, int length) throws TeiidComponentException {
+ Map.Entry<Long, FileStore> entry = this.storageFiles.floorEntry(start);
boolean createNew = false;
FileStore fileInfo = null;
long fileOffset = 0;
@@ -73,7 +76,10 @@
} else {
fileInfo = entry.getValue();
fileOffset = entry.getKey();
- createNew = entry.getValue().getLength() + length > getMaxFileSize();
+ if (start > entry.getValue().getLength() + fileOffset) {
+ throw new AssertionError("invalid write start location"); //$NON-NLS-1$
+ }
+ createNew = start + length > getMaxFileSize();
}
if (createNew) {
FileStore newFileInfo = storageManager.createFileStore(name + "_" + storageFiles.size()); //$NON-NLS-1$
@@ -83,15 +89,36 @@
storageFiles.put(fileOffset, newFileInfo);
fileInfo = newFileInfo;
}
- fileInfo.write(bytes, offset, length);
+ fileInfo.write(start - fileOffset, bytes, offset, length);
}
public void removeDirect() {
for (FileStore info : storageFiles.values()) {
info.remove();
}
+ storageFiles.clear();
}
+ @Override
+ protected void truncateDirect(long length)
+ throws TeiidComponentException {
+ Map.Entry<Long, FileStore> start = storageFiles.floorEntry(length);
+ if (start == null) {
+ return;
+ }
+ if (start.getKey().longValue() == length) {
+ start.getValue().remove();
+ storageFiles.remove(start.getKey());
+ } else {
+ start.getValue().truncate(length - start.getKey());
+ }
+ for (Iterator<FileStore> iter = storageFiles.tailMap(length, false).values().iterator(); iter.hasNext();) {
+ iter.next().remove();
+ iter.remove();
+ }
+
+ }
+
}
public long getMaxFileSize() {
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java 2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java 2011-09-14 20:18:16 UTC (rev 3497)
@@ -100,5 +100,34 @@
}
}
-
+
+ /**
+ * Forces the logic through several compaction cycles by using large strings
+ * @throws TeiidComponentException
+ */
+ @Test public void testCompaction() throws TeiidComponentException {
+ BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
+ bm.setProcessorBatchSize(32);
+ bm.setMaxReserveKB(0);//force all to disk
+ bm.setCompactionThreshold(0);
+ bm.initialize();
+
+ ElementSymbol e1 = new ElementSymbol("x");
+ e1.setType(String.class);
+ List<ElementSymbol> elements = Arrays.asList(e1);
+ STree map = bm.createSTree(elements, "1", 1);
+
+ int size = 1000;
+
+ for (int i = 0; i < size; i++) {
+ assertNull(map.insert(Arrays.asList(new String(new byte[1000])), InsertMode.ORDERED, size));
+ assertEquals(i + 1, map.getRowCount());
+ }
+
+ for (int i = 0; i < size; i++) {
+ assertNotNull(map.remove(Arrays.asList(new String(new byte[1000]))));
+ }
+
+ }
+
}
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java 2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java 2011-09-14 20:18:16 UTC (rev 3497)
@@ -48,5 +48,26 @@
assertEquals(2, msm.getRemoved());
}
+
+ @Test public void testTruncate() throws Exception {
+ MemoryStorageManager msm = new MemoryStorageManager();
+ SplittableStorageManager ssm = new SplittableStorageManager(msm);
+ ssm.setMaxFileSizeDirect(2048);
+ String tsID = "0"; //$NON-NLS-1$
+ // Add one batch
+ FileStore store = ssm.createFileStore(tsID);
+ TestFileStorageManager.writeBytes(store);
+
+ assertEquals(1, msm.getCreated());
+ TestFileStorageManager.writeBytes(store);
+
+ assertEquals(2, msm.getCreated());
+
+ store.truncate(100);
+
+ assertEquals(1, msm.getRemoved());
+
+ }
+
}
[View Less]
13 years, 6 months
teiid SVN: r3496 - branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-09-14 15:19:11 -0400 (Wed, 14 Sep 2011)
New Revision: 3496
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
Log:
TEIID-1755 preventing an npe if not in a clustering profile
Modified: branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/…
[View More]TempTableDataManager.java 2011-09-14 18:41:30 UTC (rev 3495)
+++ branches/7.4.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-09-14 19:19:11 UTC (rev 3496)
@@ -341,7 +341,7 @@
}
}
boolean invalidate = Boolean.TRUE.equals(((Constant)proc.getParameter(2).getExpression()).getValue());
- if (invalidate) {
+ if (invalidate && distributedCache != null) {
touchTable(context, matTableName, false, System.currentTimeMillis());
}
MatState oldState = info.setState(MatState.NEEDS_LOADING, invalidate?Boolean.FALSE:null, null);
[View Less]
13 years, 6 months
teiid SVN: r3495 - branches/as7/build/kits/jboss-as7/standalone/configuration.
by teiid-commits@lists.jboss.org
Author: rareddy
Date: 2011-09-14 14:41:30 -0400 (Wed, 14 Sep 2011)
New Revision: 3495
Modified:
branches/as7/build/kits/jboss-as7/standalone/configuration/standalone-teiid.xml
Log:
TEIID-1720: adding a sample connection factory
Modified: branches/as7/build/kits/jboss-as7/standalone/configuration/standalone-teiid.xml
===================================================================
--- branches/as7/build/kits/jboss-as7/standalone/configuration/standalone-teiid.xml 2011-09-14 17:57:13 UTC (…
[View More]rev 3494)
+++ branches/as7/build/kits/jboss-as7/standalone/configuration/standalone-teiid.xml 2011-09-14 18:41:30 UTC (rev 3495)
@@ -259,7 +259,24 @@
</modules>
</subsystem>
<subsystem xmlns="urn:jboss:domain:remoting:1.0"/>
- <subsystem xmlns="urn:jboss:domain:resource-adapters:1.0" />
+ <subsystem xmlns="urn:jboss:domain:resource-adapters:1.0">
+ <resource-adapters>
+ <resource-adapter>
+ <archive>teiid-connector-file.rar</archive>
+ <transaction-support>NoTransaction</transaction-support>
+ <connection-definitions>
+ <connection-definition class-name="org.teiid.resource.adapter.file.FileManagedConnectionFactory"
+ jndi-name="java:/fileDS"
+ enabled="true"
+ use-java-context="true"
+ pool-name="teiid-file-ds">
+ <config-property name="ParentDirectory">/home/rareddy/testing/</config-property>
+ <config-property name="AllowParentPaths">true</config-property>
+ </connection-definition>
+ </connection-definitions>
+ </resource-adapter>
+ </resource-adapters>
+ </subsystem>
<subsystem xmlns="urn:jboss:domain:sar:1.0"/>
<subsystem xmlns="urn:jboss:domain:security:1.0">
<security-domains>
[View Less]
13 years, 6 months
teiid SVN: r3494 - in branches/7.4.x/engine/src: test/java/org/teiid/common/buffer and 1 other directory.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-09-14 13:57:13 -0400 (Wed, 14 Sep 2011)
New Revision: 3494
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
branches/7.4.x/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
Log:
TEIID-1753 addressing compaction issues
Modified: branches/7.4.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- branches/7.4.x/…
[View More]engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-13 23:22:55 UTC (rev 3493)
+++ branches/7.4.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-14 17:57:13 UTC (rev 3494)
@@ -36,12 +36,14 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -95,7 +97,8 @@
private final class BatchManagerImpl implements BatchManager {
private final String id;
private volatile FileStore store;
- private Map<Long, long[]> physicalMapping = new ConcurrentHashMap<Long, long[]>();
+ private Map<Long, long[]> physicalMapping = new HashMap<Long, long[]>();
+ private ConcurrentSkipListSet<Long> freed = new ConcurrentSkipListSet<Long>();
private ReadWriteLock compactionLock = new ReentrantReadWriteLock();
private AtomicLong unusedSpace = new AtomicLong();
private int[] lobIndexes;
@@ -109,6 +112,13 @@
this.sizeUtility = new SizeUtility();
}
+ private void freeBatch(Long id) {
+ long[] info = physicalMapping.remove(id);
+ if (info != null) {
+ unusedSpace.addAndGet(info[1]);
+ }
+ }
+
public FileStore createStorage(String prefix) {
return createFileStore(id+prefix);
}
@@ -123,7 +133,7 @@
}
private boolean shouldCompact(long offset) {
- return offset > COMPACTION_THRESHOLD && unusedSpace.get() * 4 > offset * 3;
+ return offset > compactionThreshold && unusedSpace.get() * 4 > offset * 3;
}
private long getOffset() throws TeiidComponentException {
@@ -131,45 +141,38 @@
if (!shouldCompact(offset)) {
return offset;
}
- try {
- this.compactionLock.writeLock().lock();
- offset = store.getLength();
- //retest the condition to ensure that compaction is still needed
- if (!shouldCompact(offset)) {
- return offset;
+ offset = store.getLength();
+ FileStore newStore = createFileStore(id);
+ newStore.setCleanupReference(this);
+ byte[] buffer = new byte[IO_BUFFER_SIZE];
+ List<long[]> values = new ArrayList<long[]>(physicalMapping.values());
+ Collections.sort(values, new Comparator<long[]>() {
+ @Override
+ public int compare(long[] o1, long[] o2) {
+ return Long.signum(o1[0] - o2[0]);
}
- FileStore newStore = createFileStore(id);
- newStore.setCleanupReference(this);
- byte[] buffer = new byte[IO_BUFFER_SIZE];
- List<long[]> values = new ArrayList<long[]>(physicalMapping.values());
- Collections.sort(values, new Comparator<long[]>() {
- @Override
- public int compare(long[] o1, long[] o2) {
- return Long.signum(o1[0] - o2[0]);
- }
- });
- for (long[] info : values) {
- long oldOffset = info[0];
- info[0] = newStore.getLength();
- int size = (int)info[1];
- while (size > 0) {
- int toWrite = Math.min(IO_BUFFER_SIZE, size);
- store.readFully(oldOffset, buffer, 0, toWrite);
- newStore.write(buffer, 0, toWrite);
- size -= toWrite;
- }
+ });
+ for (long[] info : values) {
+ long oldOffset = info[0];
+ info[0] = newStore.getLength();
+ int size = (int)info[1];
+ while (size > 0) {
+ int toWrite = Math.min(IO_BUFFER_SIZE, size);
+ store.readFully(oldOffset, buffer, 0, toWrite);
+ newStore.write(buffer, 0, toWrite);
+ size -= toWrite;
+ oldOffset += toWrite;
}
- store.remove();
- store = newStore;
- long oldOffset = offset;
- offset = store.getLength();
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Compacted store", id, "pre-size", oldOffset, "post-size", offset); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
- }
- return offset;
- } finally {
- this.compactionLock.writeLock().unlock();
}
+ store.remove();
+ store = newStore;
+ long oldOffset = offset;
+ offset = store.getLength();
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Compacted store", id, "pre-size", oldOffset, "post-size", offset); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ }
+ unusedSpace.set(0);
+ return offset;
}
@Override
@@ -298,7 +301,6 @@
try {
this.batchManager.compactionLock.readLock().lock();
long[] info = batchManager.physicalMapping.get(this.id);
- Assertion.isNotNull(info, "Invalid batch " + id); //$NON-NLS-1$
ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(batchManager.store.createInputStream(info[0]), IO_BUFFER_SIZE));
batch = new TupleBatch();
batch.setDataTypes(types);
@@ -341,16 +343,20 @@
lobManager.updateReferences(batchManager.lobIndexes, tuple);
}
}
- synchronized (batchManager.store) {
- offset = batchManager.getOffset();
- OutputStream fsos = new BufferedOutputStream(batchManager.store.createOutputStream(), IO_BUFFER_SIZE);
- ObjectOutputStream oos = new ObjectOutputStream(fsos);
- batch.writeExternal(oos);
- oos.close();
- long size = batchManager.store.getLength() - offset;
- long[] info = new long[] {offset, size};
- batchManager.physicalMapping.put(this.id, info);
+ batchManager.compactionLock.writeLock().lock();
+ lockheld = true;
+ Long free = null;
+ while ((free = batchManager.freed.pollFirst()) != null) {
+ batchManager.freeBatch(free);
}
+ offset = batchManager.getOffset();
+ OutputStream fsos = new BufferedOutputStream(batchManager.store.createOutputStream(), IO_BUFFER_SIZE);
+ ObjectOutputStream oos = new ObjectOutputStream(fsos);
+ batch.writeExternal(oos);
+ oos.close();
+ long size = batchManager.store.getLength() - offset;
+ long[] info = new long[] {offset, size};
+ batchManager.physicalMapping.put(this.id, info);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "batch written starting at:", offset); //$NON-NLS-1$
}
@@ -383,12 +389,17 @@
}
}
}
- long[] info = batchManager.physicalMapping.remove(id);
- if (info != null) {
- batchManager.unusedSpace.addAndGet(info[1]);
- }
activeBatch = null;
batchReference = null;
+ if (batchManager.compactionLock.writeLock().tryLock()) {
+ try {
+ batchManager.freeBatch(id);
+ } finally {
+ batchManager.compactionLock.writeLock().unlock();
+ }
+ } else {
+ batchManager.freed.add(id);
+ }
}
@Override
@@ -407,6 +418,7 @@
private volatile int reserveBatchKB;
private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a hint to set the reserveBatchKB
private boolean useWeakReferences = true;
+ private int compactionThreshold = COMPACTION_THRESHOLD;
private ReentrantLock lock = new ReentrantLock(true);
private Condition batchesFreed = lock.newCondition();
@@ -642,7 +654,7 @@
try {
mb.persist();
} catch (TeiidComponentException e) {
- LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read that batch later will result in an exception"); //$NON-NLS-1$
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read that batch later will result in an exception"); //$NON-NLS-1$
}
}
}
@@ -651,8 +663,6 @@
public int getSchemaSize(List<? extends Expression> elements) {
int total = 0;
boolean isValueCacheEnabled = DataTypeManager.isValueCacheEnabled();
- //we make a assumption that the average column size under 64bits is approximately 128bytes
- //this includes alignment, row/array, and reference overhead
for (Expression element : elements) {
Class<?> type = element.getType();
total += SizeUtility.getSize(isValueCacheEnabled, type);
@@ -703,4 +713,8 @@
this.useWeakReferences = useWeakReferences;
}
+ public void setCompactionThreshold(int compactionThreshold) {
+ this.compactionThreshold = compactionThreshold;
+ }
+
}
Modified: branches/7.4.x/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
===================================================================
--- branches/7.4.x/engine/src/test/java/org/teiid/common/buffer/TestSTree.java 2011-09-13 23:22:55 UTC (rev 3493)
+++ branches/7.4.x/engine/src/test/java/org/teiid/common/buffer/TestSTree.java 2011-09-14 17:57:13 UTC (rev 3494)
@@ -100,5 +100,34 @@
}
}
+
+ /**
+ * Forces the logic through several compaction cycles by using large strings
+ * @throws TeiidComponentException
+ */
+ @Test public void testCompaction() throws TeiidComponentException {
+ BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
+ bm.setProcessorBatchSize(32);
+ bm.setMaxReserveKB(0);//force all to disk
+ bm.setCompactionThreshold(0);
+ bm.initialize();
+
+ ElementSymbol e1 = new ElementSymbol("x");
+ e1.setType(String.class);
+ List<ElementSymbol> elements = Arrays.asList(e1);
+ STree map = bm.createSTree(elements, "1", 1);
+
+ int size = 1000;
+
+ for (int i = 0; i < size; i++) {
+ assertNull(map.insert(Arrays.asList(new String(new byte[1000])), InsertMode.ORDERED, size));
+ assertEquals(i + 1, map.getRowCount());
+ }
+
+ for (int i = 0; i < size; i++) {
+ assertNotNull(map.remove(Arrays.asList(new String(new byte[1000]))));
+ }
+
+ }
}
[View Less]
13 years, 6 months
teiid SVN: r3493 - in branches/7.1.x/engine/src/main/java/org/teiid/query: tempdata and 1 other directory.
by teiid-commits@lists.jboss.org
Author: loleary
Date: 2011-09-13 19:22:55 -0400 (Tue, 13 Sep 2011)
New Revision: 3493
Modified:
branches/7.1.x/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java
branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
Log:
TEIID-1657: making view name available to the mattable request such that it can be proactively loaded at other nodes in the cluster
Modified: branches/7.1.x/engine/src/main/java/org/teiid/query/optimizer/…
[View More]relational/RelationalPlanner.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java 2011-09-13 23:22:43 UTC (rev 3492)
+++ branches/7.1.x/engine/src/main/java/org/teiid/query/optimizer/relational/RelationalPlanner.java 2011-09-13 23:22:55 UTC (rev 3493)
@@ -1004,6 +1004,7 @@
id = store.addTempGroup(matTableName, ResolverUtil.resolveElementsInGroup(table, metadata), false, true);
id.setQueryNode(metadata.getVirtualPlan(table.getMetadataID()));
id.setCardinality(metadata.getCardinality(table.getMetadataID()));
+ id.setOriginalMetadataID(table.getMetadataID());
Object pk = metadata.getPrimaryKey(table.getMetadataID());
if (pk != null) {
Modified: branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-09-13 23:22:43 UTC (rev 3492)
+++ branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-09-13 23:22:55 UTC (rev 3493)
@@ -420,8 +420,13 @@
if (!group.isTempGroupSymbol()) {
return null;
}
+ String viewName = null;
final String tableName = group.getNonCorrelationName().toUpperCase();
boolean remapColumns = !tableName.equalsIgnoreCase(group.getName());
+ TempMetadataID groupID = (TempMetadataID)group.getMetadataID();
+ if (groupID.getOriginalMetadataID() != null) {
+ viewName = context.getMetadata().getFullName(groupID.getOriginalMetadataID());
+ }
TempTable table = null;
if (group.isGlobalTable()) {
final TempTableStore globalStore = context.getGlobalTableStore();
@@ -445,9 +450,9 @@
if (load) {
if (!info.isValid()) {
//blocking load
- loadGlobalTable(context, group, tableName, null, globalStore, info, loadTime, true);
+ loadGlobalTable(context, group, tableName, viewName, globalStore, info, loadTime, true);
} else {
- loadAsynch(context, group, tableName, null, globalStore, info, loadTime);
+ loadAsynch(context, group, tableName, viewName, globalStore, info, loadTime);
}
}
table = globalStore.getOrCreateTempTable(tableName, query, bufferManager, false);
@@ -611,8 +616,8 @@
matTableEntry.lastUpdate = System.currentTimeMillis();
MatTableEntry entry = refreshJob.put(key, matTableEntry, null);
if (entry == null) {
- // in the case of refreshjob, cacheCreate are not being notified correctly due to nature of how Teiid uses the cache
- // so, in order to get a cacheModified event insert again.
+ // Due to nature how the JBoss Cache being used as flat keys with nodes, node creation event is
+ // hard to capture, this is way to insert and later do update to capture the node modification.
refreshJob.put(key, matTableEntry, null);
}
}
[View Less]
13 years, 6 months
teiid SVN: r3492 - in branches/7.1.x: engine/src/main/java/org/teiid/cache and 7 other directories.
by teiid-commits@lists.jboss.org
Author: loleary
Date: 2011-09-13 19:22:43 -0400 (Tue, 13 Sep 2011)
New Revision: 3492
Modified:
branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java
branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheListener.java
branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/TupleBatchCacheLoader.java
branches/7.1.x/engine/src/main/java/org/teiid/cache/Cache.java
branches/7.1.x/engine/src/main/java/org/teiid/cache/…
[View More]CacheListener.java
branches/7.1.x/engine/src/main/java/org/teiid/cache/DefaultCache.java
branches/7.1.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
branches/7.1.x/engine/src/main/resources/org/teiid/query/i18n.properties
branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java
branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
branches/7.1.x/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
Log:
TEIID-1657: Implemented distributed refreshMatView call based on the JBoss Cache Listener mechanism. When user issues a refreshMatView call on one node, then all the nodes gets notified about the change in the original node after load of the mat view table is finished. The proactive load on the passive nodes are done in a separate thread. Also fixed a memory leak with the TupleBufferCacheLoader in loading batches.
Modified: branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java
===================================================================
--- branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java 2011-09-13 23:22:43 UTC (rev 3492)
@@ -112,11 +112,13 @@
}
}
- public synchronized void addListener(CacheListener listener) {
- this.cacheListener = new JBossCacheListener(this.rootFqn, listener);
+ @Override
+ public synchronized void setListener(CacheListener listener) {
+ this.cacheListener = new JBossCacheListener(this.rootFqn, this.cacheStore, this, listener);
this.cacheStore.addCacheListener(this.cacheListener);
}
+ @Override
public synchronized void removeListener() {
this.cacheStore.removeCacheListener(this.cacheListener);
this.cacheListener = null;
Modified: branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheListener.java
===================================================================
--- branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheListener.java 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheListener.java 2011-09-13 23:22:43 UTC (rev 3492)
@@ -21,7 +21,11 @@
*/
package org.teiid.cache.jboss;
+import java.util.Set;
+
import org.jboss.cache.Fqn;
+import org.jboss.cache.Node;
+import org.jboss.cache.eviction.ExpirationAlgorithmConfig;
import org.jboss.cache.notifications.annotation.NodeCreated;
import org.jboss.cache.notifications.annotation.NodeEvicted;
import org.jboss.cache.notifications.annotation.NodeLoaded;
@@ -33,14 +37,18 @@
@org.jboss.cache.notifications.annotation.CacheListener
-public class JBossCacheListener {
+public class JBossCacheListener<K,V> {
private CacheListener listener;
private Fqn rootFqn;
+ private JBossCache cache;
+ private org.jboss.cache.Cache<K,V> cacheStore;
- public JBossCacheListener(Fqn fqn, CacheListener listener) {
+ public JBossCacheListener(Fqn fqn, org.jboss.cache.Cache cacheStore, JBossCache cache, CacheListener listener) {
this.rootFqn = fqn;
this.listener = listener;
+ this.cache = cache;
+ this.cacheStore = cacheStore;
}
@NodeCreated
@@ -55,4 +63,61 @@
listener.cacheChanged();
}
}
+
+ @NodeCreated
+ public synchronized void cacheCreated(NodeEvent ne) {
+ if (!ne.isPre() && !ne.isOriginLocal()) {
+ Fqn fqn = ne.getFqn();
+ if (fqn.isChildOrEquals(rootFqn)) {
+ Node<K,V> node = this.cacheStore.getNode(fqn);
+ if (node != null) {
+ Set<K> keys = node.getKeys();
+ for (K key:keys) {
+ if ((key instanceof String) && (key.equals(ExpirationAlgorithmConfig.EXPIRATION_KEY))) {
+ continue;
+ }
+ listener.cacheCreated(key, cache.get(key));
+ }
+ }
+ }
+ }
+ }
+
+ @NodeRemoved
+ public synchronized void cacheRemoved(NodeEvent ne) {
+ if (!ne.isPre() && !ne.isOriginLocal()) {
+ Fqn fqn = ne.getFqn();
+ if (fqn.isChildOrEquals(rootFqn)) {
+ Node<K,V> node = this.cacheStore.getNode(fqn);
+ if (node != null) {
+ Set<K> keys = node.getKeys();
+ for (K key:keys) {
+ if ((key instanceof String) && (key.equals(ExpirationAlgorithmConfig.EXPIRATION_KEY))) {
+ continue;
+ }
+ listener.cacheRemoved(key, cache.get(key));
+ }
+ }
+ }
+ }
+ }
+
+ @NodeModified
+ public synchronized void cacheModified(NodeEvent ne) {
+ Fqn fqn = ne.getFqn();
+ if (!ne.isPre() && !ne.isOriginLocal()) {
+ if (fqn.isChildOrEquals(rootFqn)) {
+ Node<K,V> node = this.cacheStore.getNode(fqn);
+ if (node != null) {
+ Set<K> keys = node.getKeys();
+ for (K key:keys) {
+ if ((key instanceof String) && (key.equals(ExpirationAlgorithmConfig.EXPIRATION_KEY))) {
+ continue;
+ }
+ listener.cacheModified(key, cache.get(key));
+ }
+ }
+ }
+ }
+ }
}
Modified: branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/TupleBatchCacheLoader.java
===================================================================
--- branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/TupleBatchCacheLoader.java 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/TupleBatchCacheLoader.java 2011-09-13 23:22:43 UTC (rev 3492)
@@ -62,9 +62,8 @@
map.put(id, b);
return map;
}
- return super.get(fqn);
}
- return null;
+ return super.get(fqn);
}
@Override
Modified: branches/7.1.x/engine/src/main/java/org/teiid/cache/Cache.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/cache/Cache.java 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/engine/src/main/java/org/teiid/cache/Cache.java 2011-09-13 23:22:43 UTC (rev 3492)
@@ -82,4 +82,14 @@
* @return
*/
Set<K> keys();
+
+ /**
+ * set cache listener
+ * @param listener
+ */
+ void setListener(CacheListener<K, V> listener);
+
+
+ void removeListener();
+
}
Modified: branches/7.1.x/engine/src/main/java/org/teiid/cache/CacheListener.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/cache/CacheListener.java 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/engine/src/main/java/org/teiid/cache/CacheListener.java 2011-09-13 23:22:43 UTC (rev 3492)
@@ -25,6 +25,9 @@
/**
* Listener for the cache events like add,update delete
*/
-public interface CacheListener {
+public interface CacheListener<K, V> {
void cacheChanged();
+ void cacheCreated(K key, V value);
+ void cacheRemoved(K key, V value);
+ void cacheModified(K key, V value);
}
Modified: branches/7.1.x/engine/src/main/java/org/teiid/cache/DefaultCache.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/cache/DefaultCache.java 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/engine/src/main/java/org/teiid/cache/DefaultCache.java 2011-09-13 23:22:43 UTC (rev 3492)
@@ -95,9 +95,11 @@
this.ttl = ttl;
}
- public void addListener(CacheListener listener) {
- throw new UnsupportedOperationException();
+ public void setListener(CacheListener listener) {
}
+
+ public void removeListener() {
+ }
public void clear() {
synchronized (map) {
Modified: branches/7.1.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2011-09-13 23:22:43 UTC (rev 3492)
@@ -95,8 +95,12 @@
this.uuid = java.util.UUID.randomUUID().toString();
}
return this.uuid;
- }
+ }
+ public void setId(String uuid) {
+ this.uuid = uuid;
+ }
+
public boolean isLobs() {
return lobIndexes != null;
}
Modified: branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2011-09-13 23:22:43 UTC (rev 3492)
@@ -130,6 +130,7 @@
}
buffer = bufferManager.createTupleBuffer(schema, "cached", TupleSourceType.FINAL); //$NON-NLS-1$
buffer.setBatchSize(this.batchSize);
+ buffer.setId(this.uuid);
if (this.hint != null) {
buffer.setPrefersMemory(this.hint.getPrefersMemory());
}
@@ -142,6 +143,7 @@
return false;
}
buffer.addTupleBatch(batch, true);
+ cache.remove(uuid+","+row); //$NON-NLS-1$
}
this.results = buffer;
bufferManager.addTupleBuffer(this.results);
Modified: branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-09-13 23:22:43 UTC (rev 3492)
@@ -22,6 +22,10 @@
package org.teiid.dqp.internal.process;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -46,6 +50,7 @@
import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
import org.teiid.cache.CacheConfiguration;
import org.teiid.cache.CacheFactory;
+import org.teiid.cache.CacheListener;
import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.client.DQP;
import org.teiid.client.RequestMessage;
@@ -212,6 +217,7 @@
private CacheFactory cacheFactory;
private SessionAwareCache<CachedResults> matTables;
+ private CacheListener<TempTableDataManager.MatTableKey, TempTableDataManager.MatTableEntry> matTableListener;
/**
* perform a full shutdown and wait for 10 seconds for all threads to finish
@@ -711,11 +717,11 @@
}
if (cacheFactory.isReplicated()) {
- matTables = new SessionAwareCache<CachedResults>(this.cacheFactory, SessionAwareCache.Type.RESULTSET, new CacheConfiguration(Policy.EXPIRATION, -1, -1, "MaterilizationTables")); //$NON-NLS-1$
+ matTables = new SessionAwareCache<CachedResults>(this.cacheFactory, SessionAwareCache.Type.RESULTSET, new CacheConfiguration(Policy.EXPIRATION, -1, -1, "MaterializationTables")); //$NON-NLS-1$
matTables.setBufferManager(this.bufferManager);
}
- dataTierMgr = new TempTableDataManager(new DataTierManagerImpl(this,this.bufferService), this.bufferManager, this.processWorkerPool, this.rsCache, this.matTables, this.cacheFactory);
+ dataTierMgr = new TempTableDataManager(new DataTierManagerImpl(this,this.bufferService), this.bufferManager, this.processWorkerPool, this.rsCache, this.matTables, this.cacheFactory, this.matTableListener);
}
public void setBufferService(BufferService service) {
@@ -726,6 +732,29 @@
this.transactionService = service;
}
+ public void setMatTableListener(final CacheListener<TempTableDataManager.MatTableKey, TempTableDataManager.MatTableEntry> listener) {
+ this.matTableListener = (CacheListener<TempTableDataManager.MatTableKey, TempTableDataManager.MatTableEntry>)Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {CacheListener.class}, new InvocationHandler() {
+ @Override
+ public Object invoke(Object proxy, final Method method, final Object[] args) throws Throwable {
+ addWork(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ method.invoke(listener, args);
+ } catch (IllegalArgumentException e) {
+ LogManager.logDetail(LogConstants.CTX_DQP, e);
+ } catch (IllegalAccessException e) {
+ LogManager.logDetail(LogConstants.CTX_DQP, e);
+ } catch (InvocationTargetException e) {
+ LogManager.logDetail(LogConstants.CTX_DQP, e);
+ }
+ }
+ });
+ return null;
+ }
+ });
+ }
+
@Override
public boolean cancelRequest(long requestID)
throws TeiidProcessingException, TeiidComponentException {
Modified: branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2011-09-13 23:22:43 UTC (rev 3492)
@@ -32,6 +32,7 @@
import org.teiid.cache.Cache;
import org.teiid.cache.CacheConfiguration;
import org.teiid.cache.CacheFactory;
+import org.teiid.cache.CacheListener;
import org.teiid.cache.DefaultCache;
import org.teiid.cache.DefaultCacheFactory;
import org.teiid.cache.CacheConfiguration.Policy;
@@ -307,4 +308,14 @@
public void setBufferManager(BufferManager bufferManager) {
this.bufferManager = bufferManager;
}
+
+ public void setListener(CacheListener<CacheID, T> listener) {
+ this.localCache.setListener(listener);
+ this.distributedCache.setListener(listener);
+ }
+
+ public void removeListener() {
+ this.localCache.removeListener();
+ this.distributedCache.removeListener();
+ }
}
Modified: branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-09-13 23:22:43 UTC (rev 3492)
@@ -40,6 +40,7 @@
import org.teiid.cache.Cache;
import org.teiid.cache.CacheConfiguration;
import org.teiid.cache.CacheFactory;
+import org.teiid.cache.CacheListener;
import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
@@ -113,7 +114,7 @@
private SessionAwareCache<CachedResults> cache;
private Executor executor;
- private static class MatTableKey implements Serializable {
+ public static class MatTableKey implements Serializable {
private static final long serialVersionUID = 5481692896572663992L;
String name;
VDBKey vdb;
@@ -134,19 +135,37 @@
MatTableKey other = (MatTableKey)obj;
return this.name.equals(other.name) && this.vdb.equals(other.vdb);
}
+
+ public String getVDBName() {
+ return vdb.getName();
+ }
+
+ public int getVDBVersion() {
+ return vdb.getVersion();
+ }
}
- private static class MatTableEntry implements Serializable {
+ public static class MatTableEntry implements Serializable {
private static final long serialVersionUID = 8559613701442751579L;
long lastUpdate = System.currentTimeMillis();
boolean valid;
+ String viewName;
+
+ public String getViewName() {
+ return viewName;
+ }
+
+ public boolean allowsUpdate() {
+ return valid && viewName != null;
+ }
}
private Cache<MatTableKey, MatTableEntry> tables;
+ private Cache<MatTableKey, MatTableEntry> refreshJob;
private SessionAwareCache<CachedResults> distributedCache;
public TempTableDataManager(ProcessorDataManager processorDataManager, BufferManager bufferManager,
- Executor executor, SessionAwareCache<CachedResults> cache, SessionAwareCache<CachedResults> distibutedCache, CacheFactory cacheFactory){
+ Executor executor, SessionAwareCache<CachedResults> cache, SessionAwareCache<CachedResults> distibutedCache, CacheFactory cacheFactory, CacheListener<MatTableKey, MatTableEntry> listener){
this.processorDataManager = processorDataManager;
this.bufferManager = bufferManager;
this.executor = executor;
@@ -155,6 +174,10 @@
if (distibutedCache != null) {
CacheConfiguration cc = new CacheConfiguration(Policy.LRU, -1, -1, "MaterializationUpdates"); //$NON-NLS-1$
tables = cacheFactory.get(cc.getLocation(), cc);
+
+ cc = new CacheConfiguration(Policy.LRU, -1, -1, "MaterializationRefresh"); //$NON-NLS-1$
+ refreshJob = cacheFactory.get(cc.getLocation(), cc);
+ refreshJob.setListener(listener);
}
}
@@ -289,7 +312,7 @@
context.setDeterminismLevel(determinismLevel);
return tb.createIndexedTupleSource();
}
-
+
private TupleSource handleSystemProcedures(CommandContext context, StoredProcedure proc)
throws TeiidComponentException, QueryMetadataException,
QueryProcessingException, QueryResolverException,
@@ -303,9 +326,23 @@
String matTableName = RelationalPlanner.MAT_PREFIX+matViewName.toUpperCase();
LogManager.logDetail(LogConstants.CTX_MATVIEWS, "processing refreshmatview for", matViewName); //$NON-NLS-1$
MatTableInfo info = globalStore.getMatTableInfo(matTableName);
+
+ Long loadTime = null;
+ boolean useCache = false;
+ if (this.distributedCache != null) {
+ MatTableKey key = new MatTableKey();
+ key.name = matTableName;
+ key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
+ MatTableEntry entry = this.tables.get(key);
+ useCache = (entry != null && entry.valid && entry.lastUpdate > info.getUpdateTime());
+ if (useCache) {
+ loadTime = entry.lastUpdate;
+ }
+ }
+
boolean invalidate = Boolean.TRUE.equals(((Constant)proc.getParameter(2).getExpression()).getValue());
if (invalidate) {
- touchTable(context, matTableName, false);
+ touchTable(context, matTableName, matViewName, false, System.currentTimeMillis());
}
MatState oldState = info.setState(MatState.NEEDS_LOADING, invalidate?Boolean.FALSE:null, null);
if (oldState == MatState.LOADING) {
@@ -316,7 +353,7 @@
Object matTableId = RelationalPlanner.getGlobalTempTableMetadataId(group, matTableName, context, metadata, AnalysisRecord.createNonRecordingRecord());
GroupSymbol matTable = new GroupSymbol(matTableName);
matTable.setMetadataID(matTableId);
- int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore, info, null, false);
+ int rowCount = loadGlobalTable(context, matTable, matTableName, matViewName, globalStore, info, invalidate?null:loadTime, !invalidate && useCache);
return CollectionTupleSource.createUpdateCountTupleSource(rowCount);
} else if (StringUtil.endsWithIgnoreCase(proc.getProcedureCallableName(), REFRESHMATVIEWROW)) {
Object groupID = validateMatView(metadata, proc);
@@ -408,9 +445,9 @@
if (load) {
if (!info.isValid()) {
//blocking load
- loadGlobalTable(context, group, tableName, globalStore, info, loadTime, true);
+ loadGlobalTable(context, group, tableName, null, globalStore, info, loadTime, true);
} else {
- loadAsynch(context, group, tableName, globalStore, info, loadTime);
+ loadAsynch(context, group, tableName, null, globalStore, info, loadTime);
}
}
table = globalStore.getOrCreateTempTable(tableName, query, bufferManager, false);
@@ -435,13 +472,13 @@
}
private void loadAsynch(final CommandContext context,
- final GroupSymbol group, final String tableName,
+ final GroupSymbol group, final String tableName,final String viewName,
final TempTableStore globalStore, final MatTableInfo info,
final Long loadTime) {
Callable<Integer> toCall = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
- return loadGlobalTable(context, group, tableName, globalStore, info, loadTime, true);
+ return loadGlobalTable(context, group, tableName, viewName, globalStore, info, loadTime, true);
}
};
FutureTask<Integer> task = new FutureTask<Integer>(toCall);
@@ -449,9 +486,9 @@
}
private int loadGlobalTable(CommandContext context,
- GroupSymbol group, final String tableName,
+ GroupSymbol group, final String tableName, final String viewName,
TempTableStore globalStore, MatTableInfo info, Long loadTime, boolean useCache)
- throws TeiidComponentException, TeiidProcessingException {
+ throws TeiidComponentException, TeiidProcessingException {
LogManager.logInfo(LogConstants.CTX_MATVIEWS, QueryPlugin.Util.getString("TempTableDataManager.loading", tableName)); //$NON-NLS-1$
QueryMetadataInterface metadata = context.getMetadata();
Create create = new Create();
@@ -477,9 +514,11 @@
}
}
int rowCount = -1;
+ boolean tableUpdated = false;
+ String fullName = null;
try {
- String fullName = metadata.getFullName(group.getMetadataID());
- TupleSource ts = null;
+ fullName = metadata.getFullName(group.getMetadataID());
+ TupleSource ts = null;
CacheID cid = null;
if (distributedCache != null) {
cid = new CacheID(new ParseInfo(), fullName, context.getVdbName(),
@@ -487,6 +526,7 @@
if (useCache) {
CachedResults cr = this.distributedCache.get(cid);
if (cr != null) {
+ LogManager.logInfo(LogConstants.CTX_MATVIEWS, QueryPlugin.Util.getString("TempTableDataManager.cache_load", tableName)); //$NON-NLS-1$
ts = cr.getResults().createIndexedTupleSource();
}
}
@@ -501,14 +541,15 @@
String transformation = metadata.getVirtualPlan(group.getMetadataID()).getQuery();
QueryProcessor qp = context.getQueryProcessorFactory().createQueryProcessor(transformation, fullName, context);
qp.setNonBlocking(true);
-
+
if (distributedCache != null) {
CachedResults cr = new CachedResults();
BatchCollector bc = qp.createBatchCollector();
TupleBuffer tb = bc.collectTuples();
cr.setResults(tb);
- touchTable(context, fullName, true);
- this.distributedCache.put(cid, FunctionMethod.VDB_DETERMINISTIC, cr, info.getTtl());
+ touchTable(context, fullName, viewName, true, info.getUpdateTime());
+ this.distributedCache.put(cid, FunctionMethod.VDB_DETERMINISTIC, cr, info.getTtl());
+ tableUpdated = true;
ts = tb.createIndexedTupleSource();
} else {
ts = new BatchCollector.BatchProducerTupleSource(qp);
@@ -541,19 +582,40 @@
globalStore.swapTempTable(tableName, table);
info.setState(MatState.LOADED, true, loadTime);
LogManager.logInfo(LogConstants.CTX_MATVIEWS, QueryPlugin.Util.getString("TempTableDataManager.loaded", tableName, rowCount)); //$NON-NLS-1$
+ if (tableUpdated) {
+ initiateRefreshAcrossCluster(context, fullName, viewName);
+ }
}
}
return rowCount;
}
- private void touchTable(CommandContext context, String fullName, boolean valid) {
+ private void touchTable(CommandContext context, String fullName, String viewName, boolean valid, long loadtime) {
MatTableKey key = new MatTableKey();
key.name = fullName;
key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
MatTableEntry matTableEntry = new MatTableEntry();
matTableEntry.valid = valid;
+ matTableEntry.viewName = viewName;
+ matTableEntry.lastUpdate = loadtime;
tables.put(key, matTableEntry, null);
}
+
+ private void initiateRefreshAcrossCluster(CommandContext context, String fullName, String viewName) {
+ MatTableKey key = new MatTableKey();
+ key.name = fullName;
+ key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
+ MatTableEntry matTableEntry = new MatTableEntry();
+ matTableEntry.valid = true;
+ matTableEntry.viewName = viewName;
+ matTableEntry.lastUpdate = System.currentTimeMillis();
+ MatTableEntry entry = refreshJob.put(key, matTableEntry, null);
+ if (entry == null) {
+ // in the case of refreshjob, cacheCreate are not being notified correctly due to nature of how Teiid uses the cache
+ // so, in order to get a cacheModified event insert again.
+ refreshJob.put(key, matTableEntry, null);
+ }
+ }
/**
* Return a list of ElementSymbols for the given index/key object
Modified: branches/7.1.x/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- branches/7.1.x/engine/src/main/resources/org/teiid/query/i18n.properties 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/engine/src/main/resources/org/teiid/query/i18n.properties 2011-09-13 23:22:43 UTC (rev 3492)
@@ -783,6 +783,7 @@
TempTableDataManager.failed_load=Failed to load materialized view table {0}.
TempTableDataManager.loaded=Loaded materialized view table {0} with row count {1}.
+TempTableDataManager.cache_load=Loaded materialized view table {0} from cached contents from another clustered node.
TempTableDataManager.loading=Loading materialized view table {0}
TempTableDataManager.not_implicit_matview={0} does not target an internal materialized view.
TempTableDataManager.row_refresh_pk=Materialized view {0} cannot have a row refreshed since there is no primary key.
Modified: branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java
===================================================================
--- branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestMaterialization.java 2011-09-13 23:22:43 UTC (rev 3492)
@@ -74,7 +74,7 @@
command.run();
}
};
- dataManager = new TempTableDataManager(hdm, bm, executor, cache, cache, new DefaultCacheFactory());
+ dataManager = new TempTableDataManager(hdm, bm, executor, cache, cache, new DefaultCacheFactory(), null);
}
private void execute(String sql, List<?>... expectedResults) throws Exception {
Modified: branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
--- branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2011-09-13 23:22:43 UTC (rev 3492)
@@ -245,7 +245,7 @@
command.run();
}
};
- dataManager = new TempTableDataManager(dataManager, bufferMgr, executor, cache, null, null);
+ dataManager = new TempTableDataManager(dataManager, bufferMgr, executor, cache, null, null, null);
}
if (context.getQueryProcessorFactory() == null) {
context.setQueryProcessorFactory(new QueryProcessorFactoryImpl(bufferMgr, dataManager, new DefaultCapabilitiesFinder(), null, context.getMetadata()));
Modified: branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
===================================================================
--- branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestTempTables.java 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/engine/src/test/java/org/teiid/query/processor/TestTempTables.java 2011-09-13 23:22:43 UTC (rev 3492)
@@ -76,7 +76,7 @@
command.run();
}
};
- dataManager = new TempTableDataManager(fdm, bm, executor, cache, null, null);
+ dataManager = new TempTableDataManager(fdm, bm, executor, cache, null, null, null);
}
@Test public void testInsertWithQueryExpression() throws Exception {
Modified: branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
--- branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2011-09-13 23:22:43 UTC (rev 3492)
@@ -70,6 +70,7 @@
import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
import org.teiid.adminapi.jboss.AdminProvider;
import org.teiid.cache.CacheFactory;
+import org.teiid.cache.CacheListener;
import org.teiid.client.DQP;
import org.teiid.client.RequestMessage;
import org.teiid.client.ResultsMessage;
@@ -97,6 +98,9 @@
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
import org.teiid.net.TeiidURL;
+import org.teiid.query.tempdata.TempTableDataManager;
+import org.teiid.query.tempdata.TempTableDataManager.MatTableEntry;
+import org.teiid.query.tempdata.TempTableDataManager.MatTableKey;
import org.teiid.security.SecurityHelper;
import org.teiid.transport.ClientServiceRegistry;
import org.teiid.transport.ClientServiceRegistryImpl;
@@ -147,7 +151,8 @@
public void start() {
dqpCore.setTransactionService((TransactionService)LogManager.createLoggingProxy(LogConstants.CTX_TXN_LOG, transactionServerImpl, new Class[] {TransactionService.class}, MessageLevel.DETAIL));
-
+ dqpCore.setMatTableListener(getMatTableListener());
+
// create the necessary services
createClientServices();
@@ -497,7 +502,7 @@
@Override
@ManagementOperation(description="Execute a sql query", params={@ManagementParameter(name="vdbName"),@ManagementParameter(name="vdbVersion"), @ManagementParameter(name="command"), @ManagementParameter(name="timoutInMilli")})
- public List<List> executeQuery(final String vdbName, final int version, final String command, final long timoutInMilli) throws AdminException {
+ public List<List> executeQuery(final String vdbName, final int version, final String command, final long timoutInMilli) throws AdminException {
Properties properties = new Properties();
properties.setProperty(TeiidURL.JDBC.VDB_NAME, vdbName);
properties.setProperty(TeiidURL.JDBC.VDB_VERSION, String.valueOf(version));
@@ -530,7 +535,14 @@
request.setExecutionId(0L);
request.setRowLimit(getMaxRowsFetchSize()); // this would limit the number of rows that are returned.
Future<ResultsMessage> message = dqpCore.executeRequest(requestID, request);
- ResultsMessage rm = message.get(timoutInMilli, TimeUnit.MILLISECONDS);
+
+ ResultsMessage rm = null;
+ if (timoutInMilli < 0) {
+ rm = message.get();
+ }
+ else {
+ rm = message.get(timoutInMilli, TimeUnit.MILLISECONDS);
+ }
if (rm.getException() != null) {
throw new AdminProcessingException(rm.getException());
@@ -607,4 +619,39 @@
}
return newResults;
}
+
+ private CacheListener<TempTableDataManager.MatTableKey, TempTableDataManager.MatTableEntry> getMatTableListener() {
+ return new CacheListener<TempTableDataManager.MatTableKey, TempTableDataManager.MatTableEntry>() {
+
+ @Override
+ public void cacheChanged() {
+ }
+
+ @Override
+ public void cacheCreated(MatTableKey key, MatTableEntry value) {
+ refreshMatView(key, value);
+ }
+
+ @Override
+ public void cacheModified(MatTableKey key, MatTableEntry value) {
+ refreshMatView(key, value);
+ }
+
+ private void refreshMatView(MatTableKey key, MatTableEntry value) {
+ if (value != null) {
+ try {
+ if (value.allowsUpdate()) {
+ executeQuery(key.getVDBName(), key.getVDBVersion(), "execute SYSADMIN.refreshmatview(viewname=>'"+value.getViewName()+"',invalidate=>false)", -1); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ } catch (AdminException e) {
+ LogManager.logWarning(LogConstants.CTX_RUNTIME, e, IntegrationPlugin.Util.getString("error_refresh", value.getViewName() )); //$NON-NLS-1$
+ }
+ }
+ }
+ @Override
+ public void cacheRemoved(MatTableKey key, MatTableEntry value) {
+
+ }
+ };
+ }
}
Modified: branches/7.1.x/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
===================================================================
--- branches/7.1.x/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties 2011-09-13 23:22:06 UTC (rev 3491)
+++ branches/7.1.x/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties 2011-09-13 23:22:43 UTC (rev 3492)
@@ -46,3 +46,4 @@
distribute_failed=Deploy of the archive failed {0}
template_not_found=Template not found for {0}
admin_executing=JOPR admin {0} is executing command {1}
+error_refresh=error occurred during refreshing the materialized view entries for view {0}
\ No newline at end of file
[View Less]
13 years, 6 months
teiid SVN: r3491 - in branches/7.1.x: build/kits/jboss-container/deploy/teiid and 1 other directories.
by teiid-commits@lists.jboss.org
Author: loleary
Date: 2011-09-13 19:22:06 -0400 (Tue, 13 Sep 2011)
New Revision: 3491
Modified:
branches/7.1.x/build/kits/jboss-container/deploy/teiid/teiid-cache-manager-jboss-beans-rename-me.xml
branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java
branches/7.1.x/pom.xml
Log:
TEIID-1493: When a node rejoins the cluster after the initial cache has been populated, during the join time the state has been set to not transfer. This need be set to …
[View More]transfer, also JBoss cache only transfers the state on "active" regions.
Modified: branches/7.1.x/build/kits/jboss-container/deploy/teiid/teiid-cache-manager-jboss-beans-rename-me.xml
===================================================================
--- branches/7.1.x/build/kits/jboss-container/deploy/teiid/teiid-cache-manager-jboss-beans-rename-me.xml 2011-09-13 23:21:52 UTC (rev 3490)
+++ branches/7.1.x/build/kits/jboss-container/deploy/teiid/teiid-cache-manager-jboss-beans-rename-me.xml 2011-09-13 23:22:06 UTC (rev 3491)
@@ -85,7 +85,7 @@
<!-- Hibernate 2LC can replicate custom types, so we use marshalling -->
<property name="useRegionBasedMarshalling">true</property>
<!-- Must match the value of "useRegionBasedMarshalling" -->
- <property name="inactiveOnStartup">true</property>
+ <property name="inactiveOnStartup">false</property>
<!-- Disable asynchronous RPC marshalling/sending -->
<property name="serializationExecutorPoolSize">0</property>
Modified: branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java
===================================================================
--- branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java 2011-09-13 23:21:52 UTC (rev 3490)
+++ branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java 2011-09-13 23:22:06 UTC (rev 3491)
@@ -59,6 +59,7 @@
if (!this.cacheStore.getCacheStatus().allowInvocations()) {
this.cacheStore.start();
+ this.cacheStore.getRegion(this.cacheStore.getRoot().getFqn(), true).activate();
}
Node cacheRoot = this.cacheStore.getRoot().addChild(Fqn.fromString("Teiid")); //$NON-NLS-1$
@@ -67,6 +68,7 @@
Region cacheRegion = this.cacheStore.getRegion(node.getFqn(), true);
cacheRegion.setEvictionRegionConfig(buildEvictionConfig(node.getFqn(), config));
+ cacheRegion.activate();
JBossCache jc = null;
if (config != null && config.getPolicy().equals(Policy.EXPIRATION)) {
Modified: branches/7.1.x/pom.xml
===================================================================
--- branches/7.1.x/pom.xml 2011-09-13 23:21:52 UTC (rev 3490)
+++ branches/7.1.x/pom.xml 2011-09-13 23:22:06 UTC (rev 3491)
@@ -365,7 +365,7 @@
<dependency>
<groupId>org.jboss.cache</groupId>
<artifactId>jbosscache-core</artifactId>
- <version>3.1.0.GA</version>
+ <version>3.2.5.GA</version>
<exclusions>
<exclusion>
<groupId>javax.transaction</groupId>
[View Less]
13 years, 6 months