teiid SVN: r4461 - trunk/build/kits/jboss-as7/docs/teiid.
by teiid-commits@lists.jboss.org
Author: van.halbert
Date: 2012-09-21 09:21:02 -0400 (Fri, 21 Sep 2012)
New Revision: 4461
Modified:
trunk/build/kits/jboss-as7/docs/teiid/teiid-releasenotes.html
Log:
TEIID-2214 - updated release notes
Modified: trunk/build/kits/jboss-as7/docs/teiid/teiid-releasenotes.html
===================================================================
--- trunk/build/kits/jboss-as7/docs/teiid/teiid-releasenotes.html 2012-09-21 13:06:52 UTC (rev 4460)
+++ trunk/build/kits/jboss-as7/docs/teiid/teiid-releasenotes.html 2012-09-21 13:21:02 UTC (rev 4461)
@@ -37,6 +37,7 @@
the corresponding function based index automatically. In any other circumstance, the metadata is not currently used.
<li>TEIID-2181 <b>System query performance</b> - system queries are now indexed on case-insensitive schema/table name columns.
<li>TEIID-2086 <b>Transactional results caching</b> - the result set cache is now transactional by default.
+ <li>TEIID-2210 <b>Object Translator</b> - the translator-object can support Infinispan Cache and other Map type caches. See OBJECTTABLE for executing queries against this translator.
</ul>
<h2><a name="Compatibility">Compatibility Issues</a></h2>
12 years, 5 months
teiid SVN: r4460 - in trunk/build: kits/jboss-as7/bin/scripts and 4 other directories.
by teiid-commits@lists.jboss.org
Author: van.halbert
Date: 2012-09-21 09:06:52 -0400 (Fri, 21 Sep 2012)
New Revision: 4460
Added:
trunk/build/kits/jboss-as7/modules/org/jboss/teiid/translator/object/
trunk/build/kits/jboss-as7/modules/org/jboss/teiid/translator/object/main/
trunk/build/kits/jboss-as7/modules/org/jboss/teiid/translator/object/main/module.xml
Modified:
trunk/build/assembly/jboss-as7/dist.xml
trunk/build/kits/jboss-as7/bin/scripts/teiid-domain-mode-install.cli
trunk/build/kits/jboss-as7/standalone/configuration/standalone-teiid.xml
Log:
TEIID-2213 adding translator-object to the kits
Modified: trunk/build/assembly/jboss-as7/dist.xml
===================================================================
--- trunk/build/assembly/jboss-as7/dist.xml 2012-09-21 13:01:05 UTC (rev 4459)
+++ trunk/build/assembly/jboss-as7/dist.xml 2012-09-21 13:06:52 UTC (rev 4460)
@@ -419,6 +419,27 @@
</binaries>
</moduleSet>
+ <moduleSet>
+ <includeSubModules>true</includeSubModules>
+ <useAllReactorProjects>true</useAllReactorProjects>
+ <includes>
+ <include>org.jboss.teiid.connectors:translator-object</include>
+ </includes>
+ <binaries>
+ <includeDependencies>true</includeDependencies>
+ <unpack>false</unpack>
+ <dependencySets>
+ <dependencySet>
+ <useProjectArtifact>true</useProjectArtifact>
+ <unpack>false</unpack>
+ <useTransitiveDependencies>false</useTransitiveDependencies>
+ <useDefaultExcludes>true</useDefaultExcludes>
+ </dependencySet>
+ </dependencySets>
+ <outputDirectory>modules/org/jboss/teiid/translator/object/main</outputDirectory>
+ <fileMode>0644</fileMode>
+ </binaries>
+ </moduleSet>
<!-- Include the JOPR plugin
<moduleSet>
<includeSubModules>true</includeSubModules>
Modified: trunk/build/kits/jboss-as7/bin/scripts/teiid-domain-mode-install.cli
===================================================================
--- trunk/build/kits/jboss-as7/bin/scripts/teiid-domain-mode-install.cli 2012-09-21 13:01:05 UTC (rev 4459)
+++ trunk/build/kits/jboss-as7/bin/scripts/teiid-domain-mode-install.cli 2012-09-21 13:06:52 UTC (rev 4460)
@@ -62,7 +62,11 @@
/profile=ha/subsystem=teiid/translator=ws:add(module=org.jboss.teiid.translator.ws)
/profile=ha/subsystem=teiid/translator=salesforce:add(module=org.jboss.teiid.translator.salesforce)
/profile=ha/subsystem=teiid/translator=hive:add(module=org.jboss.teiid.translator.hive)
+/profile=ha/subsystem=teiid/translator=infinispan-cache:add(module=org.jboss.teiid.translator.object)
+/profile=ha/subsystem=teiid/translator=infinispanremote-cache:add(module=org.jboss.teiid.translator.object)
+/profile=ha/subsystem=teiid/translator=map-cache:add(module=org.jboss.teiid.translator.object)
+
/profile=ha/subsystem=datasources/jdbc-driver=teiid:add(driver-name=teiid, driver-module-name=org.jboss.teiid.client, driver-class-name=org.teiid.jdbc.TeiidDriver, driver-xa-datasource-class-name=org.teiid.jdbc.TeiidDataSource)
/profile=ha/subsystem=datasources/jdbc-driver=teiid-local:add(driver-name=teiid-local, driver-module-name=org.jboss.teiid, driver-class-name=org.teiid.jdbc.TeiidDriver, driver-xa-datasource-class-name=org.teiid.jdbc.TeiidDataSource)
Added: trunk/build/kits/jboss-as7/modules/org/jboss/teiid/translator/object/main/module.xml
===================================================================
--- trunk/build/kits/jboss-as7/modules/org/jboss/teiid/translator/object/main/module.xml (rev 0)
+++ trunk/build/kits/jboss-as7/modules/org/jboss/teiid/translator/object/main/module.xml 2012-09-21 13:06:52 UTC (rev 4460)
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module xmlns="urn:jboss:module:1.0" name="org.jboss.teiid.translator.object">
+ <resources>
+ <resource-root path="translator-object-${project.version}.jar" />
+ <!-- Insert resources here -->
+ </resources>
+
+ <dependencies>
+ <module name="javax.api"/>
+ <module name="javax.resource.api"/>
+ <module name="org.jboss.teiid.common-core" />
+ <module name="org.jboss.teiid.api" />
+ <module name="javax.persistence.api"/>
+ <module name="org.hibernate"/>
+ <module name="org.infinispan"/>
+ <module name="org.infinispan.cachestore.remote"/>
+ <module name="org.infinispan.client.hotrod"/>
+ </dependencies>
+</module>
Modified: trunk/build/kits/jboss-as7/standalone/configuration/standalone-teiid.xml
===================================================================
--- trunk/build/kits/jboss-as7/standalone/configuration/standalone-teiid.xml 2012-09-21 13:01:05 UTC (rev 4459)
+++ trunk/build/kits/jboss-as7/standalone/configuration/standalone-teiid.xml 2012-09-21 13:06:52 UTC (rev 4460)
@@ -319,7 +319,11 @@
<translator name="salesforce" module="org.jboss.teiid.translator.salesforce"/>
<translator name="hive" module="org.jboss.teiid.translator.hive"/>
<translator name="jpa2" module="org.jboss.teiid.translator.jpa"/>
+ <translator name="infinispan-cache" module="org.jboss.teiid.translator.object"/>
+ <translator name="infinispanremote-cache" module="org.jboss.teiid.translator.object"/>
+ <translator name="map-cache" module="org.jboss.teiid.translator.object"/>
+
</subsystem>
<subsystem xmlns="urn:jboss:domain:threads:1.1">
<bounded-queue-thread-pool name="teiid-async">
12 years, 5 months
teiid SVN: r4459 - in trunk/connectors/translator-object: .settings and 13 other directories.
by teiid-commits@lists.jboss.org
Author: van.halbert
Date: 2012-09-21 09:01:05 -0400 (Fri, 21 Sep 2012)
New Revision: 4459
Added:
trunk/connectors/translator-object/examples
trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/ObjectConnection.java
trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/InfinispanBaseExecutionFactory.java
trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/InfinispanConnectionImpl.java
trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/mapcache/MapCacheConnection.java
trunk/connectors/translator-object/src/test/example_vdbs/
trunk/connectors/translator-object/src/test/example_vdbs/infinispancache-vdb.xml
trunk/connectors/translator-object/src/test/example_vdbs/infinispanremotecache-vdb.xml
trunk/connectors/translator-object/src/test/example_vdbs/mapcache-vdb.xml
Removed:
trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/mapcache/MapCacheSearchByKey.java
trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/FakeStrategy.java
trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/TestSelectProjections.java
Modified:
trunk/connectors/translator-object/.settings/org.maven.ide.eclipse.prefs
trunk/connectors/translator-object/pom.xml
trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/ObjectExecution.java
trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/ObjectExecutionFactory.java
trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/InfinispanExecutionFactory.java
trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/InfinispanRemoteExecutionFactory.java
trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/search/LuceneSearch.java
trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/search/SearchByKey.java
trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/mapcache/MapCacheExecutionFactory.java
trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/search/BasicKeySearchCriteria.java
trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/search/SearchCriterion.java
trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/util/ObjectUtil.java
trunk/connectors/translator-object/src/main/resources/org/teiid/translator/object/i18n.properties
trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/TestObjectExecution.java
trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/TestObjectExecutionFactory.java
trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/RemoteInfinispanTestHelper.java
trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/TestInfinispanConfigFileKeySearch.java
trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/TestInfinispanJndiILuceneSearch.java
trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/TestInfinispanJndiKeySearch.java
trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/TestInfinispanRemoteJndiKeySearch.java
trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/mapcache/TestMapCacheKeySearch.java
trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/search/TestBasicKeySearchCriteria.java
Log:
TEIID-2211 and TEIID-2213, adding translator to the dist kit and related changes to the translator to fix integration. Also, removed unneeded code with the advent of the OBJECTTABLE which removed the need to parse the whole SELECT command. Also, included a couple of dynamic vdb examples for each of the translator types.
Modified: trunk/connectors/translator-object/.settings/org.maven.ide.eclipse.prefs
===================================================================
--- trunk/connectors/translator-object/.settings/org.maven.ide.eclipse.prefs 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/.settings/org.maven.ide.eclipse.prefs 2012-09-21 13:01:05 UTC (rev 4459)
@@ -1,9 +1,6 @@
-#Wed Jan 25 12:40:16 CST 2012
-activeProfiles=
+#Wed Jan 25 12:40:49 CST 2012
eclipse.preferences.version=1
-fullBuildGoals=process-test-resources
-includeModules=false
-resolveWorkspaceProjects=true
-resourceFilterGoals=process-resources resources\:testResources
-skipCompilerPlugin=true
-version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.6
Added: trunk/connectors/translator-object/examples
===================================================================
--- trunk/connectors/translator-object/examples (rev 0)
+++ trunk/connectors/translator-object/examples 2012-09-21 13:01:05 UTC (rev 4459)
@@ -0,0 +1,9 @@
+
+
+Chaining objecttables using JDG Team example:
+
+SELECT o.TeamName, c.Name
+FROM object, OBJECTTABLE('x' PASSING ocol AS x COLUMNS TeamName varchar(255) 'teiid_row.foo', members object 'teiid_row.teamMembers') as o,
+OBJECTABLE('m' PASSING o.members as m COLUMNS Name string 'teiid_row.name') as c
+
+Would produce a list of every team/member name.
\ No newline at end of file
Modified: trunk/connectors/translator-object/pom.xml
===================================================================
--- trunk/connectors/translator-object/pom.xml 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/pom.xml 2012-09-21 13:01:05 UTC (rev 4459)
@@ -54,22 +54,26 @@
<groupId>org.infinispan</groupId>
<artifactId>infinispan-core</artifactId>
<version>${version.org.infinispan}</version>
- </dependency>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-query</artifactId>
<version>${version.org.infinispan}</version>
- </dependency>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-search</artifactId>
<version>${version.hibernate.search}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-client-hotrod</artifactId>
<version>${version.org.infinispan}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.infinispan</groupId>
@@ -87,10 +91,12 @@
<dependency>
<groupId>org.jgroups</groupId>
<artifactId>jgroups</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-river</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
Added: trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/ObjectConnection.java
===================================================================
--- trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/ObjectConnection.java (rev 0)
+++ trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/ObjectConnection.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package org.teiid.translator.object;
+
+import java.util.List;
+
+import org.teiid.language.Select;
+import org.teiid.translator.TranslatorException;
+
+/**
+ * Each ObjectConnection implementation represents a connection instance and is responsible for
+ * implementing the data source search language specifics for searching its cache.
+ *
+ * @author vhalbert
+ *
+ */
+public interface ObjectConnection {
+
+ /**
+ * Call to perform the search on the cache identified by this connection instance
+ * @param command
+ * @return List of Objects
+ * @throws TranslatorException
+ */
+ public List<Object> performSearch(Select command) throws TranslatorException;
+
+}
Modified: trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/ObjectExecution.java
===================================================================
--- trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/ObjectExecution.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/ObjectExecution.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -41,16 +41,14 @@
public class ObjectExecution implements ResultSetExecution {
private Select query;
- private Object connection;
- private ObjectExecutionFactory config;
+ private ObjectConnection connection;
private Iterator<Object> resultsIt = null;
public ObjectExecution(Select query, RuntimeMetadata metadata,
- ObjectExecutionFactory factory, Object connection) {
+ ObjectExecutionFactory factory, ObjectConnection connection) {
this.query = query;
this.connection = connection;
- this.config = factory;
}
@Override
@@ -59,11 +57,8 @@
LogManager.logTrace(LogConstants.CTX_CONNECTOR,
"ObjectExecution command: " + query.toString()); //$NON-NLS-1$
- SelectProjections projections = SelectProjections.create(config);
- projections.parse(query);
+ List<Object> results = executeQuery();
- List<Object> results = executeQuery(projections);
-
if (results != null && results.size() > 0) {
LogManager.logDetail(LogConstants.CTX_CONNECTOR,
"ObjectExecution number of returned objects is : " + results.size()); //$NON-NLS-1$
@@ -78,17 +73,15 @@
this.resultsIt = results.iterator();
}
- protected List<Object> executeQuery(SelectProjections projections)
+ protected List<Object> executeQuery()
throws TranslatorException {
- SearchStrategy is = this.config.getSearchStrategy();
LogManager
.logTrace(
LogConstants.CTX_CONNECTOR,
- "ObjectExecution calling search strategy : " + is.getClass().getName()); //$NON-NLS-1$
+ "ObjectExecution calling search strategy : " + connection.getClass().getName()); //$NON-NLS-1$
- return is.performSearch((Select) query, projections, this.config,
- this.connection);
+ return connection.performSearch((Select) query);
}
@@ -108,7 +101,6 @@
public void close() {
this.query = null;
this.connection = null;
- this.config = null;
this.resultsIt = null;
}
Modified: trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/ObjectExecutionFactory.java
===================================================================
--- trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/ObjectExecutionFactory.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/ObjectExecutionFactory.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -24,7 +24,11 @@
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
import javax.resource.cci.ConnectionFactory;
import org.teiid.language.QueryExpression;
@@ -35,46 +39,38 @@
import org.teiid.translator.ResultSetExecution;
import org.teiid.translator.TranslatorException;
import org.teiid.translator.TranslatorProperty;
-import org.teiid.translator.object.infinispan.search.SearchByKey;
-import org.teiid.translator.object.util.ObjectUtil;
+
/**
* The ObjectExecutionFactory is a base implementation for connecting to an
- * Object data source that's stored in a cache.
+ * Object cache. It provides the core features and behavior common to all implementations.
*
- *
* @author vhalbert
*
*/
public abstract class ObjectExecutionFactory extends
- ExecutionFactory<ConnectionFactory, Object> {
+ ExecutionFactory<ConnectionFactory, ObjectConnection> {
public static final int MAX_SET_SIZE = 1000;
- /*
- * SearchStrategy is the implementation that will perform a specific cache
- * lookup algorithm
- */
- private SearchStrategy searchStrategy = null;
- private String searchStrategyClassName = null;
-
// rootClassName identifies the name of the class that is identified by the
// unique key in the cache
private String rootClassName = null;
private Class<?> rootClass = null;
+ private String cacheJndiName;
+
public ObjectExecutionFactory() {
- super();
- this.setSourceRequiredForMetadata(false);
- this.setMaxInCriteriaSize(MAX_SET_SIZE);
- this.setMaxDependentInPredicates(1);
+ setSourceRequiredForMetadata(false);
+ setMaxInCriteriaSize(MAX_SET_SIZE);
+ setMaxDependentInPredicates(1);
- this.setSupportsOrderBy(false);
- this.setSupportsSelectDistinct(false);
- this.setSupportsInnerJoins(false);
- this.setSupportsFullOuterJoins(false);
- this.setSupportsOuterJoins(false);
+ setSupportsOrderBy(false);
+ setSupportsSelectDistinct(false);
+ setSupportsInnerJoins(false);
+ setSupportsFullOuterJoins(false);
+ setSupportsOuterJoins(false);
}
@@ -94,10 +90,6 @@
rootClass = Class.forName(rootClassName.trim(), true, getClass()
.getClassLoader());
- searchStrategy = (SearchStrategy) ObjectUtil.createObject(
- searchStrategyClassName, Collections.EMPTY_LIST, getClass()
- .getClassLoader());
-
} catch (ClassNotFoundException e) {
String msg = ObjectPlugin.Util.getString(
"ObjectExecutionFactory.rootClassNotFound",
@@ -110,12 +102,15 @@
@Override
public ResultSetExecution createResultSetExecution(QueryExpression command,
ExecutionContext executionContext, RuntimeMetadata metadata,
- Object connection) throws TranslatorException {
+ ObjectConnection connection) throws TranslatorException {
- return new ObjectExecution((Select) command, metadata, this, connection);
+
+ return new ObjectExecution((Select) command, metadata, this, (connection == null ? getConnection(null, executionContext) : connection) );
}
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
public List getSupportedFunctions() {
return Collections.EMPTY_LIST;
}
@@ -172,38 +167,8 @@
public boolean supportsOrderBy() {
return false;
}
-
+
/**
- * Get the class name for the search strategy that will be used to perform
- * object lookups in the cache.
- *
- * @return String class name
- * @see #setSearchStrategyClassName(String)
- */
- public String getSearchStrategyClassName() {
- return this.searchStrategyClassName;
- }
-
- /**
- * Set the class name for the search strategy that will be used to perform
- * object lookups in the cache.
- * <p>
- * Default is {@link SearchByKey}
- *
- * @param searchStrategyClassName
- * @see #getSearchStrategyClassName()
- */
-
- public void setSearchStrategyClassName(String searchStrategyClassName) {
- this.searchStrategyClassName = searchStrategyClassName;
- }
-
- protected SearchStrategy getSearchStrategy() {
- return this.searchStrategy;
- }
-
-
- /**
* Call to get the class name of the root object in the cache. This
* identifies the name of the class that is identified by the unique key in
* the cache
@@ -230,7 +195,30 @@
}
}
+
+ /**
+ * Get the JNDI name for the {@link Map cache} instance that should be used as the data source.
+ *
+ * @return the JNDI name of the {@link Map cache} instance that should be used,
+ * @see #setCacheJndiName(String)
+ */
+ @TranslatorProperty(display = "CacheJndiName", advanced = true)
+ public String getCacheJndiName() {
+ return cacheJndiName;
+ }
+ /**
+ * Set the JNDI name to a {@link Map cache} instance that should be used as this source.
+ *
+ * @param jndiName the JNDI name of the {@link Map cache} instance that should be used
+ * @see #setCacheJndiName(String)
+ */
+ public synchronized void setCacheJndiName( String jndiName ) {
+ if (this.cacheJndiName == jndiName || this.cacheJndiName != null
+ && this.cacheJndiName.equals(jndiName)) return; // unchanged
+ this.cacheJndiName = jndiName;
+ }
+
/**
* Call to get the class specified by calling
* {@link #setRootClassName(String)}
@@ -240,5 +228,41 @@
public Class<?> getRootClass() {
return this.rootClass;
}
+
+ /**
+ * Utility method available to all implementations to find the Cache via JNDI.
+ * @return Object located via JNDI
+ * @throws TranslatorException
+ */
+ protected Object findCacheUsingJNDIName() throws TranslatorException {
+
+ Object cache = null;
+ String jndiName = getCacheJndiName();
+ if (jndiName != null && jndiName.trim().length() != 0) {
+ try {
+ Context context = null;
+ if (context == null) {
+ try {
+ context = new InitialContext();
+ } catch (NamingException err) {
+ throw new TranslatorException(err);
+ }
+ }
+ cache = context.lookup(jndiName);
+
+ if (cache == null) {
+ String msg = ObjectPlugin.Util.getString(
+ "ObjectExecutionFactory.cacheNotFoundinJNDI",
+ new Object[] { jndiName });
+ throw new TranslatorException(msg); //$NON-NLS-1$
+
+ }
+ } catch (Throwable err) {
+ if (err instanceof RuntimeException) throw (RuntimeException)err;
+ throw new TranslatorException(err);
+ }
+ }
+ return cache;
+ }
}
Added: trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/InfinispanBaseExecutionFactory.java
===================================================================
--- trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/InfinispanBaseExecutionFactory.java (rev 0)
+++ trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/InfinispanBaseExecutionFactory.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -0,0 +1,198 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package org.teiid.translator.object.infinispan;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+import javax.resource.cci.ConnectionFactory;
+
+import org.infinispan.Cache;
+import org.infinispan.api.BasicCache;
+import org.infinispan.api.BasicCacheContainer;
+import org.infinispan.manager.CacheContainer;
+import org.infinispan.manager.DefaultCacheManager;
+import org.teiid.translator.ExecutionContext;
+import org.teiid.translator.TranslatorException;
+import org.teiid.translator.TranslatorProperty;
+import org.teiid.translator.object.ObjectConnection;
+import org.teiid.translator.object.ObjectExecutionFactory;
+import org.teiid.translator.object.ObjectPlugin;
+
+/**
+ *
+ * @author vhalbert
+ *
+ */
+public abstract class InfinispanBaseExecutionFactory extends ObjectExecutionFactory {
+ public static final String PROPERTIES_FILE = "META-INF" + File.separator
+ + "datagrid.properties";
+
+ private String cacheName = null;
+ private String configurationFileName = null;
+
+ public InfinispanBaseExecutionFactory() {
+ super();
+ }
+
+ @Override
+ public void start() throws TranslatorException {
+ super.start();
+
+ if (this.getCacheName() == null || this.getCacheName().isEmpty()) {
+ String msg = ObjectPlugin.Util.getString(
+ "InfinispanBaseExecutionFactory.cacheNameNotDefined",
+ new Object[] {});
+ throw new TranslatorException(msg); //$NON-NLS-1$
+ }
+
+ }
+
+ /**
+ * Will return <code>true</code> if FullText searching is supported for this implementation.
+ *
+ * @return True if full text searching is reported.
+ */
+ public abstract boolean isFullTextSearchingSupported();
+
+ /**
+ * Will return <code>true</code> if access to the cache is still allowed.
+ * @return True if access is alive
+ */
+ public abstract boolean isAlive();
+
+ /**
+ * Method for obtaining the CacheContainer by {@link InfinispanConnectionImpl#getCache() the connection}.
+ *
+ * @return BasicCacheContainer
+ * @throws TranslatorException
+ * if there an issue obtaining the cache
+ * @see #getCache()
+ */
+ protected abstract BasicCacheContainer getCacheContainer() throws TranslatorException;
+
+
+
+ /**
+ * Get the cacheName that will be used by this factory instance to access
+ * the named cache. However, if not specified a default configuration will
+ * be created.
+ *
+ * @return
+ * @see #setCacheName(String)
+ */
+ @TranslatorProperty(display = "CacheName", advanced = true)
+ public String getCacheName() {
+ return this.cacheName;
+ }
+
+ /**
+ * Set the cacheName that will be used to find the named cache.
+ *
+ * @param cacheName
+ * @see #getCacheName()
+ */
+
+ public void setCacheName(String cacheName) {
+ this.cacheName = cacheName;
+ }
+
+
+ /**
+ * Get the name of the configuration resource or file that should be used if
+ * a {@link Cache cache} is to be created using the
+ * {@link DefaultCacheManager}. If not specified, a default configuration
+ * will be used.
+ *
+ * @return the name of the resource or file configuration that should be
+ * passed to the {@link CacheContainer}, or null if the default
+ * configuration should be used
+ * @see #setConfigurationFileName(String)
+ */
+ @TranslatorProperty(display = "ConfigurationFileName", advanced = true)
+ public String getConfigurationFileName() {
+ return configurationFileName;
+ }
+
+ /**
+ * Set the name of the configuration that should be used if a {@link Cache
+ * cache} is to be created using the {@link DefaultCacheManager}.
+ *
+ * @param configurationFileName
+ * the name of the configuration file that should be used to load
+ * the {@link CacheContainer}, or null if the default
+ * configuration should be used
+ * @see #getConfigurationFileName()
+ */
+ public synchronized void setConfigurationFileName(
+ String configurationFileName) {
+ if (this.configurationFileName == configurationFileName
+ || this.configurationFileName != null
+ && this.configurationFileName.equals(configurationFileName))
+ return; // unchanged
+ this.configurationFileName = configurationFileName;
+ }
+
+ public BasicCache<String, Object> getCache() throws TranslatorException {
+ BasicCache<String, Object> cache = null;
+ BasicCacheContainer container = getCacheContainer();
+ if (getCacheName() != null) {
+ cache = container.getCache(getCacheName());
+ } else {
+ cache = container.getCache();
+ }
+
+ if (cache == null) {
+ String msg = ObjectPlugin.Util.getString(
+ "InfinispanBaseExecutionFactory.cacheNotFound", new Object[] { (getCacheName() != null ? getCacheName()
+ : "DefaultCache") });
+ throw new TranslatorException(msg); //$NON-NLS-1$
+ }
+
+ return cache;
+
+ }
+
+
+ @Override
+ public ObjectConnection getConnection(ConnectionFactory factory,
+ ExecutionContext executionContext) throws TranslatorException {
+
+ return new InfinispanConnectionImpl(this);
+ }
+
+ protected String jdgProperty(String name) {
+ Properties props = new Properties();
+ try {
+ props.load(this.getClass().getClassLoader()
+ .getResourceAsStream(PROPERTIES_FILE));
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ return props.getProperty(name);
+ }
+
+ public void cleanUp() {
+
+ }
+}
Added: trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/InfinispanConnectionImpl.java
===================================================================
--- trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/InfinispanConnectionImpl.java (rev 0)
+++ trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/InfinispanConnectionImpl.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -0,0 +1,107 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.translator.object.infinispan;
+
+import java.util.List;
+
+import org.infinispan.api.BasicCache;
+import org.teiid.language.Select;
+import org.teiid.resource.spi.BasicConnection;
+import org.teiid.translator.TranslatorException;
+import org.teiid.translator.object.ObjectConnection;
+import org.teiid.translator.object.infinispan.search.LuceneSearch;
+import org.teiid.translator.object.infinispan.search.SearchByKey;
+
+
+/**
+ * Represents an instance of a connection to an Infinispan cache. More than one connection can
+ * be created to query to the cache.
+ */
+public class InfinispanConnectionImpl extends BasicConnection implements ObjectConnection {
+
+ private InfinispanBaseExecutionFactory factory = null;
+
+ protected InfinispanConnectionImpl() {
+ super();
+ }
+ public InfinispanConnectionImpl(InfinispanBaseExecutionFactory factory) {
+ super();
+ this.factory = factory;
+ }
+
+
+ /**
+ * Close the connection, if a connection requires closing.
+ * (non-Javadoc)
+ */
+ @Override
+ public void close() {
+ factory = null;
+ }
+
+ /**
+ * Currently, this method always returns alive. We assume the connection is alive,
+ * and rely on proper timeout values to automatically clean up connections before
+ * any server-side timeout occurs. Rather than incur overhead by rebinding,
+ * we'll assume the connection is always alive, and throw an error when it is actually used,
+ * if the connection fails. This may be a more efficient way of handling failed connections,
+ * with the one tradeoff that stale connections will not be detected until execution time. In
+ * practice, there is no benefit to detecting stale connections before execution time.
+ *
+ * One possible extension is to implement a UnsolicitedNotificationListener.
+ * (non-Javadoc)
+ */
+ public boolean isAlive() {
+ return factory.isAlive();
+ }
+
+
+ @Override
+ public void cleanUp() {
+ factory = null;
+
+ }
+
+ public InfinispanBaseExecutionFactory getFactory() {
+ return this.factory;
+ }
+
+ @Override
+ public List<Object> performSearch(Select command)
+ throws TranslatorException {
+ if (factory.isFullTextSearchingSupported()) {
+ return LuceneSearch.performSearch(command, this);
+ } else {
+ return SearchByKey.performSearch(command, this);
+ }
+
+ }
+
+ public BasicCache<String, Object> getCache()
+ throws TranslatorException {
+ return factory.getCache();
+
+ }
+
+
+}
Modified: trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/InfinispanExecutionFactory.java
===================================================================
--- trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/InfinispanExecutionFactory.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/InfinispanExecutionFactory.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -25,193 +25,96 @@
import java.io.IOException;
import java.util.Properties;
-import org.infinispan.Cache;
-import org.infinispan.api.BasicCache;
import org.infinispan.api.BasicCacheContainer;
-import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.DefaultCacheManager;
-import org.teiid.language.QueryExpression;
-import org.teiid.language.Select;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
-import org.teiid.metadata.RuntimeMetadata;
-import org.teiid.translator.ExecutionContext;
-import org.teiid.translator.ResultSetExecution;
import org.teiid.translator.Translator;
import org.teiid.translator.TranslatorException;
import org.teiid.translator.TranslatorProperty;
-import org.teiid.translator.object.ObjectExecution;
-import org.teiid.translator.object.ObjectExecutionFactory;
import org.teiid.translator.object.ObjectPlugin;
-import org.teiid.translator.object.infinispan.search.LuceneSearch;
-import org.teiid.translator.object.infinispan.search.SearchByKey;
+/**
+ * InfinispanExecutionFactory is the translator that will access an Infinispan local cache.
+ * <p>
+ * The default settings are:
+ * <li>{@link #supportsLuceneSearching dynamic Searching} - will be set to <code>false</code>, supporting only Key searching.
+ * This is because you must have your objects in your cache annotated before Hibernate/Lucene searching will work.
+ * </li>
+ * <p>
+ * The required settings are:
+ * <li>{@link #setCacheJndiName(String) jndiName} OR {@link #setConfigurationFileName(String) configFileName} -
+ * must be specified to indicate how the Infinispan container will be obtained</li>
+ * <li>{@link #setCacheName(String) cacheName} - identifies the cache located in the Infinispan container</li>
+ * <p>
+ * Optional settings are:
+ * <li>{@link #setSupportsLuceneSearching(boolean) dynamic Searching} - when <code>true</code>, will use the
+ * Hibernate/Lucene searching to locate objects in the cache</li>
+ *
+ * @author vhalbert
+ *
+ */
@Translator(name = "infinispan-cache", description = "The Infinispan Cache Translator")
-public class InfinispanExecutionFactory extends ObjectExecutionFactory {
+public class InfinispanExecutionFactory extends InfinispanBaseExecutionFactory {
public static final String PROPERTIES_FILE = "META-INF" + File.separator
+ "datagrid.properties";
- private boolean useLeceneSearching = false;
- private String cacheName = null;
- private String configurationFileName = null;
+ private boolean supportsLuceneSearching = false;
- private BasicCacheContainer cacheContainer = null;
-
+ protected BasicCacheContainer cacheContainer = null;
+ private boolean useJndi = true;
+
public InfinispanExecutionFactory() {
super();
-
- this.setSearchStrategyClassName(SearchByKey.class.getName());
}
-
+
@Override
public void start() throws TranslatorException {
-
- if (this.getCacheName() == null || this.getCacheName().isEmpty()) {
- String msg = ObjectPlugin.Util.getString(
- "InfinispanExecutionFactory.cacheNameNotDefined",
- new Object[] {});
- throw new TranslatorException(msg); //$NON-NLS-1$
- }
-
super.start();
+
+ String configFile = this.getConfigurationFileName();
+ String jndiName = getCacheJndiName();
+ if ( jndiName == null || jndiName.trim().length() == 0) {
+ if (configFile == null || configFile.trim().length() == 0) {
+ String msg = ObjectPlugin.Util
+ .getString(
+ "InfinispanExecutionFactory.undefinedHowToGetCache");
+ throw new TranslatorException(msg); //$NON-NLS-1$
+ }
+ useJndi = false;
+
+ } else {
+ useJndi = true;
+ }
- if (createCacheContainer()) {
- cacheContainer = this.getCacheContainer();
- }
}
-
- @Override
- public ResultSetExecution createResultSetExecution(QueryExpression command,
- ExecutionContext executionContext, RuntimeMetadata metadata,
- Object connection) throws TranslatorException {
-
- return new ObjectExecution((Select) command, metadata, this,
- (cacheContainer != null ? cacheContainer : connection));
-
+
+ public boolean isAlive() {
+ return (cacheContainer != null ? true : false);
}
-
- protected boolean createCacheContainer() {
- if (this.getConfigurationFileName() != null) {
- return true;
- }
- return false;
-
+
+ public boolean isFullTextSearchingSupported() {
+ return this.supportsLuceneSearching;
}
- /**
- * Get the cacheName that will be used by this factory instance to access
- * the named cache. However, if not specified a default configuration will
- * be created.
- *
- * @return
- * @see #setCacheName(String)
- */
- @TranslatorProperty(display = "CacheName", advanced = true)
- public String getCacheName() {
- return this.cacheName;
- }
/**
- * Set the cacheName that will be used to find the named cache.
- *
- * @param cacheName
- * @see #getCacheName()
- */
-
- public void setCacheName(String cacheName) {
- this.cacheName = cacheName;
- }
-
- /**
* Indicates if Hibernate Search and Apache Lucene are used to index and
* search objects
*
* @since 6.1.0
*/
- @TranslatorProperty(display = "Use Lucene Searching", description = "True, assumes objects have Hibernate Search annotations", advanced = true)
- public boolean useLuceneSearching() {
- return useLeceneSearching;
+ @TranslatorProperty(display = "Support Using Lucene Searching", description = "True, assumes objects have Hibernate Search annotations", advanced = true)
+ public boolean supportsLuceneSearching() {
+ return this.supportsLuceneSearching;
}
- public void setUseLeceneSearching(boolean useLeceneSearching) {
- this.useLeceneSearching = useLeceneSearching;
- if (useLeceneSearching) {
- this.setSearchStrategyClassName(LuceneSearch.class.getName());
- } else {
- this.setSearchStrategyClassName(SearchByKey.class.getName());
- }
+ public void setSupportsLuceneSearching(boolean supportsLuceneSearching) {
+ this.supportsLuceneSearching = supportsLuceneSearching;
}
- /**
- * Get the name of the configuration resource or file that should be used if
- * a {@link Cache cache} is to be created using the
- * {@link DefaultCacheManager}. If not specified, a default configuration
- * will be used.
- *
- * @return the name of the resource or file configuration that should be
- * passed to the {@link CacheContainer}, or null if the default
- * configuration should be used
- * @see #setConfigurationFileName(String)
- */
- @TranslatorProperty(display = "ConfigurationFileName", advanced = true)
- public String getConfigurationFileName() {
- return configurationFileName;
- }
/**
- * Set the name of the configuration that should be used if a {@link Cache
- * cache} is to be created using the {@link DefaultCacheManager}.
- *
- * @param configurationFileName
- * the name of the configuration file that should be used to load
- * the {@link CacheContainer}, or null if the default
- * configuration should be used
- * @see #getConfigurationFileName()
- */
- public synchronized void setConfigurationFileName(
- String configurationFileName) {
- if (this.configurationFileName == configurationFileName
- || this.configurationFileName != null
- && this.configurationFileName.equals(configurationFileName))
- return; // unchanged
- this.configurationFileName = configurationFileName;
- }
-
- public BasicCache<String, Object> getCache(Object connection)
- throws TranslatorException {
- BasicCache<String, Object> cache = null;
- if (connection instanceof BasicCacheContainer) {
- BasicCacheContainer bc = (BasicCacheContainer) connection;
-
- if (this.getCacheName() != null) {
- cache = bc.getCache(this.getCacheName());
- } else {
- cache = bc.getCache();
- }
- } else if (connection instanceof BasicCache) {
- cache = (BasicCache) connection;
- } else {
- String msg = ObjectPlugin.Util.getString(
- "InfinispanExecutionFactory.unsupportedConnectionType",
- new Object[] { connection.getClass().getName(),
- "either BasicCache or BasicCacheContainer" });
- throw new TranslatorException(msg); //$NON-NLS-1$
-
- }
-
- if (cache == null) {
- String msg = ObjectPlugin.Util.getString(
- "InfinispanExecutionFactory.noCache", new Object[] { (this
- .getCacheName() != null ? this.getCacheName()
- : "DefaultCache") });
- throw new TranslatorException(msg); //$NON-NLS-1$
- }
-
- return cache;
-
- }
-
- /**
* Method for obtaining the CacheContainer. This method will be called to
* create a container based on the <code>configurationFileName</code>
* specified.
@@ -223,8 +126,34 @@
*/
protected synchronized BasicCacheContainer getCacheContainer()
throws TranslatorException {
+ if (this.cacheContainer != null) return this.cacheContainer;
+
+ this.cacheContainer = createCacheContainer();
+
+ return this.cacheContainer;
+
+ }
+
+ private BasicCacheContainer createCacheContainer() throws TranslatorException {
BasicCacheContainer container = null;
+ if (useJndi) {
+ Object object = findCacheUsingJNDIName();
+ if (object instanceof BasicCacheContainer) {
+ LogManager
+ .logInfo(LogConstants.CTX_CONNECTOR,
+ "=== Using CacheContainer (loaded from Jndi) ==="); //$NON-NLS-1$
+
+ return (BasicCacheContainer) object;
+ }
+ String msg = ObjectPlugin.Util.getString(
+ "InfinispanExecutionFactory.unsupportedContainerType",
+ new Object[] { object.getClass().getName(),
+ "BasicCacheContainer" });
+ throw new TranslatorException(msg); //$NON-NLS-1$
+
+
+ }
try {
container = new DefaultCacheManager(
this.getConfigurationFileName());
@@ -249,4 +178,7 @@
return props.getProperty(name);
}
+ public void cleanUp() {
+ this.cacheContainer = null;
+ }
}
Modified: trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/InfinispanRemoteExecutionFactory.java
===================================================================
--- trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/InfinispanRemoteExecutionFactory.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/InfinispanRemoteExecutionFactory.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -27,6 +27,7 @@
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.translator.Translator;
+import org.teiid.translator.TranslatorProperty;
/**
* The InfinispanRemoteExecutionFactory is used to obtain a remote
@@ -40,38 +41,28 @@
@Translator(name = "infinispanremote-cache", description = "The Execution Factory for Remote Infinispan Cache")
public class InfinispanRemoteExecutionFactory extends
- InfinispanExecutionFactory {
- // public static final String DATAGRID_HOST = "datagrid.host";
- // public static final String HOTROD_PORT = "datagrid.hotrod.port";
+ InfinispanBaseExecutionFactory {
- private BasicCacheContainer manager;
+ private RemoteCacheManager manager;
- private volatile String remoteServerList;
+ private String remoteServerList;
public InfinispanRemoteExecutionFactory() {
super();
- this.setSourceRequired(false);
}
-
-
- protected boolean createCacheContainer() {
- if (this.getConfigurationFileName() != null) {
- return true;
- }
-
- if (this.getRemoteServerList() != null
- || this.getRemoteServerList().length() > 0) {
- return true;
- }
-
- return false;
-
- }
-
+
@Override
public boolean isSourceRequired() {
- return true;
+ return false;
}
+
+ public boolean isAlive() {
+ return (manager.isStarted());
+ }
+
+ public boolean isFullTextSearchingSupported() {
+ return false;
+ }
/**
* Get the list of remote servers that make up the Infinispan cluster. The
@@ -82,6 +73,7 @@
*
* @return the names of the remote servers
*/
+ @TranslatorProperty(display = "Server List", description = "Server List (host:port[;host:port...]) to connect to", advanced = true)
public String getRemoteServerList() {
return remoteServerList;
}
@@ -99,7 +91,7 @@
*
* @see #getRemoteServerList()
*/
- public synchronized void setRemoteServerList(
+ public void setRemoteServerList(
String remoteInfinispanServerList) {
if (this.remoteServerList == remoteInfinispanServerList
|| this.remoteServerList != null
@@ -110,6 +102,8 @@
@Override
protected synchronized BasicCacheContainer getCacheContainer() {
+ if (this.manager != null) return this.manager;
+
RemoteCacheManager container = null;
if (this.getConfigurationFileName() != null) {
container = new RemoteCacheManager(this.getConfigurationFileName());
@@ -123,27 +117,30 @@
|| this.getRemoteServerList().isEmpty()
|| this.getRemoteServerList().equals("")) {
container = new RemoteCacheManager();
-
+
LogManager
.logInfo(LogConstants.CTX_CONNECTOR,
"=== Using RemoteCacheManager (no serverlist defined) ==="); //$NON-NLS-1$
} else {
container = new RemoteCacheManager(this.getRemoteServerList());
-
LogManager
.logInfo(LogConstants.CTX_CONNECTOR,
"=== Using RemoteCacheManager (loaded by serverlist) ==="); //$NON-NLS-1$
}
}
+
- return container;
+ this.manager = container;
+ return this.manager;
}
public void cleanUp() {
- manager.stop();
+ if (this.manager != null) {
+ manager.stop();
+ }
manager = null;
}
Modified: trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/search/LuceneSearch.java
===================================================================
--- trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/search/LuceneSearch.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/search/LuceneSearch.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -21,7 +21,6 @@
*/
package org.teiid.translator.object.infinispan.search;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -46,47 +45,34 @@
import org.teiid.logging.LogManager;
import org.teiid.metadata.Column;
import org.teiid.translator.TranslatorException;
-import org.teiid.translator.object.ObjectExecutionFactory;
import org.teiid.translator.object.ObjectPlugin;
-import org.teiid.translator.object.SearchStrategy;
-import org.teiid.translator.object.SelectProjections;
-import org.teiid.translator.object.infinispan.InfinispanExecutionFactory;
+import org.teiid.translator.object.infinispan.InfinispanConnectionImpl;
/**
* LuceneSearch will parse the WHERE criteria and build the search query(s)
* that's used to retrieve the results from an Infinispan cache.
*
+ * Note: As of Infinispan 5.x, it doesn't support fulltext searching the RemoteCache
+ *
* @author vhalbert
*
*/
-public class LuceneSearch implements SearchStrategy {
- protected List<String> exceptionMessages = new ArrayList<String>(2);
+public final class LuceneSearch {
- private QueryBuilder queryBuilder;
- public LuceneSearch() {
- }
-
- public List<Object> performSearch(Select command,
- SelectProjections projections,
- ObjectExecutionFactory objectFactory, Object connection)
+ public static List<Object> performSearch(Select command, InfinispanConnectionImpl connection)
throws TranslatorException {
-
- InfinispanExecutionFactory factory = (InfinispanExecutionFactory) objectFactory;
-
+
SearchManager searchManager = Search
- .getSearchManager((Cache<?, ?>) factory.getCache(connection));
+ .getSearchManager((Cache) connection.getCache() );
- queryBuilder = searchManager.buildQueryBuilderForClass(
- factory.getRootClass()).get();
+ QueryBuilder queryBuilder = searchManager.buildQueryBuilderForClass(
+ connection.getFactory().getRootClass()).get();
BooleanJunction<BooleanJunction> junction = queryBuilder.bool();
boolean createdQueries = buildQueryFromWhereClause(command.getWhere(),
- junction);
+ junction, queryBuilder);
- // check for errors
- this.throwExceptionIfFound();
-
Query query = null;
if (createdQueries) {
query = junction.createQuery();
@@ -95,7 +81,7 @@
}
CacheQuery cacheQuery = searchManager.getQuery(query,
- factory.getRootClass()); // rootNodeType
+ connection.getFactory().getRootClass()); // rootNodeType
List<Object> results = cacheQuery.list();
if (results == null || results.isEmpty()) {
@@ -105,8 +91,8 @@
return results;
}
- private boolean buildQueryFromWhereClause(Condition criteria,
- BooleanJunction<BooleanJunction> junction)
+ private static boolean buildQueryFromWhereClause(Condition criteria,
+ BooleanJunction<BooleanJunction> junction, QueryBuilder queryBuilder)
throws TranslatorException {
boolean createdQueries = false;
BooleanJunction<BooleanJunction> inUse = junction;
@@ -120,15 +106,15 @@
switch (op) {
case AND:
- BooleanJunction<BooleanJunction> leftAnd = this.queryBuilder
+ BooleanJunction<BooleanJunction> leftAnd = queryBuilder
.bool();
boolean andLeftHasQueries = buildQueryFromWhereClause(
- crit.getLeftCondition(), leftAnd);
+ crit.getLeftCondition(), leftAnd, queryBuilder);
- BooleanJunction<BooleanJunction> rightAnd = this.queryBuilder
+ BooleanJunction<BooleanJunction> rightAnd = queryBuilder
.bool();
boolean andRightHasQueries = buildQueryFromWhereClause(
- crit.getRightCondition(), rightAnd);
+ crit.getRightCondition(), rightAnd, queryBuilder);
if (andLeftHasQueries && andRightHasQueries) {
leftAnd.must(rightAnd.createQuery());
@@ -148,9 +134,9 @@
case OR:
boolean orLeftHasQueries = buildQueryFromWhereClause(
- crit.getLeftCondition(), inUse);
+ crit.getLeftCondition(), inUse, queryBuilder);
boolean orRightHasQueries = buildQueryFromWhereClause(
- crit.getRightCondition(), inUse);
+ crit.getRightCondition(), inUse, queryBuilder);
createdQueries = (orLeftHasQueries ? orLeftHasQueries
: orRightHasQueries);
@@ -164,7 +150,7 @@
}
} else if (criteria instanceof Comparison) {
- createdQueries = visit((Comparison) criteria, inUse);
+ createdQueries = visit((Comparison) criteria, inUse, queryBuilder);
} else if (criteria instanceof Exists) {
LogManager.logTrace(LogConstants.CTX_CONNECTOR,
@@ -172,10 +158,10 @@
// TODO Exists should be supported in a future release.
} else if (criteria instanceof Like) {
- createdQueries = visit((Like) criteria, inUse);
+ createdQueries = visit((Like) criteria, inUse, queryBuilder);
} else if (criteria instanceof In) {
- createdQueries = visit((In) criteria, inUse);
+ createdQueries = visit((In) criteria, inUse, queryBuilder);
}
// else if (criteria instanceof Not) {
@@ -188,8 +174,8 @@
return createdQueries;
}
- public boolean visit(Comparison obj,
- BooleanJunction<BooleanJunction> junction) {
+ public static boolean visit(Comparison obj,
+ BooleanJunction<BooleanJunction> junction, QueryBuilder queryBuilder) throws TranslatorException {
LogManager.logTrace(LogConstants.CTX_CONNECTOR,
"Parsing Comparison criteria."); //$NON-NLS-1$
@@ -222,39 +208,37 @@
if (value == null) {
final String msg = ObjectPlugin.Util
.getString("LuceneSearch.unsupportedComparingByNull"); //$NON-NLS-1$
- addException(msg);
- return false;
+ throw new TranslatorException(msg);
}
value = escapeReservedChars(value);
switch (op) {
case NE:
- createEqualsQuery(mdIDElement, value, false, true, junction);
+ createEqualsQuery(mdIDElement, value, false, true, junction, queryBuilder);
break;
case EQ:
- createEqualsQuery(mdIDElement, value, true, false, junction);
+ createEqualsQuery(mdIDElement, value, true, false, junction, queryBuilder);
break;
case GT:
- createRangeAboveQuery(mdIDElement, value, junction);
+ createRangeAboveQuery(mdIDElement, value, junction, queryBuilder);
break;
case LT:
- createRangeBelowQuery(mdIDElement, value, junction);
+ createRangeBelowQuery(mdIDElement, value, junction, queryBuilder);
break;
default:
final String msg = ObjectPlugin.Util
- .getString("LuceneSearch.unsupportedComparisonOperator"); //$NON-NLS-1$
- addException(msg);
- return false;
+ .getString("LuceneSearch.invalidOperator", new Object[] { op, "NE, EQ, GT, LT" }); //$NON-NLS-1$
+ throw new TranslatorException(msg);
}
return true;
}
- public boolean visit(In obj, BooleanJunction<BooleanJunction> junction) {
+ public static boolean visit(In obj, BooleanJunction<BooleanJunction> junction, QueryBuilder queryBuilder) throws TranslatorException {
LogManager.logTrace(LogConstants.CTX_CONNECTOR, "Parsing IN criteria."); //$NON-NLS-1$
Expression lhs = ((In) obj).getLeftExpression();
@@ -271,19 +255,19 @@
// add these as OR queries
createEqualsQuery(mdIDElement,
escapeReservedChars(literal.getValue()), false, false,
- junction);
+ junction, queryBuilder);
createdQuery = true;
} else {
String msg = ObjectPlugin.Util.getString(
"LuceneSearch.Unsupported_expression",
new Object[] { expr, "IN" });
- this.addException(msg);
+ throw new TranslatorException(msg);
}
}
return createdQuery;
}
- public boolean visit(Like obj, BooleanJunction<BooleanJunction> junction) {
+ public static boolean visit(Like obj, BooleanJunction<BooleanJunction> junction, QueryBuilder queryBuilder) throws TranslatorException {
LogManager.logTrace(LogConstants.CTX_CONNECTOR,
"Parsing LIKE criteria."); //$NON-NLS-1$
@@ -305,17 +289,15 @@
value = (String) escapeReservedChars(((Literal) literalExp)
.getValue());
- createLikeQuery(c, value.replaceAll("%", ""), junction); // "*"
- return true;
+ createLikeQuery(c, value.replaceAll("%", ""), junction, queryBuilder); // "*"
} else {
final String msg = ObjectPlugin.Util.getString(
"LuceneSearch.Unsupported_expression",
new Object[] { literalExp.toString(), "LIKE" });
- this.addException(msg);
- return false;
-
+ throw new TranslatorException(msg);
}
+ return true;
}
protected static Object escapeReservedChars(final Object value) {
@@ -352,8 +334,8 @@
return sb.toString();
}
- private Query createEqualsQuery(Column column, Object value, boolean and,
- boolean not, BooleanJunction<BooleanJunction> junction) {
+ private static Query createEqualsQuery(Column column, Object value, boolean and,
+ boolean not, BooleanJunction<BooleanJunction> junction, QueryBuilder queryBuilder) {
Query queryKey = queryBuilder.keyword()
.onField(getNameInSourceFromColumn(column))
// .matching(value.toString() + "*")
@@ -369,8 +351,8 @@
return queryKey;
}
- private Query createRangeAboveQuery(Column column, Object value,
- BooleanJunction<BooleanJunction> junction) {
+ private static Query createRangeAboveQuery(Column column, Object value,
+ BooleanJunction<BooleanJunction> junction, QueryBuilder queryBuilder) {
Query queryKey = queryBuilder.range()
.onField(getNameInSourceFromColumn(column))
@@ -379,8 +361,8 @@
return queryKey;
}
- private Query createRangeBelowQuery(Column column, Object value,
- BooleanJunction<BooleanJunction> junction) {
+ private static Query createRangeBelowQuery(Column column, Object value,
+ BooleanJunction<BooleanJunction> junction, QueryBuilder queryBuilder) {
Query queryKey = queryBuilder.range()
.onField(getNameInSourceFromColumn(column))
@@ -389,8 +371,8 @@
return queryKey;
}
- private Query createLikeQuery(Column column, String value,
- BooleanJunction<BooleanJunction> junction) {
+ private static Query createLikeQuery(Column column, String value,
+ BooleanJunction<BooleanJunction> junction, QueryBuilder queryBuilder) {
Query queryKey = queryBuilder.phrase()
.onField(getNameInSourceFromColumn(column)).sentence(value)
.createQuery();
@@ -398,7 +380,7 @@
return queryKey;
}
- private String getNameInSourceFromColumn(Column c) {
+ private static String getNameInSourceFromColumn(Column c) {
String name = c.getNameInSource();
if (name == null || name.trim().equals("")) { //$NON-NLS-1$
return c.getName();
@@ -406,16 +388,4 @@
return name;
}
- private void addException(String message) {
-
- exceptionMessages.add(message);
-
- }
-
- protected void throwExceptionIfFound() throws TranslatorException {
- if (!exceptionMessages.isEmpty())
- throw new TranslatorException("LuceneSearch Exception(s): "
- + exceptionMessages.toString());
- }
-
}
Modified: trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/search/SearchByKey.java
===================================================================
--- trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/search/SearchByKey.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/infinispan/search/SearchByKey.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -25,69 +25,72 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.infinispan.api.BasicCache;
+import org.infinispan.client.hotrod.RemoteCache;
import org.teiid.language.Select;
import org.teiid.translator.TranslatorException;
-import org.teiid.translator.object.ObjectExecutionFactory;
-import org.teiid.translator.object.SearchStrategy;
-import org.teiid.translator.object.SelectProjections;
-import org.teiid.translator.object.infinispan.InfinispanExecutionFactory;
+import org.teiid.translator.object.infinispan.InfinispanConnectionImpl;
import org.teiid.translator.object.search.BasicKeySearchCriteria;
import org.teiid.translator.object.search.SearchCriterion;
/**
- * SearchByKey is a simple SearchStrategy that enables querying the cache by
+ * SearchByKey is a simple ObjectConnection that enables querying the cache by
* the key, using EQUI and IN clauses on the SELECT statement.
*/
-public class SearchByKey implements SearchStrategy {
+public final class SearchByKey {
- public List<Object> performSearch(Select command,
- SelectProjections projections,
- ObjectExecutionFactory objectFactory, Object connection)
+ public static List<Object> performSearch(Select command, InfinispanConnectionImpl connection)
throws TranslatorException {
- InfinispanExecutionFactory factory = (InfinispanExecutionFactory) objectFactory;
- BasicCache<String, Object> cache = factory.getCache(connection);
+ BasicCache<String, Object> cache = connection.getCache();
BasicKeySearchCriteria bksc = BasicKeySearchCriteria.getInstance(
- factory, projections, command);
+ connection.getFactory(), command);
- return get(bksc.getCriterion(), cache, factory);
+ return get(bksc.getCriterion(), cache, connection.getFactory().getRootClass());
}
- private List<Object> get(SearchCriterion criterion,
- BasicCache<String, Object> cache, InfinispanExecutionFactory factory)
+ private static List<Object> get(SearchCriterion criterion,
+ BasicCache<String, Object> cache, Class rootClass)
throws TranslatorException {
List<Object> results = null;
- if (!criterion.isRootTableInSelect()) {
- return Collections.EMPTY_LIST;
- }
+ if (criterion.getOperator() == SearchCriterion.Operator.ALL) {
- if (criterion.getOperator() == SearchCriterion.Operator.ALL) {
- Set keys = cache.keySet();
- results = new ArrayList<Object>();
- for (Iterator it = keys.iterator(); it.hasNext();) {
- Object v = cache.get(it.next());
- addValue(v, results, factory.getRootClass());
+ if (cache instanceof RemoteCache) {
+ RemoteCache<?, ?> rc = (RemoteCache<?, ?>) cache;
+ Map<?, ?> c = rc.getBulk();
+ results = new ArrayList<Object>();
+ for (Iterator it = c.keySet().iterator(); it.hasNext();) {
+ Object v = cache.get(it.next());
+ addValue(v, results, rootClass);
+ }
+
+ } else {
+ Set keys = cache.keySet();
+ results = new ArrayList<Object>();
+ for (Iterator it = keys.iterator(); it.hasNext();) {
+ Object v = cache.get(it.next());
+ addValue(v, results, rootClass);
+ }
}
return results;
}
if (criterion.getCriterion() != null) {
- results = get(criterion.getCriterion(), cache, factory);
+ results = get(criterion.getCriterion(), cache, rootClass);
}
if (results == null) {
results = new ArrayList<Object>();
}
+
if (criterion.getOperator().equals(SearchCriterion.Operator.EQUALS)) {
@@ -96,7 +99,7 @@
Object v = cache.get(value instanceof String ? value : value
.toString());
if (v != null) {
- addValue(v, results, factory.getRootClass());
+ addValue(v, results, rootClass);
}
} else if (criterion.getOperator().equals(SearchCriterion.Operator.IN)) {
@@ -107,7 +110,7 @@
Object v = cache.get(arg instanceof String ? arg : arg
.toString());
if (v != null) {
- addValue(v, results, factory.getRootClass());
+ addValue(v, results, rootClass);
}
}
@@ -117,7 +120,7 @@
}
- private void addValue(Object value, List<Object> results, Class rootNodeType) {
+ private static void addValue(Object value, List<Object> results, Class rootNodeType) {
if (value != null && value.getClass().equals(rootNodeType)) {
if (value.getClass().isArray()) {
Added: trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/mapcache/MapCacheConnection.java
===================================================================
--- trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/mapcache/MapCacheConnection.java (rev 0)
+++ trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/mapcache/MapCacheConnection.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -0,0 +1,167 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package org.teiid.translator.object.mapcache;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.resource.ResourceException;
+
+import org.teiid.language.Select;
+import org.teiid.resource.spi.BasicConnection;
+import org.teiid.translator.TranslatorException;
+import org.teiid.translator.object.ObjectConnection;
+import org.teiid.translator.object.ObjectPlugin;
+import org.teiid.translator.object.search.BasicKeySearchCriteria;
+import org.teiid.translator.object.search.SearchCriterion;
+
+/**
+ * The MapCacheConnection provides simple key searches of the cache.
+ *
+ * @author vhalbert
+ *
+ */
+public class MapCacheConnection extends BasicConnection implements ObjectConnection {
+
+ private MapCacheExecutionFactory factory = null;
+ private BasicKeySearchCriteria visitor = null;
+
+ public MapCacheConnection(MapCacheExecutionFactory factory) {
+ super();
+ this.factory = factory;
+ }
+
+ @Override
+ public boolean isAlive() {
+ return true;
+ }
+
+
+ @Override
+ public void cleanUp() {
+ factory = null;
+ visitor = null;
+ }
+
+
+ @Override
+ public void close() throws ResourceException {
+ cleanUp();
+
+ }
+
+ public List<Object> performSearch(Select command) throws TranslatorException {
+
+ visitor = BasicKeySearchCriteria.getInstance(factory,command);
+
+ return get(visitor.getCriterion(), factory.getCache(), factory.getRootClass());
+ }
+
+ private List<Object> get(SearchCriterion criterion, Map<?, ?> cache,
+ Class<?> rootClass) throws TranslatorException {
+ List<Object> results = null;
+ if (criterion.getOperator() == SearchCriterion.Operator.ALL) {
+ results = new ArrayList<Object>();
+ for (Iterator<?> it = cache.keySet().iterator(); it.hasNext();) {
+ Object v = cache.get(it.next());
+ addValue(v, results, rootClass);
+
+ }
+
+ return results;
+ }
+
+ if (criterion.getCriterion() != null) {
+ results = get(criterion.getCriterion(), cache, rootClass);
+ }
+
+ if (results == null) {
+ results = new ArrayList<Object>();
+ }
+
+ if (criterion.getOperator().equals(SearchCriterion.Operator.EQUALS)) {
+
+ Object v = cache.get(criterion.getValue());
+ if (v != null) {
+ addValue(v, results, rootClass);
+ }
+ } else if (criterion.getOperator().equals(SearchCriterion.Operator.IN)) {
+
+ List<?> parms = (List<?>) criterion.getValue();
+ for (Iterator<?> it = parms.iterator(); it.hasNext();) {
+ Object arg = it.next();
+ Object v = cache.get(arg);
+ if (v != null) {
+ addValue(v, results, rootClass);
+ }
+ }
+
+ }
+
+ return results;
+
+ }
+
+ private void addValue(Object value, List<Object> results, Class<?> rootClass)
+ throws TranslatorException {
+ // can only add objects of the same root class in the cache
+ if (value != null) {
+ if (value.getClass().equals(rootClass)) {
+
+ if (value.getClass().isArray()) {
+ List<Object> listRows = Arrays.asList((Object[]) value);
+ results.addAll(listRows);
+ return;
+ }
+
+ if (value instanceof Collection) {
+ results.addAll((Collection<?>) value);
+ return;
+ }
+
+ if (value instanceof Map) {
+ Map<?, ?> mapRows = (Map<?, ?>) value;
+ results.addAll(mapRows.values());
+ return;
+ }
+
+ results.add(value);
+ } else {
+ // the object obtained from the cache has to be of the same root
+ // class type, otherwise, the modeling
+ // structure won't correspond correctly
+ String msg = ObjectPlugin.Util.getString(
+ "MapCacheConnection.unexpectedObjectTypeInCache",
+ new Object[] { value.getClass().getName(),
+ rootClass.getName() });
+
+ throw new TranslatorException(msg);
+ }
+ }
+
+ }
+
+}
Modified: trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/mapcache/MapCacheExecutionFactory.java
===================================================================
--- trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/mapcache/MapCacheExecutionFactory.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/mapcache/MapCacheExecutionFactory.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -21,20 +21,87 @@
*/
package org.teiid.translator.object.mapcache;
+import java.util.Map;
+
+import javax.resource.cci.ConnectionFactory;
+
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.translator.ExecutionContext;
import org.teiid.translator.Translator;
+import org.teiid.translator.TranslatorException;
+import org.teiid.translator.object.ObjectConnection;
import org.teiid.translator.object.ObjectExecutionFactory;
+import org.teiid.translator.object.ObjectPlugin;
+/**
+ * The MapCacheExecutionFactory provides a translator that supports a cache of type Map.
+ * The cache will be looked up using a @link {@link #setCacheJndiName(String) JNDI name};
+ *
+ * @author vhalbert
+ *
+ */
@Translator(name = "map-cache", description = "The Map Cache Factory")
public class MapCacheExecutionFactory extends ObjectExecutionFactory {
+
+ private volatile Map<?,?> cache = null;
public MapCacheExecutionFactory() {
- this.setSearchStrategyClassName(MapCacheSearchByKey.class.getName());
this.setSourceRequired(false);
}
+
+ @Override
+ public void start() throws TranslatorException {
+ super.start();
+
+ String jndiName = getCacheJndiName();
+ if (jndiName == null || jndiName.trim().length() == 0) {
+ String msg = ObjectPlugin.Util
+ .getString(
+ "MapCacheExecutionFactory.undefinedJndiName", new Object[] { });
+ throw new TranslatorException(msg); //$NON-NLS-1$
+ }
+
+ }
+
@Override
public boolean supportsOnlyLiteralComparison() {
return true;
}
+
+
+ protected synchronized Map<?,?> getCache() throws TranslatorException {
+ if (this.cache != null) return this.cache;
+
+ Object object = findCacheUsingJNDIName();
+ Map<?,?> cache = null;
+
+ if (object instanceof Map) {
+
+ cache = (Map<?,?>)object;
+
+ LogManager.logInfo(LogConstants.CTX_CONNECTOR, "=== Using CacheManager (obtained from JNDI ==="); //$NON-NLS-1$
+ } else {
+ String msg = ObjectPlugin.Util.getString(
+ "MapCacheExecutionFactory.unexpectedCacheType",
+ new Object[] { (object == null ? "nullObject" : object.getClass().getName()), "Map" });
+ throw new TranslatorException(msg); //$NON-NLS-1$
+ }
+
+ this.cache = cache;
+ return this.cache;
+
+ }
+
+
+
+ @Override
+ public ObjectConnection getConnection(ConnectionFactory factory,
+ ExecutionContext executionContext) throws TranslatorException {
+
+ return new MapCacheConnection(this);
+
+ }
}
Deleted: trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/mapcache/MapCacheSearchByKey.java
===================================================================
--- trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/mapcache/MapCacheSearchByKey.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/mapcache/MapCacheSearchByKey.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -1,155 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-package org.teiid.translator.object.mapcache;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.teiid.language.Select;
-import org.teiid.translator.TranslatorException;
-import org.teiid.translator.object.ObjectExecutionFactory;
-import org.teiid.translator.object.ObjectPlugin;
-import org.teiid.translator.object.SearchStrategy;
-import org.teiid.translator.object.SelectProjections;
-import org.teiid.translator.object.search.BasicKeySearchCriteria;
-import org.teiid.translator.object.search.SearchCriterion;
-
-/**
- * The MapCacheSearchByKey provides simple key searches of the cache.
- *
- * @author vhalbert
- *
- */
-public class MapCacheSearchByKey implements SearchStrategy {
-
- private BasicKeySearchCriteria visitor = null;
-
- public List<Object> performSearch(Select command,
- SelectProjections projections, ObjectExecutionFactory factory,
- Object connection) throws TranslatorException {
-
- Map<?, ?> cache = null;
- if (connection instanceof Map) {
- cache = (Map<?, ?>) connection;
- } else {
- String msg = ObjectPlugin.Util.getString(
- "MapCacheSearchByKey.unexpectedCacheType",
- new Object[] { connection.getClass().getName(), "Map" });
- throw new TranslatorException(msg); //$NON-NLS-1$
-
- }
-
- visitor = BasicKeySearchCriteria.getInstance(factory, projections,
- command);
-
- return get(visitor.getCriterion(), cache, factory.getRootClass());
- }
-
- private List<Object> get(SearchCriterion criterion, Map<?, ?> cache,
- Class<?> rootClass) throws TranslatorException {
- List<Object> results = null;
- if (criterion.getOperator() == SearchCriterion.Operator.ALL) {
- results = new ArrayList<Object>();
- for (Iterator<?> it = cache.keySet().iterator(); it.hasNext();) {
- Object v = cache.get(it.next());
- addValue(v, results, rootClass);
-
- }
-
- return results;
- }
-
- if (criterion.getCriterion() != null) {
- results = get(criterion.getCriterion(), cache, rootClass);
- }
-
- if (results == null) {
- results = new ArrayList<Object>();
- }
-
- if (criterion.getOperator().equals(SearchCriterion.Operator.EQUALS)) {
-
- Object v = cache.get(criterion.getValue());
- if (v != null) {
- addValue(v, results, rootClass);
- }
- } else if (criterion.getOperator().equals(SearchCriterion.Operator.IN)) {
-
- List<?> parms = (List<?>) criterion.getValue();
- for (Iterator<?> it = parms.iterator(); it.hasNext();) {
- Object arg = it.next();
- Object v = cache.get(arg);
- if (v != null) {
- addValue(v, results, rootClass);
- }
- }
-
- }
-
- return results;
-
- }
-
- private void addValue(Object value, List<Object> results, Class<?> rootClass)
- throws TranslatorException {
- // can only add objects of the same root class in the cache
- if (value != null) {
- if (value.getClass().equals(rootClass)) {
-
- if (value.getClass().isArray()) {
- List<Object> listRows = Arrays.asList((Object[]) value);
- results.addAll(listRows);
- return;
- }
-
- if (value instanceof Collection) {
- results.addAll((Collection<?>) value);
- return;
- }
-
- if (value instanceof Map) {
- Map<?, ?> mapRows = (Map<?, ?>) value;
- results.addAll(mapRows.values());
- return;
- }
-
- results.add(value);
- } else {
- // the object obtained from the cache has to be of the same root
- // class type, otherwise, the modeling
- // structure won't correspond correctly
- String msg = ObjectPlugin.Util.getString(
- "MapCacheSearchByKey.unexpectedObjectTypeInCache",
- new Object[] { value.getClass().getName(),
- rootClass.getName() });
-
- throw new TranslatorException(msg);
- }
- }
-
- }
-
-}
Modified: trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/search/BasicKeySearchCriteria.java
===================================================================
--- trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/search/BasicKeySearchCriteria.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/search/BasicKeySearchCriteria.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -24,7 +24,6 @@
import java.util.ArrayList;
import java.util.List;
-import org.teiid.core.util.Assertion;
import org.teiid.language.AggregateFunction;
import org.teiid.language.ColumnReference;
import org.teiid.language.Command;
@@ -44,7 +43,6 @@
import org.teiid.translator.TranslatorException;
import org.teiid.translator.object.ObjectExecutionFactory;
import org.teiid.translator.object.ObjectPlugin;
-import org.teiid.translator.object.SelectProjections;
/**
* The BasicKeySearchCriteria parses the {@link Command select} and creates
@@ -59,33 +57,21 @@
// search criteria based on the WHERE clause
private SearchCriterion criterion;
- private SelectProjections projections;
private List<String> exceptionMessages = new ArrayList<String>(2);
- private BasicKeySearchCriteria(ObjectExecutionFactory factory,
- SelectProjections projections) {
- this.projections = projections;
+ private BasicKeySearchCriteria(ObjectExecutionFactory factory) {
}
public static BasicKeySearchCriteria getInstance(
- ObjectExecutionFactory factory, SelectProjections projections,
+ ObjectExecutionFactory factory,
Select command) throws TranslatorException {
- BasicKeySearchCriteria visitor = new BasicKeySearchCriteria(factory,
- projections);
+ BasicKeySearchCriteria visitor = new BasicKeySearchCriteria(factory);
visitor.visitNode(command);
visitor.throwExceptionIfFound();
return visitor;
}
- private String getRootNodePrimaryKeyColumnName() {
- return this.projections.getRootNodePrimaryKeyColumnName();
- }
-
- private boolean isRootTableInFrom() {
- return this.projections.isRootTableInFrom();
- }
-
/**
* Call to get the {@link SearchCriterion Criterion}. If the command
* specified no criteria, then a {@link SearchCriterion Criterion} that
@@ -100,7 +86,6 @@
this.criterion = new SearchCriterion();
}
- this.criterion.setRootTableInSelect(isRootTableInFrom());
return this.criterion;
}
@@ -139,8 +124,11 @@
}
if (mdIDElement == null || value == null) {
- Assertion.assertTrue(false,
- "BasicKeySearchCriteria.missingComparisonExpression");
+ String msg = ObjectPlugin.Util
+ .getString(
+ "BasicKeySearchCriteria.missingComparisonExpression", new Object[] { });
+ addException(msg);
+ return;
}
addCompareCriteria(mdIDElement,
@@ -170,10 +158,11 @@
type = literal.getType();
} else {
- Assertion
- .assertTrue(false,
- "BasicKeySearchCriteria.Unsupported_expression "
- + expr);
+ String msg = ObjectPlugin.Util
+ .getString(
+ "BasicKeySearchCriteria.Unsupported_expression", new Object[] {expr });
+ addException(msg);
+ return;
}
}
@@ -227,19 +216,9 @@
assert (searchCriteria.getTableName() != null);
assert (searchCriteria.getField() != null);
- assert (getRootNodePrimaryKeyColumnName() != null);
- // must be a key column in order to perform search
- if (searchCriteria.getTableName().equalsIgnoreCase(
- this.projections.getRootTableName())
- && searchCriteria.getField().equalsIgnoreCase(
- getRootNodePrimaryKeyColumnName())) {
-
- if (this.criterion != null) {
- searchCriteria.addOrCriterion(this.criterion);
- }
-
- this.criterion = searchCriteria;
+ if (this.criterion != null) {
+ searchCriteria.addOrCriterion(this.criterion);
}
this.criterion = searchCriteria;
Modified: trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/search/SearchCriterion.java
===================================================================
--- trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/search/SearchCriterion.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/search/SearchCriterion.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -41,7 +41,6 @@
private Column column;
private Object value;
private String runtimeType;
- private boolean isRootTableInSelect = false;
public SearchCriterion() {
this.operator = Operator.ALL;
@@ -141,14 +140,6 @@
return this.isAnd;
}
- public boolean isRootTableInSelect() {
- return isRootTableInSelect;
- }
-
- public void setRootTableInSelect(boolean isRootTableInSelect) {
- this.isRootTableInSelect = isRootTableInSelect;
- }
-
private String getNameInSourceFromColumn(Column c) {
String name = c.getNameInSource();
if (name == null || name.equals("")) { //$NON-NLS-1$
Modified: trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/util/ObjectUtil.java
===================================================================
--- trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/util/ObjectUtil.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/main/java/org/teiid/translator/object/util/ObjectUtil.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -32,11 +32,22 @@
Collection<?> ctors, ClassLoader loader) throws TranslatorException {
try {
- return ReflectionHelper.create(objectClassName, ctors.toArray(), null, loader);
+ return ReflectionHelper.create(objectClassName, ctors, loader);
} catch (Exception e1) {
- throw new TranslatorException(e1.getCause());
+ throw new TranslatorException(e1);
}
}
+
+ public static Object createObject(String objectClassName,
+ Object[] ctorObjs, Class<?>[] argTypes, ClassLoader loader) throws TranslatorException {
+ try {
+
+ return ReflectionHelper.create(objectClassName, ctorObjs, argTypes, loader);
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ throw new TranslatorException(e1);
+ }
+ }
}
Modified: trunk/connectors/translator-object/src/main/resources/org/teiid/translator/object/i18n.properties
===================================================================
--- trunk/connectors/translator-object/src/main/resources/org/teiid/translator/object/i18n.properties 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/main/resources/org/teiid/translator/object/i18n.properties 2012-09-21 13:01:05 UTC (rev 4459)
@@ -19,21 +19,30 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA.
#
-xTEIID12001=Received IAggregate, but it is not supported. Check capabilities.
-xTEIID12005=Received IFunction, but it is not supported. Check capabilities.
-xTEIID12006=Received IScalarSubquery, but it is not supported. Check capabilities.
-xTEIID12007=Received ISearchedCaseExpression, but it is not supported. Check capabilities.
+TEIID12001=Received IAggregate, but it is not supported. Check capabilities.
+TEIID12005=Received IFunction, but it is not supported. Check capabilities.
+TEIID12006=Received IScalarSubquery, but it is not supported. Check capabilities.
+TEIID12007=Received ISearchedCaseExpression, but it is not supported. Check capabilities.
+
ObjectExecutionFactory.rootClassNameNotDefined=RootClassName is undefined
ObjectExecutionFactory.rootClassNotFound=RootClass {0} not found to load
+ObjectExecutionFactory.cacheNotFoundinJNDI=Cache was not found using JNDI name {0}
+InfinispanBaseExecutionFactory.cacheNameNotDefined=CacheName is not defined
+InfinispanBaseExecutionFactory.cacheNotFound=No cache was found for {0} in Infinispan
+
+InfinispanExecutionFactory.undefinedHowToGetCache=Either CacheJndiName or ConfigurationFileName must be specified how the cache will be obtained
+InfinispanExecutionFactory.unsupportedContainerType=CacheContainer type {0} is not currently supported when connecting to Infinispan, must be {1}
+
LuceneSearch.invalidOperator=Unsupported operator {0} was encountered, only {1} are supported
LuceneSearch.unsupportedComparingByNull=Comparing using a NULL is not currently supported.
-LuceneSearch.Unsupported_expression=Unsupported expression of {0} when performing [1}
+LuceneSearch.Unsupported_expression=Unsupported expression of {0} when performing {1}
-InfinispanBaseExecutionFactory.cacheNameNotDefined=CacheName is not defined
-InfinispanBaseExecutionFactory.noCache=No cache was found for {0} in Infinispan
-InfinispanExecutionFactory.unsupportedConnectionType=Connection type {0} is not currently supported when connecting to Infinispan, must be {1}
+MapCacheConnection.unexpectedObjectTypeInCache=Unexpected root object type in the cache is {0}, expected {1}
-MapCacheSearchByKey.unexpectedObjectTypeInCache=Unexpected root object type in the cache is {0}, expected {1}
-MapCacheSearchByKey.unexpectedCacheType=Unable to process object in cache of type {0}, expected type {1}
\ No newline at end of file
+MapCacheExecutionFactory.undefinedJndiName=CacheJndiName is undefined
+MapCacheExecutionFactory.unexpectedCacheType=Expected cache type to be of type Map, but its of type {0}
+
+BasicKeySearchCriteria.missingComparisonExpression=Comparison criteria is missing the expression
+BasicKeySearchCriteria.Unsupported_expression=Unsupported expression {0} found in the IN clause
\ No newline at end of file
Added: trunk/connectors/translator-object/src/test/example_vdbs/infinispancache-vdb.xml
===================================================================
--- trunk/connectors/translator-object/src/test/example_vdbs/infinispancache-vdb.xml (rev 0)
+++ trunk/connectors/translator-object/src/test/example_vdbs/infinispancache-vdb.xml 2012-09-21 13:01:05 UTC (rev 4459)
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<vdb name="team" version="1">
+
+ <description>Shows how to call Web Services</description>
+
+ <property name="UseConnectorMetadata" value="cached" />
+<!-- connection-jndi-name="dummy" -->
+ <model name="Team" visible="false">
+ <source name="objsource" translator-name="infinispan1" />
+ <metadata type="DDL"><![CDATA[
+ CREATE FOREIGN TABLE Team (TeamObject Object);
+ ]]> </metadata>
+ </model>
+ <model name="TeamView" type="VIRTUAL">
+ <metadata type="DDL"><![CDATA[
+ CREATE VIEW Teams (
+ TeamName varchar(255) PRIMARY KEY
+ )
+ AS
+ SELECT o.TeamName FROM Team as T, OBJECTTABLE('x' PASSING T.TeamObject AS x COLUMNS TeamName varchar(255) 'teiid_row.name') as o;
+ CREATE VIEW Players (
+ TeamName varchar(255) PRIMARY KEY,
+ PlayerName varchar(255)
+ )
+ AS
+ SELECT o.TeamName, y.Name FROM (Select TeamObject from Team) as T, OBJECTTABLE('x' PASSING T.TeamObject AS x COLUMNS TeamName varchar(255) 'teiid_row.name', players object 'teiid_row.players') as o, OBJECTTABLE('m' PASSING o.players as m COLUMNS Name string 'teiid_row') as y;
+ ]]> </metadata>
+ </model>
+
+ <translator name="infinispan1" type="infinispan-cache">
+ <property name="CacheName" value="teams"/>
+ <property name="RootClassName" value="com.jboss.datagrid.hotrod.Team"/>
+ <property name="CacheJndiName" value="java/CacheManager"/>
+ <property name="SupportsLuceneSearching" value="true"/>
+
+ <!-- optional properties for obtaining the RemoteCacheManager -->
+ <!--
+ <property name="ConfigurationFileName" value="<dir>/infinispan-config.xml"/>
+ -->
+
+ </translator>
+</vdb>
\ No newline at end of file
Added: trunk/connectors/translator-object/src/test/example_vdbs/infinispanremotecache-vdb.xml
===================================================================
--- trunk/connectors/translator-object/src/test/example_vdbs/infinispanremotecache-vdb.xml (rev 0)
+++ trunk/connectors/translator-object/src/test/example_vdbs/infinispanremotecache-vdb.xml 2012-09-21 13:01:05 UTC (rev 4459)
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<vdb name="team" version="1">
+
+ <description>Shows how to call Web Services</description>
+
+ <property name="UseConnectorMetadata" value="cached" />
+<!-- connection-jndi-name="dummy" -->
+ <model name="Team" visible="false">
+ <source name="objsource" translator-name="infinispan1" />
+ <metadata type="DDL"><![CDATA[
+ CREATE FOREIGN TABLE Team (TeamObject Object);
+ ]]> </metadata>
+ </model>
+ <model name="TeamView" type="VIRTUAL">
+ <metadata type="DDL"><![CDATA[
+ CREATE VIEW Teams (
+ TeamName varchar(255) PRIMARY KEY
+ )
+ AS
+ SELECT o.TeamName FROM Team as T, OBJECTTABLE('x' PASSING T.TeamObject AS x COLUMNS TeamName varchar(255) 'teiid_row.name') as o;
+ CREATE VIEW Players (
+ TeamName varchar(255) PRIMARY KEY,
+ PlayerName varchar(255)
+ )
+ AS
+ SELECT o.TeamName, y.Name FROM (Select TeamObject from Team) as T, OBJECTTABLE('x' PASSING T.TeamObject AS x COLUMNS TeamName varchar(255) 'teiid_row.name', players object 'teiid_row.players') as o, OBJECTTABLE('m' PASSING o.players as m COLUMNS Name string 'teiid_row') as y;
+ ]]> </metadata>
+ </model>
+
+ <translator name="infinispan1" type="infinispanremote-cache">
+ <property name="CacheName" value="teams"/>
+ <property name="RootClassName" value="com.jboss.datagrid.hotrod.Team"/>
+ <property name="RemoteServerList" value="localhost:11222"/>
+
+ <!-- optional properties for obtaining the RemoteCacheManager -->
+ <!--
+ <property name="CacheJndiName" value="java/CacheManager"/>
+ <property name="ConfigurationFileName" value="<dir>/infinispan-config.xml"/>
+ -->
+ </translator>
+</vdb>
\ No newline at end of file
Added: trunk/connectors/translator-object/src/test/example_vdbs/mapcache-vdb.xml
===================================================================
--- trunk/connectors/translator-object/src/test/example_vdbs/mapcache-vdb.xml (rev 0)
+++ trunk/connectors/translator-object/src/test/example_vdbs/mapcache-vdb.xml 2012-09-21 13:01:05 UTC (rev 4459)
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<vdb name="team" version="1">
+
+ <description>Shows how to call Web Services</description>
+
+ <property name="UseConnectorMetadata" value="cached" />
+<!-- connection-jndi-name="dummy" -->
+ <model name="Team" visible="false">
+ <source name="objsource" translator-name="mapcache1" />
+ <metadata type="DDL"><![CDATA[
+ CREATE FOREIGN TABLE Team (TeamObject Object);
+ ]]> </metadata>
+ </model>
+ <model name="TeamView" type="VIRTUAL">
+ <metadata type="DDL"><![CDATA[
+ CREATE VIEW Teams (
+ TeamName varchar(255) PRIMARY KEY
+ )
+ AS
+ SELECT o.TeamName FROM Team as T, OBJECTTABLE('x' PASSING T.TeamObject AS x COLUMNS TeamName varchar(255) 'teiid_row.name') as o;
+ CREATE VIEW Players (
+ TeamName varchar(255) PRIMARY KEY,
+ PlayerName varchar(255)
+ )
+ AS
+ SELECT o.TeamName, y.Name FROM (Select TeamObject from Team) as T, OBJECTTABLE('x' PASSING T.TeamObject AS x COLUMNS TeamName varchar(255) 'teiid_row.name', players object 'teiid_row.players') as o, OBJECTTABLE('m' PASSING o.players as m COLUMNS Name string 'teiid_row') as y;
+ ]]> </metadata>
+ </model>
+
+ <translator name="mapcache1" type="map-cache">
+ <property name="RootClassName" value="com.jboss.datagrid.hotrod.Team"/>
+ <property name="CacheJndiName" value="java/CacheManager"/>
+ </translator>
+</vdb>
\ No newline at end of file
Deleted: trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/FakeStrategy.java
===================================================================
--- trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/FakeStrategy.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/FakeStrategy.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -1,39 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-package org.teiid.translator.object;
-
-import java.util.List;
-
-import org.teiid.language.Select;
-import org.teiid.translator.TranslatorException;
-
-public class FakeStrategy implements SearchStrategy {
-
- public static List<Object> RESULTS = null;
-
- @Override
- public List<Object> performSearch(Select command, SelectProjections projections,
- ObjectExecutionFactory factory, Object connection) throws TranslatorException{
- return RESULTS;
- }
-
-}
Modified: trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/TestObjectExecution.java
===================================================================
--- trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/TestObjectExecution.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/TestObjectExecution.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -22,6 +22,8 @@
package org.teiid.translator.object;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
import java.util.List;
@@ -43,19 +45,20 @@
@Mock
private ExecutionContext context;
+ @Mock
+ private ObjectConnection connection;
@Before public void beforeEach() throws Exception{
MockitoAnnotations.initMocks(this);
+
+ when(connection.performSearch(any(Select.class))).thenReturn(source.getAll());
+
factory = new ObjectExecutionFactory() { };
- factory.setSearchStrategyClassName(FakeStrategy.class.getName());
factory.setRootClassName(TradesCacheSource.TRADE_CLASS_NAME);
factory.start();
-
- FakeStrategy.RESULTS = source.getAll();
-
}
@@ -113,7 +116,7 @@
private ObjectExecution createExecution(String sql) throws Exception {
Select command = (Select)VDBUtility.TRANSLATION_UTILITY.parseCommand(sql); //$NON-NLS-1$
- ObjectExecution exec = (ObjectExecution) factory.createExecution(command, context, VDBUtility.RUNTIME_METADATA, null);
+ ObjectExecution exec = (ObjectExecution) factory.createExecution(command, context, VDBUtility.RUNTIME_METADATA, connection);
return exec;
Modified: trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/TestObjectExecutionFactory.java
===================================================================
--- trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/TestObjectExecutionFactory.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/TestObjectExecutionFactory.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -59,8 +59,6 @@
MockitoAnnotations.initMocks(this);
factory = new TestFactory();
- factory.setSearchStrategyClassName(FakeStrategy.class.getName());
-
}
@Test public void testFactory() throws Exception {
Deleted: trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/TestSelectProjections.java
===================================================================
--- trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/TestSelectProjections.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/TestSelectProjections.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -1,126 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-package org.teiid.translator.object;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.MockitoAnnotations;
-import org.teiid.language.Select;
-import org.teiid.translator.object.mapcache.MapCacheSearchByKey;
-import org.teiid.translator.object.util.TradesCacheSource;
-import org.teiid.translator.object.util.VDBUtility;
-
-
-@SuppressWarnings("nls")
-public class TestSelectProjections {
-
- private ObjectExecutionFactory factory;
-
- @Before public void beforeEach() throws Exception{
-
- MockitoAnnotations.initMocks(this);
-
- factory = new ObjectExecutionFactory() {};
-
- factory.setSearchStrategyClassName(MapCacheSearchByKey.class.getName());
- factory.setRootClassName(TradesCacheSource.TRADE_CLASS_NAME);
-
- factory.start();
-
- }
-
-
- @Test public void testQueryRootObject() throws Exception {
-
- Select command = (Select)VDBUtility.TRANSLATION_UTILITY.parseCommand("select * From Trade_Object.Trade"); //$NON-NLS-1$
- SelectProjections visitor = createSelectProjections(command);
-
- validateResults(true, visitor);
-
- }
-
- @Test public void testQueryWithNonSearchableColumn() throws Exception {
- Select command = (Select)VDBUtility.TRANSLATION_UTILITY.parseCommand("select L.Name as LegName, L.TradeId as ID From Trade_Object.Leg as L"); //$NON-NLS-1$
- SelectProjections visitor = createSelectProjections(command);
- validateResults( false, visitor);
- }
-
-
- @Test public void testQuery1LevelDownWithRootNotInSelect() throws Exception {
- Select command = (Select)VDBUtility.TRANSLATION_UTILITY.parseCommand("select L.Name as LegName From Trade_Object.Leg as L"); //$NON-NLS-1$
- SelectProjections visitor = createSelectProjections(command);
- validateResults( false, visitor);
- }
-
- @Test public void testQuery2LevelDownWithRootNotInSelect() throws Exception {
- Select command = (Select)VDBUtility.TRANSLATION_UTILITY.parseCommand("select N.LineItem " +
- " From Trade_Object.Transaction as N "); //$NON-NLS-1$
- SelectProjections visitor = createSelectProjections(command);
- validateResults(false, visitor);
-
- }
-
- @Test public void testQueryIncludeLegs() throws Exception {
- Select command = (Select)VDBUtility.TRANSLATION_UTILITY.parseCommand("select T.TradeId, T.Name as TradeName, L.Name as LegName From Trade_Object.Trade as T, Trade_Object.Leg as L Where T.TradeId = L.TradeId"); //$NON-NLS-1$
- SelectProjections visitor = createSelectProjections(command);
- validateResults(true, visitor);
- }
-
- @Test public void testQueryGetAllTransactions() throws Exception {
- Select command = (Select)VDBUtility.TRANSLATION_UTILITY.parseCommand("select T.TradeId, T.Name as TradeName, L.Name as LegName, " +
- " N.LineItem " +
- " From Trade_Object.Trade as T, Trade_Object.Leg as L, Trade_Object.Transaction as N " +
- " Where T.TradeId = L.TradeId and L.LegId = N.LegId"); //$NON-NLS-1$
- SelectProjections visitor = createSelectProjections(command);
- validateResults(true, visitor);
-
- }
-
-
- @Test public void testIN() throws Exception {
-
- Select command = (Select)VDBUtility.TRANSLATION_UTILITY.parseCommand("select * From Trade_Object.Trade where Trade_Object.Trade.TradeID IN (1,2,3)"); //$NON-NLS-1$
- SelectProjections visitor = createSelectProjections(command);
-
- validateResults(true, visitor);
-
- }
-
- private SelectProjections createSelectProjections(Select command) {
- SelectProjections visitor = SelectProjections.create(factory);
- visitor.parse(command);
- return visitor;
-
- }
-
- private void validateResults( boolean rootNodeInQuery, SelectProjections visitor) throws Exception {
-
- assertEquals(rootNodeInQuery, visitor.isRootTableInFrom());
- assertNotNull(visitor.getRootNodePrimaryKeyColumnName());
- assertNotNull(visitor.getRootTableName());
-
- }
-
-}
Modified: trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/RemoteInfinispanTestHelper.java
===================================================================
--- trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/RemoteInfinispanTestHelper.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/RemoteInfinispanTestHelper.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -24,25 +24,40 @@
import java.io.IOException;
import java.net.InetAddress;
import java.util.Properties;
+
+import org.infinispan.configuration.cache.CacheMode;
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.server.core.Main;
import org.infinispan.server.hotrod.HotRodServer;
+import org.teiid.translator.object.util.TradesCacheSource;
/**
*/
public class RemoteInfinispanTestHelper {
protected static final int PORT = 11311;
protected static final int TIMEOUT = 0;
- protected static final String CONFIG_FILE = "src/test/resources/infinispan_remote_config.xml";
private static HotRodServer server = null;
private static int count = 0;
+ private static DefaultCacheManager CACHEMANAGER = null;
public static synchronized HotRodServer createServer() throws IOException {
count++;
if (server == null) {
- DefaultCacheManager cacheManager = new DefaultCacheManager(CONFIG_FILE);
+ Configuration c = new ConfigurationBuilder().clustering().cacheMode(CacheMode.REPL_SYNC).eviction().maxEntries(7).build();
+
+ CACHEMANAGER = new DefaultCacheManager(
+ new GlobalConfigurationBuilder().transport().defaultTransport().build(),
+ c);
+ CACHEMANAGER.start();
// This doesn't work on IPv6, because the util assumes "127.0.0.1" ...
// server = HotRodTestingUtil.startHotRodServer(cacheManager, HOST, PORT);
+
+
+ CACHEMANAGER.defineConfiguration(TradesCacheSource.TRADES_CACHE_NAME, c);
+
server = new HotRodServer();
String hostAddress = hostAddress();
String hostPort = Integer.toString(hostPort());
@@ -54,10 +69,19 @@
props.setProperty(Main.PROP_KEY_PROXY_HOST(), hostAddress);
props.setProperty(Main.PROP_KEY_PROXY_PORT(), hostPort);
// System.out.println("Starting HotRot Server at " + hostAddress + ":" + hostPort);
- server.start(props, cacheManager);
+ server.start(props, CACHEMANAGER);
+
+ server.cacheManager().startCaches(TradesCacheSource.TRADES_CACHE_NAME);
+
+ TradesCacheSource.loadCache(server.getCacheManager().getCache(TradesCacheSource.TRADES_CACHE_NAME));
+
}
return server;
}
+
+ public static DefaultCacheManager getCacheManager() {
+ return CACHEMANAGER;
+ }
public static int hostPort() {
return PORT;
Modified: trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/TestInfinispanConfigFileKeySearch.java
===================================================================
--- trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/TestInfinispanConfigFileKeySearch.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/TestInfinispanConfigFileKeySearch.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -57,7 +57,6 @@
factory.setCacheName(TradesCacheSource.TRADES_CACHE_NAME);
factory.setRootClassName(TradesCacheSource.TRADE_CLASS_NAME);
- factory.setSearchStrategyClassName(SearchByKey.class.getName());
factory.start();
TradesCacheSource.loadCache(factory.getCacheContainer().getCache(TradesCacheSource.TRADES_CACHE_NAME));
Modified: trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/TestInfinispanJndiILuceneSearch.java
===================================================================
--- trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/TestInfinispanJndiILuceneSearch.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/TestInfinispanJndiILuceneSearch.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -34,7 +34,9 @@
import org.junit.Test;
import org.teiid.language.Select;
import org.teiid.translator.ExecutionContext;
+import org.teiid.translator.TranslatorException;
import org.teiid.translator.object.BasicSearchTest;
+import org.teiid.translator.object.ObjectConnection;
import org.teiid.translator.object.ObjectExecution;
import org.teiid.translator.object.infinispan.search.LuceneSearch;
import org.teiid.translator.object.util.TradesCacheSource;
@@ -58,11 +60,20 @@
@Before public void beforeEachTest() throws Exception{
- factory = new InfinispanExecutionFactory();
+ factory = new InfinispanExecutionFactory() {
+ @Override
+ protected Object findCacheUsingJNDIName()
+ throws TranslatorException {
+ return container;
+ }
+
+ };
+
+ factory.setCacheJndiName("JNDINAME");
factory.setCacheName(TradesCacheSource.TRADES_CACHE_NAME);
factory.setRootClassName(TradesCacheSource.TRADE_CLASS_NAME);
- factory.setSearchStrategyClassName(LuceneSearch.class.getName());
+ factory.setSupportsLuceneSearching(true);
factory.start();
@@ -70,9 +81,9 @@
@Override
protected List<Object> performTest(Select command, int rowcnt) throws Exception {
-
- ObjectExecution exec = (ObjectExecution) factory.createExecution(command, context, VDBUtility.RUNTIME_METADATA, container);
+ ObjectExecution exec = (ObjectExecution) factory.createExecution(command, context, VDBUtility.RUNTIME_METADATA, null);
+
exec.execute();
List<Object> rows = new ArrayList<Object>();
Modified: trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/TestInfinispanJndiKeySearch.java
===================================================================
--- trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/TestInfinispanJndiKeySearch.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/TestInfinispanJndiKeySearch.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -22,18 +22,25 @@
package org.teiid.translator.object.infinispan;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
+import javax.naming.Context;
+
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.DefaultCacheManager;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.mockito.Mock;
import org.teiid.language.Select;
import org.teiid.translator.ExecutionContext;
+import org.teiid.translator.TranslatorException;
import org.teiid.translator.object.BasicSearchTest;
+import org.teiid.translator.object.ObjectConnection;
import org.teiid.translator.object.ObjectExecution;
import org.teiid.translator.object.infinispan.search.SearchByKey;
import org.teiid.translator.object.util.TradesCacheSource;
@@ -53,15 +60,24 @@
TradesCacheSource.loadCache(container.getCache(TradesCacheSource.TRADES_CACHE_NAME));
context = mock(ExecutionContext.class);
+
}
@Before public void beforeEachTest() throws Exception{
- factory = new InfinispanExecutionFactory();
+ factory = new InfinispanExecutionFactory() {
+ @Override
+ protected Object findCacheUsingJNDIName()
+ throws TranslatorException {
+ return container;
+ }
+
+ };
+
+ factory.setCacheJndiName("JNDINAME");
factory.setCacheName(TradesCacheSource.TRADES_CACHE_NAME);
factory.setRootClassName(TradesCacheSource.TRADE_CLASS_NAME);
- factory.setSearchStrategyClassName(SearchByKey.class.getName());
factory.start();
@@ -70,7 +86,7 @@
@Override
protected List<Object> performTest(Select command, int rowcnt) throws Exception {
- ObjectExecution exec = (ObjectExecution) factory.createExecution(command, context, VDBUtility.RUNTIME_METADATA, container);
+ ObjectExecution exec = (ObjectExecution) factory.createExecution(command, context, VDBUtility.RUNTIME_METADATA, null);
exec.execute();
Modified: trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/TestInfinispanRemoteJndiKeySearch.java
===================================================================
--- trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/TestInfinispanRemoteJndiKeySearch.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/infinispan/TestInfinispanRemoteJndiKeySearch.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -22,6 +22,7 @@
package org.teiid.translator.object.infinispan;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -53,7 +54,7 @@
private static ExecutionContext context;
- private InfinispanExecutionFactory factory = null;
+ private InfinispanRemoteExecutionFactory factory = null;
@Mock
private static Context jndi;
@@ -62,11 +63,10 @@
public static void beforeEachClass() throws Exception {
RemoteInfinispanTestHelper.createServer();
// Create the cache manager ...
-
+
// Set up the mock JNDI ...
jndi = mock(Context.class);
when(jndi.lookup(anyString())).thenReturn(null);
- when(jndi.lookup(JNDI_NAME)).thenReturn(container);
context = mock(ExecutionContext.class);
@@ -75,8 +75,9 @@
@Before public void beforeEachTest() throws Exception{
- factory = new InfinispanExecutionFactory();
+ factory = new InfinispanRemoteExecutionFactory();
+ factory.setRemoteServerList(RemoteInfinispanTestHelper.hostAddress() + ":" + RemoteInfinispanTestHelper.hostPort());
factory.setCacheName(TradesCacheSource.TRADES_CACHE_NAME);
factory.setRootClassName(TradesCacheSource.TRADE_CLASS_NAME);
factory.start();
@@ -91,11 +92,13 @@
@Override
protected List<Object> performTest(Select command, int rowcnt) throws Exception {
- container = new RemoteCacheManager("infinispan_remote_config.xml");
-
- TradesCacheSource.loadCache(container.getCache(TradesCacheSource.TRADES_CACHE_NAME));
+ when(jndi.lookup(JNDI_NAME)).thenReturn(container);
+
+ Object t = RemoteInfinispanTestHelper.getCacheManager().getCache(TradesCacheSource.TRADES_CACHE_NAME).get("1");
- ObjectExecution exec = (ObjectExecution) factory.createExecution(command, context, VDBUtility.RUNTIME_METADATA, container);
+ assertNotNull(t);
+
+ ObjectExecution exec = (ObjectExecution) factory.createExecution(command, context, VDBUtility.RUNTIME_METADATA, null);
exec.execute();
Modified: trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/mapcache/TestMapCacheKeySearch.java
===================================================================
--- trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/mapcache/TestMapCacheKeySearch.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/mapcache/TestMapCacheKeySearch.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -22,15 +22,22 @@
package org.teiid.translator.object.mapcache;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import javax.naming.Context;
+
import org.junit.Before;
import org.junit.BeforeClass;
+import org.mockito.Mock;
import org.teiid.language.Select;
import org.teiid.translator.ExecutionContext;
+import org.teiid.translator.TranslatorException;
import org.teiid.translator.object.BasicSearchTest;
import org.teiid.translator.object.ObjectExecution;
import org.teiid.translator.object.util.TradesCacheSource;
@@ -38,27 +45,44 @@
@SuppressWarnings("nls")
-public class TestMapCacheKeySearch extends BasicSearchTest {
-
+public class TestMapCacheKeySearch extends BasicSearchTest {
+
+ protected static final String JNDI_NAME = "java/MyCacheManager";
+
private static TradesCacheSource source = TradesCacheSource.loadCache();
private static ExecutionContext context;
private MapCacheExecutionFactory factory = null;
+
+ @Mock
+ private static Context jndi;
-
protected static boolean print = false;
@BeforeClass
public static void beforeEachClass() throws Exception {
context = mock(ExecutionContext.class);
+
+ // Set up the mock JNDI ...
+ jndi = mock(Context.class);
+ when(jndi.lookup(anyString())).thenReturn(null);
+ when(jndi.lookup(JNDI_NAME)).thenReturn(source);
}
@Before public void beforeEach() throws Exception{
- factory = new MapCacheExecutionFactory();
+ factory = new MapCacheExecutionFactory() {
+ @Override
+ protected Map<?, ?> getCache() throws TranslatorException {
+ return source;
+ }
+
+ };
+
+ factory.setCacheJndiName(JNDI_NAME);
factory.setRootClassName(TradesCacheSource.TRADE_CLASS_NAME);
factory.start();
@@ -67,7 +91,7 @@
@Override
protected List<Object> performTest(Select command, int rowcnt) throws Exception {
- ObjectExecution exec = (ObjectExecution) factory.createExecution(command, context, VDBUtility.RUNTIME_METADATA, source);
+ ObjectExecution exec = (ObjectExecution) factory.createExecution(command, context, VDBUtility.RUNTIME_METADATA, null);
exec.execute();
Modified: trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/search/TestBasicKeySearchCriteria.java
===================================================================
--- trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/search/TestBasicKeySearchCriteria.java 2012-09-21 02:54:01 UTC (rev 4458)
+++ trunk/connectors/translator-object/src/test/java/org/teiid/translator/object/search/TestBasicKeySearchCriteria.java 2012-09-21 13:01:05 UTC (rev 4459)
@@ -29,7 +29,6 @@
import org.junit.Test;
import org.teiid.language.Select;
import org.teiid.translator.object.ObjectExecutionFactory;
-import org.teiid.translator.object.SelectProjections;
import org.teiid.translator.object.util.TradesCacheSource;
import org.teiid.translator.object.util.VDBUtility;
@@ -47,25 +46,17 @@
factory.setRootClassName(TradesCacheSource.TRADE_CLASS_NAME);
}
-
- private SelectProjections createSelectProjections(Select command) {
- SelectProjections visitor = SelectProjections.create(factory);
- visitor.parse(command);
- return visitor;
+ private BasicKeySearchCriteria createVisitor(Select command) throws Exception {
+ return BasicKeySearchCriteria.getInstance(factory, command);
}
-
- private BasicKeySearchCriteria createVisitor(Select command, SelectProjections projections) throws Exception {
- return BasicKeySearchCriteria.getInstance(factory, projections, command);
- }
@Test public void testIN() throws Exception {
Select command = (Select)VDBUtility.TRANSLATION_UTILITY.parseCommand("select * From Trade_Object.Trade where Trade_Object.Trade.TradeID IN ('1','2','3')"); //$NON-NLS-1$
- SelectProjections visitor = createSelectProjections(command);
- BasicKeySearchCriteria criteria = createVisitor(command, visitor);
+ BasicKeySearchCriteria criteria = createVisitor(command);
validateSearchCriteria(criteria.getCriterion(), 1, false, true);
}
@@ -73,29 +64,25 @@
@Test public void test1Equals() throws Exception {
Select command = (Select)VDBUtility.TRANSLATION_UTILITY.parseCommand("select * From Trade_Object.Trade where Trade_Object.Trade.TradeID = '1'"); //$NON-NLS-1$
- SelectProjections visitor = createSelectProjections(command);
- BasicKeySearchCriteria criteria = createVisitor(command, visitor);
+ BasicKeySearchCriteria criteria = createVisitor(command);
validateSearchCriteria(criteria.getCriterion(), 1, false, true);
}
- @Test public void test2Equals() throws Exception {
+ @Test public void test5Equals() throws Exception {
Select command = (Select)VDBUtility.TRANSLATION_UTILITY.parseCommand("select * From Trade_Object.Trade where Trade_Object.Trade.Name = 'MyName' and (Trade_Object.Trade.TradeId = '2' or Trade_Object.Trade.Settled = 'true') or (Trade_Object.Trade.Settled = 'false' and Trade_Object.Trade.TradeId = 3) "); //$NON-NLS-1$
- SelectProjections visitor = createSelectProjections(command);
- BasicKeySearchCriteria criteria = createVisitor(command, visitor);
- validateSearchCriteria(criteria.getCriterion(), 2, false, true);
+ BasicKeySearchCriteria criteria = createVisitor(command);
+ validateSearchCriteria(criteria.getCriterion(), 5, false, true);
}
@Test public void testQueryIncludeLegsNoCriteria() throws Exception {
Select command = (Select)VDBUtility.TRANSLATION_UTILITY.parseCommand("select T.TradeId, T.Name as TradeName, L.Name as LegName From Trade_Object.Trade as T, Trade_Object.Leg as L Where T.TradeId = L.TradeId"); //$NON-NLS-1$
- SelectProjections visitor = createSelectProjections(command);
-
- BasicKeySearchCriteria criteria = createVisitor(command, visitor);
+ BasicKeySearchCriteria criteria = createVisitor(command);
validateSearchCriteria(criteria.getCriterion(), 1, false, true);
assertEquals(criteria.getCriterion().getOperator(), SearchCriterion.Operator.ALL);
@@ -104,9 +91,7 @@
@Test public void testQueryIncludeLegsWithCriteria() throws Exception {
Select command = (Select)VDBUtility.TRANSLATION_UTILITY.parseCommand("select T.TradeId, T.Name as TradeName, L.Name as LegName From Trade_Object.Trade as T, Trade_Object.Leg as L Where T.TradeId = L.TradeId and L.Name='MyLeg'"); //$NON-NLS-1$
- SelectProjections visitor = createSelectProjections(command);
-
- BasicKeySearchCriteria criteria = createVisitor(command, visitor);
+ BasicKeySearchCriteria criteria = createVisitor(command);
validateSearchCriteria(criteria.getCriterion(), 1, false, true);
}
@@ -115,9 +100,8 @@
" N.LineItem " +
" From Trade_Object.Trade as T, Trade_Object.Leg as L, Trade_Object.Transaction as N " +
" Where T.TradeId = L.TradeId and L.LegId = N.LegId "); //$NON-NLS-1$
- SelectProjections visitor = createSelectProjections(command);
- BasicKeySearchCriteria criteria = createVisitor(command, visitor);
+ BasicKeySearchCriteria criteria = createVisitor(command);
validateSearchCriteria(criteria.getCriterion(), 1, false, true);
assertEquals(criteria.getCriterion().getOperator(), SearchCriterion.Operator.ALL);
@@ -129,9 +113,8 @@
" N.LineItem " +
" From Trade_Object.Trade as T, Trade_Object.Leg as L, Trade_Object.Transaction as N " +
" Where T.TradeId = L.TradeId and L.LegId = N.LegId and T.TradeId in ('1','2','3') "); //$NON-NLS-1$
- SelectProjections visitor = createSelectProjections(command);
- BasicKeySearchCriteria criteria = createVisitor(command, visitor);
+ BasicKeySearchCriteria criteria = createVisitor(command);
validateSearchCriteria(criteria.getCriterion(), 1, false, true);
}
@@ -139,9 +122,7 @@
@Test public void testQueryLegsWithCriteria() throws Exception {
Select command = (Select)VDBUtility.TRANSLATION_UTILITY.parseCommand("select L.Name as LegName From Trade_Object.Leg as L Where L.Name='MyLeg'"); //$NON-NLS-1$
- SelectProjections visitor = createSelectProjections(command);
-
- BasicKeySearchCriteria criteria = createVisitor(command, visitor);
+ BasicKeySearchCriteria criteria = createVisitor(command);
validateSearchCriteria(criteria.getCriterion(), 1, false, false);
}
@@ -153,7 +134,7 @@
assertNotNull(criteria);
- assertEquals(cnt, criteria.getCriteriaCount());
+ assertEquals( "Criteria Count", cnt, criteria.getCriteriaCount());
if (criteria.getOperator() != SearchCriterion.Operator.ALL) {
@@ -165,12 +146,11 @@
assertNotNull(criteria.getRuntimeType());
assertNotNull(criteria.getValue());
- assertEquals(isAnd, criteria.isAndCondition());
+ assertEquals("IsAnd", isAnd, criteria.isAndCondition());
+ } else {
+ assertEquals("Criteria Cnt must be 1 when using ALL", 1, criteria.getCriteriaCount());
}
-
- assertEquals(isRootInSelect, criteria.isRootTableInSelect());
-
}
}
12 years, 5 months
teiid SVN: r4458 - in branches/7.7.x/engine/src: test/java/org/teiid/query/processor and 1 other directory.
by teiid-commits@lists.jboss.org
Author: jolee
Date: 2012-09-20 22:54:01 -0400 (Thu, 20 Sep 2012)
New Revision: 4458
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RulePushAggregates.java
branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestAggregateProcessing.java
Log:
TEIID-2220: Invalid aggregate pushing
Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RulePushAggregates.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RulePushAggregates.java 2012-09-20 19:43:05 UTC (rev 4457)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RulePushAggregates.java 2012-09-21 02:54:01 UTC (rev 4458)
@@ -595,11 +595,12 @@
Set<Expression> stagedGroupingSymbols = new LinkedHashSet<Expression>();
Collection<AggregateSymbol> aggregates = aggregateMap.get(planNode);
- if (!canPush(groupNode, stagedGroupingSymbols, planNode)) {
- continue;
+ planNode = canPush(groupNode, stagedGroupingSymbols, planNode);
+ if (planNode == null) {
+ continue;
}
- filterJoinColumns(stagedGroupingSymbols, planNode.getGroups(), groupingExpressions);
+ filterExpressions(stagedGroupingSymbols, planNode.getGroups(), groupingExpressions, false);
collectSymbolsFromOtherAggregates(allAggregates, aggregates, planNode, stagedGroupingSymbols);
@@ -726,45 +727,70 @@
/**
* Ensures that we are only pushing through inner equi joins or cross joins. Also collects the necessary staged grouping symbols
+ * @return null if we cannot push otherwise the target join node
*/
- private boolean canPush(PlanNode groupNode,
+ private PlanNode canPush(PlanNode groupNode,
Set<Expression> stagedGroupingSymbols,
PlanNode planNode) {
PlanNode parentJoin = planNode.getParent();
Set<GroupSymbol> groups = FrameUtil.findJoinSourceNode(planNode).getGroups();
+ PlanNode result = planNode;
while (parentJoin != groupNode) {
if (parentJoin.getType() != NodeConstants.Types.JOIN
|| parentJoin.hasCollectionProperty(NodeConstants.Info.NON_EQUI_JOIN_CRITERIA)
|| ((JoinType)parentJoin.getProperty(NodeConstants.Info.JOIN_TYPE)).isOuter()) {
- return false;
+ return null;
}
-
+ //we move the target up if the filtered expressions introduce outside groups
if (planNode == parentJoin.getFirstChild()) {
- if (parentJoin.hasCollectionProperty(NodeConstants.Info.LEFT_EXPRESSIONS)) {
- filterJoinColumns(stagedGroupingSymbols, groups, (List<SingleElementSymbol>)parentJoin.getProperty(NodeConstants.Info.LEFT_EXPRESSIONS));
+ if (parentJoin.hasCollectionProperty(NodeConstants.Info.LEFT_EXPRESSIONS)
+ && filterExpressions(stagedGroupingSymbols, groups, (List<Expression>)parentJoin.getProperty(NodeConstants.Info.LEFT_EXPRESSIONS), true)) {
+ result = parentJoin;
+ groups = result.getGroups();
}
} else {
- if (parentJoin.hasCollectionProperty(NodeConstants.Info.RIGHT_EXPRESSIONS)) {
- filterJoinColumns(stagedGroupingSymbols, groups, (List<SingleElementSymbol>)parentJoin.getProperty(NodeConstants.Info.RIGHT_EXPRESSIONS));
+ if (parentJoin.hasCollectionProperty(NodeConstants.Info.RIGHT_EXPRESSIONS)
+ && filterExpressions(stagedGroupingSymbols, groups, (List<Expression>)parentJoin.getProperty(NodeConstants.Info.RIGHT_EXPRESSIONS), true)) {
+ result = parentJoin;
+ groups = result.getGroups();
}
}
planNode = parentJoin;
parentJoin = parentJoin.getParent();
}
- return true;
+ if (result.getParent() == groupNode) {
+ //can't be pushed as we are already at the direct child
+ return null;
+ }
+ return result;
}
- private void filterJoinColumns(Set<Expression> stagedGroupingSymbols,
+ /**
+ * @return true if the filtered expressions contain outside groups
+ */
+ private boolean filterExpressions(Set<Expression> stagedGroupingSymbols,
Set<GroupSymbol> groups,
- List<? extends Expression> symbols) {
+ Collection<? extends Expression> symbols, boolean wholeExpression) {
+ boolean result = false;
for (Expression ex : symbols) {
- if (groups.containsAll(GroupsUsedByElementsVisitor.getGroups(ex))) {
+ Set<GroupSymbol> groups2 = GroupsUsedByElementsVisitor.getGroups(ex);
+ if (!Collections.disjoint(groups, groups2)) {
+ if (!result) {
+ boolean containsAll = groups.containsAll(groups2);
+ if (!wholeExpression && !containsAll) {
+ //collect only matching subexpressions - but the best that we'll currently do is elementsymbols
+ filterExpressions(stagedGroupingSymbols, groups, ElementCollectorVisitor.getElements(ex, true), true);
+ continue;
+ }
+ result = !containsAll;
+ }
stagedGroupingSymbols.add(SymbolMap.getExpression(ex));
}
}
+ return result;
}
private <T extends Expression> Map<PlanNode, List<T>> createNodeMapping(PlanNode groupNode,
Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestAggregateProcessing.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestAggregateProcessing.java 2012-09-20 19:43:05 UTC (rev 4457)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestAggregateProcessing.java 2012-09-21 02:54:01 UTC (rev 4458)
@@ -376,6 +376,39 @@
helpProcess(plan, dataManager, expected);
}
+ @Test public void testMultiJoinCriteria() throws Exception {
+ String sql = "SELECT count(t2.e4) as s FROM pm1.g1 as t1, pm1.g2 as t2, pm1.g3 as t3 WHERE t1.e1 = t2.e1 and t2.e2 = t3.e2 and t1.e3 || t2.e3 = t3.e3"; //$NON-NLS-1$
+
+ List[] expected = new List[] {
+ Arrays.asList(0)
+ };
+
+ FakeDataManager dataManager = new FakeDataManager();
+ sampleData1(dataManager);
+
+ ProcessorPlan plan = helpGetPlan(sql, RealMetadataFactory.example1Cached());
+
+ helpProcess(plan, dataManager, expected);
+ }
+
+ @Test public void testMultiJoinGroupBy() throws Exception {
+ String sql = "SELECT count(t2.e4) as s, t1.e3 || t2.e3 FROM pm1.g1 as t1, pm1.g2 as t2, pm1.g3 as t3 WHERE t1.e1 = t2.e1 and t2.e2 = t3.e2 GROUP BY t1.e3 || t2.e3"; //$NON-NLS-1$
+
+ List[] expected = new List[] {
+ Arrays.asList(9, "falsefalse"),
+ Arrays.asList(2, "falsetrue"),
+ Arrays.asList(4, "truefalse"),
+ Arrays.asList(1, "truetrue"),
+ };
+
+ FakeDataManager dataManager = new FakeDataManager();
+ sampleData1(dataManager);
+
+ ProcessorPlan plan = helpGetPlan(sql, RealMetadataFactory.example1Cached());
+
+ helpProcess(plan, dataManager, expected);
+ }
+
@Test public void testArrayAggOrderByPersistence() throws Exception {
// Create query
String sql = "SELECT array_agg(e2 order by e1) from pm1.g1 group by e3"; //$NON-NLS-1$
12 years, 5 months
teiid SVN: r4457 - in trunk/engine/src: test/java/org/teiid/query/processor and 1 other directory.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2012-09-20 15:43:05 -0400 (Thu, 20 Sep 2012)
New Revision: 4457
Modified:
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RulePushAggregates.java
trunk/engine/src/test/java/org/teiid/query/processor/TestAggregateProcessing.java
Log:
TEIID-2220 fix for bad aggregate pushing with more complicated grouping and equi-join expressions.
Modified: trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RulePushAggregates.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RulePushAggregates.java 2012-09-18 19:18:44 UTC (rev 4456)
+++ trunk/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RulePushAggregates.java 2012-09-20 19:43:05 UTC (rev 4457)
@@ -645,11 +645,12 @@
Set<Expression> stagedGroupingSymbols = new LinkedHashSet<Expression>();
Collection<AggregateSymbol> aggregates = aggregateMap.get(planNode);
- if (!canPush(groupNode, stagedGroupingSymbols, planNode)) {
- continue;
+ planNode = canPush(groupNode, stagedGroupingSymbols, planNode);
+ if (planNode == null) {
+ continue;
}
- filterJoinColumns(stagedGroupingSymbols, planNode.getGroups(), groupingExpressions);
+ filterExpressions(stagedGroupingSymbols, planNode.getGroups(), groupingExpressions, false);
collectSymbolsFromOtherAggregates(allAggregates, aggregates, planNode, stagedGroupingSymbols);
@@ -776,45 +777,70 @@
/**
* Ensures that we are only pushing through inner equi joins or cross joins. Also collects the necessary staged grouping symbols
+ * @return null if we cannot push otherwise the target join node
*/
- private boolean canPush(PlanNode groupNode,
+ private PlanNode canPush(PlanNode groupNode,
Set<Expression> stagedGroupingSymbols,
PlanNode planNode) {
PlanNode parentJoin = planNode.getParent();
Set<GroupSymbol> groups = FrameUtil.findJoinSourceNode(planNode).getGroups();
+ PlanNode result = planNode;
while (parentJoin != groupNode) {
if (parentJoin.getType() != NodeConstants.Types.JOIN
|| parentJoin.hasCollectionProperty(NodeConstants.Info.NON_EQUI_JOIN_CRITERIA)
|| ((JoinType)parentJoin.getProperty(NodeConstants.Info.JOIN_TYPE)).isOuter()) {
- return false;
+ return null;
}
-
+ //we move the target up if the filtered expressions introduce outside groups
if (planNode == parentJoin.getFirstChild()) {
- if (parentJoin.hasCollectionProperty(NodeConstants.Info.LEFT_EXPRESSIONS)) {
- filterJoinColumns(stagedGroupingSymbols, groups, (List<Expression>)parentJoin.getProperty(NodeConstants.Info.LEFT_EXPRESSIONS));
+ if (parentJoin.hasCollectionProperty(NodeConstants.Info.LEFT_EXPRESSIONS)
+ && filterExpressions(stagedGroupingSymbols, groups, (List<Expression>)parentJoin.getProperty(NodeConstants.Info.LEFT_EXPRESSIONS), true)) {
+ result = parentJoin;
+ groups = result.getGroups();
}
} else {
- if (parentJoin.hasCollectionProperty(NodeConstants.Info.RIGHT_EXPRESSIONS)) {
- filterJoinColumns(stagedGroupingSymbols, groups, (List<Expression>)parentJoin.getProperty(NodeConstants.Info.RIGHT_EXPRESSIONS));
+ if (parentJoin.hasCollectionProperty(NodeConstants.Info.RIGHT_EXPRESSIONS)
+ && filterExpressions(stagedGroupingSymbols, groups, (List<Expression>)parentJoin.getProperty(NodeConstants.Info.RIGHT_EXPRESSIONS), true)) {
+ result = parentJoin;
+ groups = result.getGroups();
}
}
planNode = parentJoin;
parentJoin = parentJoin.getParent();
}
- return true;
+ if (result.getParent() == groupNode) {
+ //can't be pushed as we are already at the direct child
+ return null;
+ }
+ return result;
}
- private void filterJoinColumns(Set<Expression> stagedGroupingSymbols,
+ /**
+ * @return true if the filtered expressions contain outside groups
+ */
+ private boolean filterExpressions(Set<Expression> stagedGroupingSymbols,
Set<GroupSymbol> groups,
- List<? extends Expression> symbols) {
+ Collection<? extends Expression> symbols, boolean wholeExpression) {
+ boolean result = false;
for (Expression ex : symbols) {
- if (groups.containsAll(GroupsUsedByElementsVisitor.getGroups(ex))) {
+ Set<GroupSymbol> groups2 = GroupsUsedByElementsVisitor.getGroups(ex);
+ if (!Collections.disjoint(groups, groups2)) {
+ if (!result) {
+ boolean containsAll = groups.containsAll(groups2);
+ if (!wholeExpression && !containsAll) {
+ //collect only matching subexpressions - but the best that we'll currently do is elementsymbols
+ filterExpressions(stagedGroupingSymbols, groups, ElementCollectorVisitor.getElements(ex, true), true);
+ continue;
+ }
+ result = !containsAll;
+ }
stagedGroupingSymbols.add(SymbolMap.getExpression(ex));
}
}
+ return result;
}
private <T extends Expression> Map<PlanNode, List<T>> createNodeMapping(PlanNode groupNode,
Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestAggregateProcessing.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestAggregateProcessing.java 2012-09-18 19:18:44 UTC (rev 4456)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestAggregateProcessing.java 2012-09-20 19:43:05 UTC (rev 4457)
@@ -410,6 +410,39 @@
helpProcess(plan, dataManager, expected);
}
+ @Test public void testMultiJoinCriteria() throws Exception {
+ String sql = "SELECT count(t2.e4) as s FROM pm1.g1 as t1, pm1.g2 as t2, pm1.g3 as t3 WHERE t1.e1 = t2.e1 and t2.e2 = t3.e2 and t1.e3 || t2.e3 = t3.e3"; //$NON-NLS-1$
+
+ List[] expected = new List[] {
+ Arrays.asList(0)
+ };
+
+ FakeDataManager dataManager = new FakeDataManager();
+ sampleData1(dataManager);
+
+ ProcessorPlan plan = helpGetPlan(sql, RealMetadataFactory.example1Cached());
+
+ helpProcess(plan, dataManager, expected);
+ }
+
+ @Test public void testMultiJoinGroupBy() throws Exception {
+ String sql = "SELECT count(t2.e4) as s, t1.e3 || t2.e3 FROM pm1.g1 as t1, pm1.g2 as t2, pm1.g3 as t3 WHERE t1.e1 = t2.e1 and t2.e2 = t3.e2 GROUP BY t1.e3 || t2.e3"; //$NON-NLS-1$
+
+ List[] expected = new List[] {
+ Arrays.asList(9, "falsefalse"),
+ Arrays.asList(2, "falsetrue"),
+ Arrays.asList(4, "truefalse"),
+ Arrays.asList(1, "truetrue"),
+ };
+
+ FakeDataManager dataManager = new FakeDataManager();
+ sampleData1(dataManager);
+
+ ProcessorPlan plan = helpGetPlan(sql, RealMetadataFactory.example1Cached());
+
+ helpProcess(plan, dataManager, expected);
+ }
+
@Test public void testArrayAggOrderByPersistence() throws Exception {
// Create query
String sql = "SELECT array_agg(e2 order by e1) from pm1.g1 group by e3"; //$NON-NLS-1$
12 years, 5 months
teiid SVN: r4456 - in trunk: runtime/src/main/java/org/teiid/runtime and 3 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2012-09-18 15:18:44 -0400 (Tue, 18 Sep 2012)
New Revision: 4456
Added:
trunk/runtime/src/main/resources/infinispan-replicated-config.xml
trunk/runtime/src/main/resources/tcp-shared.xml
trunk/test-integration/common/src/test/resources/infinispan-replicated-config-1.xml
trunk/test-integration/common/src/test/resources/tcp-shared-1.xml
Removed:
trunk/test-integration/common/src/test/resources/infinispan-replicated-config.xml
Modified:
trunk/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java
trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java
trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java
Log:
TEIID-2215 making non-clustered by default, using a shared transport, and adding cleanup
Modified: trunk/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java 2012-09-18 11:57:37 UTC (rev 4455)
+++ trunk/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java 2012-09-18 19:18:44 UTC (rev 4456)
@@ -27,7 +27,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -484,22 +483,14 @@
public int getTransactionIsolation() throws SQLException {
return this.transactionIsolation;
- }
-
+ }
+
+ @Override
+ public Map<String, Class<?>> getTypeMap() throws SQLException {
+ return Collections.emptyMap();
+ }
+
/**
- * Retreives the type map associated with this Connection object. The type map
- * contains entries for undefined types. This method always returns an empty
- * map since it is not possible to add entries to this type map
- * @return map containing undefined types(empty)
- * @throws SQLException, should never occur
- */
- public Map getTypeMap() throws SQLException {
- //Check to see the connection is open
- checkConnection();
- return new HashMap();
- }
-
- /**
* <p>This method will return the first warning reported by calls on this connection,
* or null if none exist.</p>
* @return A SQLWarning object if there are any warnings.
Modified: trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java 2012-09-18 11:57:37 UTC (rev 4455)
+++ trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java 2012-09-18 19:18:44 UTC (rev 4456)
@@ -23,7 +23,10 @@
package org.teiid.runtime;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.WeakHashMap;
import java.util.concurrent.Executors;
import javax.resource.spi.work.WorkManager;
@@ -31,6 +34,7 @@
import org.infinispan.manager.DefaultCacheManager;
import org.jgroups.Channel;
+import org.jgroups.ChannelListener;
import org.jgroups.JChannel;
import org.teiid.cache.CacheFactory;
import org.teiid.cache.infinispan.InfinispanCacheFactory;
@@ -45,6 +49,37 @@
public class EmbeddedConfiguration extends DQPConfiguration {
+ private final class SimpleChannelFactory implements ChannelFactory, ChannelListener {
+ private final Map<Channel, String> channels = new WeakHashMap<Channel, String>();
+
+ @Override
+ public Channel createChannel(String id) throws Exception {
+ JChannel channel = new JChannel(this.getClass().getClassLoader().getResource(getJgroupsConfigFile()));
+ channels.put(channel, id);
+ channel.addChannelListener(this);
+ return channel;
+ }
+
+ @Override
+ public void channelClosed(Channel channel) {
+ channels.remove(channel);
+ }
+
+ @Override
+ public void channelConnected(Channel channel) {
+ }
+
+ @Override
+ public void channelDisconnected(Channel channel) {
+ }
+
+ void stop() {
+ for (Channel c : new ArrayList<Channel>(channels.keySet())) {
+ c.close();
+ }
+ }
+ }
+
private SecurityHelper securityHelper;
private List<String> securityDomains;
private TransactionManager transactionManager;
@@ -55,8 +90,14 @@
private CacheFactory cacheFactory;
private int maxResultSetCacheStaleness = 60;
private String infinispanConfigFile = "infinispan-config.xml"; //$NON-NLS-1$
- private String jgroupsConfigFile = "tcp.xml"; //$NON-NLS-1$
+ private String jgroupsConfigFile;
+ private DefaultCacheManager manager;
+ private SimpleChannelFactory channelFactory;
+
+ public EmbeddedConfiguration() {
+ }
+
public SecurityHelper getSecurityHelper() {
return securityHelper;
}
@@ -84,13 +125,9 @@
}
public ObjectReplicator getObjectReplicator() {
- if (this.objectReplicator == null) {
- this.objectReplicator = new JGroupsObjectReplicator(new ChannelFactory() {
- @Override
- public Channel createChannel(String id) throws Exception {
- return new JChannel(this.getClass().getClassLoader().getResource(getJgroupsConfigFile()));
- }
- }, Executors.newCachedThreadPool());
+ if (this.objectReplicator == null && jgroupsConfigFile != null) {
+ channelFactory = new SimpleChannelFactory();
+ this.objectReplicator = new JGroupsObjectReplicator(channelFactory, Executors.newCachedThreadPool());
}
return objectReplicator;
}
@@ -150,7 +187,7 @@
public CacheFactory getCacheFactory() {
if (this.cacheFactory == null) {
try {
- DefaultCacheManager manager = new DefaultCacheManager(this.infinispanConfigFile, true);
+ manager = new DefaultCacheManager(this.infinispanConfigFile, true);
manager.startCaches(manager.getCacheNames().toArray(new String[manager.getCacheNames().size()]));
this.cacheFactory = new InfinispanCacheFactory(manager, this.getClass().getClassLoader());
} catch (IOException e) {
@@ -175,4 +212,13 @@
public void setJgroupsConfigFile(String jgroupsConfigFile) {
this.jgroupsConfigFile = jgroupsConfigFile;
}
+
+ protected void stop() {
+ if (manager != null) {
+ manager.stop();
+ }
+ if (channelFactory != null) {
+ channelFactory.stop();
+ }
+ }
}
Modified: trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java 2012-09-18 11:57:37 UTC (rev 4455)
+++ trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java 2012-09-18 19:18:44 UTC (rev 4456)
@@ -264,6 +264,7 @@
//to allow teiid to start a request transaction under an existing thread bound transaction
protected boolean detectTransactions = true;
private Boolean running;
+ private EmbeddedConfiguration config;
public EmbeddedServer() {
@@ -274,10 +275,11 @@
this.connectionFactoryProviders.put(name, connectionFactoryProvider);
}
- public synchronized void start(EmbeddedConfiguration config) {
+ public synchronized void start(@SuppressWarnings("hiding") EmbeddedConfiguration config) {
if (running != null) {
throw new IllegalStateException();
}
+ this.config = config;
this.eventDistributorFactoryService.start();
this.dqp.setEventDistributor(this.eventDistributorFactoryService.getReplicatedEventDistributor());
this.replicator = config.getObjectReplicator();
@@ -505,6 +507,9 @@
* Stops the server. Once stopped it cannot be restarted.
*/
public synchronized void stop() {
+ if (config != null) {
+ config.stop();
+ }
dqp.stop();
eventDistributorFactoryService.stop();
bufferService = null;
Added: trunk/runtime/src/main/resources/infinispan-replicated-config.xml
===================================================================
--- trunk/runtime/src/main/resources/infinispan-replicated-config.xml (rev 0)
+++ trunk/runtime/src/main/resources/infinispan-replicated-config.xml 2012-09-18 19:18:44 UTC (rev 4456)
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:5.1 http://www.infinispan.org/schemas/infinispan-config-5.1.xsd" xmlns="urn:infinispan:config:5.1">
+
+ <global>
+ <transport clusterName="teiid-cluster" machineId="m1" rackId="r1" nodeName="Node-A">
+ <properties>
+ <property name="configurationFile" value="tcp-shared.xml"/>
+ </properties>
+ </transport>
+ </global>
+
+ <namedCache name="resultset">
+ <transaction transactionMode="TRANSACTIONAL"/>
+ <eviction maxEntries="1024" strategy="LIRS" />
+ <expiration lifespan="7200000"/>
+ <clustering mode="local"/>
+ </namedCache>
+
+ <namedCache name="resultset-repl">
+ <transaction transactionMode="TRANSACTIONAL"/>
+ <eviction maxEntries="1024" strategy="LIRS" />
+ <expiration lifespan="7200000"/>
+ <clustering mode="repl">
+ <sync/>
+ </clustering>
+ </namedCache>
+
+ <namedCache name="preparedplan">
+ <eviction maxEntries="512" strategy="LIRS"/>
+ <expiration lifespan="28800"/>
+ <clustering mode="local"/>
+ </namedCache>
+
+</infinispan>
\ No newline at end of file
Property changes on: trunk/runtime/src/main/resources/infinispan-replicated-config.xml
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: trunk/runtime/src/main/resources/tcp-shared.xml
===================================================================
--- trunk/runtime/src/main/resources/tcp-shared.xml (rev 0)
+++ trunk/runtime/src/main/resources/tcp-shared.xml 2012-09-18 19:18:44 UTC (rev 4456)
@@ -0,0 +1,70 @@
+<!--
+ TCP based stack, with flow control and message bundling. This is usually used when IP
+ multicasting cannot be used in a network, e.g. because it is disabled (routers discard multicast).
+ Note that TCP.bind_addr and TCPPING.initial_hosts should be set, possibly via system properties, e.g.
+ -Djgroups.bind_addr=192.168.5.2 and -Djgroups.tcpping.initial_hosts=192.168.5.2[7800]
+ author: Bela Ban
+-->
+<config xmlns="urn:org:jgroups"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.0.xsd">
+ <TCP bind_port="7800"
+ loopback="false"
+ recv_buf_size="${tcp.recv_buf_size:20M}"
+ send_buf_size="${tcp.send_buf_size:640K}"
+ discard_incompatible_packets="true"
+ max_bundle_size="64K"
+ max_bundle_timeout="30"
+ enable_bundling="true"
+ use_send_queues="true"
+ sock_conn_timeout="300"
+
+ timer_type="new"
+ timer.min_threads="4"
+ timer.max_threads="10"
+ timer.keep_alive_time="3000"
+ timer.queue_max_size="500"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="10"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="100"
+ thread_pool.rejection_policy="discard"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="discard"
+ singleton_name="tcp-teiid"/>
+
+ <TCPPING timeout="3000"
+ initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7800],localhost[7801]}"
+ port_range="1"
+ num_initial_members="3"/>
+ <MERGE2 min_interval="10000"
+ max_interval="30000"/>
+ <FD_SOCK/>
+ <FD timeout="3000" max_tries="3" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK use_mcast_xmit="false"
+ exponential_backoff="500"
+ discard_delivered_msgs="true"/>
+ <UNICAST />
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="4M"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+ view_bundling="true"/>
+ <UFC max_credits="2M"
+ min_threshold="0.4"/>
+ <MFC max_credits="2M"
+ min_threshold="0.4"/>
+ <FRAG2 frag_size="60K" />
+ <pbcast.STATE_TRANSFER/>
+</config>
\ No newline at end of file
Property changes on: trunk/runtime/src/main/resources/tcp-shared.xml
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java 2012-09-18 11:57:37 UTC (rev 4455)
+++ trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java 2012-09-18 19:18:44 UTC (rev 4456)
@@ -22,8 +22,7 @@
package org.teiid.systemmodel;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -31,10 +30,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
-import java.util.concurrent.Executors;
-import org.jgroups.Channel;
-import org.jgroups.JChannel;
import org.junit.BeforeClass;
import org.junit.Test;
import org.teiid.core.types.DataTypeManager;
@@ -42,11 +38,9 @@
import org.teiid.jdbc.FakeServer;
import org.teiid.jdbc.FakeServer.DeployVDBParameter;
import org.teiid.metadata.FunctionMethod;
+import org.teiid.metadata.FunctionParameter;
import org.teiid.metadata.FunctionMethod.Determinism;
import org.teiid.metadata.FunctionMethod.PushDown;
-import org.teiid.metadata.FunctionParameter;
-import org.teiid.replication.jgroups.ChannelFactory;
-import org.teiid.replication.jgroups.JGroupsObjectReplicator;
import org.teiid.runtime.EmbeddedConfiguration;
@SuppressWarnings("nls")
@@ -63,8 +57,7 @@
if (DEBUG) {
UnitTestUtil.enableTraceLogging("org.teiid");
}
-
- FakeServer server1 = createServer();
+ FakeServer server1 = createServer("infinispan-replicated-config.xml", "tcp-shared.xml");
Connection c1 = server1.createConnection("jdbc:teiid:matviews");
Statement stmt = c1.createStatement();
@@ -74,7 +67,7 @@
double d1 = rs.getDouble(1);
double d2 = rs.getDouble(2);
- FakeServer server2 = createServer();
+ FakeServer server2 = createServer("infinispan-replicated-config-1.xml", "tcp-shared-1.xml");
Connection c2 = server2.createConnection("jdbc:teiid:matviews");
Statement stmt2 = c2.createStatement();
ResultSet rs2 = stmt2.executeQuery("select * from matviews where name = 'RandomView'");
@@ -128,12 +121,12 @@
server2.stop();
}
- private FakeServer createServer() throws Exception {
+ private FakeServer createServer(String ispn, String jgroups) throws Exception {
FakeServer server = new FakeServer(false);
EmbeddedConfiguration config = new EmbeddedConfiguration();
- config.setInfinispanConfigFile(UnitTestUtil.getTestDataPath()+"/infinispan-replicated-config.xml");
-
+ config.setInfinispanConfigFile(ispn);
+ config.setJgroupsConfigFile(jgroups);
server.start(config, true);
HashMap<String, Collection<FunctionMethod>> udfs = new HashMap<String, Collection<FunctionMethod>>();
udfs.put("funcs", Arrays.asList(new FunctionMethod("pause", null, null, PushDown.CANNOT_PUSHDOWN, TestMatViews.class.getName(), "pause", null, new FunctionParameter("return", DataTypeManager.DefaultDataTypes.INTEGER), true, Determinism.NONDETERMINISTIC)));
Copied: trunk/test-integration/common/src/test/resources/infinispan-replicated-config-1.xml (from rev 4454, trunk/test-integration/common/src/test/resources/infinispan-replicated-config.xml)
===================================================================
--- trunk/test-integration/common/src/test/resources/infinispan-replicated-config-1.xml (rev 0)
+++ trunk/test-integration/common/src/test/resources/infinispan-replicated-config-1.xml 2012-09-18 19:18:44 UTC (rev 4456)
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:5.1 http://www.infinispan.org/schemas/infinispan-config-5.1.xsd" xmlns="urn:infinispan:config:5.1">
+
+ <global>
+ <transport clusterName="teiid-cluster" machineId="m1" rackId="r1" nodeName="Node-A">
+ <properties>
+ <property name="configurationFile" value="tcp-shared-1.xml"/>
+ </properties>
+ </transport>
+ </global>
+
+ <namedCache name="resultset">
+ <transaction transactionMode="TRANSACTIONAL"/>
+ <eviction maxEntries="1024" strategy="LIRS" />
+ <expiration lifespan="7200000"/>
+ <clustering mode="local"/>
+ </namedCache>
+
+ <namedCache name="resultset-repl">
+ <transaction transactionMode="TRANSACTIONAL"/>
+ <eviction maxEntries="1024" strategy="LIRS" />
+ <expiration lifespan="7200000"/>
+ <clustering mode="repl">
+ <sync/>
+ </clustering>
+ </namedCache>
+
+ <namedCache name="preparedplan">
+ <eviction maxEntries="512" strategy="LIRS"/>
+ <expiration lifespan="28800"/>
+ <clustering mode="local"/>
+ </namedCache>
+
+</infinispan>
\ No newline at end of file
Property changes on: trunk/test-integration/common/src/test/resources/infinispan-replicated-config-1.xml
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Deleted: trunk/test-integration/common/src/test/resources/infinispan-replicated-config.xml
===================================================================
--- trunk/test-integration/common/src/test/resources/infinispan-replicated-config.xml 2012-09-18 11:57:37 UTC (rev 4455)
+++ trunk/test-integration/common/src/test/resources/infinispan-replicated-config.xml 2012-09-18 19:18:44 UTC (rev 4456)
@@ -1,35 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:5.1 http://www.infinispan.org/schemas/infinispan-config-5.1.xsd" xmlns="urn:infinispan:config:5.1">
-
- <global>
- <transport clusterName="teiid-cluster" machineId="m1" rackId="r1" nodeName="Node-A">
- <properties>
- <property name="configurationFile" value="tcp.xml"/>
- </properties>
- </transport>
- </global>
-
- <namedCache name="resultset">
- <transaction transactionMode="TRANSACTIONAL"/>
- <eviction maxEntries="1024" strategy="LIRS" />
- <expiration lifespan="7200000"/>
- <clustering mode="local"/>
- </namedCache>
-
- <namedCache name="resultset-repl">
- <transaction transactionMode="TRANSACTIONAL"/>
- <eviction maxEntries="1024" strategy="LIRS" />
- <expiration lifespan="7200000"/>
- <clustering mode="repl">
- <sync/>
- </clustering>
- </namedCache>
-
- <namedCache name="preparedplan">
- <eviction maxEntries="512" strategy="LIRS"/>
- <expiration lifespan="28800"/>
- <clustering mode="local"/>
- </namedCache>
-
-</infinispan>
\ No newline at end of file
Added: trunk/test-integration/common/src/test/resources/tcp-shared-1.xml
===================================================================
--- trunk/test-integration/common/src/test/resources/tcp-shared-1.xml (rev 0)
+++ trunk/test-integration/common/src/test/resources/tcp-shared-1.xml 2012-09-18 19:18:44 UTC (rev 4456)
@@ -0,0 +1,70 @@
+<!--
+ TCP based stack, with flow control and message bundling. This is usually used when IP
+ multicasting cannot be used in a network, e.g. because it is disabled (routers discard multicast).
+ Note that TCP.bind_addr and TCPPING.initial_hosts should be set, possibly via system properties, e.g.
+ -Djgroups.bind_addr=192.168.5.2 and -Djgroups.tcpping.initial_hosts=192.168.5.2[7800]
+ author: Bela Ban
+-->
+<config xmlns="urn:org:jgroups"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.0.xsd">
+ <TCP bind_port="7800"
+ loopback="false"
+ recv_buf_size="${tcp.recv_buf_size:20M}"
+ send_buf_size="${tcp.send_buf_size:640K}"
+ discard_incompatible_packets="true"
+ max_bundle_size="64K"
+ max_bundle_timeout="30"
+ enable_bundling="true"
+ use_send_queues="true"
+ sock_conn_timeout="300"
+
+ timer_type="new"
+ timer.min_threads="4"
+ timer.max_threads="10"
+ timer.keep_alive_time="3000"
+ timer.queue_max_size="500"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="10"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="100"
+ thread_pool.rejection_policy="discard"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="discard"
+ singleton_name="tcp-teiid-1"/>
+
+ <TCPPING timeout="3000"
+ initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7800],localhost[7801]}"
+ port_range="1"
+ num_initial_members="3"/>
+ <MERGE2 min_interval="10000"
+ max_interval="30000"/>
+ <FD_SOCK/>
+ <FD timeout="3000" max_tries="3" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK use_mcast_xmit="false"
+ exponential_backoff="500"
+ discard_delivered_msgs="true"/>
+ <UNICAST />
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="4M"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+ view_bundling="true"/>
+ <UFC max_credits="2M"
+ min_threshold="0.4"/>
+ <MFC max_credits="2M"
+ min_threshold="0.4"/>
+ <FRAG2 frag_size="60K" />
+ <pbcast.STATE_TRANSFER/>
+</config>
\ No newline at end of file
Property changes on: trunk/test-integration/common/src/test/resources/tcp-shared-1.xml
___________________________________________________________________
Added: svn:mime-type
+ text/plain
12 years, 5 months
teiid SVN: r4455 - in trunk/engine/src: test/java/org/teiid/common/buffer/impl and 1 other directory.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2012-09-18 07:57:37 -0400 (Tue, 18 Sep 2012)
New Revision: 4455
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
Log:
TEIID-2218 making sure that close can be called more than once
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2012-09-17 18:31:05 UTC (rev 4454)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2012-09-18 11:57:37 UTC (rev 4455)
@@ -114,6 +114,9 @@
@Override
public void close() throws IOException {
+ if (closed) {
+ return;
+ }
flush();
closed = true;
if (bytesWritten) {
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java 2012-09-17 18:31:05 UTC (rev 4454)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java 2012-09-18 11:57:37 UTC (rev 4455)
@@ -104,6 +104,15 @@
fsos.flush();
assertEquals(0, fsos.getCount());
}
+
+ @Test public void testClose() throws Exception {
+ FileStorageManager sm = getStorageManager(null, null);
+ FileStore store = sm.createFileStore("0");
+ FileStoreOutputStream fsos = store.createOutputStream(2);
+ fsos.write(new byte[100000]);
+ fsos.close();
+ fsos.close();
+ }
static Random r = new Random();
12 years, 5 months
teiid SVN: r4454 - in trunk: jboss-integration/src/main/java/org/teiid/replication/jboss and 8 other directories.
by teiid-commits@lists.jboss.org
Author: rareddy
Date: 2012-09-17 14:31:05 -0400 (Mon, 17 Sep 2012)
New Revision: 4454
Added:
trunk/runtime/src/main/java/org/teiid/replication/
trunk/runtime/src/main/java/org/teiid/replication/jgroups/
trunk/runtime/src/main/java/org/teiid/replication/jgroups/AddressWrapper.java
trunk/runtime/src/main/java/org/teiid/replication/jgroups/ChannelFactory.java
trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsInputStream.java
trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsObjectReplicator.java
trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsOutputStream.java
Removed:
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
Modified:
trunk/jboss-integration/src/main/java/org/teiid/jboss/IntegrationPlugin.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java
trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java
trunk/runtime/src/main/java/org/teiid/runtime/RuntimePlugin.java
trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties
trunk/test-integration/common/pom.xml
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java
Log:
TEIID-2215: refactored and moved code that is part of jboss-integration into runtime module to support the jgroups based object replication for clustered embedded Teiid.
Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/IntegrationPlugin.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/IntegrationPlugin.java 2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/IntegrationPlugin.java 2012-09-17 18:31:05 UTC (rev 4454)
@@ -56,9 +56,7 @@
TEIID50017, // vdb.xml parse exception
TEIID50018, // failed VDB dependency processing
TEIID50019, // redeploying VDB
- TEIID50020, // replication error failed to pull
TEIID50021, // vdb defined translator not found
- TEIID50022, // replication error timeout during the pull
TEIID50023, // replication failed
TEIID50024, // failed metadata load
TEIID50025, // VDB deployed
@@ -72,7 +70,6 @@
TEIID50039, // socket_disabled
TEIID50040, // odbc_disabled
TEIID50041, // embedded disabled
- TEIID50042, // error state
TEIID50043,
TEIID50044, // vdb save failed
TEIID50047,
@@ -82,7 +79,6 @@
TEIID50055,
TEIID50056,
TEIID50057,
- TEIID50067,
TEIID50069,
TEIID50070,
TEIID50071,
Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java 2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java 2012-09-17 18:31:05 UTC (rev 4454)
@@ -21,23 +21,32 @@
*/
package org.teiid.jboss;
+import java.util.concurrent.Executor;
+
import org.jboss.as.clustering.jgroups.ChannelFactory;
import org.jboss.msc.service.Service;
import org.jboss.msc.service.StartContext;
import org.jboss.msc.service.StartException;
import org.jboss.msc.service.StopContext;
import org.jboss.msc.value.InjectedValue;
-import org.teiid.replication.jboss.JGroupsObjectReplicator;
+import org.jgroups.Channel;
+import org.teiid.replication.jgroups.JGroupsObjectReplicator;
class JGroupsObjectReplicatorService implements Service<JGroupsObjectReplicator> {
public final InjectedValue<ChannelFactory> channelFactoryInjector = new InjectedValue<ChannelFactory>();
+ final InjectedValue<Executor> executorInjector = new InjectedValue<Executor>();
private JGroupsObjectReplicator replicator;
@Override
public void start(StartContext context) throws StartException {
- this.replicator = new JGroupsObjectReplicator(channelFactoryInjector.getValue());
+ this.replicator = new JGroupsObjectReplicator(new org.teiid.replication.jgroups.ChannelFactory() {
+ @Override
+ public Channel createChannel(String id) throws Exception {
+ return channelFactoryInjector.getValue().createChannel(id);
+ }
+ }, executorInjector.getValue());
}
@Override
Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java 2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java 2012-09-17 18:31:05 UTC (rev 4454)
@@ -83,7 +83,7 @@
import org.teiid.logging.LogManager;
import org.teiid.query.ObjectReplicator;
import org.teiid.query.function.SystemFunctionManager;
-import org.teiid.replication.jboss.JGroupsObjectReplicator;
+import org.teiid.replication.jgroups.JGroupsObjectReplicator;
import org.teiid.services.InternalEventDistributorFactory;
class TeiidAdd extends AbstractAddStepHandler implements DescriptionProvider {
@@ -247,6 +247,7 @@
JGroupsObjectReplicatorService replicatorService = new JGroupsObjectReplicatorService();
ServiceBuilder<JGroupsObjectReplicator> serviceBuilder = target.addService(TeiidServiceNames.OBJECT_REPLICATOR, replicatorService);
serviceBuilder.addDependency(ServiceName.JBOSS.append("jgroups", "stack", stack), ChannelFactory.class, replicatorService.channelFactoryInjector); //$NON-NLS-1$ //$NON-NLS-2$
+ serviceBuilder.addDependency(TeiidServiceNames.executorServiceName(asyncThreadPoolName), Executor.class, replicatorService.executorInjector);
newControllers.add(serviceBuilder.install());
LogManager.logInfo(LogConstants.CTX_RUNTIME, IntegrationPlugin.Util.gs(IntegrationPlugin.Event.TEIID50003));
} else {
Deleted: trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java 2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java 2012-09-17 18:31:05 UTC (rev 4454)
@@ -1,86 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.replication.jboss;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-
-import org.jgroups.Address;
-import org.teiid.core.util.ReflectionHelper;
-
-/**
- * Allows JGroups {@link Address} objects to be serializable
- */
-public final class AddressWrapper implements Externalizable {
-
- Address address;
-
- public AddressWrapper() {
-
- }
-
- public AddressWrapper(Address address) {
- this.address = address;
- }
-
- @Override
- public int hashCode() {
- return address.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- if (!(obj instanceof AddressWrapper)) {
- return false;
- }
- return address.equals(((AddressWrapper)obj).address);
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- String className = in.readUTF();
- try {
- this.address = (Address) ReflectionHelper.create(className, null, Thread.currentThread().getContextClassLoader());
- this.address.readFrom(in);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeUTF(address.getClass().getName());
- try {
- address.writeTo(out);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
-}
\ No newline at end of file
Deleted: trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java 2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java 2012-09-17 18:31:05 UTC (rev 4454)
@@ -1,117 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.replication.jboss;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-public class JGroupsInputStream extends InputStream {
-
- private long timeout = 15000;
- private volatile byte[] buf;
- private volatile int index=0;
- private ReentrantLock lock = new ReentrantLock();
- private Condition write = lock.newCondition();
- private Condition doneReading = lock.newCondition();
-
- public JGroupsInputStream(long timeout) {
- this.timeout = timeout;
- }
-
- @Override
- public int read() throws IOException {
- if (index < 0) {
- return -1;
- }
- if (buf == null) {
- lock.lock();
- try {
- long waitTime = TimeUnit.MILLISECONDS.toNanos(timeout);
- while (buf == null) {
- waitTime = write.awaitNanos(waitTime);
- if (waitTime <= 0) {
- throw new IOException(new TimeoutException());
- }
- }
- if (index < 0) {
- return -1;
- }
- } catch(InterruptedException e) {
- throw new IOException(e);
- } finally {
- lock.unlock();
- }
- }
- if (index == buf.length) {
- lock.lock();
- try {
- buf = null;
- index = 0;
- doneReading.signal();
- } finally {
- lock.unlock();
- }
- return read();
- }
- return buf[index++] & 0xff;
- }
-
- @Override
- public void close() {
- lock.lock();
- try {
- buf = null;
- index = -1;
- doneReading.signal();
- } finally {
- lock.unlock();
- }
- }
-
- public void receive(byte[] bytes) throws InterruptedException {
- lock.lock();
- try {
- if (index == -1) {
- return;
- }
- while (buf != null) {
- doneReading.await();
- }
- if (index == -1) {
- return;
- }
- buf = bytes;
- if (bytes == null) {
- index = -1;
- }
- write.signal();
- } finally {
- lock.unlock();
- }
- }
-
-}
\ No newline at end of file
Deleted: trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2012-09-17 18:31:05 UTC (rev 4454)
@@ -1,615 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.replication.jboss;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import org.jboss.as.clustering.jgroups.ChannelFactory;
-import org.jgroups.Address;
-import org.jgroups.Channel;
-import org.jgroups.MembershipListener;
-import org.jgroups.Message;
-import org.jgroups.MessageListener;
-import org.jgroups.ReceiverAdapter;
-import org.jgroups.View;
-import org.jgroups.blocks.MethodCall;
-import org.jgroups.blocks.MethodLookup;
-import org.jgroups.blocks.RequestOptions;
-import org.jgroups.blocks.ResponseMode;
-import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.util.Buffer;
-import org.jgroups.util.Promise;
-import org.jgroups.util.Rsp;
-import org.jgroups.util.RspList;
-import org.teiid.Replicated;
-import org.teiid.Replicated.ReplicationMode;
-import org.teiid.core.TeiidRuntimeException;
-import org.teiid.core.util.ObjectInputStreamWithClassloader;
-import org.teiid.jboss.IntegrationPlugin;
-import org.teiid.logging.LogConstants;
-import org.teiid.logging.LogManager;
-import org.teiid.query.ObjectReplicator;
-import org.teiid.query.ReplicatedObject;
-
-@SuppressWarnings("unchecked")
-public class JGroupsObjectReplicator implements ObjectReplicator, Serializable {
-
- private static final int IO_TIMEOUT = 15000;
-
- private final class ReplicatorRpcDispatcher<S> extends RpcDispatcher {
- private final S object;
- private boolean initialized;
- private final HashMap<Method, Short> methodMap;
- private final ArrayList<Method> methodList;
- Map<List<?>, JGroupsInputStream> inputStreams = new ConcurrentHashMap<List<?>, JGroupsInputStream>();
-
- private ReplicatorRpcDispatcher(Channel channel, MessageListener l,
- MembershipListener l2, Object serverObj, S object,
- HashMap<Method, Short> methodMap, ArrayList<Method> methodList) {
- super(channel, l, l2, serverObj);
- this.object = object;
- this.methodMap = methodMap;
- this.methodList = methodList;
- setMarshaller(new ContextAwareMarshaller(getClass().getClassLoader()));
- }
-
- @Override
- public Object handle(Message req) {
- Object body=null;
-
- if(req == null || req.getLength() == 0) {
- if(log.isErrorEnabled()) log.error("message or message buffer is null"); //$NON-NLS-1$
- return null;
- }
-
- try {
- body=req_marshaller != null?
- req_marshaller.objectFromBuffer(req.getBuffer(), req.getOffset(), req.getLength())
- : req.getObject();
- }
- catch(Throwable e) {
- if(log.isErrorEnabled()) log.error("exception marshalling object", e); //$NON-NLS-1$
- return e;
- }
-
- if(!(body instanceof MethodCall)) {
- if(log.isErrorEnabled()) log.error("message does not contain a MethodCall object"); //$NON-NLS-1$
-
- // create an exception to represent this and return it
- return new IllegalArgumentException("message does not contain a MethodCall object") ; //$NON-NLS-1$
- }
-
- final MethodCall method_call=(MethodCall)body;
-
- try {
- if(log.isTraceEnabled())
- log.trace("[sender=" + req.getSrc() + "], method_call: " + method_call); //$NON-NLS-1$ //$NON-NLS-2$
-
- if (method_call.getId() >= methodList.size() - 5 && req.getSrc().equals(local_addr)) {
- return null;
- }
-
- if (method_call.getId() >= methodList.size() - 3) {
- Serializable address = new AddressWrapper(req.getSrc());
- Serializable stateId = (Serializable)method_call.getArgs()[0];
- List<?> key = Arrays.asList(stateId, address);
- JGroupsInputStream is = inputStreams.get(key);
- if (method_call.getId() == methodList.size() - 3) {
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "create state", stateId); //$NON-NLS-1$
- if (is != null) {
- is.receive(null);
- }
- is = new JGroupsInputStream(IO_TIMEOUT);
- this.inputStreams.put(key, is);
- executor.execute(new StreamingRunner(object, stateId, is, null));
- } else if (method_call.getId() == methodList.size() - 2) {
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "building state", stateId); //$NON-NLS-1$
- if (is != null) {
- is.receive((byte[])method_call.getArgs()[1]);
- }
- } else if (method_call.getId() == methodList.size() - 1) {
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "finished state", stateId); //$NON-NLS-1$
- if (is != null) {
- is.receive(null);
- }
- this.inputStreams.remove(key);
- }
- return null;
- } else if (method_call.getId() == methodList.size() - 5) {
- //hasState
- ReplicatedObject ro = (ReplicatedObject)object;
- Serializable stateId = (Serializable)method_call.getArgs()[0];
-
- if (stateId == null) {
- synchronized (this) {
- if (initialized) {
- return Boolean.TRUE;
- }
- return null;
- }
- }
-
- if (ro.hasState(stateId)) {
- return Boolean.TRUE;
- }
- return null;
- } else if (method_call.getId() == methodList.size() - 4) {
- //sendState
- ReplicatedObject ro = (ReplicatedObject)object;
- String stateId = (String)method_call.getArgs()[0];
- AddressWrapper dest = (AddressWrapper)method_call.getArgs()[1];
-
- JGroupsOutputStream oStream = new JGroupsOutputStream(this, Arrays.asList(dest.address), stateId, (short)(methodMap.size() - 3), false);
- try {
- if (stateId == null) {
- ro.getState(oStream);
- } else {
- ro.getState(stateId, oStream);
- }
- } finally {
- oStream.close();
- }
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
- return null;
- }
-
- Method m=method_lookup.findMethod(method_call.getId());
- if(m == null)
- throw new Exception("no method found for " + method_call.getId()); //$NON-NLS-1$
- method_call.setMethod(m);
-
- return method_call.invoke(server_obj);
- }
- catch(Throwable x) {
- return x;
- }
- }
- }
-
- private static final long serialVersionUID = -6851804958313095166L;
- private static final String HAS_STATE = "hasState"; //$NON-NLS-1$
- private static final String SEND_STATE = "sendState"; //$NON-NLS-1$
- private static final String CREATE_STATE = "createState"; //$NON-NLS-1$
- private static final String BUILD_STATE = "buildState"; //$NON-NLS-1$
- private static final String FINISH_STATE = "finishState"; //$NON-NLS-1$
-
- private final static class StreamingRunner implements Runnable {
- private final Object object;
- private final Serializable stateId;
- private final JGroupsInputStream is;
- private Promise<Boolean> promise;
-
- private StreamingRunner(Object object, Serializable stateId, JGroupsInputStream is, Promise<Boolean> promise) {
- this.object = object;
- this.stateId = stateId;
- this.is = is;
- this.promise = promise;
- }
-
- @Override
- public void run() {
- try {
- if (stateId == null) {
- ((ReplicatedObject<?>)object).setState(is);
- } else {
- ((ReplicatedObject)object).setState(stateId, is);
- }
- if (promise != null) {
- promise.setResult(Boolean.TRUE);
- }
- LogManager.logDetail(LogConstants.CTX_RUNTIME, "state set", stateId); //$NON-NLS-1$
- } catch (Exception e) {
- if (promise != null) {
- promise.setResult(Boolean.FALSE);
- }
- LogManager.logError(LogConstants.CTX_RUNTIME, e, IntegrationPlugin.Util.gs(IntegrationPlugin.Event.TEIID50042,stateId));
- } finally {
- is.close();
- }
- }
- }
-
- private final class ReplicatedInvocationHandler<S> extends ReceiverAdapter implements
- InvocationHandler, Serializable {
-
- private static final int PULL_RETRIES = 3;
- private static final long serialVersionUID = -2943462899945966103L;
- private final S object;
- private transient ReplicatorRpcDispatcher<S> disp;
- private final HashMap<Method, Short> methodMap;
- protected List<Address> remoteMembers = Collections.synchronizedList(new ArrayList<Address>());
- private Map<Serializable, Promise<Boolean>> loadingStates = new HashMap<Serializable, Promise<Boolean>>();
-
- private ReplicatedInvocationHandler(S object,HashMap<Method, Short> methodMap) {
- this.object = object;
- this.methodMap = methodMap;
- }
-
- List<Address> getRemoteMembersCopy() {
- synchronized (remoteMembers) {
- return new ArrayList<Address>(remoteMembers);
- }
- }
-
- public void setDisp(ReplicatorRpcDispatcher<S> disp) {
- this.disp = disp;
- }
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args)
- throws Throwable {
- Short methodNum = methodMap.get(method);
- if (methodNum == null || remoteMembers.isEmpty()) {
- if (methodNum != null) {
- Replicated annotation = method.getAnnotation(Replicated.class);
- if (annotation != null && annotation.remoteOnly()) {
- return null;
- }
- }
- try {
- return method.invoke(object, args);
- } catch (InvocationTargetException e) {
- throw e.getCause();
- }
- }
- try {
- Replicated annotation = method.getAnnotation(Replicated.class);
- if (annotation.replicateState() != ReplicationMode.NONE) {
- return handleReplicateState(method, args, annotation);
- }
- MethodCall call=new MethodCall(methodNum, args);
- List<Address> dests = null;
- if (annotation.remoteOnly()) {
- dests = getRemoteMembersCopy();
- if (dests.isEmpty()) {
- return null;
- }
- }
- RspList<Object> responses = disp.callRemoteMethods(dests, call, new RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()).setAnycasting(dests != null));
- if (annotation.asynch()) {
- return null;
- }
- List<Object> results = responses.getResults();
- if (method.getReturnType() == boolean.class) {
- for (Object o : results) {
- if (!Boolean.TRUE.equals(o)) {
- return false;
- }
- }
- return true;
- } else if (method.getReturnType() == Collection.class) {
- ArrayList<Object> result = new ArrayList<Object>();
- for (Object o : results) {
- result.addAll((Collection)o);
- }
- return results;
- }
- return null;
- } catch(Exception e) {
- throw new RuntimeException(method + " " + args + " failed", e); //$NON-NLS-1$ //$NON-NLS-2$
- }
- }
-
- protected Address whereIsState(Serializable stateId, long timeout) throws Exception {
- if (remoteMembers.isEmpty()) {
- return null;
- }
- RspList<Boolean> resp = this.disp.callRemoteMethods(getRemoteMembersCopy(), new MethodCall((short)(methodMap.size() - 5), new Object[]{stateId}), new RequestOptions(ResponseMode.GET_ALL, timeout));
- Collection<Rsp<Boolean>> values = resp.values();
- Rsp<Boolean> rsp = null;
- for (Rsp<Boolean> response : values) {
- if (Boolean.TRUE.equals(response.getValue())) {
- rsp = response;
- break;
- }
- }
- if (rsp == null) {
- return null;
- }
- return rsp.getSender();
- }
-
- private Object handleReplicateState(Method method, Object[] args,
- Replicated annotation) throws IllegalAccessException,
- Throwable, IOException, IllegalStateException, Exception {
- Object result = null;
- try {
- result = method.invoke(object, args);
- } catch (InvocationTargetException e) {
- throw e.getCause();
- }
- ReplicatedObject ro = (ReplicatedObject)object;
- Serializable stateId = (Serializable)args[0];
- if (annotation.replicateState() == ReplicationMode.PUSH) {
- if (!remoteMembers.isEmpty()) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating state", stateId); //$NON-NLS-1$
- JGroupsOutputStream oStream = new JGroupsOutputStream(disp, null, stateId, (short)(methodMap.size() - 3), true);
- try {
- ro.getState(stateId, oStream);
- } finally {
- oStream.close();
- }
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
- }
- return result;
- }
- if (result != null) {
- return result;
- }
- long timeout = annotation.timeout();
- return pullState(method, args, stateId, timeout);
- }
-
- /**
- * Pull the remote state. The method and args are optional
- * to determine if the state has been made available.
- */
- Object pullState(Method method, Object[] args, Serializable stateId,
- long timeout) throws Throwable {
- Object result = null;
- for (int i = 0; i < PULL_RETRIES; i++) {
- Promise<Boolean> p = null;
- boolean wait = true;
- synchronized (loadingStates) {
- p = loadingStates.get(stateId);
- if (p == null) {
- wait = false;
- if (method != null) {
- try {
- result = method.invoke(object, args);
- } catch (InvocationTargetException e) {
- throw e.getCause();
- }
- if (result != null) {
- return result;
- }
- }
- p = new Promise<Boolean>();
- loadingStates.put(stateId, p);
- }
- }
- if (wait) {
- p.getResult(timeout);
- continue;
- }
- try {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state", stateId); //$NON-NLS-1$
- Address addr = whereIsState(stateId, timeout);
- if (addr == null) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "timeout exceeded or first member"); //$NON-NLS-1$
- break;
- }
- JGroupsInputStream is = new JGroupsInputStream(IO_TIMEOUT);
- StreamingRunner runner = new StreamingRunner(object, stateId, is, p);
- List<?> key = Arrays.asList(stateId, new AddressWrapper(addr));
- disp.inputStreams.put(key, is);
- executor.execute(runner);
-
- this.disp.callRemoteMethod(addr, new MethodCall((short)(methodMap.size() - 4), stateId, new AddressWrapper(this.disp.getChannel().getAddress())), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
-
- Boolean fetched = p.getResult(timeout);
-
- if (fetched != null) {
- if (fetched) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulled state", stateId); //$NON-NLS-1$
- if (method !=null) {
- try {
- result = method.invoke(object, args);
- } catch (InvocationTargetException e) {
- throw e.getCause();
- }
- if (result != null) {
- return result;
- }
- }
- break;
- }
- LogManager.logWarning(LogConstants.CTX_RUNTIME, IntegrationPlugin.Util.gs(IntegrationPlugin.Event.TEIID50020, object, stateId));
- } else {
- LogManager.logWarning(LogConstants.CTX_RUNTIME, IntegrationPlugin.Util.gs(IntegrationPlugin.Event.TEIID50022, object, stateId));
- }
- } finally {
- synchronized (loadingStates) {
- loadingStates.remove(stateId);
- }
- }
- }
- return null; //could not fetch the remote state
- }
-
- @Override
- public void viewAccepted(View newView) {
- if (newView.getMembers() != null) {
- synchronized (remoteMembers) {
- remoteMembers.removeAll(newView.getMembers());
- if (object instanceof ReplicatedObject<?> && !remoteMembers.isEmpty()) {
- HashSet<Serializable> dropped = new HashSet<Serializable>();
- for (Address address : remoteMembers) {
- dropped.add(new AddressWrapper(address));
- }
- ((ReplicatedObject<?>)object).droppedMembers(dropped);
- }
- remoteMembers.clear();
- remoteMembers.addAll(newView.getMembers());
- remoteMembers.remove(this.disp.getChannel().getAddress());
- }
- }
- }
- }
-
- private interface Streaming {
- void sendState(Serializable id, AddressWrapper dest);
- void createState(Serializable id);
- void buildState(Serializable id, byte[] bytes);
- void finishState(Serializable id);
- }
-
- //TODO: this should be configurable, or use a common executor
- private transient Executor executor = Executors.newCachedThreadPool();
- private transient ChannelFactory channelFactory;
-
- public JGroupsObjectReplicator(ChannelFactory channelFactory) {
- this.channelFactory = channelFactory;
- }
-
- public void stop(Object object) {
- if (object == null || !Proxy.isProxyClass(object.getClass())) {
- return;
- }
- ReplicatedInvocationHandler<?> handler = (ReplicatedInvocationHandler<?>) Proxy.getInvocationHandler(object);
- Channel c = handler.disp.getChannel();
- handler.disp.stop();
- c.close();
- }
-
- @Override
- public <T, S> T replicate(String mux_id,
- Class<T> iface, final S object, long startTimeout) throws Exception {
- Channel channel = channelFactory.createChannel(mux_id);
-
- // To keep the order of methods same at all the nodes.
- TreeMap<String, Method> methods = new TreeMap<String, Method>();
- for (Method method : iface.getMethods()) {
- if (method.getAnnotation(Replicated.class) == null) {
- continue;
- }
- methods.put(method.toGenericString(), method);
- }
-
- final HashMap<Method, Short> methodMap = new HashMap<Method, Short>();
- final ArrayList<Method> methodList = new ArrayList<Method>();
-
- for (String method : methods.keySet()) {
- methodList.add(methods.get(method));
- methodMap.put(methods.get(method), (short)(methodList.size() - 1));
- }
-
- Method hasState = ReplicatedObject.class.getMethod(HAS_STATE, new Class<?>[] {Serializable.class});
- methodList.add(hasState);
- methodMap.put(hasState, (short)(methodList.size() - 1));
-
- Method sendState = JGroupsObjectReplicator.Streaming.class.getMethod(SEND_STATE, new Class<?>[] {Serializable.class, AddressWrapper.class});
- methodList.add(sendState);
- methodMap.put(sendState, (short)(methodList.size() - 1));
-
- //add in streaming methods
- Method createState = JGroupsObjectReplicator.Streaming.class.getMethod(CREATE_STATE, new Class<?>[] {Serializable.class});
- methodList.add(createState);
- methodMap.put(createState, (short)(methodList.size() - 1));
- Method buildState = JGroupsObjectReplicator.Streaming.class.getMethod(BUILD_STATE, new Class<?>[] {Serializable.class, byte[].class});
- methodList.add(buildState);
- methodMap.put(buildState, (short)(methodList.size() - 1));
- Method finishState = JGroupsObjectReplicator.Streaming.class.getMethod(FINISH_STATE, new Class<?>[] {Serializable.class});
- methodList.add(finishState);
- methodMap.put(finishState, (short)(methodList.size() - 1));
-
- ReplicatedInvocationHandler<S> proxy = new ReplicatedInvocationHandler<S>(object, methodMap);
- /*
- * TODO: could have an object implement streaming
- * Override the normal handle method to support streaming
- */
- ReplicatorRpcDispatcher disp = new ReplicatorRpcDispatcher<S>(channel, proxy, proxy, object, object, methodMap, methodList);
-
- proxy.setDisp(disp);
- disp.setMethodLookup(new MethodLookup() {
- public Method findMethod(short id) {
- return methodList.get(id);
- }
- });
-
- T replicatedProxy = (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] {iface}, proxy);
- boolean success = false;
- try {
- channel.connect(mux_id);
- if (object instanceof ReplicatedObject) {
- ((ReplicatedObject)object).setAddress(new AddressWrapper(channel.getAddress()));
- proxy.pullState(null, null, null, startTimeout);
- }
- success = true;
- return replicatedProxy;
- } catch (Throwable e) {
- if (e instanceof Exception) {
- throw (Exception)e;
- }
- throw new TeiidRuntimeException(IntegrationPlugin.Event.TEIID50067, e);
- } finally {
- if (!success) {
- channel.close();
- } else {
- synchronized (disp) {
- //mark as initialized so that state can be pulled if needed
- disp.initialized = true;
- }
- }
- }
- }
-
- // This class is used so that the objects are loaded with the current classes class loader
- // rather than foreign class loader
- static class ContextAwareMarshaller implements RpcDispatcher.Marshaller {
- private ClassLoader classloader;
-
- public ContextAwareMarshaller(ClassLoader classloader) {
- this.classloader = classloader;
- }
-
- @Override
- public Buffer objectToBuffer(Object obj) throws Exception {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(baos);
- out.writeObject(obj);
- out.close();
- return new Buffer(baos.toByteArray());
- }
-
- @Override
- public Object objectFromBuffer(byte[] buf, int offset, int length) throws Exception {
- ObjectInputStream in = new ObjectInputStreamWithClassloader(new ByteArrayInputStream(buf, offset, length), this.classloader);
- Object anObj = in.readObject();
- in.close();
- return anObj;
- }
- }
-
-}
Deleted: trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java 2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java 2012-09-17 18:31:05 UTC (rev 4454)
@@ -1,104 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.replication.jboss;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.jgroups.Address;
-import org.jgroups.blocks.MethodCall;
-import org.jgroups.blocks.RequestOptions;
-import org.jgroups.blocks.ResponseMode;
-import org.jgroups.blocks.RpcDispatcher;
-import org.teiid.core.types.Streamable;
-
-public class JGroupsOutputStream extends OutputStream {
-
- static final int CHUNK_SIZE=Streamable.STREAMING_BATCH_SIZE_IN_BYTES;
-
- protected final RpcDispatcher disp;
- protected final List<Address> dests;
- protected final Serializable stateId;
- protected final short methodOffset;
-
- private volatile boolean closed=false;
- private final byte[] buffer=new byte[CHUNK_SIZE];
- private int index=0;
-
- public JGroupsOutputStream(RpcDispatcher disp, List<Address> dests, Serializable stateId, short methodOffset, boolean sendCreate) throws IOException {
- this.disp=disp;
- this.dests=dests;
- this.stateId=stateId;
- this.methodOffset = methodOffset;
- if (sendCreate) {
- try {
- disp.callRemoteMethods(this.dests, new MethodCall(methodOffset, new Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
- } catch(Exception e) {
- throw new IOException(e);
- }
- }
- }
-
- public void close() throws IOException {
- if(closed) {
- return;
- }
- flush();
- try {
- disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 2), new Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
- } catch(Exception e) {
- }
- closed=true;
- }
-
- public void flush() throws IOException {
- checkClosed();
- try {
- if(index == 0) {
- return;
- }
- disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 1), new Object[] {stateId, Arrays.copyOf(buffer, index)}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
- index=0;
- } catch(Exception e) {
- throw new IOException(e);
- }
- }
-
- private void checkClosed() throws IOException {
- if(closed) {
- throw new IOException("output stream is closed"); //$NON-NLS-1$
- }
- }
-
- public void write(int b) throws IOException {
- checkClosed();
- if(index >= buffer.length) {
- flush();
- }
- buffer[index++]=(byte)b;
- }
-
-}
\ No newline at end of file
Modified: trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
===================================================================
--- trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties 2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties 2012-09-17 18:31:05 UTC (rev 4454)
@@ -55,7 +55,7 @@
TEIID50029=VDB {0}.{1} model "{2}" metadata is currently being loaded. Start Time: {3}
TEIID50030=VDB {0}.{1} model "{2}" metadata loaded. End Time: {3}
TEIID50036=VDB {0}.{1} model "{2}" metadata failed to load. Reason:{3}
-TEIID50042=error setting state {0}
+
TEIID50043=Invalid metadata file found at {0}; delete this file and restart server.
TEIID50069=Failed to load module {0}
TEIID50089=Failed to find any services of type {0} from module {1}
@@ -188,8 +188,6 @@
TEIID50005=Clearing cache {0} for vdb {1}.{2}
TEIID50021=VDB {0}.{1} deployed in inactive state due to unavailability of data sources {2}
TEIID50016=Invalid VDB file deployment failed {0}
-TEIID50020= {0} Failed to Pull {1}
-TEIID50022={0} timeout pulling {1}
TEIID50078=Translator not found {0}
no_vdb_found=VDB {0}.{1} not found or VDB is not in ACTIVE status
no_model_found= VDB {0}.{1} does not have model with name {2}.
Copied: trunk/runtime/src/main/java/org/teiid/replication/jgroups/AddressWrapper.java (from rev 4449, trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java)
===================================================================
--- trunk/runtime/src/main/java/org/teiid/replication/jgroups/AddressWrapper.java (rev 0)
+++ trunk/runtime/src/main/java/org/teiid/replication/jgroups/AddressWrapper.java 2012-09-17 18:31:05 UTC (rev 4454)
@@ -0,0 +1,86 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jgroups;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.jgroups.Address;
+import org.teiid.core.util.ReflectionHelper;
+
+/**
+ * Allows JGroups {@link Address} objects to be serializable
+ */
+public final class AddressWrapper implements Externalizable {
+
+ Address address;
+
+ public AddressWrapper() {
+
+ }
+
+ public AddressWrapper(Address address) {
+ this.address = address;
+ }
+
+ @Override
+ public int hashCode() {
+ return address.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof AddressWrapper)) {
+ return false;
+ }
+ return address.equals(((AddressWrapper)obj).address);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ String className = in.readUTF();
+ try {
+ this.address = (Address) ReflectionHelper.create(className, null, Thread.currentThread().getContextClassLoader());
+ this.address.readFrom(in);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(address.getClass().getName());
+ try {
+ address.writeTo(out);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+}
\ No newline at end of file
Property changes on: trunk/runtime/src/main/java/org/teiid/replication/jgroups/AddressWrapper.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: svn:mergeinfo
+ /branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/AddressWrapper.java:3673-3711,3754-3769
/branches/7.7.x/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java:3816-3868,3876-3915,3920-3983
/branches/8.0.x/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java:4002-4046,4048-4051
/branches/8.1.x/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java:4319-4342
/trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/AddressWrapper.java:3507-3666
Added: trunk/runtime/src/main/java/org/teiid/replication/jgroups/ChannelFactory.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/replication/jgroups/ChannelFactory.java (rev 0)
+++ trunk/runtime/src/main/java/org/teiid/replication/jgroups/ChannelFactory.java 2012-09-17 18:31:05 UTC (rev 4454)
@@ -0,0 +1,33 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package org.teiid.replication.jgroups;
+
+import org.jgroups.Channel;
+
+public interface ChannelFactory {
+ /**
+ * Creates a JGroups channel
+ * @return a JGroups channel
+ * @throws Exception if there was a failure setting up the protocol stack
+ */
+ Channel createChannel(String id) throws Exception;
+}
Property changes on: trunk/runtime/src/main/java/org/teiid/replication/jgroups/ChannelFactory.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Copied: trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsInputStream.java (from rev 4449, trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java)
===================================================================
--- trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsInputStream.java (rev 0)
+++ trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsInputStream.java 2012-09-17 18:31:05 UTC (rev 4454)
@@ -0,0 +1,117 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jgroups;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class JGroupsInputStream extends InputStream {
+
+ private long timeout = 15000;
+ private volatile byte[] buf;
+ private volatile int index=0;
+ private ReentrantLock lock = new ReentrantLock();
+ private Condition write = lock.newCondition();
+ private Condition doneReading = lock.newCondition();
+
+ public JGroupsInputStream(long timeout) {
+ this.timeout = timeout;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (index < 0) {
+ return -1;
+ }
+ if (buf == null) {
+ lock.lock();
+ try {
+ long waitTime = TimeUnit.MILLISECONDS.toNanos(timeout);
+ while (buf == null) {
+ waitTime = write.awaitNanos(waitTime);
+ if (waitTime <= 0) {
+ throw new IOException(new TimeoutException());
+ }
+ }
+ if (index < 0) {
+ return -1;
+ }
+ } catch(InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+ if (index == buf.length) {
+ lock.lock();
+ try {
+ buf = null;
+ index = 0;
+ doneReading.signal();
+ } finally {
+ lock.unlock();
+ }
+ return read();
+ }
+ return buf[index++] & 0xff;
+ }
+
+ @Override
+ public void close() {
+ lock.lock();
+ try {
+ buf = null;
+ index = -1;
+ doneReading.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void receive(byte[] bytes) throws InterruptedException {
+ lock.lock();
+ try {
+ if (index == -1) {
+ return;
+ }
+ while (buf != null) {
+ doneReading.await();
+ }
+ if (index == -1) {
+ return;
+ }
+ buf = bytes;
+ if (bytes == null) {
+ index = -1;
+ }
+ write.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+}
\ No newline at end of file
Property changes on: trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsInputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: svn:mergeinfo
+ /branches/7.4.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:3535-3555
/branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:3673-3711,3754-3769
/branches/7.7.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:3816-3868,3876-3915,3920-3983
/branches/8.0.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:4002-4046,4048-4051
/branches/8.1.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:4319-4342
/trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:3507-3666
Copied: trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsObjectReplicator.java (from rev 4449, trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java)
===================================================================
--- trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsObjectReplicator.java (rev 0)
+++ trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsObjectReplicator.java 2012-09-17 18:31:05 UTC (rev 4454)
@@ -0,0 +1,614 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jgroups;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.MembershipListener;
+import org.jgroups.Message;
+import org.jgroups.MessageListener;
+import org.jgroups.ReceiverAdapter;
+import org.jgroups.View;
+import org.jgroups.blocks.MethodCall;
+import org.jgroups.blocks.MethodLookup;
+import org.jgroups.blocks.RequestOptions;
+import org.jgroups.blocks.ResponseMode;
+import org.jgroups.blocks.RpcDispatcher;
+import org.jgroups.util.Buffer;
+import org.jgroups.util.Promise;
+import org.jgroups.util.Rsp;
+import org.jgroups.util.RspList;
+import org.teiid.Replicated;
+import org.teiid.Replicated.ReplicationMode;
+import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.util.ObjectInputStreamWithClassloader;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.query.ObjectReplicator;
+import org.teiid.query.ReplicatedObject;
+import org.teiid.runtime.RuntimePlugin;
+
+@SuppressWarnings("unchecked")
+public class JGroupsObjectReplicator implements ObjectReplicator, Serializable {
+
+ private static final int IO_TIMEOUT = 15000;
+
+ private final class ReplicatorRpcDispatcher<S> extends RpcDispatcher {
+ private final S object;
+ private boolean initialized;
+ private final HashMap<Method, Short> methodMap;
+ private final ArrayList<Method> methodList;
+ Map<List<?>, JGroupsInputStream> inputStreams = new ConcurrentHashMap<List<?>, JGroupsInputStream>();
+
+ private ReplicatorRpcDispatcher(Channel channel, MessageListener l,
+ MembershipListener l2, Object serverObj, S object,
+ HashMap<Method, Short> methodMap, ArrayList<Method> methodList) {
+ super(channel, l, l2, serverObj);
+ this.object = object;
+ this.methodMap = methodMap;
+ this.methodList = methodList;
+ setMarshaller(new ContextAwareMarshaller(getClass().getClassLoader()));
+ }
+
+ @Override
+ public Object handle(Message req) {
+ Object body=null;
+
+ if(req == null || req.getLength() == 0) {
+ if(log.isErrorEnabled()) log.error("message or message buffer is null"); //$NON-NLS-1$
+ return null;
+ }
+
+ try {
+ body=req_marshaller != null?
+ req_marshaller.objectFromBuffer(req.getBuffer(), req.getOffset(), req.getLength())
+ : req.getObject();
+ }
+ catch(Throwable e) {
+ if(log.isErrorEnabled()) log.error("exception marshalling object", e); //$NON-NLS-1$
+ return e;
+ }
+
+ if(!(body instanceof MethodCall)) {
+ if(log.isErrorEnabled()) log.error("message does not contain a MethodCall object"); //$NON-NLS-1$
+
+ // create an exception to represent this and return it
+ return new IllegalArgumentException("message does not contain a MethodCall object") ; //$NON-NLS-1$
+ }
+
+ final MethodCall method_call=(MethodCall)body;
+
+ try {
+ if(log.isTraceEnabled())
+ log.trace("[sender=" + req.getSrc() + "], method_call: " + method_call); //$NON-NLS-1$ //$NON-NLS-2$
+
+ if (method_call.getId() >= methodList.size() - 5 && req.getSrc().equals(local_addr)) {
+ return null;
+ }
+
+ if (method_call.getId() >= methodList.size() - 3) {
+ Serializable address = new AddressWrapper(req.getSrc());
+ Serializable stateId = (Serializable)method_call.getArgs()[0];
+ List<?> key = Arrays.asList(stateId, address);
+ JGroupsInputStream is = inputStreams.get(key);
+ if (method_call.getId() == methodList.size() - 3) {
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "create state", stateId); //$NON-NLS-1$
+ if (is != null) {
+ is.receive(null);
+ }
+ is = new JGroupsInputStream(IO_TIMEOUT);
+ this.inputStreams.put(key, is);
+ executor.execute(new StreamingRunner(object, stateId, is, null));
+ } else if (method_call.getId() == methodList.size() - 2) {
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "building state", stateId); //$NON-NLS-1$
+ if (is != null) {
+ is.receive((byte[])method_call.getArgs()[1]);
+ }
+ } else if (method_call.getId() == methodList.size() - 1) {
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "finished state", stateId); //$NON-NLS-1$
+ if (is != null) {
+ is.receive(null);
+ }
+ this.inputStreams.remove(key);
+ }
+ return null;
+ } else if (method_call.getId() == methodList.size() - 5) {
+ //hasState
+ ReplicatedObject ro = (ReplicatedObject)object;
+ Serializable stateId = (Serializable)method_call.getArgs()[0];
+
+ if (stateId == null) {
+ synchronized (this) {
+ if (initialized) {
+ return Boolean.TRUE;
+ }
+ return null;
+ }
+ }
+
+ if (ro.hasState(stateId)) {
+ return Boolean.TRUE;
+ }
+ return null;
+ } else if (method_call.getId() == methodList.size() - 4) {
+ //sendState
+ ReplicatedObject ro = (ReplicatedObject)object;
+ String stateId = (String)method_call.getArgs()[0];
+ AddressWrapper dest = (AddressWrapper)method_call.getArgs()[1];
+
+ JGroupsOutputStream oStream = new JGroupsOutputStream(this, Arrays.asList(dest.address), stateId, (short)(methodMap.size() - 3), false);
+ try {
+ if (stateId == null) {
+ ro.getState(oStream);
+ } else {
+ ro.getState(stateId, oStream);
+ }
+ } finally {
+ oStream.close();
+ }
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
+ return null;
+ }
+
+ Method m=method_lookup.findMethod(method_call.getId());
+ if(m == null)
+ throw new Exception("no method found for " + method_call.getId()); //$NON-NLS-1$
+ method_call.setMethod(m);
+
+ return method_call.invoke(server_obj);
+ }
+ catch(Throwable x) {
+ return x;
+ }
+ }
+ }
+
+ private static final long serialVersionUID = -6851804958313095166L;
+ private static final String HAS_STATE = "hasState"; //$NON-NLS-1$
+ private static final String SEND_STATE = "sendState"; //$NON-NLS-1$
+ private static final String CREATE_STATE = "createState"; //$NON-NLS-1$
+ private static final String BUILD_STATE = "buildState"; //$NON-NLS-1$
+ private static final String FINISH_STATE = "finishState"; //$NON-NLS-1$
+
+ private final static class StreamingRunner implements Runnable {
+ private final Object object;
+ private final Serializable stateId;
+ private final JGroupsInputStream is;
+ private Promise<Boolean> promise;
+
+ private StreamingRunner(Object object, Serializable stateId, JGroupsInputStream is, Promise<Boolean> promise) {
+ this.object = object;
+ this.stateId = stateId;
+ this.is = is;
+ this.promise = promise;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (stateId == null) {
+ ((ReplicatedObject<?>)object).setState(is);
+ } else {
+ ((ReplicatedObject)object).setState(stateId, is);
+ }
+ if (promise != null) {
+ promise.setResult(Boolean.TRUE);
+ }
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, "state set", stateId); //$NON-NLS-1$
+ } catch (Exception e) {
+ if (promise != null) {
+ promise.setResult(Boolean.FALSE);
+ }
+ LogManager.logError(LogConstants.CTX_RUNTIME, e, RuntimePlugin.Util.gs(RuntimePlugin.Event.TEIID40101, stateId));
+ } finally {
+ is.close();
+ }
+ }
+ }
+
+ private final class ReplicatedInvocationHandler<S> extends ReceiverAdapter implements
+ InvocationHandler, Serializable {
+
+ private static final int PULL_RETRIES = 3;
+ private static final long serialVersionUID = -2943462899945966103L;
+ private final S object;
+ private transient ReplicatorRpcDispatcher<S> disp;
+ private final HashMap<Method, Short> methodMap;
+ protected List<Address> remoteMembers = Collections.synchronizedList(new ArrayList<Address>());
+ private Map<Serializable, Promise<Boolean>> loadingStates = new HashMap<Serializable, Promise<Boolean>>();
+
+ private ReplicatedInvocationHandler(S object,HashMap<Method, Short> methodMap) {
+ this.object = object;
+ this.methodMap = methodMap;
+ }
+
+ List<Address> getRemoteMembersCopy() {
+ synchronized (remoteMembers) {
+ return new ArrayList<Address>(remoteMembers);
+ }
+ }
+
+ public void setDisp(ReplicatorRpcDispatcher<S> disp) {
+ this.disp = disp;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ Short methodNum = methodMap.get(method);
+ if (methodNum == null || remoteMembers.isEmpty()) {
+ if (methodNum != null) {
+ Replicated annotation = method.getAnnotation(Replicated.class);
+ if (annotation != null && annotation.remoteOnly()) {
+ return null;
+ }
+ }
+ try {
+ return method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ }
+ try {
+ Replicated annotation = method.getAnnotation(Replicated.class);
+ if (annotation.replicateState() != ReplicationMode.NONE) {
+ return handleReplicateState(method, args, annotation);
+ }
+ MethodCall call=new MethodCall(methodNum, args);
+ List<Address> dests = null;
+ if (annotation.remoteOnly()) {
+ dests = getRemoteMembersCopy();
+ if (dests.isEmpty()) {
+ return null;
+ }
+ }
+ RspList<Object> responses = disp.callRemoteMethods(dests, call, new RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()).setAnycasting(dests != null));
+ if (annotation.asynch()) {
+ return null;
+ }
+ List<Object> results = responses.getResults();
+ if (method.getReturnType() == boolean.class) {
+ for (Object o : results) {
+ if (!Boolean.TRUE.equals(o)) {
+ return false;
+ }
+ }
+ return true;
+ } else if (method.getReturnType() == Collection.class) {
+ ArrayList<Object> result = new ArrayList<Object>();
+ for (Object o : results) {
+ result.addAll((Collection)o);
+ }
+ return results;
+ }
+ return null;
+ } catch(Exception e) {
+ throw new RuntimeException(method + " " + args + " failed", e); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ }
+
+ protected Address whereIsState(Serializable stateId, long timeout) throws Exception {
+ if (remoteMembers.isEmpty()) {
+ return null;
+ }
+ RspList<Boolean> resp = this.disp.callRemoteMethods(getRemoteMembersCopy(), new MethodCall((short)(methodMap.size() - 5), new Object[]{stateId}), new RequestOptions(ResponseMode.GET_ALL, timeout));
+ Collection<Rsp<Boolean>> values = resp.values();
+ Rsp<Boolean> rsp = null;
+ for (Rsp<Boolean> response : values) {
+ if (Boolean.TRUE.equals(response.getValue())) {
+ rsp = response;
+ break;
+ }
+ }
+ if (rsp == null) {
+ return null;
+ }
+ return rsp.getSender();
+ }
+
+ private Object handleReplicateState(Method method, Object[] args,
+ Replicated annotation) throws IllegalAccessException,
+ Throwable, IOException, IllegalStateException, Exception {
+ Object result = null;
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ ReplicatedObject ro = (ReplicatedObject)object;
+ Serializable stateId = (Serializable)args[0];
+ if (annotation.replicateState() == ReplicationMode.PUSH) {
+ if (!remoteMembers.isEmpty()) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating state", stateId); //$NON-NLS-1$
+ JGroupsOutputStream oStream = new JGroupsOutputStream(disp, null, stateId, (short)(methodMap.size() - 3), true);
+ try {
+ ro.getState(stateId, oStream);
+ } finally {
+ oStream.close();
+ }
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
+ }
+ return result;
+ }
+ if (result != null) {
+ return result;
+ }
+ long timeout = annotation.timeout();
+ return pullState(method, args, stateId, timeout);
+ }
+
+ /**
+ * Pull the remote state. The method and args are optional
+ * to determine if the state has been made available.
+ */
+ Object pullState(Method method, Object[] args, Serializable stateId,
+ long timeout) throws Throwable {
+ Object result = null;
+ for (int i = 0; i < PULL_RETRIES; i++) {
+ Promise<Boolean> p = null;
+ boolean wait = true;
+ synchronized (loadingStates) {
+ p = loadingStates.get(stateId);
+ if (p == null) {
+ wait = false;
+ if (method != null) {
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ if (result != null) {
+ return result;
+ }
+ }
+ p = new Promise<Boolean>();
+ loadingStates.put(stateId, p);
+ }
+ }
+ if (wait) {
+ p.getResult(timeout);
+ continue;
+ }
+ try {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state", stateId); //$NON-NLS-1$
+ Address addr = whereIsState(stateId, timeout);
+ if (addr == null) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "timeout exceeded or first member"); //$NON-NLS-1$
+ break;
+ }
+ JGroupsInputStream is = new JGroupsInputStream(IO_TIMEOUT);
+ StreamingRunner runner = new StreamingRunner(object, stateId, is, p);
+ List<?> key = Arrays.asList(stateId, new AddressWrapper(addr));
+ disp.inputStreams.put(key, is);
+ executor.execute(runner);
+
+ this.disp.callRemoteMethod(addr, new MethodCall((short)(methodMap.size() - 4), stateId, new AddressWrapper(this.disp.getChannel().getAddress())), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
+
+ Boolean fetched = p.getResult(timeout);
+
+ if (fetched != null) {
+ if (fetched) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulled state", stateId); //$NON-NLS-1$
+ if (method !=null) {
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ if (result != null) {
+ return result;
+ }
+ }
+ break;
+ }
+ LogManager.logWarning(LogConstants.CTX_RUNTIME, RuntimePlugin.Util.gs(RuntimePlugin.Event.TEIID40102, object, stateId));
+ } else {
+ LogManager.logWarning(LogConstants.CTX_RUNTIME, RuntimePlugin.Util.gs(RuntimePlugin.Event.TEIID40103, object, stateId));
+ }
+ } finally {
+ synchronized (loadingStates) {
+ loadingStates.remove(stateId);
+ }
+ }
+ }
+ return null; //could not fetch the remote state
+ }
+
+ @Override
+ public void viewAccepted(View newView) {
+ if (newView.getMembers() != null) {
+ synchronized (remoteMembers) {
+ remoteMembers.removeAll(newView.getMembers());
+ if (object instanceof ReplicatedObject<?> && !remoteMembers.isEmpty()) {
+ HashSet<Serializable> dropped = new HashSet<Serializable>();
+ for (Address address : remoteMembers) {
+ dropped.add(new AddressWrapper(address));
+ }
+ ((ReplicatedObject<?>)object).droppedMembers(dropped);
+ }
+ remoteMembers.clear();
+ remoteMembers.addAll(newView.getMembers());
+ remoteMembers.remove(this.disp.getChannel().getAddress());
+ }
+ }
+ }
+ }
+
+ private interface Streaming {
+ void sendState(Serializable id, AddressWrapper dest);
+ void createState(Serializable id);
+ void buildState(Serializable id, byte[] bytes);
+ void finishState(Serializable id);
+ }
+
+ //TODO: this should be configurable, or use a common executor
+ private transient Executor executor;
+ private transient ChannelFactory channelFactory;
+
+ public JGroupsObjectReplicator(ChannelFactory channelFactory, Executor executor) {
+ this.channelFactory = channelFactory;
+ this.executor = executor;
+ }
+
+ public void stop(Object object) {
+ if (object == null || !Proxy.isProxyClass(object.getClass())) {
+ return;
+ }
+ ReplicatedInvocationHandler<?> handler = (ReplicatedInvocationHandler<?>) Proxy.getInvocationHandler(object);
+ Channel c = handler.disp.getChannel();
+ handler.disp.stop();
+ c.close();
+ }
+
+ @Override
+ public <T, S> T replicate(String mux_id,
+ Class<T> iface, final S object, long startTimeout) throws Exception {
+ Channel channel = channelFactory.createChannel(mux_id);
+
+ // To keep the order of methods same at all the nodes.
+ TreeMap<String, Method> methods = new TreeMap<String, Method>();
+ for (Method method : iface.getMethods()) {
+ if (method.getAnnotation(Replicated.class) == null) {
+ continue;
+ }
+ methods.put(method.toGenericString(), method);
+ }
+
+ final HashMap<Method, Short> methodMap = new HashMap<Method, Short>();
+ final ArrayList<Method> methodList = new ArrayList<Method>();
+
+ for (String method : methods.keySet()) {
+ methodList.add(methods.get(method));
+ methodMap.put(methods.get(method), (short)(methodList.size() - 1));
+ }
+
+ Method hasState = ReplicatedObject.class.getMethod(HAS_STATE, new Class<?>[] {Serializable.class});
+ methodList.add(hasState);
+ methodMap.put(hasState, (short)(methodList.size() - 1));
+
+ Method sendState = JGroupsObjectReplicator.Streaming.class.getMethod(SEND_STATE, new Class<?>[] {Serializable.class, AddressWrapper.class});
+ methodList.add(sendState);
+ methodMap.put(sendState, (short)(methodList.size() - 1));
+
+ //add in streaming methods
+ Method createState = JGroupsObjectReplicator.Streaming.class.getMethod(CREATE_STATE, new Class<?>[] {Serializable.class});
+ methodList.add(createState);
+ methodMap.put(createState, (short)(methodList.size() - 1));
+ Method buildState = JGroupsObjectReplicator.Streaming.class.getMethod(BUILD_STATE, new Class<?>[] {Serializable.class, byte[].class});
+ methodList.add(buildState);
+ methodMap.put(buildState, (short)(methodList.size() - 1));
+ Method finishState = JGroupsObjectReplicator.Streaming.class.getMethod(FINISH_STATE, new Class<?>[] {Serializable.class});
+ methodList.add(finishState);
+ methodMap.put(finishState, (short)(methodList.size() - 1));
+
+ ReplicatedInvocationHandler<S> proxy = new ReplicatedInvocationHandler<S>(object, methodMap);
+ /*
+ * TODO: could have an object implement streaming
+ * Override the normal handle method to support streaming
+ */
+ ReplicatorRpcDispatcher disp = new ReplicatorRpcDispatcher<S>(channel, proxy, proxy, object, object, methodMap, methodList);
+
+ proxy.setDisp(disp);
+ disp.setMethodLookup(new MethodLookup() {
+ public Method findMethod(short id) {
+ return methodList.get(id);
+ }
+ });
+
+ T replicatedProxy = (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] {iface}, proxy);
+ boolean success = false;
+ try {
+ channel.connect(mux_id);
+ if (object instanceof ReplicatedObject) {
+ ((ReplicatedObject)object).setAddress(new AddressWrapper(channel.getAddress()));
+ proxy.pullState(null, null, null, startTimeout);
+ }
+ success = true;
+ return replicatedProxy;
+ } catch (Throwable e) {
+ if (e instanceof Exception) {
+ throw (Exception)e;
+ }
+ throw new TeiidRuntimeException(RuntimePlugin.Event.TEIID40104, e);
+ } finally {
+ if (!success) {
+ channel.close();
+ } else {
+ synchronized (disp) {
+ //mark as initialized so that state can be pulled if needed
+ disp.initialized = true;
+ }
+ }
+ }
+ }
+
+ // This class is used so that the objects are loaded with the current classes class loader
+ // rather than foreign class loader
+ static class ContextAwareMarshaller implements RpcDispatcher.Marshaller {
+ private ClassLoader classloader;
+
+ public ContextAwareMarshaller(ClassLoader classloader) {
+ this.classloader = classloader;
+ }
+
+ @Override
+ public Buffer objectToBuffer(Object obj) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(baos);
+ out.writeObject(obj);
+ out.close();
+ return new Buffer(baos.toByteArray());
+ }
+
+ @Override
+ public Object objectFromBuffer(byte[] buf, int offset, int length) throws Exception {
+ ObjectInputStream in = new ObjectInputStreamWithClassloader(new ByteArrayInputStream(buf, offset, length), this.classloader);
+ Object anObj = in.readObject();
+ in.close();
+ return anObj;
+ }
+ }
+
+}
Property changes on: trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsObjectReplicator.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: svn:mergeinfo
+ /branches/7.4.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:3535-3555
/branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:3673-3711,3754-3769
/branches/7.7.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:3816-3868,3876-3915,3920-3983
/branches/8.0.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:4002-4046,4048-4051
/branches/8.1.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:4319-4342
/trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:3507-3666
Copied: trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsOutputStream.java (from rev 4449, trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java)
===================================================================
--- trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsOutputStream.java (rev 0)
+++ trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsOutputStream.java 2012-09-17 18:31:05 UTC (rev 4454)
@@ -0,0 +1,104 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jgroups;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import org.jgroups.Address;
+import org.jgroups.blocks.MethodCall;
+import org.jgroups.blocks.RequestOptions;
+import org.jgroups.blocks.ResponseMode;
+import org.jgroups.blocks.RpcDispatcher;
+import org.teiid.core.types.Streamable;
+
+public class JGroupsOutputStream extends OutputStream {
+
+ static final int CHUNK_SIZE=Streamable.STREAMING_BATCH_SIZE_IN_BYTES;
+
+ protected final RpcDispatcher disp;
+ protected final List<Address> dests;
+ protected final Serializable stateId;
+ protected final short methodOffset;
+
+ private volatile boolean closed=false;
+ private final byte[] buffer=new byte[CHUNK_SIZE];
+ private int index=0;
+
+ public JGroupsOutputStream(RpcDispatcher disp, List<Address> dests, Serializable stateId, short methodOffset, boolean sendCreate) throws IOException {
+ this.disp=disp;
+ this.dests=dests;
+ this.stateId=stateId;
+ this.methodOffset = methodOffset;
+ if (sendCreate) {
+ try {
+ disp.callRemoteMethods(this.dests, new MethodCall(methodOffset, new Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
+ } catch(Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ if(closed) {
+ return;
+ }
+ flush();
+ try {
+ disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 2), new Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
+ } catch(Exception e) {
+ }
+ closed=true;
+ }
+
+ public void flush() throws IOException {
+ checkClosed();
+ try {
+ if(index == 0) {
+ return;
+ }
+ disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 1), new Object[] {stateId, Arrays.copyOf(buffer, index)}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
+ index=0;
+ } catch(Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void checkClosed() throws IOException {
+ if(closed) {
+ throw new IOException("output stream is closed"); //$NON-NLS-1$
+ }
+ }
+
+ public void write(int b) throws IOException {
+ checkClosed();
+ if(index >= buffer.length) {
+ flush();
+ }
+ buffer[index++]=(byte)b;
+ }
+
+}
\ No newline at end of file
Property changes on: trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsOutputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: svn:mergeinfo
+ /branches/7.4.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:3535-3555
/branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:3673-3711,3754-3769
/branches/7.7.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:3816-3868,3876-3915,3920-3983
/branches/8.0.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:4002-4046,4048-4051
/branches/8.1.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:4319-4342
/trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:3507-3666
Modified: trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java 2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java 2012-09-17 18:31:05 UTC (rev 4454)
@@ -24,11 +24,14 @@
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.Executors;
import javax.resource.spi.work.WorkManager;
import javax.transaction.TransactionManager;
import org.infinispan.manager.DefaultCacheManager;
+import org.jgroups.Channel;
+import org.jgroups.JChannel;
import org.teiid.cache.CacheFactory;
import org.teiid.cache.infinispan.InfinispanCacheFactory;
import org.teiid.core.TeiidRuntimeException;
@@ -36,6 +39,8 @@
import org.teiid.dqp.internal.process.TeiidExecutor;
import org.teiid.dqp.internal.process.ThreadReuseExecutor;
import org.teiid.query.ObjectReplicator;
+import org.teiid.replication.jgroups.ChannelFactory;
+import org.teiid.replication.jgroups.JGroupsObjectReplicator;
import org.teiid.security.SecurityHelper;
public class EmbeddedConfiguration extends DQPConfiguration {
@@ -50,6 +55,7 @@
private CacheFactory cacheFactory;
private int maxResultSetCacheStaleness = 60;
private String infinispanConfigFile = "infinispan-config.xml"; //$NON-NLS-1$
+ private String jgroupsConfigFile = "tcp.xml"; //$NON-NLS-1$
public SecurityHelper getSecurityHelper() {
return securityHelper;
@@ -76,12 +82,23 @@
public void setTransactionManager(TransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
+
public ObjectReplicator getObjectReplicator() {
+ if (this.objectReplicator == null) {
+ this.objectReplicator = new JGroupsObjectReplicator(new ChannelFactory() {
+ @Override
+ public Channel createChannel(String id) throws Exception {
+ return new JChannel(this.getClass().getClassLoader().getResource(getJgroupsConfigFile()));
+ }
+ }, Executors.newCachedThreadPool());
+ }
return objectReplicator;
}
+
public void setObjectReplicator(ObjectReplicator objectReplicator) {
this.objectReplicator = objectReplicator;
}
+
/**
* Sets the {@link WorkManager} to be used instead of a {@link ThreadReuseExecutor}.
* This means that Teiid will not own the processing threads and will not necessarily be
@@ -151,5 +168,11 @@
}
public void setMaxResultSetCacheStaleness(int maxResultSetCacheStaleness) {
this.maxResultSetCacheStaleness = maxResultSetCacheStaleness;
+ }
+ public String getJgroupsConfigFile() {
+ return jgroupsConfigFile;
+ }
+ public void setJgroupsConfigFile(String jgroupsConfigFile) {
+ this.jgroupsConfigFile = jgroupsConfigFile;
}
}
Modified: trunk/runtime/src/main/java/org/teiid/runtime/RuntimePlugin.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/RuntimePlugin.java 2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/runtime/src/main/java/org/teiid/runtime/RuntimePlugin.java 2012-09-17 18:31:05 UTC (rev 4454)
@@ -112,5 +112,9 @@
TEIID40098,
TEIID40099,
TEIID40100,
+ TEIID40101,
+ TEIID40102,
+ TEIID40103,
+ TEIID40104
}
}
Modified: trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties
===================================================================
--- trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties 2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties 2012-09-17 18:31:05 UTC (rev 4454)
@@ -100,4 +100,8 @@
TEIID40096=Waited {0} for VDB {1}.{2} to be deployed, but it never was. Please check to see if the deployment is missing or is in error.
TEIID40097=Waited {0} for VDB {1}.{2} to be ACTIVE, but it never was. Please check it's sources - {3}.
TEIID40098=Reached end of results; use hasNext() call to check if there are more results before calling next()
-TEIID40099=Cache system has been shutdown
\ No newline at end of file
+TEIID40099=Cache system has been shutdown
+
+TEIID40101=error setting state {0}
+TEIID40102= {0} Failed to Pull {1}
+TEIID40103={0} timeout pulling {1}
\ No newline at end of file
Modified: trunk/test-integration/common/pom.xml
===================================================================
--- trunk/test-integration/common/pom.xml 2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/test-integration/common/pom.xml 2012-09-17 18:31:05 UTC (rev 4454)
@@ -39,18 +39,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
- <!--
- <dependency>
- <groupId>javax.enterprise</groupId>
- <artifactId>cdi-api</artifactId>
- <scope>test</scope>
- </dependency>
- -->
- <dependency>
- <groupId>org.jboss.as</groupId>
- <artifactId>jboss-as-clustering-jgroups</artifactId>
- <scope>provided</scope>
- </dependency>
</dependencies>
<profiles>
Modified: trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java 2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java 2012-09-17 18:31:05 UTC (rev 4454)
@@ -22,7 +22,8 @@
package org.teiid.systemmodel;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -30,9 +31,8 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.concurrent.Executors;
-import org.jboss.as.clustering.jgroups.ChannelFactory;
-import org.jboss.as.server.ServerEnvironment;
import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.junit.BeforeClass;
@@ -42,10 +42,11 @@
import org.teiid.jdbc.FakeServer;
import org.teiid.jdbc.FakeServer.DeployVDBParameter;
import org.teiid.metadata.FunctionMethod;
-import org.teiid.metadata.FunctionParameter;
import org.teiid.metadata.FunctionMethod.Determinism;
import org.teiid.metadata.FunctionMethod.PushDown;
-import org.teiid.replication.jboss.JGroupsObjectReplicator;
+import org.teiid.metadata.FunctionParameter;
+import org.teiid.replication.jgroups.ChannelFactory;
+import org.teiid.replication.jgroups.JGroupsObjectReplicator;
import org.teiid.runtime.EmbeddedConfiguration;
@SuppressWarnings("nls")
@@ -129,21 +130,8 @@
private FakeServer createServer() throws Exception {
FakeServer server = new FakeServer(false);
-
- JGroupsObjectReplicator jor = new JGroupsObjectReplicator(new ChannelFactory() {
- @Override
- public Channel createChannel(String id) throws Exception {
- return new JChannel(this.getClass().getClassLoader().getResource("tcp.xml"));
- }
- @Override
- public ServerEnvironment getServerEnvironment() {
- return null;
- }
- });
-
EmbeddedConfiguration config = new EmbeddedConfiguration();
- config.setObjectReplicator(jor);
config.setInfinispanConfigFile(UnitTestUtil.getTestDataPath()+"/infinispan-replicated-config.xml");
server.start(config, true);
12 years, 5 months
teiid SVN: r4453 - trunk/connectors/translator-object.
by teiid-commits@lists.jboss.org
Author: rareddy
Date: 2012-09-17 14:26:52 -0400 (Mon, 17 Sep 2012)
New Revision: 4453
Modified:
trunk/connectors/translator-object/
Log:
adding ignores
Property changes on: trunk/connectors/translator-object
___________________________________________________________________
Added: svn:ignore
+ target
12 years, 5 months
teiid SVN: r4452 - in trunk: engine/src/main/resources/org/teiid/query and 3 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2012-09-17 12:30:02 -0400 (Mon, 17 Sep 2012)
New Revision: 4452
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/resources/org/teiid/query/i18n.properties
trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
trunk/runtime/src/main/java/org/teiid/transport/ServerWorkItem.java
trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties
Log:
TEIID-2172 general logging cleanups
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2012-09-17 16:29:05 UTC (rev 4451)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2012-09-17 16:30:02 UTC (rev 4452)
@@ -326,8 +326,6 @@
}
private void handleThrowable(Throwable e) {
- LogManager.logDetail(LogConstants.CTX_DQP, e, "Request Thread", requestID, "- error occurred"); //$NON-NLS-1$ //$NON-NLS-2$
-
if (!isCanceled()) {
dqpCore.logMMCommand(this, Event.ERROR, null);
//Case 5558: Differentiate between system level errors and
@@ -345,10 +343,17 @@
} else {
elem = cause.getMessage();
}
- LogManager.logWarning(LogConstants.CTX_DQP, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID30020, e.getMessage(), requestID, e.getClass().getName(), elem));
+ String msg = QueryPlugin.Util.gs(QueryPlugin.Event.TEIID30020, e.getMessage(), requestID, e.getClass().getName(), elem);
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+ LogManager.logWarning(LogConstants.CTX_DQP, e, msg);
+ } else {
+ LogManager.logWarning(LogConstants.CTX_DQP, msg + QueryPlugin.Util.getString("stack_info")); //$NON-NLS-1$
+ }
} else {
LogManager.logError(LogConstants.CTX_DQP, e, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID30019, requestID));
- }
+ }
+ } else {
+ LogManager.logDetail(LogConstants.CTX_DQP, e, "Request Thread", requestID, "- error occurred after cancel"); //$NON-NLS-1$ //$NON-NLS-2$
}
this.processingException = e;
Modified: trunk/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- trunk/engine/src/main/resources/org/teiid/query/i18n.properties 2012-09-17 16:29:05 UTC (rev 4451)
+++ trunk/engine/src/main/resources/org/teiid/query/i18n.properties 2012-09-17 16:30:02 UTC (rev 4452)
@@ -846,7 +846,8 @@
TEIID30028=Failed to properly rollback autowrap transaction properly
TEIID30019=Unexpected exception for request {0}
-TEIID30020=Processing exception ''{0}'' for request {1}. Exception type {2} thrown from {3}. Enable more detailed logging to see the entire stacktrace.
+TEIID30020=Processing exception ''{0}'' for request {1}. Exception type {2} thrown from {3}.
+stack_info=\ Enable more detailed logging to see the entire stacktrace.
# #query (018.005)
ERR.018.005.0095 = User <{0}> is not entitled to action <{1}> for 1 or more of the groups/elements/procedures.
Modified: trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java 2012-09-17 16:29:05 UTC (rev 4451)
+++ trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java 2012-09-17 16:30:02 UTC (rev 4452)
@@ -333,8 +333,8 @@
this.sessionService.setDqp(this.dqp);
this.services.setSecurityHelper(this.sessionService.getSecurityHelper());
this.logon = new LogonImpl(sessionService, null);
- services.registerClientService(ILogon.class, logon, null);
- services.registerClientService(DQP.class, dqp, null);
+ services.registerClientService(ILogon.class, logon, LogConstants.CTX_SECURITY);
+ services.registerClientService(DQP.class, dqp, LogConstants.CTX_DQP);
initDriver();
running = true;
}
Modified: trunk/runtime/src/main/java/org/teiid/transport/ServerWorkItem.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/ServerWorkItem.java 2012-09-17 16:29:05 UTC (rev 4451)
+++ trunk/runtime/src/main/java/org/teiid/transport/ServerWorkItem.java 2012-09-17 16:30:02 UTC (rev 4452)
@@ -40,8 +40,10 @@
import org.teiid.core.crypto.CryptoException;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
+import org.teiid.logging.MessageLevel;
import org.teiid.net.socket.Message;
import org.teiid.net.socket.ServiceInvocationStruct;
+import org.teiid.query.QueryPlugin;
import org.teiid.runtime.RuntimePlugin;
import org.teiid.transport.ClientServiceRegistryImpl.ClientService;
@@ -159,7 +161,11 @@
cause = cause.getCause();
}
StackTraceElement elem = cause.getStackTrace()[0];
- LogManager.logDetail(context, e, "Processing exception for session", this.socketClientInstance.getWorkContext().getSessionId()); //$NON-NLS-1$
- LogManager.logWarning(context, RuntimePlugin.Util.gs(RuntimePlugin.Event.TEIID40011, e.getMessage(), this.socketClientInstance.getWorkContext().getSessionId(), e.getClass().getName(), elem));
+ String msg = RuntimePlugin.Util.gs(RuntimePlugin.Event.TEIID40011, e.getMessage(), this.socketClientInstance.getWorkContext().getSessionId(), e.getClass().getName(), elem);
+ if (LogManager.isMessageToBeRecorded(context, MessageLevel.DETAIL)) {
+ LogManager.logWarning(context, e, msg);
+ } else {
+ LogManager.logWarning(context, msg + QueryPlugin.Util.getString("stack_info")); ////$NON-NLS-1$
+ }
}
}
\ No newline at end of file
Modified: trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties
===================================================================
--- trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties 2012-09-17 16:29:05 UTC (rev 4451)
+++ trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties 2012-09-17 16:30:02 UTC (rev 4452)
@@ -32,7 +32,7 @@
TEIID40007 = Keepalive failed for session {0}
TEIID40009 = Admin [{0}] is terminating this session: {1}.
TEIID40017=Unexpected exception for session {0}
-TEIID40011=Processing exception ''{0}'' for session {1}. Exception type {2} thrown from {3}. Enable more detailed logging to see the entire stacktrace.
+TEIID40011=Processing exception ''{0}'' for session {1}. Exception type {2} thrown from {3}.
TEIID40070=Only {0} connections are allowed on this port. Component not found: {1}
SocketTransport.1=Bound to address {0} listening on port {1}
12 years, 5 months