teiid SVN: r3491 - in branches/7.1.x: build/kits/jboss-container/deploy/teiid and 1 other directories.
by teiid-commits@lists.jboss.org
Author: loleary
Date: 2011-09-13 19:22:06 -0400 (Tue, 13 Sep 2011)
New Revision: 3491
Modified:
branches/7.1.x/build/kits/jboss-container/deploy/teiid/teiid-cache-manager-jboss-beans-rename-me.xml
branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java
branches/7.1.x/pom.xml
Log:
TEIID-1493: When a node rejoins the cluster after the initial cache has been populated, during the join time the state has been set to not transfer. This need be set to transfer, also JBoss cache only transfers the state on "active" regions.
Modified: branches/7.1.x/build/kits/jboss-container/deploy/teiid/teiid-cache-manager-jboss-beans-rename-me.xml
===================================================================
--- branches/7.1.x/build/kits/jboss-container/deploy/teiid/teiid-cache-manager-jboss-beans-rename-me.xml 2011-09-13 23:21:52 UTC (rev 3490)
+++ branches/7.1.x/build/kits/jboss-container/deploy/teiid/teiid-cache-manager-jboss-beans-rename-me.xml 2011-09-13 23:22:06 UTC (rev 3491)
@@ -85,7 +85,7 @@
<!-- Hibernate 2LC can replicate custom types, so we use marshalling -->
<property name="useRegionBasedMarshalling">true</property>
<!-- Must match the value of "useRegionBasedMarshalling" -->
- <property name="inactiveOnStartup">true</property>
+ <property name="inactiveOnStartup">false</property>
<!-- Disable asynchronous RPC marshalling/sending -->
<property name="serializationExecutorPoolSize">0</property>
Modified: branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java
===================================================================
--- branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java 2011-09-13 23:21:52 UTC (rev 3490)
+++ branches/7.1.x/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java 2011-09-13 23:22:06 UTC (rev 3491)
@@ -59,6 +59,7 @@
if (!this.cacheStore.getCacheStatus().allowInvocations()) {
this.cacheStore.start();
+ this.cacheStore.getRegion(this.cacheStore.getRoot().getFqn(), true).activate();
}
Node cacheRoot = this.cacheStore.getRoot().addChild(Fqn.fromString("Teiid")); //$NON-NLS-1$
@@ -67,6 +68,7 @@
Region cacheRegion = this.cacheStore.getRegion(node.getFqn(), true);
cacheRegion.setEvictionRegionConfig(buildEvictionConfig(node.getFqn(), config));
+ cacheRegion.activate();
JBossCache jc = null;
if (config != null && config.getPolicy().equals(Policy.EXPIRATION)) {
Modified: branches/7.1.x/pom.xml
===================================================================
--- branches/7.1.x/pom.xml 2011-09-13 23:21:52 UTC (rev 3490)
+++ branches/7.1.x/pom.xml 2011-09-13 23:22:06 UTC (rev 3491)
@@ -365,7 +365,7 @@
<dependency>
<groupId>org.jboss.cache</groupId>
<artifactId>jbosscache-core</artifactId>
- <version>3.1.0.GA</version>
+ <version>3.2.5.GA</version>
<exclusions>
<exclusion>
<groupId>javax.transaction</groupId>
14 years, 7 months
teiid SVN: r3490 - branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers.
by teiid-commits@lists.jboss.org
Author: loleary
Date: 2011-09-13 19:21:52 -0400 (Tue, 13 Sep 2011)
New Revision: 3490
Modified:
branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
Log:
TEIID-1493 Rolling back the previous change.
Modified: branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
--- branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2011-09-13 23:21:43 UTC (rev 3489)
+++ branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2011-09-13 23:21:52 UTC (rev 3490)
@@ -62,6 +62,7 @@
import org.teiid.adminapi.AdminComponentException;
import org.teiid.adminapi.AdminException;
import org.teiid.adminapi.AdminProcessingException;
+import org.teiid.adminapi.Admin.Cache;
import org.teiid.adminapi.impl.CacheStatisticsMetadata;
import org.teiid.adminapi.impl.DQPManagement;
import org.teiid.adminapi.impl.RequestMetadata;
@@ -239,6 +240,10 @@
} catch (SessionServiceException e) {
//ignore
}
+
+ // dump the caches.
+ dqpCore.clearCache(Cache.PREPARED_PLAN_CACHE.toString(), name, version);
+ dqpCore.clearCache(Cache.QUERY_SERVICE_RESULT_SET_CACHE.toString(), name, version);
}
});
}
14 years, 7 months
teiid SVN: r3489 - branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers.
by teiid-commits@lists.jboss.org
Author: loleary
Date: 2011-09-13 19:21:43 -0400 (Tue, 13 Sep 2011)
New Revision: 3489
Modified:
branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
Log:
TEIID-1493: If the same VDB vesion is added and removed recently, the code was not flushing the cache contents; however in the case of startup, there will be no-entries in the recently removed map, the cache entries will be dumped for all the VDBs, thus invalidate the distributed cache entries. Here TEIID-1256 being reverted as this was wrong assumption from before of the behavior required. Removing the clearing of the cache entries entirely. Actually I am little confused about the intent of the map to keep track of the recently removed vdbs, as this doing opposite to what is documented in terms of the active connections.
Modified: branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
--- branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2011-09-13 23:21:32 UTC (rev 3488)
+++ branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2011-09-13 23:21:43 UTC (rev 3489)
@@ -62,7 +62,6 @@
import org.teiid.adminapi.AdminComponentException;
import org.teiid.adminapi.AdminException;
import org.teiid.adminapi.AdminProcessingException;
-import org.teiid.adminapi.Admin.Cache;
import org.teiid.adminapi.impl.CacheStatisticsMetadata;
import org.teiid.adminapi.impl.DQPManagement;
import org.teiid.adminapi.impl.RequestMetadata;
@@ -240,10 +239,6 @@
} catch (SessionServiceException e) {
//ignore
}
-
- // dump the caches.
- dqpCore.clearCache(Cache.PREPARED_PLAN_CACHE.toString(), name, version);
- dqpCore.clearCache(Cache.QUERY_SERVICE_RESULT_SET_CACHE.toString(), name, version);
}
});
}
14 years, 7 months
teiid SVN: r3488 - branches/7.1.x/runtime/src/main/java/org/teiid/odbc.
by teiid-commits@lists.jboss.org
Author: loleary
Date: 2011-09-13 19:21:32 -0400 (Tue, 13 Sep 2011)
New Revision: 3488
Modified:
branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemote.java
branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
Log:
TEIID-1652, TEIID-1653 : Handling the error conditions and doing rollback in case of an error
Modified: branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemote.java
===================================================================
--- branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemote.java 2011-09-13 23:21:21 UTC (rev 3487)
+++ branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemote.java 2011-09-13 23:21:32 UTC (rev 3488)
@@ -56,11 +56,6 @@
void flush();
- void cursorExecute(String prepareName, String sql);
- void cursorFetch(String prepareName, int rows);
- void cursorMove(String prepareName, int rows);
- void cursorClose(String prepareName);
-
// unimplemented frontend messages
// CopyData (F & B)
// CopyDone (F & B)
Modified: branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
===================================================================
--- branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java 2011-09-13 23:21:21 UTC (rev 3487)
+++ branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java 2011-09-13 23:21:32 UTC (rev 3488)
@@ -194,8 +194,7 @@
}
}
- @Override
- public void cursorExecute(String cursorName, String sql) {
+ private boolean cursorExecute(String cursorName, String sql) {
if (this.connection != null) {
if (sql != null) {
String modfiedSQL = sql.replaceAll("\\$\\d+", "?");//$NON-NLS-1$ //$NON-NLS-2$
@@ -211,6 +210,7 @@
boolean hasResults = stmt.execute();
this.cursorMap.put(cursorName, new Cursor(cursorName, sql, stmt, null, hasResults?stmt.getResultSet():null));
this.client.sendCommandComplete("DECLARE CURSOR", 0); //$NON-NLS-1$
+ return true;
} catch (SQLException e) {
this.client.errorOccurred(e);
} catch (IOException e) {
@@ -221,47 +221,55 @@
else {
this.client.errorOccurred(RuntimePlugin.Util.getString("no_active_connection")); //$NON-NLS-1$
}
-
+ return false;
}
- @Override
- public void cursorFetch(String cursorName, int rows) {
+
+ private boolean cursorFetch(String cursorName, int rows) {
Cursor cursor = this.cursorMap.get(cursorName);
if (cursor != null) {
cursor.fetchSize = rows;
this.client.sendCursorResults(cursor.rs, rows);
+ return true;
}
- else {
- this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", cursorName)); //$NON-NLS-1$
- return;
- }
+ this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", cursorName)); //$NON-NLS-1$
+ return false;
}
- @Override
- public void cursorMove(String prepareName, int rows) {
+ private boolean cursorMove(String prepareName, int rows) {
+ if (rows == 0) {
+ try {
+ this.client.sendCommandComplete("MOVE", 0); //$NON-NLS-1$
+ return true;
+ } catch (IOException e) {
+ this.client.errorOccurred(e);
+ return false;
+ }
+ }
+
Cursor cursor = this.cursorMap.get(prepareName);
if (cursor != null) {
this.client.sendMoveCursor(cursor.rs, rows);
+ return true;
}
- else {
- this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", prepareName)); //$NON-NLS-1$
- return;
- }
+ this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", prepareName)); //$NON-NLS-1$
+ return false;
}
- @Override
- public void cursorClose(String prepareName) {
+ private boolean cursorClose(String prepareName) {
Cursor cursor = this.cursorMap.remove(prepareName);
if (cursor != null) {
try {
cursor.rs.close();
cursor.stmt.close();
this.client.sendCommandComplete("CLOSE CURSOR", 0); //$NON-NLS-1$
+ return true;
} catch (SQLException e) {
this.client.errorOccurred(e);
} catch (IOException e) {
this.client.errorOccurred(e);
}
}
+ return false;
}
@Override
@@ -490,18 +498,19 @@
ScriptReader reader = new ScriptReader(new StringReader(query));
String sql = reader.readStatement();
while (sql != null) {
+ boolean success = true;
Matcher m = null;
if ((m = cursorSelectPattern.matcher(sql)).matches()){
- cursorExecute(m.group(1), fixSQL(m.group(4)));
+ success = cursorExecute(m.group(1), fixSQL(m.group(4)));
}
else if ((m = fetchPattern.matcher(sql)).matches()){
- cursorFetch(m.group(2), Integer.parseInt(m.group(1)));
+ success = cursorFetch(m.group(2), Integer.parseInt(m.group(1)));
}
else if ((m = movePattern.matcher(sql)).matches()){
- cursorMove(m.group(2), Integer.parseInt(m.group(1)));
+ success = cursorMove(m.group(2), Integer.parseInt(m.group(1)));
}
else if ((m = closePattern.matcher(sql)).matches()){
- cursorClose(m.group(1));
+ success = cursorClose(m.group(1));
}
else if ((m = savepointPattern.matcher(sql)).matches()) {
this.client.sendCommandComplete("SAVEPOINT", 0); //$NON-NLS-1$
@@ -513,12 +522,19 @@
closePreparedStatement(m.group(1));
this.client.sendCommandComplete("DEALLOCATE", 0); //$NON-NLS-1$
}
+ else {
+ success = executeAndSend(fixSQL(sql));
+ }
- else {
- if (!executeAndSend(fixSQL(sql))) {
- break;
+ if (!success) {
+ try {
+ if (!this.connection.getAutoCommit()) {
+ this.connection.rollback(false);
+ }
+ } catch (SQLException e) {
}
- }
+ break;
+ }
sql = reader.readStatement();
}
sync();
@@ -647,7 +663,8 @@
}
Prepared query = this.preparedMap.remove(preparedName);
if (query == null) {
- this.client.errorOccurred(RuntimePlugin.Util.getString("no_stmt_found", preparedName)); //$NON-NLS-1$
+ // since we pro actively closing the prepare, if deallocate comes in do not throw an error.
+ this.client.statementClosed();
}
else {
// Close all the bound messages off of this prepared
14 years, 7 months
teiid SVN: r3487 - branches/7.1.x/runtime/src/main/java/org/teiid/odbc.
by teiid-commits@lists.jboss.org
Author: loleary
Date: 2011-09-13 19:21:21 -0400 (Tue, 13 Sep 2011)
New Revision: 3487
Modified:
branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
Log:
TEIID-1653: correcting the regex to account for sql with new line characters
Modified: branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
===================================================================
--- branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java 2011-09-13 23:21:11 UTC (rev 3486)
+++ branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java 2011-09-13 23:21:21 UTC (rev 3487)
@@ -137,7 +137,7 @@
private static Pattern preparedAutoIncrement = Pattern.compile("select 1 \\s*from pg_catalog.pg_attrdef \\s*where adrelid = \\$1 AND adnum = \\$2 " + //$NON-NLS-1$
"\\s*and pg_catalog.pg_get_expr\\(adbin, adrelid\\) \\s*like '%nextval\\(%'", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
- private static Pattern cursorSelectPattern = Pattern.compile("DECLARE \"(\\w+)\" CURSOR(\\s(WITH HOLD|SCROLL))? FOR (.*)", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+ public static Pattern cursorSelectPattern = Pattern.compile("DECLARE \"(\\w+)\" CURSOR(\\s(WITH HOLD|SCROLL))? FOR (.*)", Pattern.CASE_INSENSITIVE|Pattern.DOTALL); //$NON-NLS-1$
private static Pattern fetchPattern = Pattern.compile("FETCH (\\d+) IN \"(\\w+)\".*", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
private static Pattern movePattern = Pattern.compile("MOVE (\\d+) IN \"(\\w+)\".*", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
private static Pattern closePattern = Pattern.compile("CLOSE \"(\\w+)\"", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
14 years, 7 months
teiid SVN: r3486 - in branches/7.1.x: engine/src/main/resources/org/teiid/query and 2 other directories.
by teiid-commits@lists.jboss.org
Author: loleary
Date: 2011-09-13 19:21:11 -0400 (Tue, 13 Sep 2011)
New Revision: 3486
Modified:
branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
branches/7.1.x/engine/src/main/resources/org/teiid/query/i18n.properties
branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
branches/7.1.x/runtime/src/main/java/org/teiid/deployers/VDBRepository.java
Log:
TEIID-1493 changing the listener logic to prevent unnecessary cache clearing
Modified: branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-09-13 23:20:46 UTC (rev 3485)
+++ branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-09-13 23:21:11 UTC (rev 3486)
@@ -557,7 +557,7 @@
}
private void clearPlanCache(String vdbName, int version){
- LogManager.logInfo(LogConstants.CTX_DQP, QueryPlugin.Util.getString("DQPCore.Clearing_prepared_plan_cache")); //$NON-NLS-1$
+ LogManager.logInfo(LogConstants.CTX_DQP, QueryPlugin.Util.getString("DQPCore.Clearing_prepared_plan_cache_for_vdb", vdbName, version)); //$NON-NLS-1$
this.prepPlanCache.clearForVDB(vdbName, version);
}
Modified: branches/7.1.x/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- branches/7.1.x/engine/src/main/resources/org/teiid/query/i18n.properties 2011-09-13 23:20:46 UTC (rev 3485)
+++ branches/7.1.x/engine/src/main/resources/org/teiid/query/i18n.properties 2011-09-13 23:21:11 UTC (rev 3486)
@@ -811,6 +811,8 @@
DQPCore.Unable_to_load_metadata_for_VDB_name__{0},_version__{1}=Unable to load metadata for VDB name= {0}, version= {1}
DQPCore.Unknown_query_metadata_exception_while_registering_query__{0}.=Unknown query metadata exception while registering query: {0}.
DQPCore.Clearing_prepared_plan_cache=Clearing prepared plan cache
+DQPCore.Clearing_prepared_plan_cache_for_vdb=Clearing prepared plan cache for vdb {0}.{1}
+DQPCore.clearing_resultset_cache=Clearing the resultset cache for vdb {0}.{1}
DQPCore.The_request_has_been_closed.=The request {0} has been closed.
DQPCore.The_atomic_request_has_been_cancelled=The atomic request {0} has been canceled.
DQPCore.failed_to_cancel=Failed to Cancel request, as request already finished processing
Modified: branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
--- branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2011-09-13 23:20:46 UTC (rev 3485)
+++ branches/7.1.x/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2011-09-13 23:21:11 UTC (rev 3486)
@@ -32,9 +32,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -78,6 +80,7 @@
import org.teiid.core.ComponentNotFoundException;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.util.LRUCache;
import org.teiid.deployers.VDBLifeCycleListener;
import org.teiid.deployers.VDBRepository;
import org.teiid.dqp.internal.process.DQPConfiguration;
@@ -101,6 +104,7 @@
import org.teiid.transport.ODBCSocketListener;
import org.teiid.transport.SocketConfiguration;
import org.teiid.transport.SocketListener;
+import org.teiid.vdb.runtime.VDBKey;
@ManagementObject(name="RuntimeEngineDeployer", isRuntime=true, componentType=@ManagementComponent(type="teiid",subtype="dqp"), properties=ManagementProperties.EXPLICIT)
@@ -212,14 +216,19 @@
// add vdb life cycle listeners
this.vdbRepository.addListener(new VDBLifeCycleListener() {
-
+
+ private Set<VDBKey> recentlyRemoved = Collections.newSetFromMap(new LRUCache<VDBKey, Boolean>(10000));
+
@Override
public void removed(String name, int version) {
-
+ recentlyRemoved.add(new VDBKey(name, version));
}
@Override
public void added(String name, int version) {
+ if (!recentlyRemoved.remove(new VDBKey(name, version))) {
+ return;
+ }
// terminate all the previous sessions
try {
Collection<SessionMetadata> sessions = sessionService.getActiveSessions();
Modified: branches/7.1.x/runtime/src/main/java/org/teiid/deployers/VDBRepository.java
===================================================================
--- branches/7.1.x/runtime/src/main/java/org/teiid/deployers/VDBRepository.java 2011-09-13 23:20:46 UTC (rev 3485)
+++ branches/7.1.x/runtime/src/main/java/org/teiid/deployers/VDBRepository.java 2011-09-13 23:21:11 UTC (rev 3486)
@@ -31,6 +31,7 @@
import java.util.NavigableMap;
import java.util.Properties;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.jboss.deployers.spi.DeploymentException;
import org.teiid.adminapi.AdminException;
@@ -60,7 +61,7 @@
private MetadataStore systemStore;
private MetadataStore odbcStore;
private boolean odbcEnabled = false;
- private List<VDBLifeCycleListener> listeners = new ArrayList<VDBLifeCycleListener>();
+ private List<VDBLifeCycleListener> listeners = new CopyOnWriteArrayList<VDBLifeCycleListener>();
private SystemFunctionManager systemFunctionManager;
public void addVDB(VDBMetaData vdb, MetadataStoreGroup stores, LinkedHashMap<String, Resource> visibilityMap, UDFMetaData udf, ConnectorManagerRepository cmr) throws DeploymentException {
@@ -157,7 +158,7 @@
this.odbcEnabled = true;
}
- public synchronized boolean removeVDB(String vdbName, int vdbVersion) {
+ public boolean removeVDB(String vdbName, int vdbVersion) {
VDBKey key = new VDBKey(vdbName, vdbVersion);
CompositeVDB removed = this.vdbRepo.remove(key);
if (removed != null) {
@@ -214,11 +215,11 @@
}
}
- public synchronized void addListener(VDBLifeCycleListener listener) {
+ public void addListener(VDBLifeCycleListener listener) {
this.listeners.add(listener);
}
- public synchronized void removeListener(VDBLifeCycleListener listener) {
+ public void removeListener(VDBLifeCycleListener listener) {
this.listeners.remove(listener);
}
14 years, 7 months
teiid SVN: r3485 - in branches/7.1.x/engine/src: main/java/org/teiid/query/sql/symbol and 1 other directories.
by teiid-commits@lists.jboss.org
Author: loleary
Date: 2011-09-13 19:20:46 -0400 (Tue, 13 Sep 2011)
New Revision: 3485
Modified:
branches/7.1.x/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RulePushAggregates.java
branches/7.1.x/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleRemoveOptionalJoins.java
branches/7.1.x/engine/src/main/java/org/teiid/query/sql/symbol/AggregateSymbol.java
branches/7.1.x/engine/src/test/java/org/teiid/query/optimizer/TestAggregatePushdown.java
Log:
TEIID-1656 adding support for pushdown of aggs over unions with grouping expressions
Modified: branches/7.1.x/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RulePushAggregates.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RulePushAggregates.java 2011-09-13 23:20:28 UTC (rev 3484)
+++ branches/7.1.x/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RulePushAggregates.java 2011-09-13 23:20:46 UTC (rev 3485)
@@ -29,6 +29,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
@@ -45,6 +46,7 @@
import org.teiid.query.function.FunctionLibrary;
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.metadata.TempMetadataAdapter;
+import org.teiid.query.metadata.TempMetadataID;
import org.teiid.query.metadata.TempMetadataStore;
import org.teiid.query.optimizer.capabilities.CapabilitiesFinder;
import org.teiid.query.optimizer.capabilities.SourceCapabilities.Capability;
@@ -58,6 +60,7 @@
import org.teiid.query.resolver.util.ResolverUtil;
import org.teiid.query.resolver.util.ResolverVisitor;
import org.teiid.query.rewriter.QueryRewriter;
+import org.teiid.query.sql.LanguageObject;
import org.teiid.query.sql.LanguageObject.Util;
import org.teiid.query.sql.lang.CompareCriteria;
import org.teiid.query.sql.lang.Criteria;
@@ -116,7 +119,7 @@
PlanNode setOp = child.getFirstChild();
try {
- pushGroupNodeOverUnion(plan, metadata, capFinder, groupNode, child, groupingExpressions, setOp);
+ pushGroupNodeOverUnion(plan, metadata, capFinder, groupNode, child, groupingExpressions, setOp, context, analysisRecord);
} catch (QueryResolverException e) {
throw new TeiidComponentException(e);
}
@@ -154,21 +157,66 @@
* source
* child 1
* ...
- *
+ *
* Or if the child does not support pushdown we add dummy aggregate projection
* count(*) = 1, count(x) = case x is null then 0 else 1 end, avg(x) = x, etc.
*/
private void pushGroupNodeOverUnion(PlanNode plan,
QueryMetadataInterface metadata, CapabilitiesFinder capFinder,
- PlanNode groupNode, PlanNode child,
- List<SingleElementSymbol> groupingExpressions, PlanNode setOp)
+ PlanNode groupNode, PlanNode unionSourceParent,
+ List<SingleElementSymbol> groupingExpressions, PlanNode setOp, CommandContext context, AnalysisRecord record)
throws TeiidComponentException, QueryMetadataException,
QueryPlannerException, QueryResolverException {
- if (setOp == null || setOp.getType() != NodeConstants.Types.SET_OP || setOp.getProperty(NodeConstants.Info.SET_OPERATION) != Operation.UNION) {
- return; //must not be a union
+ if (setOp == null) {
+ return;
}
+ PlanNode intermediateView = null;
+ if (setOp.getType() != NodeConstants.Types.SET_OP) {
+ if (setOp.getType() != NodeConstants.Types.PROJECT) {
+ return;
+ }
+ intermediateView = unionSourceParent;
+ unionSourceParent = setOp.getFirstChild();
+ if (unionSourceParent == null || unionSourceParent.getType() != NodeConstants.Types.SOURCE || unionSourceParent.getFirstChild() == null
+ || unionSourceParent.getFirstChild().getType() != NodeConstants.Types.SET_OP || unionSourceParent.getFirstChild().getProperty(NodeConstants.Info.SET_OPERATION) != Operation.UNION) {
+ return; //not an eligible union
+ }
+ setOp = unionSourceParent.getFirstChild();
+ if (groupingExpressions == null) {
+ return; //shouldn't happen - the view should have been removed
+ }
+ }
+ if (setOp.getProperty(NodeConstants.Info.SET_OPERATION) != Operation.UNION) {
+ return;
+ }
LinkedHashSet<AggregateSymbol> aggregates = collectAggregates(groupNode);
+ //check to see if any aggregate is dependent upon cardinality
+ boolean cardinalityDependent = RuleRemoveOptionalJoins.areAggregatesCardinalityDependent(aggregates);
+
+ LinkedList<PlanNode> unionChildren = new LinkedList<PlanNode>();
+ findUnionChildren(unionChildren, cardinalityDependent, setOp);
+
+ SymbolMap parentMap = (SymbolMap)unionSourceParent.getProperty(NodeConstants.Info.SYMBOL_MAP);
+ List<ElementSymbol> virtualElements = parentMap.getKeys();
+ GroupSymbol virtualGroup = unionSourceParent.getGroups().iterator().next();
+
+ List<SingleElementSymbol> actualGroupingExpressions = groupingExpressions;
+ if (intermediateView != null) {
+ actualGroupingExpressions = new ArrayList<SingleElementSymbol>(groupingExpressions.size());
+ SymbolMap viewMap = (SymbolMap)intermediateView.getProperty(NodeConstants.Info.SYMBOL_MAP);
+ for (SingleElementSymbol ses : groupingExpressions) {
+ Expression ex = viewMap.getMappedExpression((ElementSymbol)ses);
+ SingleElementSymbol newCol = null;
+ if (ex instanceof SingleElementSymbol) {
+ newCol = (SingleElementSymbol)ex;
+ } else {
+ newCol = new ExpressionSymbol("grouping", ex); //$NON-NLS-1$
+ }
+ actualGroupingExpressions.add(newCol);
+ }
+ }
+
/*
* if there are no aggregates, this is just duplicate removal
* mark the union as not all, which should be removed later but
@@ -181,26 +229,19 @@
return;
}
- //check to see if any aggregate is dependent upon cardinality
- boolean cardinalityDependent = RuleRemoveOptionalJoins.areAggregatesCardinalityDependent(aggregates);
-
- LinkedList<PlanNode> unionChildren = new LinkedList<PlanNode>();
- findUnionChildren(unionChildren, cardinalityDependent, setOp);
-
if (unionChildren.size() < 2) {
return;
}
- SymbolMap parentMap = (SymbolMap)child.getProperty(NodeConstants.Info.SYMBOL_MAP);
- List<ElementSymbol> virtualElements = parentMap.getKeys();
List<SingleElementSymbol> copy = new ArrayList<SingleElementSymbol>(aggregates);
aggregates.clear();
Map<AggregateSymbol, Expression> aggMap = buildAggregateMap(copy, metadata, aggregates);
boolean shouldPushdown = false;
List<Boolean> pushdownList = new ArrayList<Boolean>(unionChildren.size());
+
for (PlanNode planNode : unionChildren) {
- boolean pushdown = canPushGroupByToUnionChild(metadata, capFinder, groupingExpressions, aggregates, planNode);
+ boolean pushdown = canPushGroupByToUnionChild(metadata, capFinder, actualGroupingExpressions, aggregates, planNode, record);
pushdownList.add(pushdown);
shouldPushdown |= pushdown;
}
@@ -209,17 +250,70 @@
return;
}
+ if (intermediateView != null) {
+ parentMap = pushGroupByView(plan, metadata, capFinder, unionSourceParent,
+ setOp, intermediateView, cardinalityDependent,
+ unionChildren, virtualElements, virtualGroup);
+ virtualElements = parentMap.getKeys();
+ virtualGroup = unionSourceParent.getGroups().iterator().next();
+ }
+
Iterator<Boolean> pushdownIterator = pushdownList.iterator();
for (PlanNode planNode : unionChildren) {
- addView(plan, planNode, pushdownIterator.next(), groupingExpressions, aggregates, virtualElements, metadata, capFinder);
+ addView(plan, planNode, pushdownIterator.next(), new GroupSymbol("X"), groupingExpressions, aggregates, virtualElements, metadata, capFinder, null); //$NON-NLS-1$
}
//update the parent plan with the staged aggregates and the new projected symbols
- List<SingleElementSymbol> projectedViewSymbols = (List<SingleElementSymbol>)NodeEditor.findNodePreOrder(child, NodeConstants.Types.PROJECT).getProperty(NodeConstants.Info.PROJECT_COLS);
- List<ElementSymbol> updatedVirturalElement = new ArrayList<ElementSymbol>(virtualElements);
+ List<SingleElementSymbol> projectedViewSymbols = (List<SingleElementSymbol>)NodeEditor.findNodePreOrder(unionSourceParent, NodeConstants.Types.PROJECT).getProperty(NodeConstants.Info.PROJECT_COLS);
//hack to introduce aggregate symbols to the parent view TODO: this should change the metadata properly.
- GroupSymbol virtualGroup = child.getGroups().iterator().next();
+ SymbolMap newParentMap = modifyUnionSourceParent(unionSourceParent, virtualGroup, projectedViewSymbols, virtualElements);
+ Map<AggregateSymbol, ElementSymbol> projectedMap = new HashMap<AggregateSymbol, ElementSymbol>();
+ Iterator<AggregateSymbol> aggIter = aggregates.iterator();
+ for (ElementSymbol projectedViewSymbol : newParentMap.getKeys().subList(projectedViewSymbols.size() - aggregates.size(), projectedViewSymbols.size())) {
+ projectedMap.put(aggIter.next(), projectedViewSymbol);
+ }
+ for (Expression expr : aggMap.values()) {
+ ExpressionMappingVisitor.mapExpressions(expr, projectedMap);
+ }
+ mapExpressions(groupNode.getParent(), aggMap, metadata);
+ }
+
+ private SymbolMap pushGroupByView(PlanNode plan,
+ QueryMetadataInterface metadata, CapabilitiesFinder capFinder,
+ PlanNode unionSourceParent, PlanNode setOp,
+ PlanNode intermediateView, boolean cardinalityDependent,
+ LinkedList<PlanNode> unionChildren,
+ List<ElementSymbol> virtualElements, GroupSymbol virtualGroup)
+ throws TeiidComponentException, QueryPlannerException,
+ QueryResolverException {
+ //perform view pushing
+ /*
+ * TODO: this introduces yet another potentially unneeded view, but cannot be removed by the normal merge virtual logic
+ * due to an intervening access node
+ */
+ PlanNode intermediateProject = intermediateView.getFirstChild();
+ List<SingleElementSymbol> projectedViewSymbols = (List<SingleElementSymbol>)intermediateProject.getProperty(NodeConstants.Info.PROJECT_COLS);
+ for (PlanNode planNode : unionChildren) {
+ addView(plan, planNode, false, (GroupSymbol) virtualGroup.clone(), null, Collections.EMPTY_SET, virtualElements, metadata, capFinder, LanguageObject.Util.deepClone(projectedViewSymbols, SingleElementSymbol.class));
+ }
+ unionChildren.clear();
+ findUnionChildren(unionChildren, cardinalityDependent, setOp);
+ virtualGroup = intermediateView.getGroups().iterator().next();
+ unionSourceParent.getGroups().clear();
+ unionSourceParent.addGroup(virtualGroup);
+ projectedViewSymbols = (List<SingleElementSymbol>)NodeEditor.findNodePreOrder(unionSourceParent, NodeConstants.Types.PROJECT).getProperty(NodeConstants.Info.PROJECT_COLS);
+ SymbolMap parentMap = modifyUnionSourceParent(unionSourceParent, virtualGroup, projectedViewSymbols, Collections.EMPTY_LIST);
+ //remove the old view
+ NodeEditor.removeChildNode(intermediateView, intermediateProject);
+ NodeEditor.removeChildNode(intermediateView.getParent(), intermediateView);
+ return parentMap;
+ }
+
+ private SymbolMap modifyUnionSourceParent(PlanNode unionSourceParent,
+ GroupSymbol virtualGroup,
+ List<SingleElementSymbol> projectedViewSymbols, List<ElementSymbol> baseVirtualElements) {
+ List<ElementSymbol> updatedVirturalElement = new ArrayList<ElementSymbol>(baseVirtualElements);
for (int i = updatedVirturalElement.size(); i < projectedViewSymbols.size(); i++) {
SingleElementSymbol symbol = projectedViewSymbols.get(i);
String name = symbol.getShortName();
@@ -227,25 +321,18 @@
ElementSymbol virtualElement = new ElementSymbol(virtualElementName);
virtualElement.setGroupSymbol(virtualGroup);
virtualElement.setType(symbol.getType());
+ virtualElement.setMetadataID(new TempMetadataID(virtualElementName, symbol.getType()));
updatedVirturalElement.add(virtualElement);
}
SymbolMap newParentMap = SymbolMap.createSymbolMap(updatedVirturalElement, projectedViewSymbols);
- child.setProperty(NodeConstants.Info.SYMBOL_MAP, newParentMap);
- Map<AggregateSymbol, ElementSymbol> projectedMap = new HashMap<AggregateSymbol, ElementSymbol>();
- Iterator<AggregateSymbol> aggIter = aggregates.iterator();
- for (ElementSymbol projectedViewSymbol : newParentMap.getKeys().subList(projectedViewSymbols.size() - aggregates.size(), projectedViewSymbols.size())) {
- projectedMap.put(aggIter.next(), projectedViewSymbol);
- }
- for (Expression expr : aggMap.values()) {
- ExpressionMappingVisitor.mapExpressions(expr, projectedMap);
- }
- mapExpressions(groupNode.getParent(), aggMap, metadata);
+ unionSourceParent.setProperty(NodeConstants.Info.SYMBOL_MAP, newParentMap);
+ return newParentMap;
}
private boolean canPushGroupByToUnionChild(QueryMetadataInterface metadata,
CapabilitiesFinder capFinder,
List<SingleElementSymbol> groupingExpressions,
- LinkedHashSet<AggregateSymbol> aggregates, PlanNode planNode)
+ LinkedHashSet<AggregateSymbol> aggregates, PlanNode planNode, AnalysisRecord record)
throws QueryMetadataException, TeiidComponentException {
if (planNode.getType() != NodeConstants.Types.ACCESS) {
return false;
@@ -260,8 +347,16 @@
return false;
}
}
- if ((groupingExpressions == null || groupingExpressions.isEmpty()) && !CapabilitiesUtil.supports(Capability.QUERY_AGGREGATES_COUNT_STAR, modelId, metadata, capFinder)) {
- return false;
+ if ((groupingExpressions == null || groupingExpressions.isEmpty())) {
+ if (!CapabilitiesUtil.supports(Capability.QUERY_AGGREGATES_COUNT_STAR, modelId, metadata, capFinder)) {
+ return false;
+ }
+ } else {
+ for (SingleElementSymbol ses : groupingExpressions) {
+ if(! CriteriaCapabilityValidatorVisitor.canPushLanguageObject(ses, modelId, metadata, capFinder, record)) {
+ return false;
+ }
+ }
}
//TODO: check to see if we are distinct
return true;
@@ -270,7 +365,7 @@
/**
* Recursively searches the union tree for all applicable source nodes
*/
- private PlanNode findUnionChildren(List<PlanNode> unionChildren, boolean carinalityDependent, PlanNode setOp) {
+ static PlanNode findUnionChildren(List<PlanNode> unionChildren, boolean carinalityDependent, PlanNode setOp) {
if (setOp.getType() != NodeConstants.Types.SET_OP || setOp.getProperty(NodeConstants.Info.SET_OPERATION) != Operation.UNION) {
return setOp;
}
@@ -292,11 +387,14 @@
return null;
}
- public void addView(PlanNode root, PlanNode unionSource, boolean pushdown, List<SingleElementSymbol> groupingExpressions,
+ public void addView(PlanNode root, PlanNode unionSource, boolean pushdown, GroupSymbol group, List<SingleElementSymbol> groupingExpressions,
Set<AggregateSymbol> aggregates, List<ElementSymbol> virtualElements,
- QueryMetadataInterface metadata, CapabilitiesFinder capFinder)
+ QueryMetadataInterface metadata, CapabilitiesFinder capFinder, List<SingleElementSymbol> actualProject)
throws TeiidComponentException, QueryPlannerException, QueryResolverException {
- PlanNode originalNode = unionSource;
+ PlanNode accessNode = null;
+ if (pushdown) {
+ accessNode = NodeEditor.findNodePreOrder(unionSource, NodeConstants.Types.ACCESS);
+ }
//branches other than the first need to have their projected column names updated
PlanNode sortNode = NodeEditor.findNodePreOrder(unionSource, NodeConstants.Types.SORT, NodeConstants.Types.SOURCE);
List<SingleElementSymbol> sortOrder = null;
@@ -320,34 +418,23 @@
updateSymbolName(projectCols, i, virtualElem, projectedSymbol);
}
}
- PlanNode intermediateView = NodeFactory.getNewNode(NodeConstants.Types.SOURCE);
- unionSource.addAsParent(intermediateView);
+ PlanNode intermediateView = createView(group, virtualElements, unionSource, metadata);
+ SymbolMap symbolMap = (SymbolMap)intermediateView.getProperty(Info.SYMBOL_MAP);
unionSource = intermediateView;
- TempMetadataStore store = new TempMetadataStore();
- TempMetadataAdapter tma = new TempMetadataAdapter(metadata, store);
- GroupSymbol group = new GroupSymbol("X"); //$NON-NLS-1$
- try {
- group.setMetadataID(ResolverUtil.addTempGroup(tma, group, virtualElements, false));
- } catch (QueryResolverException e) {
- throw new TeiidComponentException(e);
- }
- intermediateView.addGroup(group);
- List<ElementSymbol> projectedSymbols = ResolverUtil.resolveElementsInGroup(group, metadata);
- SymbolMap symbolMap = SymbolMap.createSymbolMap(projectedSymbols,
- (List<Expression>)NodeEditor.findNodePreOrder(unionSource, NodeConstants.Types.PROJECT).getProperty(NodeConstants.Info.PROJECT_COLS));
- intermediateView.setProperty(NodeConstants.Info.SYMBOL_MAP, symbolMap);
Set<SingleElementSymbol> newGroupingExpressions = Collections.emptySet();
if (groupingExpressions != null) {
newGroupingExpressions = new HashSet<SingleElementSymbol>();
for (SingleElementSymbol singleElementSymbol : groupingExpressions) {
- newGroupingExpressions.add((SingleElementSymbol)symbolMap.getKeys().get(virtualElements.indexOf(singleElementSymbol)).clone());
+ newGroupingExpressions.add((SingleElementSymbol) symbolMap.getKeys().get(virtualElements.indexOf(singleElementSymbol)).clone());
}
}
- List<SingleElementSymbol> projectedViewSymbols = Util.deepClone(projectedSymbols, SingleElementSymbol.class);
+ List<SingleElementSymbol> projectedViewSymbols = Util.deepClone(symbolMap.getKeys(), SingleElementSymbol.class);
- SymbolMap viewMapping = SymbolMap.createSymbolMap(NodeEditor.findParent(unionSource, NodeConstants.Types.SOURCE).getGroups().iterator().next(), projectedSymbols, metadata);
+ PlanNode parent = NodeEditor.findParent(unionSource, NodeConstants.Types.SOURCE);
+ SymbolMap parentMap = (SymbolMap) parent.getProperty(NodeConstants.Info.SYMBOL_MAP);
+ SymbolMap viewMapping = SymbolMap.createSymbolMap(parentMap.getKeys(), projectedViewSymbols);
for (AggregateSymbol agg : aggregates) {
agg = (AggregateSymbol)agg.clone();
ExpressionMappingVisitor.mapExpressions(agg, viewMapping.asMap());
@@ -380,18 +467,50 @@
unionSource = projectPlanNode;
//create proper names for the aggregate symbols
- Select select = new Select(projectedViewSymbols);
+ Select select = null;
+ if (actualProject == null) {
+ select = new Select(projectedViewSymbols);
+ } else {
+ select = new Select(actualProject);
+ }
QueryRewriter.makeSelectUnique(select, false);
projectedViewSymbols = select.getProjectedSymbols();
projectPlanNode.setProperty(NodeConstants.Info.PROJECT_COLS, projectedViewSymbols);
projectPlanNode.addGroup(group);
if (pushdown) {
- while (RuleRaiseAccess.raiseAccessNode(root, originalNode, metadata, capFinder, true, null) != null) {
+ while (RuleRaiseAccess.raiseAccessNode(root, accessNode, metadata, capFinder, true, null) != null) {
//continue to raise
}
}
}
+
+ static PlanNode createView(GroupSymbol group, List<? extends SingleElementSymbol> virtualElements, PlanNode child, QueryMetadataInterface metadata) throws TeiidComponentException {
+ SymbolMap symbolMap = createSymbolMap(group, virtualElements, child, metadata);
+ PlanNode branchSource = NodeFactory.getNewNode(NodeConstants.Types.SOURCE);
+ branchSource.addGroup(group);
+ PlanNode projectNode = NodeEditor.findNodePreOrder(child, NodeConstants.Types.PROJECT);
+ branchSource.setProperty(Info.SYMBOL_MAP, SymbolMap.createSymbolMap(symbolMap.getKeys(), (List<? extends SingleElementSymbol>)projectNode.getProperty(Info.PROJECT_COLS)));
+ child.addAsParent(branchSource);
+ return branchSource;
+ }
+ private static SymbolMap createSymbolMap(GroupSymbol group,
+ List<? extends SingleElementSymbol> virtualElements,
+ PlanNode child, QueryMetadataInterface metadata)
+ throws TeiidComponentException, QueryMetadataException {
+ TempMetadataStore store = new TempMetadataStore();
+ TempMetadataAdapter tma = new TempMetadataAdapter(metadata, store);
+ try {
+ group.setMetadataID(ResolverUtil.addTempGroup(tma, group, virtualElements, false));
+ } catch (QueryResolverException e) {
+ throw new TeiidComponentException(e);
+ }
+ List<ElementSymbol> projectedSymbols = ResolverUtil.resolveElementsInGroup(group, metadata);
+ SymbolMap symbolMap = SymbolMap.createSymbolMap(projectedSymbols,
+ (List<Expression>)NodeEditor.findNodePreOrder(child, NodeConstants.Types.PROJECT).getProperty(NodeConstants.Info.PROJECT_COLS));
+ return symbolMap;
+ }
+
private void updateSymbolName(List<SingleElementSymbol> projectCols, int i,
ElementSymbol virtualElem, SingleElementSymbol projectedSymbol) {
if (projectedSymbol instanceof AliasSymbol) {
@@ -450,20 +569,19 @@
}
Map<PlanNode, List<SingleElementSymbol>> groupingMap = createNodeMapping(groupNode, groupingExpressions, false);
- Set<PlanNode> possibleTargetNodes = new HashSet<PlanNode>(aggregateMap.keySet());
+ Set<PlanNode> possibleTargetNodes = new LinkedHashSet<PlanNode>(aggregateMap.keySet());
possibleTargetNodes.addAll(groupingMap.keySet());
for (PlanNode planNode : possibleTargetNodes) {
Set<SingleElementSymbol> stagedGroupingSymbols = new LinkedHashSet<SingleElementSymbol>();
List<AggregateSymbol> aggregates = aggregateMap.get(planNode);
- List<SingleElementSymbol> groupBy = groupingMap.get(planNode);
if (!canPush(groupNode, stagedGroupingSymbols, planNode)) {
continue;
}
- if (groupBy != null) {
- stagedGroupingSymbols.addAll(groupBy);
+ if (groupingExpressions != null) {
+ filterJoinColumns(stagedGroupingSymbols, planNode.getGroups(), groupingExpressions);
}
collectSymbolsFromOtherAggregates(allAggregates, aggregates, planNode, stagedGroupingSymbols);
@@ -625,7 +743,7 @@
private <T extends SingleElementSymbol> Map<PlanNode, List<T>> createNodeMapping(PlanNode groupNode,
Collection<T> expressions, boolean aggs) {
- Map<PlanNode, List<T>> result = new HashMap<PlanNode, List<T>>();
+ Map<PlanNode, List<T>> result = new LinkedHashMap<PlanNode, List<T>>();
if (expressions == null) {
return result;
}
@@ -657,7 +775,7 @@
if (originatingNode.getParent() == groupNode) {
//anything logically applied after the join and is
//dependent upon the cardinality prevents us from optimizing.
- if (aggs && RuleRemoveOptionalJoins.isCardinalityDependent((AggregateSymbol)aggregateSymbol)) {
+ if (aggs && ((AggregateSymbol)aggregateSymbol).isCardinalityDependent()) {
return null;
}
continue;
@@ -753,7 +871,8 @@
nestedAggregates.add(countAgg);
nestedAggregates.add(sumAgg);
nestedAggregates.add(sumSqAgg);
- } else {
+ }
+ else {
//AGG(X) -> AGG(AGG(X))
newExpression = new AggregateSymbol("stagedAgg", aggFunction.name(), false, partitionAgg); //$NON-NLS-1$
nestedAggregates.add(partitionAgg);
@@ -787,4 +906,5 @@
public String toString() {
return "PushAggregates"; //$NON-NLS-1$
}
+
}
Modified: branches/7.1.x/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleRemoveOptionalJoins.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleRemoveOptionalJoins.java 2011-09-13 23:20:28 UTC (rev 3484)
+++ branches/7.1.x/engine/src/main/java/org/teiid/query/optimizer/relational/rules/RuleRemoveOptionalJoins.java 2011-09-13 23:20:46 UTC (rev 3485)
@@ -299,30 +299,13 @@
static boolean areAggregatesCardinalityDependent(Set<AggregateSymbol> aggs) {
for (AggregateSymbol aggregateSymbol : aggs) {
- if (isCardinalityDependent(aggregateSymbol)) {
+ if (aggregateSymbol.isCardinalityDependent()) {
return true;
}
}
return false;
}
- static boolean isCardinalityDependent(AggregateSymbol aggregateSymbol) {
- if (aggregateSymbol.isDistinct()) {
- return false;
- }
- switch (aggregateSymbol.getAggregateFunction()) {
- case COUNT:
- case AVG:
- case STDDEV_POP:
- case STDDEV_SAMP:
- case VAR_POP:
- case VAR_SAMP:
- case SUM:
- return true;
- }
- return false;
- }
-
public String toString() {
return "RuleRemoveOptionalJoins"; //$NON-NLS-1$
}
Modified: branches/7.1.x/engine/src/main/java/org/teiid/query/sql/symbol/AggregateSymbol.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/query/sql/symbol/AggregateSymbol.java 2011-09-13 23:20:28 UTC (rev 3484)
+++ branches/7.1.x/engine/src/main/java/org/teiid/query/sql/symbol/AggregateSymbol.java 2011-09-13 23:20:46 UTC (rev 3485)
@@ -243,5 +243,22 @@
&& EquivalenceUtil.areEqual(this.getExpression(), other.getExpression())
&& EquivalenceUtil.areEqual(this.getOrderBy(), other.getOrderBy());
}
+
+ public boolean isCardinalityDependent() {
+ if (isDistinct()) {
+ return false;
+ }
+ switch (getAggregateFunction()) {
+ case COUNT:
+ case AVG:
+ case STDDEV_POP:
+ case STDDEV_SAMP:
+ case VAR_POP:
+ case VAR_SAMP:
+ case SUM:
+ return true;
+ }
+ return false;
+ }
}
Modified: branches/7.1.x/engine/src/test/java/org/teiid/query/optimizer/TestAggregatePushdown.java
===================================================================
--- branches/7.1.x/engine/src/test/java/org/teiid/query/optimizer/TestAggregatePushdown.java 2011-09-13 23:20:28 UTC (rev 3484)
+++ branches/7.1.x/engine/src/test/java/org/teiid/query/optimizer/TestAggregatePushdown.java 2011-09-13 23:20:46 UTC (rev 3485)
@@ -22,7 +22,10 @@
package org.teiid.query.optimizer;
-import static org.teiid.query.optimizer.TestOptimizer.*;
+import static org.teiid.query.optimizer.TestOptimizer.SHOULD_SUCCEED;
+import static org.teiid.query.optimizer.TestOptimizer.checkNodeTypes;
+import static org.teiid.query.optimizer.TestOptimizer.getTypicalCapabilities;
+import static org.teiid.query.optimizer.TestOptimizer.helpPlan;
import org.junit.Test;
import org.teiid.query.metadata.QueryMetadataInterface;
@@ -35,6 +38,7 @@
import org.teiid.query.processor.ProcessorPlan;
import org.teiid.query.unittest.FakeMetadataFacade;
import org.teiid.query.unittest.FakeMetadataFactory;
+import org.teiid.query.unittest.RealMetadataFactory;
import org.teiid.translator.SourceSystemFunctions;
@SuppressWarnings("nls")
@@ -899,6 +903,37 @@
}
/**
+ * pushdown won't happen since searched case is not supported
+ */
+ @Test public void testPushDownOverUnionGroupingExpression() throws Exception {
+ FakeCapabilitiesFinder capFinder = new FakeCapabilitiesFinder();
+ BasicSourceCapabilities caps = getAggregateCapabilities();
+ caps.setCapabilitySupport(Capability.QUERY_SEARCHED_CASE, true);
+ capFinder.addCapabilities("pm1", caps); //$NON-NLS-1$
+ capFinder.addCapabilities("pm2", getAggregateCapabilities()); //$NON-NLS-1$
+
+ ProcessorPlan plan = TestOptimizer.helpPlan("select max(e2), case when e1 is null then 0 else 1 end from (select e1, e2 from pm1.g1 union all select e1, e2 from pm2.g2) z group by case when e1 is null then 0 else 1 end", FakeMetadataFactory.example1Cached(), null, capFinder, //$NON-NLS-1$
+ new String[]{"SELECT v_1.c_0, MAX(v_1.c_1) FROM (SELECT CASE WHEN v_0.c_0 IS NULL THEN 0 ELSE 1 END AS c_0, v_0.c_1 FROM (SELECT g_0.e1 AS c_0, g_0.e2 AS c_1 FROM pm1.g1 AS g_0) AS v_0) AS v_1 GROUP BY v_1.c_0", //$NON-NLS-1$
+ "SELECT g_0.e1, g_0.e2 FROM pm2.g2 AS g_0"}, ComparisonMode.EXACT_COMMAND_STRING); //$NON-NLS-1$
+ TestOptimizer.checkNodeTypes(plan, new int[] {
+ 2, // Access
+ 0, // DependentAccess
+ 0, // DependentSelect
+ 0, // DependentProject
+ 0, // DupRemove
+ 1, // Grouping
+ 0, // NestedLoopJoinStrategy
+ 0, // MergeJoinStrategy
+ 0, // Null
+ 0, // PlanExecution
+ 3, // Project
+ 0, // Select
+ 0, // Sort
+ 1 // UnionAll
+ });
+ }
+
+ /**
* Ensures that we do not raise criteria over a group by
* TODO: check if the criteria only depends on grouping columns
*/
14 years, 7 months
teiid SVN: r3484 - in branches/7.1.x: engine/src/main/java/org/teiid/query/tempdata and 2 other directories.
by teiid-commits@lists.jboss.org
Author: loleary
Date: 2011-09-13 19:20:28 -0400 (Tue, 13 Sep 2011)
New Revision: 3484
Modified:
branches/7.1.x/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
branches/7.1.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
branches/7.1.x/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java
Log:
TEIID-1657 changes to ensure that matview refresh is handled properly with distributed caching enabled.
Modified: branches/7.1.x/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java 2011-09-13 23:20:09 UTC (rev 3483)
+++ branches/7.1.x/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java 2011-09-13 23:20:28 UTC (rev 3484)
@@ -80,6 +80,18 @@
*/
public class TransformationMetadata extends BasicQueryMetadata implements Serializable {
+ private static final class LiveTableQueryNode extends QueryNode {
+ Table t;
+ private LiveTableQueryNode(Table t) {
+ super(t.getFullName(), null);
+ this.t = t;
+ }
+
+ public String getQuery() {
+ return t.getSelectTransformation();
+ }
+ }
+
private final class VirtualFileInputStreamFactory extends
InputStreamFactory {
private final VirtualFile f;
@@ -480,8 +492,7 @@
if (!tableRecord.isVirtual()) {
throw new QueryMetadataException(QueryPlugin.Util.getString("TransformationMetadata.QueryPlan_could_not_be_found_for_physical_group__6")+tableRecord.getFullName()); //$NON-NLS-1$
}
- String transQuery = tableRecord.getSelectTransformation();
- QueryNode queryNode = new QueryNode(tableRecord.getFullName(), transQuery);
+ LiveTableQueryNode queryNode = new LiveTableQueryNode(tableRecord);
// get any bindings and add them onto the query node
List bindings = tableRecord.getBindings();
Modified: branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-09-13 23:20:09 UTC (rev 3483)
+++ branches/7.1.x/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-09-13 23:20:28 UTC (rev 3484)
@@ -303,7 +303,7 @@
String matTableName = RelationalPlanner.MAT_PREFIX+matViewName.toUpperCase();
LogManager.logDetail(LogConstants.CTX_MATVIEWS, "processing refreshmatview for", matViewName); //$NON-NLS-1$
MatTableInfo info = globalStore.getMatTableInfo(matTableName);
- boolean invalidate = Boolean.TRUE.equals(((Constant)proc.getParameter(1).getExpression()).getValue());
+ boolean invalidate = Boolean.TRUE.equals(((Constant)proc.getParameter(2).getExpression()).getValue());
if (invalidate) {
touchTable(context, matTableName, false);
}
@@ -316,7 +316,7 @@
Object matTableId = RelationalPlanner.getGlobalTempTableMetadataId(group, matTableName, context, metadata, AnalysisRecord.createNonRecordingRecord());
GroupSymbol matTable = new GroupSymbol(matTableName);
matTable.setMetadataID(matTableId);
- int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore, info, null);
+ int rowCount = loadGlobalTable(context, matTable, matTableName, globalStore, info, null, false);
return CollectionTupleSource.createUpdateCountTupleSource(rowCount);
} else if (StringUtil.endsWithIgnoreCase(proc.getProcedureCallableName(), REFRESHMATVIEWROW)) {
Object groupID = validateMatView(metadata, proc);
@@ -396,18 +396,19 @@
key.vdb = new VDBKey(context.getVdbName(), context.getVdbVersion());
MatTableEntry entry = this.tables.get(key);
- boolean firstload = !info.isValid();
- if (entry != null && entry.lastUpdate > info.getUpdateTime() && info.getState() != MatState.LOADING) {
- //remote load
- info.setState(MatState.NEEDS_LOADING, firstload?false:entry.valid, null);
+ boolean notValid = !info.isValid();
+ if (entry != null && entry.lastUpdate > info.getUpdateTime()
+ && info.getState() != MatState.LOADING) {
+ //trigger a remote load due to the cache being more up to date than the local copy
+ info.setState(MatState.NEEDS_LOADING, notValid?false:entry.valid, null);
loadTime = entry.lastUpdate;
- }
+ }
}
boolean load = info.shouldLoad();
if (load) {
if (!info.isValid()) {
//blocking load
- loadGlobalTable(context, group, tableName, globalStore, info, loadTime);
+ loadGlobalTable(context, group, tableName, globalStore, info, loadTime, true);
} else {
loadAsynch(context, group, tableName, globalStore, info, loadTime);
}
@@ -440,7 +441,7 @@
Callable<Integer> toCall = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
- return loadGlobalTable(context, group, tableName, globalStore, info, loadTime);
+ return loadGlobalTable(context, group, tableName, globalStore, info, loadTime, true);
}
};
FutureTask<Integer> task = new FutureTask<Integer>(toCall);
@@ -449,7 +450,7 @@
private int loadGlobalTable(CommandContext context,
GroupSymbol group, final String tableName,
- TempTableStore globalStore, MatTableInfo info, Long loadTime)
+ TempTableStore globalStore, MatTableInfo info, Long loadTime, boolean useCache)
throws TeiidComponentException, TeiidProcessingException {
LogManager.logInfo(LogConstants.CTX_MATVIEWS, QueryPlugin.Util.getString("TempTableDataManager.loading", tableName)); //$NON-NLS-1$
QueryMetadataInterface metadata = context.getMetadata();
@@ -483,9 +484,11 @@
if (distributedCache != null) {
cid = new CacheID(new ParseInfo(), fullName, context.getVdbName(),
context.getVdbVersion(), context.getConnectionID(), context.getUserName());
- CachedResults cr = this.distributedCache.get(cid);
- if (cr != null) {
- ts = cr.getResults().createIndexedTupleSource();
+ if (useCache) {
+ CachedResults cr = this.distributedCache.get(cid);
+ if (cr != null) {
+ ts = cr.getResults().createIndexedTupleSource();
+ }
}
}
Modified: branches/7.1.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
===================================================================
--- branches/7.1.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-09-13 23:20:09 UTC (rev 3483)
+++ branches/7.1.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-09-13 23:20:28 UTC (rev 3484)
@@ -22,6 +22,9 @@
package org.teiid.jdbc;
import java.io.File;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.Properties;
import org.jboss.deployers.spi.DeploymentException;
@@ -31,11 +34,12 @@
import org.teiid.adminapi.impl.ModelMetaData;
import org.teiid.adminapi.impl.VDBMetaData;
import org.teiid.cache.CacheConfiguration;
+import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.cache.DefaultCacheFactory;
-import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.client.DQP;
import org.teiid.client.security.ILogon;
import org.teiid.deployers.MetadataStoreGroup;
+import org.teiid.deployers.UDFMetaData;
import org.teiid.deployers.VDBRepository;
import org.teiid.dqp.internal.datamgr.ConnectorManager;
import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
@@ -48,6 +52,8 @@
import org.teiid.metadata.index.IndexMetadataFactory;
import org.teiid.metadata.index.VDBMetadataFactory;
import org.teiid.query.function.SystemFunctionManager;
+import org.teiid.query.function.metadata.FunctionMethod;
+import org.teiid.query.metadata.TransformationMetadata.Resource;
import org.teiid.query.optimizer.capabilities.BasicSourceCapabilities;
import org.teiid.query.optimizer.capabilities.SourceCapabilities;
import org.teiid.services.SessionServiceImpl;
@@ -66,6 +72,10 @@
private ConnectorManagerRepository cmr;
public FakeServer() {
+ this(new DQPConfiguration());
+ }
+
+ public FakeServer(DQPConfiguration config) {
this.logon = new LogonImpl(sessionService, null);
this.repo.setSystemStore(VDBMetadataFactory.getSystem());
@@ -86,9 +96,13 @@
}
});
- DQPConfiguration config = new DQPConfiguration();
config.setResultsetCacheConfig(new CacheConfiguration(Policy.LRU, 60, 250, "resultsetcache")); //$NON-NLS-1$
- this.dqp.setCacheFactory(new DefaultCacheFactory());
+ this.dqp.setCacheFactory(new DefaultCacheFactory() {
+ @Override
+ public boolean isReplicated() {
+ return true; //pretend to be replicated for matview tests
+ }
+ });
this.dqp.start(config);
this.sessionService.setDqp(this.dqp);
@@ -96,12 +110,29 @@
registerClientService(DQP.class, dqp, null);
}
+ public void stop() {
+ this.dqp.stop();
+ }
+
public void deployVDB(String vdbName, String vdbPath) throws Exception {
-
+ deployVDB(vdbName, vdbPath, null);
+ }
+
+ public void deployVDB(String vdbName, String vdbPath, Map<String, Collection<FunctionMethod>> udfs) throws Exception {
IndexMetadataFactory imf = VDBMetadataFactory.loadMetadata(new File(vdbPath).toURI().toURL());
MetadataStore metadata = imf.getMetadataStore(repo.getSystemStore().getDatatypes());
-
- VDBMetaData vdbMetaData = new VDBMetaData();
+ LinkedHashMap<String, Resource> entries = imf.getEntriesPlusVisibilities();
+ deployVDB(vdbName, metadata, entries, udfs);
+ }
+
+ public void deployVDB(String vdbName, MetadataStore metadata,
+ LinkedHashMap<String, Resource> entries) {
+ deployVDB(vdbName, metadata, entries, null);
+ }
+
+ public void deployVDB(String vdbName, MetadataStore metadata,
+ LinkedHashMap<String, Resource> entries, Map<String, Collection<FunctionMethod>> udfs) {
+ VDBMetaData vdbMetaData = new VDBMetaData();
vdbMetaData.setName(vdbName);
vdbMetaData.setStatus(VDB.Status.ACTIVE);
@@ -120,10 +151,17 @@
try {
MetadataStoreGroup stores = new MetadataStoreGroup();
stores.addStore(metadata);
- this.repo.addVDB(vdbMetaData, stores, imf.getEntriesPlusVisibilities(), null, cmr);
+ UDFMetaData udfMetaData = null;
+ if (udfs != null) {
+ udfMetaData = new UDFMetaData();
+ for (Map.Entry<String, Collection<FunctionMethod>> entry : udfs.entrySet()) {
+ udfMetaData.addFunctions(entry.getValue());
+ }
+ }
+ this.repo.addVDB(vdbMetaData, stores, entries, udfMetaData, cmr);
} catch (DeploymentException e) {
throw new RuntimeException(e);
- }
+ }
}
public void removeVDB(String vdbName) {
Modified: branches/7.1.x/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java
===================================================================
--- branches/7.1.x/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java 2011-09-13 23:20:09 UTC (rev 3483)
+++ branches/7.1.x/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java 2011-09-13 23:20:28 UTC (rev 3484)
@@ -22,20 +22,30 @@
package org.teiid.systemmodel;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.teiid.adminapi.impl.VDBMetaData;
+import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.types.DataTypeManager;
import org.teiid.core.util.UnitTestUtil;
import org.teiid.jdbc.FakeServer;
import org.teiid.jdbc.TeiidSQLException;
import org.teiid.metadata.Table;
+import org.teiid.query.function.metadata.FunctionMethod;
+import org.teiid.query.function.metadata.FunctionParameter;
import org.teiid.query.metadata.TransformationMetadata;
@SuppressWarnings("nls")
@@ -44,13 +54,32 @@
private static final String MATVIEWS = "matviews";
private Connection conn;
private FakeServer server;
+
+ private static int count = 0;
+
+ public static int pause() throws InterruptedException {
+ synchronized (TestMatViews.class) {
+ count++;
+ TestMatViews.class.notify();
+ while (count < 2) {
+ TestMatViews.class.wait();
+ }
+ }
+ return 1;
+ }
@Before public void setUp() throws Exception {
server = new FakeServer();
- server.deployVDB(MATVIEWS, UnitTestUtil.getTestDataPath() + "/matviews.vdb");
+ HashMap<String, Collection<FunctionMethod>> udfs = new HashMap<String, Collection<FunctionMethod>>();
+ udfs.put("funcs", Arrays.asList(new FunctionMethod("pause", null, null, FunctionMethod.CANNOT_PUSHDOWN, TestMatViews.class.getName(), "pause", null, new FunctionParameter("return", DataTypeManager.DefaultDataTypes.INTEGER), false, FunctionMethod.NONDETERMINISTIC)));
+ server.deployVDB(MATVIEWS, UnitTestUtil.getTestDataPath() + "/matviews.vdb", udfs);
conn = server.createConnection("jdbc:teiid:matviews");
}
+ @After public void tearDown() throws Exception {
+ server.stop();
+ }
+
@Test public void testSystemMatViewsWithImplicitLoad() throws Exception {
Statement s = conn.createStatement();
ResultSet rs = s.executeQuery("select * from MatViews order by name");
@@ -80,6 +109,34 @@
@Test public void testSystemMatViewsWithExplicitRefresh() throws Exception {
Statement s = conn.createStatement();
+ ResultSet rs = s.executeQuery("select * from (call refreshMatView('TEST.RANDOMVIEW', false)) p");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ rs = s.executeQuery("select * from MatViews where name = 'RandomView'");
+ assertTrue(rs.next());
+ assertEquals("LOADED", rs.getString("loadstate"));
+ assertEquals(true, rs.getBoolean("valid"));
+ rs = s.executeQuery("select x from TEST.RANDOMVIEW");
+ assertTrue(rs.next());
+ double key = rs.getDouble(1);
+
+ rs = s.executeQuery("select * from (call refreshMatView('TEST.RANDOMVIEW', false)) p");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ rs = s.executeQuery("select * from MatViews where name = 'RandomView'");
+ assertTrue(rs.next());
+ assertEquals("LOADED", rs.getString("loadstate"));
+ assertEquals(true, rs.getBoolean("valid"));
+ rs = s.executeQuery("select x from TEST.RANDOMVIEW");
+ assertTrue(rs.next());
+ double key1 = rs.getDouble(1);
+
+ //ensure that invalidate with distributed caching works
+ assertTrue(key1 != key);
+ }
+
+ @Test public void testSystemManViewsWithExplictRefreshAndInvalidate() throws Exception {
+ Statement s = conn.createStatement();
ResultSet rs = s.executeQuery("select * from (call refreshMatView('TEST.MATVIEW', false)) p");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
@@ -87,6 +144,47 @@
assertTrue(rs.next());
assertEquals("LOADED", rs.getString("loadstate"));
assertEquals(true, rs.getBoolean("valid"));
+
+ count = 0;
+
+ VDBMetaData vdb = server.getVDB(MATVIEWS);
+ TransformationMetadata tm = vdb.getAttachment(TransformationMetadata.class);
+ Table tbl = tm.getGroupID("TEST.MATVIEW");
+ tbl.setSelectTransformation("select pause() as x");
+
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ Statement s1 = conn.createStatement();
+ ResultSet rs = s1.executeQuery("select * from (call refreshMatView('TEST.MATVIEW', true)) p");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ } catch (Exception e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+ };
+ t.start();
+ synchronized (TestMatViews.class) {
+ while (count < 1) {
+ TestMatViews.class.wait();
+ }
+ }
+ rs = s.executeQuery("select * from MatViews where name = 'MatView'");
+ assertTrue(rs.next());
+ assertEquals("NEEDS_LOADING", rs.getString("loadstate"));
+ assertEquals(false, rs.getBoolean("valid"));
+
+ synchronized (TestMatViews.class) {
+ count++;
+ TestMatViews.class.notify();
+ }
+ t.join();
+
+ rs = s.executeQuery("select * from MatViews where name = 'MatView'");
+ assertTrue(rs.next());
+ assertEquals("LOADED", rs.getString("loadstate"));
+ assertEquals(true, rs.getBoolean("valid"));
}
@Test(expected=TeiidSQLException.class) public void testSystemMatViewsInvalidView() throws Exception {
@@ -99,14 +197,34 @@
s.execute("call refreshMatView('foo', false)");
}
+ @Test(expected=TeiidSQLException.class) public void testSystemMatViewsWithRowRefreshNotAllowed() throws Exception {
+ Statement s = conn.createStatement();
+ VDBMetaData vdb = server.getVDB(MATVIEWS);
+ TransformationMetadata tm = vdb.getAttachment(TransformationMetadata.class);
+ Table t = tm.getGroupID("TEST.RANDOMVIEW");
+ t.setSelectTransformation("select rand() as x, rand() as y");
+ ResultSet rs = s.executeQuery("select * from (call refreshMatView('TEST.RANDOMVIEW', false)) p");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ rs = s.executeQuery("select * from MatViews where name = 'RandomView'");
+ assertTrue(rs.next());
+ assertEquals("LOADED", rs.getString("loadstate"));
+ assertEquals(true, rs.getBoolean("valid"));
+ rs = s.executeQuery("select x from TEST.RANDOMVIEW");
+ assertTrue(rs.next());
+ double key = rs.getDouble(1);
+
+ rs = s.executeQuery("select * from (call refreshMatViewRow('TEST.RANDOMVIEW', "+key+")) p");
+ }
+
@Test public void testSystemMatViewsWithRowRefresh() throws Exception {
- //TOOD: remove this. it's a workaround for TEIIDDES-549
+ Statement s = conn.createStatement();
+
VDBMetaData vdb = server.getVDB(MATVIEWS);
TransformationMetadata tm = vdb.getAttachment(TransformationMetadata.class);
Table t = tm.getGroupID("TEST.RANDOMVIEW");
t.setSelectTransformation("/*+ cache(updatable) */ " + t.getSelectTransformation());
- Statement s = conn.createStatement();
//prior to load refresh of a single row returns -1
ResultSet rs = s.executeQuery("select * from (call refreshMatViewRow('TEST.RANDOMVIEW', 0)) p");
assertTrue(rs.next());
14 years, 7 months
teiid SVN: r3483 - in branches/7.1.x/engine/src: main/resources/org/teiid/query and 1 other directories.
by teiid-commits@lists.jboss.org
Author: loleary
Date: 2011-09-13 19:20:09 -0400 (Tue, 13 Sep 2011)
New Revision: 3483
Modified:
branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
branches/7.1.x/engine/src/main/resources/org/teiid/query/i18n.properties
branches/7.1.x/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
Log:
TEIID-1654 ensuring that missing cache entries are not restored
Modified: branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
===================================================================
--- branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2011-09-13 23:19:51 UTC (rev 3482)
+++ branches/7.1.x/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2011-09-13 23:20:09 UTC (rev 3483)
@@ -35,6 +35,7 @@
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidException;
import org.teiid.core.types.DataTypeManager;
import org.teiid.core.util.Assertion;
import org.teiid.logging.LogConstants;
@@ -118,15 +119,16 @@
@Override
public synchronized boolean restore(Cache cache, BufferManager bufferManager) {
- try {
- if (this.results == null) {
+ if (this.results == null) {
+ TupleBuffer buffer = null;
+ try {
List<ElementSymbol> schema = new ArrayList<ElementSymbol>(types.length);
for (String type : types) {
ElementSymbol es = new ElementSymbol("x"); //$NON-NLS-1$
es.setType(DataTypeManager.getDataTypeClass(type));
schema.add(es);
}
- TupleBuffer buffer = bufferManager.createTupleBuffer(schema, "cached", TupleSourceType.FINAL); //$NON-NLS-1$
+ buffer = bufferManager.createTupleBuffer(schema, "cached", TupleSourceType.FINAL); //$NON-NLS-1$
buffer.setBatchSize(this.batchSize);
if (this.hint != null) {
buffer.setPrefersMemory(this.hint.getPrefersMemory());
@@ -134,18 +136,24 @@
for (int row = 1; row <= this.rowCount; row+=this.batchSize) {
TupleBatch batch = (TupleBatch)cache.get(uuid+","+row); //$NON-NLS-1$
- if (batch != null) {
- buffer.addTupleBatch(batch, true);
- }
+ if (batch == null) {
+ LogManager.logInfo(LogConstants.CTX_DQP, QueryPlugin.Util.getString("not_found_cache")); //$NON-NLS-1$
+ buffer.remove();
+ return false;
+ }
+ buffer.addTupleBatch(batch, true);
}
this.results = buffer;
bufferManager.addTupleBuffer(this.results);
this.results.close();
+ } catch (TeiidException e) {
+ LogManager.logWarning(LogConstants.CTX_DQP, e, QueryPlugin.Util.getString("unexpected_exception_restoring_results")); //$NON-NLS-1$
+ if (buffer != null) {
+ buffer.remove();
+ }
+ return false;
}
- return true;
- } catch (TeiidComponentException e) {
- LogManager.logDetail(LogConstants.CTX_DQP, QueryPlugin.Util.getString("not_found_cache")); //$NON-NLS-1$
}
- return false;
+ return true;
}
}
Modified: branches/7.1.x/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- branches/7.1.x/engine/src/main/resources/org/teiid/query/i18n.properties 2011-09-13 23:19:51 UTC (rev 3482)
+++ branches/7.1.x/engine/src/main/resources/org/teiid/query/i18n.properties 2011-09-13 23:20:09 UTC (rev 3483)
@@ -869,7 +869,9 @@
datasource_not_found=Data Source {0} not accessible.
RequestWorkItem.cache_nondeterministic=Caching command "{0}" at a session level, but less deterministic functions were evaluated.
-not_found_cache=Results not found in cache
+not_found_cache=Failed to restore results, since batch entries were missing. The entry will be re-populated.
+unexpected_exception_restoring_results=Failed to restore results. The entry will be re-populated.
+failed_to_cache=Failed to store the result set contents to disk.
failed_to_unwrap_connection=Failed to unwrap the source connection.
connection_factory_not_found=Failed to find the Connection Factory with JNDI name {0}. Please check the name or deploy the Connection Factory with specified name.
Modified: branches/7.1.x/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
===================================================================
--- branches/7.1.x/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java 2011-09-13 23:19:51 UTC (rev 3482)
+++ branches/7.1.x/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java 2011-09-13 23:20:09 UTC (rev 3483)
@@ -24,6 +24,7 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -41,6 +42,7 @@
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.util.UnitTestUtil;
import org.teiid.dqp.service.FakeBufferService;
import org.teiid.query.sql.lang.Query;
import org.teiid.query.sql.symbol.ElementSymbol;
@@ -101,7 +103,6 @@
results.setResults(tb);
results.setCommand(new Query());
Cache cache = new DefaultCache("dummy"); //$NON-NLS-1$
-
// simulate the jboss-cache remote transport, where the batches are remotely looked up
// in cache
for (int row=1; row<=tb.getRowCount();row+=4) {
@@ -119,7 +120,7 @@
CachedResults cachedResults = (CachedResults)ois.readObject();
ois.close();
- cachedResults.restore(cache, bm);
+ assertTrue(cachedResults.restore(cache, bm));
// since restored, simulate a async cache flush
cache.clear();
@@ -132,5 +133,9 @@
assertArrayEquals(tb.getBatch(1).getAllTuples(), cachedTb.getBatch(1).getAllTuples());
assertArrayEquals(tb.getBatch(9).getAllTuples(), cachedTb.getBatch(9).getAllTuples());
+
+ //ensure that an incomplete load fails
+ cache.remove(results.getId()+","+1); //$NON-NLS-1$
+ cachedResults = UnitTestUtil.helpSerialize(results);
}
}
14 years, 7 months
teiid SVN: r3482 - in branches/7.1.x/runtime/src/main/java/org/teiid: transport and 1 other directory.
by teiid-commits@lists.jboss.org
Author: loleary
Date: 2011-09-13 19:19:51 -0400 (Tue, 13 Sep 2011)
New Revision: 3482
Modified:
branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java
branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemote.java
branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
branches/7.1.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
Log:
TEIID-1652, TEIID-1653
Modified: branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java
===================================================================
--- branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java 2011-09-13 23:19:40 UTC (rev 3481)
+++ branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java 2011-09-13 23:19:51 UTC (rev 3482)
@@ -21,6 +21,7 @@
*/
package org.teiid.odbc;
+import java.io.IOException;
import java.nio.charset.Charset;
import java.sql.ParameterMetaData;
import java.sql.ResultSet;
@@ -65,7 +66,16 @@
// DataRow (B)
// CommandComplete (B)
+ // if count = -1 send all
void sendResults(String sql, ResultSet rs, boolean describeRows);
+
+ void sendCursorResults(ResultSet rs, int rowCount);
+
+ void sendPortalResults(String sql, ResultSet rs, int rowCount, boolean portal);
+
+ void sendMoveCursor(ResultSet rs, int rowCount);
+
+ void sendCommandComplete(String sql, int updateCount) throws IOException;
// CommandComplete (B)
void sendUpdateCount(String sql, int updateCount);
@@ -100,9 +110,4 @@
// NoticeResponse (B)
// NotificationResponse (B)
-
- // PortalSuspended (B)
-
-
-
}
Modified: branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemote.java
===================================================================
--- branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemote.java 2011-09-13 23:19:40 UTC (rev 3481)
+++ branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemote.java 2011-09-13 23:19:51 UTC (rev 3482)
@@ -55,6 +55,11 @@
void unsupportedOperation(String msg);
void flush();
+
+ void cursorExecute(String prepareName, String sql);
+ void cursorFetch(String prepareName, int rows);
+ void cursorMove(String prepareName, int rows);
+ void cursorClose(String prepareName);
// unimplemented frontend messages
// CopyData (F & B)
Modified: branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
===================================================================
--- branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java 2011-09-13 23:19:40 UTC (rev 3481)
+++ branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java 2011-09-13 23:19:51 UTC (rev 3482)
@@ -137,11 +137,16 @@
private static Pattern preparedAutoIncrement = Pattern.compile("select 1 \\s*from pg_catalog.pg_attrdef \\s*where adrelid = \\$1 AND adnum = \\$2 " + //$NON-NLS-1$
"\\s*and pg_catalog.pg_get_expr\\(adbin, adrelid\\) \\s*like '%nextval\\(%'", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
- private static Pattern deallocatePattern = Pattern.compile("DEALLOCATE \"(\\w+\\d+_*)\""); //$NON-NLS-1$
- private static Pattern releasePattern = Pattern.compile("RELEASE (\\w+\\d+_*)"); //$NON-NLS-1$
- private static Pattern savepointPattern = Pattern.compile("SAVEPOINT (\\w+\\d+_*)"); //$NON-NLS-1$
- private static Pattern rollbackPattern = Pattern.compile("ROLLBACK\\s*(to)*\\s*(\\w+\\d+_*)*"); //$NON-NLS-1$
+ private static Pattern cursorSelectPattern = Pattern.compile("DECLARE \"(\\w+)\" CURSOR(\\s(WITH HOLD|SCROLL))? FOR (.*)", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+ private static Pattern fetchPattern = Pattern.compile("FETCH (\\d+) IN \"(\\w+)\".*", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+ private static Pattern movePattern = Pattern.compile("MOVE (\\d+) IN \"(\\w+)\".*", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+ private static Pattern closePattern = Pattern.compile("CLOSE \"(\\w+)\"", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+ private static Pattern deallocatePattern = Pattern.compile("DEALLOCATE \"(\\w+\\d+_*)\"", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+ private static Pattern releasePattern = Pattern.compile("RELEASE (\\w+\\d?_*)", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+ private static Pattern savepointPattern = Pattern.compile("SAVEPOINT (\\w+\\d?_*)", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+ private static Pattern rollbackPattern = Pattern.compile("ROLLBACK\\s*(to)*\\s*(\\w+\\d+_*)*", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+
private ODBCClientRemote client;
private Properties props;
private AuthenticationType authType;
@@ -150,6 +155,7 @@
// TODO: this is unbounded map; need to define some boundaries as to how many stmts each session can have
private Map<String, Prepared> preparedMap = Collections.synchronizedMap(new HashMap<String, Prepared>());
private Map<String, Portal> portalMap = Collections.synchronizedMap(new HashMap<String, Portal>());
+ private Map<String, Cursor> cursorMap = Collections.synchronizedMap(new HashMap<String, Cursor>());
public ODBCServerRemoteImpl(ODBCClientRemote client, AuthenticationType authType) {
this.client = client;
@@ -189,6 +195,76 @@
}
@Override
+ public void cursorExecute(String cursorName, String sql) {
+ if (this.connection != null) {
+ if (sql != null) {
+ String modfiedSQL = sql.replaceAll("\\$\\d+", "?");//$NON-NLS-1$ //$NON-NLS-2$
+ try {
+ // close if the name is already used or the unnamed prepare; otherwise
+ // stmt is alive until session ends.
+ Prepared previous = this.preparedMap.remove(cursorName);
+ if (previous != null) {
+ previous.stmt.close();
+ }
+
+ PreparedStatement stmt = this.connection.prepareStatement(modfiedSQL);
+ boolean hasResults = stmt.execute();
+ this.cursorMap.put(cursorName, new Cursor(cursorName, sql, stmt, null, hasResults?stmt.getResultSet():null));
+ this.client.sendCommandComplete("DECLARE CURSOR", 0); //$NON-NLS-1$
+ } catch (SQLException e) {
+ this.client.errorOccurred(e);
+ } catch (IOException e) {
+ this.client.errorOccurred(e);
+ }
+ }
+ }
+ else {
+ this.client.errorOccurred(RuntimePlugin.Util.getString("no_active_connection")); //$NON-NLS-1$
+ }
+
+ }
+ @Override
+ public void cursorFetch(String cursorName, int rows) {
+ Cursor cursor = this.cursorMap.get(cursorName);
+ if (cursor != null) {
+ cursor.fetchSize = rows;
+ this.client.sendCursorResults(cursor.rs, rows);
+ }
+ else {
+ this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", cursorName)); //$NON-NLS-1$
+ return;
+ }
+ }
+
+ @Override
+ public void cursorMove(String prepareName, int rows) {
+ Cursor cursor = this.cursorMap.get(prepareName);
+ if (cursor != null) {
+ this.client.sendMoveCursor(cursor.rs, rows);
+ }
+ else {
+ this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", prepareName)); //$NON-NLS-1$
+ return;
+ }
+ }
+
+ @Override
+ public void cursorClose(String prepareName) {
+ Cursor cursor = this.cursorMap.remove(prepareName);
+ if (cursor != null) {
+ try {
+ cursor.rs.close();
+ cursor.stmt.close();
+ this.client.sendCommandComplete("CLOSE CURSOR", 0); //$NON-NLS-1$
+ } catch (SQLException e) {
+ this.client.errorOccurred(e);
+ } catch (IOException e) {
+ this.client.errorOccurred(e);
+ }
+ }
+ }
+
+ @Override
public void prepare(String prepareName, String sql, int[] paramType) {
if (this.connection != null) {
@@ -263,43 +339,50 @@
bindName = UNNAMED;
}
- Portal query = this.portalMap.get(bindName);
- if (query == null) {
- this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
- sync();
- }
+ Cursor cursor = this.cursorMap.get(bindName);
+ if (cursor != null) {
+ this.client.sendPortalResults(cursor.sql, cursor.rs, cursor.fetchSize, true);
+ }
else {
- if (query.sql.trim().isEmpty()) {
- this.client.emptyQueryReceived();
- return;
+ Portal query = this.portalMap.get(bindName);
+ if (query == null) {
+ this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
+ sync();
+ }
+ else {
+ if (query.sql.trim().isEmpty()) {
+ this.client.emptyQueryReceived();
+ return;
+ }
+
+ PreparedStatement stmt = query.stmt;
+ try {
+ // maxRows = 0, means unlimited.
+ if (maxRows != 0) {
+ stmt.setMaxRows(maxRows);
+ }
+
+ boolean result = stmt.execute();
+ if (result) {
+ try {
+ ResultSet rs = stmt.getResultSet();
+ this.client.sendResults(query.sql, rs, true);
+ } catch (SQLException e) {
+ this.client.errorOccurred(e);
+ }
+ } else {
+ this.client.sendUpdateCount(query.sql, stmt.getUpdateCount());
+ }
+ } catch (SQLException e) {
+ this.client.errorOccurred(e);
+ }
}
-
- PreparedStatement stmt = query.stmt;
- try {
- // maxRows = 0, means unlimited.
- if (maxRows != 0) {
- stmt.setMaxRows(maxRows);
- }
-
- boolean result = stmt.execute();
- if (result) {
- try {
- ResultSet rs = stmt.getResultSet();
- this.client.sendResults(query.sql, rs, true);
- } catch (SQLException e) {
- this.client.errorOccurred(e);
- }
- } else {
- this.client.sendUpdateCount(query.sql, stmt.getUpdateCount());
- }
- } catch (SQLException e) {
- this.client.errorOccurred(e);
- }
}
}
private String fixSQL(String sql) {
String modified = sql;
+ Matcher m = null;
// select current_schema()
// set client_encoding to 'WIN1252'
if (sql != null) {
@@ -307,8 +390,7 @@
String sqlLower = sql.toLowerCase();
if (sqlLower.startsWith("select")) { //$NON-NLS-1$
modified = sql.replace('\n', ' ');
-
- Matcher m = null;
+
if ((m = pkPattern.matcher(modified)).matches()) {
modified = new StringBuffer("SELECT k.Name AS attname, convert(Position, short) AS attnum, TableName AS relname, SchemaName AS nspname, TableName AS relname") //$NON-NLS-1$
.append(" FROM SYS.KeyColumns k") //$NON-NLS-1$
@@ -367,7 +449,7 @@
modified = "select 63"; //$NON-NLS-1$
}
else {
- Matcher m = setPattern.matcher(sql);
+ m = setPattern.matcher(sql);
if (m.matches()) {
if (m.group(2).equalsIgnoreCase("client_encoding")) { //$NON-NLS-1$
this.client.setEncoding(PGCharsetConverter.getCharset(m.group(4)));
@@ -383,16 +465,6 @@
else if ((m = rollbackPattern.matcher(modified)).matches()) {
modified = "ROLLBACK"; //$NON-NLS-1$
}
- else if ((m = savepointPattern.matcher(modified)).matches()) {
- modified = "SELECT 'SAVEPOINT'"; //$NON-NLS-1$
- }
- else if ((m = releasePattern.matcher(modified)).matches()) {
- modified = "SELECT 'RELEASE'"; //$NON-NLS-1$
- }
- else if ((m = deallocatePattern.matcher(modified)).matches()) {
- closePreparedStatement(m.group(1));
- modified = "SELECT 'DEALLOCATE'"; //$NON-NLS-1$
- }
}
if (modified != null && !modified.equalsIgnoreCase(sql)) {
LogManager.logDetail(LogConstants.CTX_ODBC, "Modified Query:"+modified); //$NON-NLS-1$
@@ -417,39 +489,71 @@
try {
ScriptReader reader = new ScriptReader(new StringReader(query));
String sql = reader.readStatement();
- String s = fixSQL(sql);
- while (s != null) {
- Statement stmt = null;
- try {
- stmt = this.connection.createStatement();
- boolean result = stmt.execute(s);
- if (result) {
- this.client.sendResults(sql, stmt.getResultSet(), true);
- } else {
- this.client.sendUpdateCount(sql, stmt.getUpdateCount());
- }
- sql = reader.readStatement();
- s = fixSQL(sql);
- } catch (SQLException e) {
- this.client.errorOccurred(e);
- break;
- } finally {
- try {
- if (stmt != null) {
- stmt.close();
- }
- } catch (SQLException e) {
- this.client.errorOccurred(e);
+ while (sql != null) {
+ Matcher m = null;
+ if ((m = cursorSelectPattern.matcher(sql)).matches()){
+ cursorExecute(m.group(1), fixSQL(m.group(4)));
+ }
+ else if ((m = fetchPattern.matcher(sql)).matches()){
+ cursorFetch(m.group(2), Integer.parseInt(m.group(1)));
+ }
+ else if ((m = movePattern.matcher(sql)).matches()){
+ cursorMove(m.group(2), Integer.parseInt(m.group(1)));
+ }
+ else if ((m = closePattern.matcher(sql)).matches()){
+ cursorClose(m.group(1));
+ }
+ else if ((m = savepointPattern.matcher(sql)).matches()) {
+ this.client.sendCommandComplete("SAVEPOINT", 0); //$NON-NLS-1$
+ }
+ else if ((m = releasePattern.matcher(sql)).matches()) {
+ this.client.sendCommandComplete("RELEASE", 0); //$NON-NLS-1$
+ }
+ else if ((m = deallocatePattern.matcher(sql)).matches()) {
+ closePreparedStatement(m.group(1));
+ this.client.sendCommandComplete("DEALLOCATE", 0); //$NON-NLS-1$
+ }
+
+ else {
+ if (!executeAndSend(fixSQL(sql))) {
break;
}
- }
+ }
+ sql = reader.readStatement();
}
+ sync();
} catch(IOException e) {
this.client.errorOccurred(e);
}
- sync();
}
+ private boolean executeAndSend(String sql) {
+ boolean sucess = true;
+ Statement stmt = null;
+ try {
+ stmt = this.connection.createStatement();
+ boolean result = stmt.execute(sql);
+ if (result) {
+ this.client.sendResults(sql, stmt.getResultSet(), true);
+ } else {
+ this.client.sendUpdateCount(sql, stmt.getUpdateCount());
+ }
+ } catch (SQLException e) {
+ this.client.errorOccurred(e);
+ sucess = false;
+ } finally {
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+ } catch (SQLException e) {
+ this.client.errorOccurred(e);
+ sucess = false;
+ }
+ }
+ return sucess;
+ }
+
@Override
public void getParameterDescription(String prepareName) {
if (prepareName == null || prepareName.length() == 0) {
@@ -625,6 +729,16 @@
*/
int[] paramType;
}
+
+ static class Cursor extends Prepared {
+ ResultSet rs;
+ int fetchSize = 1000;
+
+ public Cursor (String name, String sql, PreparedStatement stmt, int[] paramType, ResultSet rs) {
+ super(name, sql, stmt, paramType);
+ this.rs = rs;
+ }
+ }
/**
* Represents a PostgreSQL Portal object.
Modified: branches/7.1.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
===================================================================
--- branches/7.1.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java 2011-09-13 23:19:40 UTC (rev 3481)
+++ branches/7.1.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java 2011-09-13 23:19:51 UTC (rev 3482)
@@ -241,6 +241,61 @@
}
@Override
+ public void sendCursorResults(ResultSet rs, int rowCount) {
+ try {
+ try {
+ ResultSetMetaData meta = rs.getMetaData();
+ sendRowDescription(meta, rs.getStatement());
+ int rowsSent = sendDataRows(rs, rowCount);
+ sendCommandComplete("FETCH", rowsSent);
+ } catch (SQLException e) {
+ sendErrorResponse(e);
+ }
+ } catch (IOException e) {
+ terminate(e);
+ }
+ }
+
+ @Override
+ public void sendPortalResults(String sql, ResultSet rs, int rowCount, boolean portal) {
+ try {
+ try {
+ int rowsSent = sendDataRows(rs, rowCount);
+ if (rowsSent < rowCount) {
+ sendCommandComplete(sql, 0);
+ }
+ else {
+ sendPortalSuspended();
+ }
+ } catch (SQLException e) {
+ sendErrorResponse(e);
+ }
+ } catch (IOException e) {
+ terminate(e);
+ }
+ }
+
+ @Override
+ public void sendMoveCursor(ResultSet rs, int rowCount) {
+ try {
+ try {
+ int rowsMoved = 0;
+ for (int i = 0; i < rowCount; i++) {
+ if (!rs.next()) {
+ break;
+ }
+ rowsMoved++;
+ }
+ sendCommandComplete("MOVE", rowsMoved);
+ } catch (SQLException e) {
+ sendErrorResponse(e);
+ }
+ } catch (IOException e) {
+ terminate(e);
+ }
+ }
+
+ @Override
public void sendResults(String sql, ResultSet rs, boolean describeRows) {
try {
try {
@@ -248,9 +303,7 @@
ResultSetMetaData meta = rs.getMetaData();
sendRowDescription(meta, rs.getStatement());
}
- while (rs.next()) {
- sendDataRow(rs);
- }
+ sendDataRows(rs, -1);
sendCommandComplete(sql, 0);
} catch (SQLException e) {
sendErrorResponse(e);
@@ -311,8 +364,9 @@
startMessage('I');
sendMessage();
}
-
- private void sendCommandComplete(String sql, int updateCount) throws IOException {
+
+ @Override
+ public void sendCommandComplete(String sql, int updateCount) throws IOException {
startMessage('C');
sql = sql.trim().toUpperCase();
// TODO remove remarks at the beginning
@@ -331,22 +385,64 @@
tag = "COMMIT";
} else if (sql.startsWith("ROLLBACK")) {
tag = "ROLLBACK";
- } else {
- trace("Check command tag: " + sql);
- tag = "UPDATE " + updateCount;
+ } else if (sql.startsWith("DECLARE CURSOR")) {
+ tag = "DECLARE CURSOR";
+ } else if (sql.startsWith("CLOSE CURSOR")) {
+ tag = "CLOSE CURSOR";
+ } else if (sql.startsWith("FETCH")) {
+ tag = "FETCH "+updateCount;
+ } else if (sql.startsWith("MOVE")) {
+ tag = "MOVE "+updateCount;
}
+ else {
+ tag = sql;
+ }
writeString(tag);
sendMessage();
}
-
- private void sendDataRow(ResultSet rs) throws SQLException, IOException {
+
+ // 300k
+ static int ODBC_SOCKET_BUFF_SIZE = Integer.parseInt(System.getProperty("ODBCPacketSize", "307200"));
+
+ private int sendDataRows(ResultSet rs, int rowsToSend) throws SQLException, IOException {
+ int avgRowsize = -1;
+ int rowCount = 0;
+ ChannelBuffer buffer = ChannelBuffers.directBuffer(ODBC_SOCKET_BUFF_SIZE);
int columns = rs.getMetaData().getColumnCount();
- String[] values = new String[columns];
- for (int i = 0; i < columns; i++) {
- values[i] = rs.getString(i + 1);
+ int rowsSent = 0;
+
+ while(rs.next()) {
+ String[] values = new String[columns];
+ for (int i = 0; i < columns; i++) {
+ values[i] = rs.getString(i + 1);
+ }
+
+ rowCount++;
+
+ buildDataRow(values, buffer);
+ avgRowsize = buffer.readableBytes()/rowCount;
+
+ if (buffer.writableBytes() < (avgRowsize*2)) {
+ Channels.write(this.ctx, this.message.getFuture(), buffer, this.message.getRemoteAddress());
+ rowCount = 0;
+ buffer= ChannelBuffers.directBuffer(ODBC_SOCKET_BUFF_SIZE);
+ }
+
+ rowsSent++;
+ if (rowsSent == rowsToSend) {
+ break;
+ }
}
+
+ if (rowCount > 0) {
+ Channels.write(this.ctx, this.message.getFuture(), buffer, this.message.getRemoteAddress());
+ }
+ return rowsSent;
+ }
+
+ private void buildDataRow(String[] values, ChannelBuffer buffer) throws IOException {
startMessage('D');
- writeShort(columns);
+ writeShort(values.length);
for (String s : values) {
if (s == null) {
writeInt(-1);
@@ -357,7 +453,16 @@
write(d2);
}
}
- sendMessage();
+
+ byte[] buff = outBuffer.toByteArray();
+ int len = buff.length;
+ this.outBuffer = null;
+ this.dataOut = null;
+
+ // now build the wire contents.
+ buffer.writeByte((byte)this.messageType);
+ buffer.writeInt(len+4);
+ buffer.writeBytes(buff);
}
private void sendErrorResponse(Throwable t) throws IOException {
@@ -474,6 +579,11 @@
startMessage('2');
sendMessage();
}
+
+ private void sendPortalSuspended() {
+ startMessage('s');
+ sendMessage();
+ }
private void sendAuthenticationCleartextPassword() throws IOException {
startMessage('R');
14 years, 7 months