Author: shawkins
Date: 2010-07-29 11:18:33 -0400 (Thu, 29 Jul 2010)
New Revision: 2391
Added:
trunk/api/src/main/java/org/teiid/language/IteratorValueSource.java
trunk/engine/src/main/java/org/teiid/query/optimizer/BatchedUpdatePlanner.java
Removed:
trunk/engine/src/main/java/org/teiid/query/optimizer/batch/
trunk/engine/src/test/java/org/teiid/cache/
Modified:
trunk/api/src/main/java/org/teiid/language/visitor/AbstractLanguageVisitor.java
trunk/api/src/main/java/org/teiid/language/visitor/CollectorVisitor.java
trunk/api/src/main/java/org/teiid/language/visitor/LanguageObjectVisitor.java
trunk/api/src/main/java/org/teiid/language/visitor/SQLStringVisitor.java
trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java
trunk/build/kits/jboss-container/teiid-releasenotes.html
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/JDBCExecutionFactory.java
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/JDBCUpdateExecution.java
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/TranslatedCommand.java
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/modeshape/ModeShapeExecutionFactory.java
trunk/connectors/translator-jdbc/src/test/java/org/teiid/translator/jdbc/TestJDBCUpdateExecution.java
trunk/documentation/developer-guide/src/main/docbook/en-US/content/translator-api.xml
trunk/documentation/reference/src/main/docbook/en-US/content/translators.xml
trunk/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java
trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
trunk/engine/src/main/java/org/teiid/common/buffer/TupleBrowser.java
trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
trunk/engine/src/main/java/org/teiid/common/buffer/TupleSource.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/CapabilitiesConverter.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/LanguageBridgeFactory.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
trunk/engine/src/main/java/org/teiid/query/function/metadata/FunctionMethod.java
trunk/engine/src/main/java/org/teiid/query/function/source/SystemSource.java
trunk/engine/src/main/java/org/teiid/query/metadata/TempCapabilitiesFinder.java
trunk/engine/src/main/java/org/teiid/query/optimizer/QueryOptimizer.java
trunk/engine/src/main/java/org/teiid/query/optimizer/capabilities/SourceCapabilities.java
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/PlanToProcessConverter.java
trunk/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
trunk/engine/src/main/java/org/teiid/query/processor/CollectionTupleSource.java
trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentCriteriaProcessor.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentValueSource.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/GroupingNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectIntoNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortingFilter.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
trunk/engine/src/main/java/org/teiid/query/sql/lang/Insert.java
trunk/engine/src/main/java/org/teiid/query/sql/lang/Query.java
trunk/engine/src/main/java/org/teiid/query/sql/visitor/SQLStringVisitor.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
trunk/engine/src/test/java/org/teiid/query/optimizer/TestOptimizer.java
trunk/engine/src/test/java/org/teiid/query/processor/FakeTupleSource.java
trunk/engine/src/test/java/org/teiid/query/processor/TestCollectionTupleSource.java
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestBatchedUpdateNode.java
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestGroupingNode.java
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java
Log:
TEIID-892 introduced a way better control large inserts. cleaned up the tuplesource
interface. TEIID-1167 changed temp tables to support the iterator based inserts since
they do not support bulk delete/update logic.
Added: trunk/api/src/main/java/org/teiid/language/IteratorValueSource.java
===================================================================
--- trunk/api/src/main/java/org/teiid/language/IteratorValueSource.java
(rev 0)
+++ trunk/api/src/main/java/org/teiid/language/IteratorValueSource.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.language;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.teiid.language.visitor.LanguageObjectVisitor;
+
+public class IteratorValueSource<T extends List<?>> extends
BaseLanguageObject implements InsertValueSource {
+
+ private Iterator<T> iter;
+ private int columnCount;
+
+ public IteratorValueSource(Iterator<T> iter, int columnCount) {
+ this.iter = iter;
+ this.columnCount = columnCount;
+ }
+
+ /**
+ * A memory safe iterator of the insert values. Only 1 iterator is associated
+ * with the value source. Once it is consumed there are no more values.
+ * @return
+ */
+ public Iterator<T> getIterator() {
+ return iter;
+ }
+
+ public int getColumnCount() {
+ return columnCount;
+ }
+
+ @Override
+ public void acceptVisitor(LanguageObjectVisitor visitor) {
+ visitor.visit(this);
+ }
+
+}
Property changes on: trunk/api/src/main/java/org/teiid/language/IteratorValueSource.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/api/src/main/java/org/teiid/language/visitor/AbstractLanguageVisitor.java
===================================================================
---
trunk/api/src/main/java/org/teiid/language/visitor/AbstractLanguageVisitor.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/api/src/main/java/org/teiid/language/visitor/AbstractLanguageVisitor.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -41,6 +41,7 @@
import org.teiid.language.In;
import org.teiid.language.Insert;
import org.teiid.language.IsNull;
+import org.teiid.language.IteratorValueSource;
import org.teiid.language.Join;
import org.teiid.language.LanguageObject;
import org.teiid.language.Like;
@@ -146,4 +147,5 @@
public void visit(SetQuery obj) {}
public void visit(SetClause obj) {}
public void visit(SearchedWhenClause obj) {}
+ public void visit(IteratorValueSource obj) {}
}
Modified: trunk/api/src/main/java/org/teiid/language/visitor/CollectorVisitor.java
===================================================================
--- trunk/api/src/main/java/org/teiid/language/visitor/CollectorVisitor.java 2010-07-28
21:11:54 UTC (rev 2390)
+++ trunk/api/src/main/java/org/teiid/language/visitor/CollectorVisitor.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -44,6 +44,7 @@
import org.teiid.language.In;
import org.teiid.language.Insert;
import org.teiid.language.IsNull;
+import org.teiid.language.IteratorValueSource;
import org.teiid.language.Join;
import org.teiid.language.LanguageObject;
import org.teiid.language.Like;
@@ -226,6 +227,11 @@
public void visit(SearchedWhenClause obj) {
checkInstance(obj);
}
+
+ @Override
+ public void visit(IteratorValueSource obj) {
+ checkInstance(obj);
+ }
/**
* This is a utility method to instantiate and run the visitor in conjunction
Modified: trunk/api/src/main/java/org/teiid/language/visitor/LanguageObjectVisitor.java
===================================================================
---
trunk/api/src/main/java/org/teiid/language/visitor/LanguageObjectVisitor.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/api/src/main/java/org/teiid/language/visitor/LanguageObjectVisitor.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -61,4 +61,5 @@
public void visit(SetQuery obj);
public void visit(SetClause obj);
public void visit(SearchedWhenClause obj);
+ public void visit(IteratorValueSource obj);
}
Modified: trunk/api/src/main/java/org/teiid/language/visitor/SQLStringVisitor.java
===================================================================
--- trunk/api/src/main/java/org/teiid/language/visitor/SQLStringVisitor.java 2010-07-28
21:11:54 UTC (rev 2390)
+++ trunk/api/src/main/java/org/teiid/language/visitor/SQLStringVisitor.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -50,6 +50,7 @@
import org.teiid.language.In;
import org.teiid.language.Insert;
import org.teiid.language.IsNull;
+import org.teiid.language.IteratorValueSource;
import org.teiid.language.Join;
import org.teiid.language.LanguageObject;
import org.teiid.language.Like;
@@ -494,6 +495,18 @@
append(obj.getValues());
buffer.append(Tokens.RPAREN);
}
+
+ @Override
+ public void visit(IteratorValueSource obj) {
+ buffer.append(VALUES).append(Tokens.SPACE).append(Tokens.LPAREN);
+ for (int i = 0; i < obj.getColumnCount(); i++) {
+ if (i > 0) {
+ buffer.append(", "); //$NON-NLS-1$
+ }
+ buffer.append("?"); //$NON-NLS-1$
+ }
+ buffer.append(Tokens.RPAREN);
+ }
public void visit(IsNull obj) {
append(obj.getExpression());
Modified: trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java
===================================================================
--- trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java 2010-07-28 21:11:54
UTC (rev 2390)
+++ trunk/api/src/main/java/org/teiid/translator/ExecutionFactory.java 2010-07-29 15:18:33
UTC (rev 2391)
@@ -23,6 +23,7 @@
package org.teiid.translator;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import javax.resource.ResourceException;
@@ -36,6 +37,7 @@
import org.teiid.language.Command;
import org.teiid.language.LanguageFactory;
import org.teiid.language.QueryExpression;
+import org.teiid.language.Select;
import org.teiid.language.SetQuery;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
@@ -730,12 +732,22 @@
/**
* Support indicates that the connector can accept INSERTs with
- * values specified by an {@link SetQuery}.
+ * values specified by a {@link SetQuery} or {@link Select}
* @since 6.1
*/
public boolean supportsInsertWithQueryExpression() {
return false;
}
+
+ /**
+ * Support indicates that the connector can accept INSERTs
+ * with values specified by an {@link Iterator}
+ * @since 7.1
+ * @return
+ */
+ public boolean supportsInsertWithIterator() {
+ return false;
+ }
public static <T> T getInstance(Class<T> expectedType, String className,
Collection<?> ctorObjs, Class<? extends T> defaultClass) throws
TranslatorException {
try {
Modified: trunk/build/kits/jboss-container/teiid-releasenotes.html
===================================================================
--- trunk/build/kits/jboss-container/teiid-releasenotes.html 2010-07-28 21:11:54 UTC (rev
2390)
+++ trunk/build/kits/jboss-container/teiid-releasenotes.html 2010-07-29 15:18:33 UTC (rev
2391)
@@ -10,7 +10,7 @@
<P><A
HREF="http://www.teiid.org/"><IMG
SRC="https://www.jboss.org/dms/teiid/images/teiid_banner.png"
NAME="graphics1" ALT="Teiid" ALIGN=BOTTOM WIDTH=800></A>
<H1>Teiid ${project.version} Release Notes</H1>
-<P>Teiid ${project.version} adds ODBC, SQL, and caching features.
+<P>Teiid ${project.version} adds ODBC, SQL, performance, and caching features.
<H2>Overview</H2>
<UL>
@@ -38,7 +38,11 @@
<li>Added PRIMARY KEY and the associated index support to temp tables.
</ul>
<LI><B>Parallel Source Queries</B> - reestablished parallel
execution of source queries within a query plan along with a prioritized work system to
help prevent resource contention.
+ <LI><B>Improved Insert Support</B> - Cross source inserts using a
query expression can defer to the source to perform the entire insert atomically with
source controlled batching.
+ See maxPreparedInsertBatchSize on any JDBC translator supporting the
InsertWithIterator capability.
<LI><B>SHOW Statement</B> - added client handling for the SHOW
statement to retrieve query plan information and see parameter values.
+ <LI><B>User Identity</B> - Teiid DataSources in JBossAS connecting
to a local Teiid instance can preserve the thread's user identity, rather than
requiring
+ a new authentication. See TeiidDataSource.setPassthroughAuthentication.
</UL>
<h2><a name="Compatibility">Compatibility
Issues</a></h2>
Modified:
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/JDBCExecutionFactory.java
===================================================================
---
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/JDBCExecutionFactory.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/JDBCExecutionFactory.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -139,6 +139,7 @@
private boolean trimStrings;
private boolean useCommentsInSourceQuery;
private String version;
+ private int maxInsertBatchSize = 2048;
private AtomicBoolean initialConnection = new AtomicBoolean(true);
@@ -440,6 +441,11 @@
}
@Override
+ public boolean supportsInsertWithIterator() {
+ return super.supportsBulkUpdate();
+ }
+
+ @Override
public boolean supportsBatchedUpdates() {
return true;
}
@@ -462,9 +468,25 @@
@Override
public boolean supportsInsertWithQueryExpression() {
return true;
- }
+ }
/**
+ * Get the max number of inserts to perform in one batch.
+ * @return
+ */
+ @TranslatorProperty(display="Max Prepared Insert Batch Size",
description="The max size of a prepared insert batch. Default 2048.",
advanced=true)
+ public int getMaxPreparedInsertBatchSize() {
+ return maxInsertBatchSize;
+ }
+
+ public void setMaxPreparedInsertBatchSize(int maxInsertBatchSize) {
+ if (maxInsertBatchSize < 1) {
+ throw new AssertionError("Max prepared batch insert size must be greater than
0"); //$NON-NLS-1$
+ }
+ this.maxInsertBatchSize = maxInsertBatchSize;
+ }
+
+ /**
* Gets the database calendar. This will be set to the time zone
* specified by the property {@link JDBCPropertyNames#DATABASE_TIME_ZONE}, or
* the local time zone if none is specified.
Modified:
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/JDBCUpdateExecution.java
===================================================================
---
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/JDBCUpdateExecution.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/JDBCUpdateExecution.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -26,14 +26,17 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import org.teiid.language.BatchedUpdates;
import org.teiid.language.Command;
+import org.teiid.language.Insert;
+import org.teiid.language.IteratorValueSource;
import org.teiid.language.Literal;
-import org.teiid.translator.TranslatorException;
import org.teiid.translator.DataNotAvailableException;
import org.teiid.translator.ExecutionContext;
+import org.teiid.translator.TranslatorException;
import org.teiid.translator.UpdateExecution;
@@ -163,8 +166,46 @@
int updateCount = 0;
if (!translatedComm.isPrepared()) {
updateCount = getStatement().executeUpdate(sql);
+ addStatementWarnings();
} else {
PreparedStatement pstatement = getPreparedStatement(sql);
+
+ if (command instanceof Insert) {
+ Insert insert = (Insert)command;
+ if (insert.getValueSource() instanceof IteratorValueSource) {
+ commitType = getAutoCommit(translatedComm);
+ if (commitType) {
+ connection.setAutoCommit(false);
+ }
+
+ IteratorValueSource<List<Object>> ivs =
(IteratorValueSource)insert.getValueSource();
+ List<Object>[] values = new List[ivs.getColumnCount()];
+ for (int i = 0; i < ivs.getColumnCount(); i++) {
+ values[i] = new ArrayList<Object>();
+ Literal literal = new Literal(values[i],
insert.getColumns().get(i).getType());
+ literal.setMultiValued(true);
+ translatedComm.getPreparedValues().add(literal);
+ }
+ Iterator<List<Object>> i = ivs.getIterator();
+ int maxBatchSize =
this.executionFactory.getMaxPreparedInsertBatchSize();
+ while (i.hasNext()) {
+ int batchSize = 0;
+ while (i.hasNext() && batchSize++ < maxBatchSize) {
+ List<Object> next = i.next();
+ for (int j = 0; j < ivs.getColumnCount(); j++) {
+ values[j].add(next.get(j));
+ }
+ }
+ updateCount += executePreparedBatch(translatedComm, pstatement,
batchSize);
+ for (int j = 0; j < ivs.getColumnCount(); j++) {
+ values[j].clear();
+ }
+ }
+ succeeded = true;
+ return new int[updateCount];
+ }
+ }
+
int rowCount = 1;
for (int i = 0; i< translatedComm.getPreparedValues().size(); i++) {
Literal paramValue =
(Literal)translatedComm.getPreparedValues().get(i);
@@ -179,19 +220,9 @@
connection.setAutoCommit(false);
}
}
- bindPreparedStatementValues(pstatement, translatedComm, rowCount);
- if (rowCount > 1) {
- int[] results = pstatement.executeBatch();
-
- for (int i=0; i<results.length; i++) {
- updateCount += results[i];
- }
- succeeded = true;
- } else {
- updateCount = pstatement.executeUpdate();
- }
+ updateCount = executePreparedBatch(translatedComm, pstatement,
rowCount);
+ succeeded = true;
}
- addStatementWarnings();
return new int[] {updateCount};
} catch (SQLException err) {
throw new JDBCExecutionException(err, translatedComm);
@@ -202,6 +233,23 @@
}
}
+ private int executePreparedBatch(TranslatedCommand translatedComm, PreparedStatement
pstatement, int rowCount)
+ throws SQLException {
+ bindPreparedStatementValues(pstatement, translatedComm, rowCount);
+ int updateCount = 0;
+ if (rowCount > 1) {
+ int[] results = pstatement.executeBatch();
+
+ for (int i=0; i<results.length; i++) {
+ updateCount += results[i];
+ }
+ } else {
+ updateCount = pstatement.executeUpdate();
+ }
+ addStatementWarnings();
+ return updateCount;
+ }
+
/**
* @param command
* @return
Modified:
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/TranslatedCommand.java
===================================================================
---
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/TranslatedCommand.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/TranslatedCommand.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -27,6 +27,8 @@
import java.util.List;
import org.teiid.language.Command;
+import org.teiid.language.Insert;
+import org.teiid.language.IteratorValueSource;
import org.teiid.language.Literal;
import org.teiid.language.visitor.CollectorVisitor;
import org.teiid.translator.TranslatorException;
@@ -89,6 +91,10 @@
return true;
}
}
+ if (command instanceof Insert) {
+ Insert insert = (Insert)command;
+ return insert.getValueSource() instanceof IteratorValueSource<?>;
+ }
return false;
}
Modified:
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/modeshape/ModeShapeExecutionFactory.java
===================================================================
---
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/modeshape/ModeShapeExecutionFactory.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/modeshape/ModeShapeExecutionFactory.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -30,7 +30,6 @@
import java.util.List;
import org.teiid.language.ColumnReference;
-import org.teiid.language.Command;
import org.teiid.language.Expression;
import org.teiid.language.Function;
import org.teiid.language.LanguageObject;
@@ -39,7 +38,6 @@
import org.teiid.translator.ExecutionContext;
import org.teiid.translator.Translator;
import org.teiid.translator.TranslatorException;
-import org.teiid.translator.TranslatorProperty;
import org.teiid.translator.jdbc.ConvertModifier;
import org.teiid.translator.jdbc.FunctionModifier;
import org.teiid.translator.jdbc.JDBCExecutionFactory;
@@ -52,8 +50,10 @@
@Translator(name="modeshape")
public class ModeShapeExecutionFactory extends JDBCExecutionFactory {
- private String version = "2.0"; //$NON-NLS-1$
-
+ public ModeShapeExecutionFactory() {
+ setDatabaseVersion("2.0"); //$NON-NLS-1$
+ }
+
@Override
public void start() throws TranslatorException {
super.start();
@@ -160,16 +160,7 @@
return "FALSE"; //$NON-NLS-1$
}
-
-
@Override
- public List<?> translateCommand(Command command, ExecutionContext context) {
- return super.translateCommand(command, context);
- }
-
-
-
- @Override
public String translateLiteralDate(Date dateValue) {
return "DATE '" + formatDateValue(dateValue) + "'";
//$NON-NLS-1$//$NON-NLS-2$
}
@@ -201,15 +192,6 @@
}
- @TranslatorProperty(description= "ModeShape Repository Version")
- public String getDatabaseVersion() {
- return this.version;
- }
-
- public void setDatabaseVersion(String version) {
- this.version = version;
- }
-
@Override
public boolean useBindVariables() {
return false;
Modified:
trunk/connectors/translator-jdbc/src/test/java/org/teiid/translator/jdbc/TestJDBCUpdateExecution.java
===================================================================
---
trunk/connectors/translator-jdbc/src/test/java/org/teiid/translator/jdbc/TestJDBCUpdateExecution.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/connectors/translator-jdbc/src/test/java/org/teiid/translator/jdbc/TestJDBCUpdateExecution.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -24,13 +24,16 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import org.junit.Test;
import org.mockito.Mockito;
import org.teiid.language.Command;
import org.teiid.language.ExpressionValueSource;
import org.teiid.language.Insert;
+import org.teiid.language.IteratorValueSource;
import org.teiid.language.Literal;
import org.teiid.translator.ExecutionContext;
@@ -58,4 +61,23 @@
Mockito.verify(p, Mockito.times(2)).addBatch();
}
+ @Test public void testInsertIteratorUpdate() throws Exception {
+ Insert command = (Insert)TranslationHelper.helpTranslate(TranslationHelper.BQT_VDB,
"insert into BQT1.SmallA (IntKey, IntNum) values (1, 2)"); //$NON-NLS-1$
+ List<List<Integer>> values = new ArrayList<List<Integer>>();
+ values.add(Arrays.asList(1, 2));
+ values.add(Arrays.asList(2, 3));
+ command.setValueSource(new IteratorValueSource(values.iterator(), 2));
+
+ Connection connection = Mockito.mock(Connection.class);
+ PreparedStatement p = Mockito.mock(PreparedStatement.class);
+ Mockito.stub(p.executeBatch()).toReturn(new int [] {1, 1});
+ Mockito.stub(connection.prepareStatement("INSERT INTO SmallA (IntKey, IntNum)
VALUES (?, ?)")).toReturn(p); //$NON-NLS-1$
+
+ JDBCExecutionFactory config = new JDBCExecutionFactory();
+
+ JDBCUpdateExecution updateExecution = new JDBCUpdateExecution(command, connection,
Mockito.mock(ExecutionContext.class), config);
+ updateExecution.execute();
+ Mockito.verify(p, Mockito.times(2)).addBatch();
+ }
+
}
Modified:
trunk/documentation/developer-guide/src/main/docbook/en-US/content/translator-api.xml
===================================================================
---
trunk/documentation/developer-guide/src/main/docbook/en-US/content/translator-api.xml 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/documentation/developer-guide/src/main/docbook/en-US/content/translator-api.xml 2010-07-29
15:18:33 UTC (rev 2391)
@@ -466,8 +466,8 @@
<para>Each <code>Insert</code> will have a single
<code>NamedTable</code> specifying the table being
inserted into. It will also has a list of <code>ColumnReference</code>
specifying the columns
of the <code>NamedTable</code> that are being inserted into. It
also has
- <code>InsertValueSource</code>, which will either be a list of
- <code>Expression(ExpressionValueSource)</code> or
<code>QueryExpression</code></para>
+ <code>InsertValueSource</code>, which will be a list of
+ Expressions (<code>ExpressionValueSource</code>), or a
<code>QueryExpression</code>, or an Iterator
(<code>IteratorValueSource</code>)</para>
</sect3>
<sect3>
@@ -1245,7 +1245,7 @@
</row>
<row>
<entry>
- <para>supportsBulkUpdate</para>
+ <para>BulkUpdate</para>
</entry>
<entry>
<para/>
@@ -1254,6 +1254,17 @@
<para>Translator supports updates with multiple value
sets</para>
</entry>
</row>
+ <row>
+ <entry>
+ <para>InsertWithIterator</para>
+ </entry>
+ <entry>
+ <para/>
+ </entry>
+ <entry>
+ <para>Translator supports inserts with an iterator of values.
The values would typically be from an evaluated QueryExpression.</para>
+ </entry>
+ </row>
</tbody>
</tgroup>
</table>
Modified: trunk/documentation/reference/src/main/docbook/en-US/content/translators.xml
===================================================================
---
trunk/documentation/reference/src/main/docbook/en-US/content/translators.xml 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/documentation/reference/src/main/docbook/en-US/content/translators.xml 2010-07-29
15:18:33 UTC (rev 2391)
@@ -196,6 +196,11 @@
<entry>This will embed a /*comment*/ leading comment with session/request id in
source SQL query for informational purposes</entry>
<entry>false</entry>
</row>
+<row>
+<entry>MaxPreparedInsertBatchSize</entry>
+<entry>The max size of a prepared insert batch.</entry>
+<entry>2048</entry>
+</row>
</tbody>
</tgroup>
</table>
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java 2010-07-28
21:11:54 UTC (rev 2390)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -73,6 +73,8 @@
protected abstract TupleBatch getBatch(int row) throws TeiidComponentException,
TeiidProcessingException;
+ protected abstract int available();
+
@Override
public void closeSource() {
batch = null;
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/STree.java 2010-07-28 21:11:54 UTC
(rev 2390)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/STree.java 2010-07-29 15:18:33 UTC
(rev 2391)
@@ -35,10 +35,8 @@
/**
* Self balancing search tree using skip list like logic
- * This has similar performance similar to a B+/-Tree
- *
- * TODO: reserve additional memory for delete/update operations
- * TODO: double link to support desc key access
+ * This has similar performance similar to a B+/-Tree,
+ * but with fewer updates.
*/
@SuppressWarnings("unchecked")
public class STree {
@@ -67,7 +65,6 @@
int keyLength,
String[] types) {
randomSeed = seedGenerator.nextInt() | 0x00000100; // ensure nonzero
- randomSeed = 1;
this.keyManager = manager;
this.leafManager = leafManager;
this.comparator = comparator;
@@ -327,6 +324,10 @@
return oldSize;
}
+ public int getHeight() {
+ return header.length;
+ }
+
@Override
public String toString() {
StringBuffer result = new StringBuffer();
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBrowser.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBrowser.java 2010-07-28
21:11:54 UTC (rev 2390)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBrowser.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -28,11 +28,12 @@
import org.teiid.common.buffer.SPage.SearchResult;
import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
/**
* Implements intelligent browsing over a {@link STree}
*/
-public class TupleBrowser {
+public class TupleBrowser implements TupleSource {
private final STree tree;
@@ -132,12 +133,9 @@
return result;
}
- /**
- * Returns the next tuple or null if there are no more results.
- * @return
- * @throws TeiidComponentException
- */
- public List<?> next() throws TeiidComponentException {
+ @Override
+ public List<?> nextTuple() throws TeiidComponentException,
+ TeiidProcessingException {
for (;;) {
//first check for value iteration
if (valueSet != null) {
@@ -237,4 +235,10 @@
public void removed() {
index-=getOffset();
}
+
+ @Override
+ public void closeSource() {
+
+ }
+
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2010-07-28
21:11:54 UTC (rev 2390)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -268,14 +268,12 @@
* @return
*/
public IndexedTupleSource createIndexedTupleSource(final boolean singleUse) {
+ if (singleUse) {
+ setForwardOnly(true);
+ }
return new AbstractTupleSource() {
@Override
- public List<? extends Expression> getSchema() {
- return (List<? extends Expression>) schema;
- }
-
- @Override
protected List<?> finalRow() throws BlockedException {
if(isFinal) {
return null;
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleSource.java 2010-07-28
21:11:54 UTC (rev 2390)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleSource.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -26,7 +26,6 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
-import org.teiid.query.sql.symbol.Expression;
/**
@@ -37,12 +36,6 @@
public interface TupleSource {
/**
- * Returns the List of ElementSymbol describing the Tuple Source
- * @return the List of elements describing the Tuple Source
- */
- List<? extends Expression> getSchema();
-
- /**
* Returns the next tuple
* @return the next tuple (a List object), or <code>null</code> if
* there are no more tuples.
@@ -58,10 +51,4 @@
*/
void closeSource();
- /**
- * Returns an estimate of the number of rows that can be read without blocking.
- * @return
- */
- int available();
-
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/CapabilitiesConverter.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/CapabilitiesConverter.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/CapabilitiesConverter.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -96,7 +96,7 @@
tgtCaps.setCapabilitySupport(Capability.QUERY_ORDERBY_UNRELATED,
srcCaps.supportsOrderByUnrelated());
tgtCaps.setCapabilitySupport(Capability.QUERY_AGGREGATES_ENHANCED_NUMERIC,
srcCaps.supportsAggregatesEnhancedNumeric());
tgtCaps.setCapabilitySupport(Capability.QUERY_ORDERBY_NULL_ORDERING,
srcCaps.supportsOrderByNullOrdering());
-
+ tgtCaps.setCapabilitySupport(Capability.INSERT_WITH_ITERATOR,
srcCaps.supportsInsertWithIterator());
List functions = srcCaps.getSupportedFunctions();
if(functions != null && functions.size() > 0) {
Iterator iter = functions.iterator();
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/LanguageBridgeFactory.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/LanguageBridgeFactory.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/LanguageBridgeFactory.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -25,10 +25,13 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import org.teiid.api.exception.query.QueryMetadataException;
import org.teiid.client.metadata.ParameterInfo;
+import org.teiid.common.buffer.TupleSource;
import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidException;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.language.AggregateFunction;
import org.teiid.language.AndOr;
@@ -44,6 +47,7 @@
import org.teiid.language.In;
import org.teiid.language.InsertValueSource;
import org.teiid.language.IsNull;
+import org.teiid.language.IteratorValueSource;
import org.teiid.language.Join;
import org.teiid.language.Like;
import org.teiid.language.Literal;
@@ -540,6 +544,40 @@
InsertValueSource valueSource = null;
if (insert.getQueryExpression() != null) {
valueSource = translate(insert.getQueryExpression());
+ } else if (insert.getTupleSource() != null) {
+ final TupleSource ts = insert.getTupleSource();
+ valueSource = new IteratorValueSource<List<?>>(new
Iterator<List<?>>() {
+
+ List<?> next;
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<?> next() {
+ if (hasNext()) {
+ List<?> result = next;
+ next = null;
+ return result;
+ }
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (next != null) {
+ return true;
+ }
+ try {
+ next = ts.nextTuple();
+ } catch (TeiidException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ return next != null;
+ }
+ }, elements.size());
} else {
// This is for the simple one row insert.
List values = insert.getValues();
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -307,7 +307,7 @@
break;
}
}
- return new CollectionTupleSource(rows.iterator(), command.getProjectedSymbols());
+ return new CollectionTupleSource(rows.iterator());
}
private List<Schema> getVisibleSchemas(VDBMetaData vdb, CompositeMetadataStore
metadata) {
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -299,12 +299,4 @@
return this.aqr.isTransactional();
}
- @Override
- public int available() {
- if (this.arm == null) {
- return 0;
- }
- return this.arm.getResults().length - index;
- }
-
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -41,7 +41,7 @@
import org.teiid.query.QueryPlugin;
import org.teiid.query.eval.Evaluator;
import org.teiid.query.metadata.QueryMetadataInterface;
-import org.teiid.query.optimizer.batch.BatchedUpdatePlanner;
+import org.teiid.query.optimizer.BatchedUpdatePlanner;
import org.teiid.query.optimizer.capabilities.SourceCapabilities;
import org.teiid.query.processor.ProcessorPlan;
import org.teiid.query.processor.relational.AccessNode;
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -28,7 +28,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.teiid.cache.Cache;
-import org.teiid.cache.CacheConfiguration;
import org.teiid.cache.CacheFactory;
import org.teiid.cache.DefaultCache;
import org.teiid.cache.DefaultCacheFactory;
Modified:
trunk/engine/src/main/java/org/teiid/query/function/metadata/FunctionMethod.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/function/metadata/FunctionMethod.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/function/metadata/FunctionMethod.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -69,7 +69,7 @@
/*
* always -> normal deterministic functions
- * server lifetime -> lookup (however lookup values can be flushed at any time)
+ * vdb -> lookup (however lookup values can be flushed at any time),
current_database
* session -> env, user
* command -> command payload
* never -> rand, etc.
@@ -83,7 +83,7 @@
* processing time.
*/
public static final int DETERMINISTIC = 0;
- public static final int SERVER_DETERMINISTIC = 1;
+ public static final int VDB_DETERMINISTIC = 1;
public static final int USER_DETERMINISTIC = 2;
public static final int SESSION_DETERMINISTIC = 3;
public static final int COMMAND_DETERMINISTIC = 4;
Modified: trunk/engine/src/main/java/org/teiid/query/function/source/SystemSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/function/source/SystemSource.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/function/source/SystemSource.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -762,7 +762,7 @@
new FunctionParameter("keyelement",
DataTypeManager.DefaultDataTypes.STRING,
QueryPlugin.Util.getString("SystemSource.Lookup_arg3")), //$NON-NLS-1$
//$NON-NLS-2$
new FunctionParameter("keyvalue", keyValueType,
QueryPlugin.Util.getString("SystemSource.Lookup_arg4")), //$NON-NLS-1$
//$NON-NLS-2$
},
- new FunctionParameter("result",
DataTypeManager.DefaultDataTypes.OBJECT,
QueryPlugin.Util.getString("SystemSource.Lookup_result")), true,
FunctionMethod.SERVER_DETERMINISTIC ) ); //$NON-NLS-1$ //$NON-NLS-2$
+ new FunctionParameter("result",
DataTypeManager.DefaultDataTypes.OBJECT,
QueryPlugin.Util.getString("SystemSource.Lookup_result")), true,
FunctionMethod.VDB_DETERMINISTIC ) ); //$NON-NLS-1$ //$NON-NLS-2$
}
}
@@ -773,11 +773,9 @@
}
private void addCurrentDatabaseFunction() {
- // this function is actually session_deterministic, however for caching purposes it
has been set deterministic
- // as caching is already scoped to the VDB.
functions.add(
new FunctionMethod("current_database",
QueryPlugin.Util.getString("SystemSource.current_database_desc"), MISCELLANEOUS,
FunctionMethod.CANNOT_PUSHDOWN, FUNCTION_CLASS, "current_database", null,
//$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
- new FunctionParameter("result",
DataTypeManager.DefaultDataTypes.STRING,
QueryPlugin.Util.getString("current_database_result")), false,
FunctionMethod.DETERMINISTIC) ); //$NON-NLS-1$ //$NON-NLS-2$
+ new FunctionParameter("result",
DataTypeManager.DefaultDataTypes.STRING,
QueryPlugin.Util.getString("current_database_result")), false,
FunctionMethod.VDB_DETERMINISTIC) ); //$NON-NLS-1$ //$NON-NLS-2$
}
private void addEnvFunction() {
Modified: trunk/engine/src/main/java/org/teiid/query/metadata/TempCapabilitiesFinder.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/metadata/TempCapabilitiesFinder.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/metadata/TempCapabilitiesFinder.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -43,7 +43,7 @@
if (TempMetadataAdapter.TEMP_MODEL.getID().equals(modelName)) {
if (tempCaps == null) {
tempCaps = new BasicSourceCapabilities();
- tempCaps.setCapabilitySupport(Capability.BULK_UPDATE, true);
+ tempCaps.setCapabilitySupport(Capability.INSERT_WITH_ITERATOR, true);
tempCaps.setCapabilitySupport(Capability.QUERY_ORDERBY, true);
tempCaps.setCapabilitySupport(Capability.CRITERIA_NOT, true);
tempCaps.setCapabilitySupport(Capability.CRITERIA_IN, true);
Copied: trunk/engine/src/main/java/org/teiid/query/optimizer/BatchedUpdatePlanner.java
(from rev 2386,
trunk/engine/src/main/java/org/teiid/query/optimizer/batch/BatchedUpdatePlanner.java)
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/optimizer/BatchedUpdatePlanner.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/query/optimizer/BatchedUpdatePlanner.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -0,0 +1,236 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.query.optimizer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.teiid.api.exception.query.QueryMetadataException;
+import org.teiid.api.exception.query.QueryPlannerException;
+import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.id.IDGenerator;
+import org.teiid.core.id.IntegerID;
+import org.teiid.query.analysis.AnalysisRecord;
+import org.teiid.query.execution.QueryExecPlugin;
+import org.teiid.query.metadata.QueryMetadataInterface;
+import org.teiid.query.optimizer.capabilities.CapabilitiesFinder;
+import org.teiid.query.optimizer.capabilities.SourceCapabilities;
+import org.teiid.query.optimizer.capabilities.SourceCapabilities.Capability;
+import org.teiid.query.optimizer.relational.rules.CapabilitiesUtil;
+import org.teiid.query.processor.ProcessorPlan;
+import org.teiid.query.processor.batch.BatchedUpdatePlan;
+import org.teiid.query.processor.relational.BatchedUpdateNode;
+import org.teiid.query.processor.relational.ProjectNode;
+import org.teiid.query.processor.relational.RelationalPlan;
+import org.teiid.query.sql.lang.BatchedUpdateCommand;
+import org.teiid.query.sql.lang.Command;
+import org.teiid.query.sql.lang.Delete;
+import org.teiid.query.sql.lang.Insert;
+import org.teiid.query.sql.lang.Update;
+import org.teiid.query.sql.symbol.GroupSymbol;
+import org.teiid.query.sql.util.VariableContext;
+import org.teiid.query.sql.visitor.EvaluatableVisitor;
+import org.teiid.query.util.CommandContext;
+
+
+
+/**
+ * Planner for BatchedUpdateCommands
+ * @since 4.2
+ */
+public class BatchedUpdatePlanner implements CommandPlanner {
+
+ /**
+ * Optimizes batched updates by batching all contiguous commands that relate to the
same physical model.
+ * For example, for the following batch of commands:
+ * <br/>
+ * <ol>
+ * <li>1. INSERT INTO physicalModel.myPhysical ...</li>
+ * <li>2. UPDATE physicalModel.myPhysical ... </li>
+ * <li>3. DELETE FROM virtualmodel.myVirtual ... </li>
+ * <li>4. UPDATE virtualmodel.myVirtual ... </li>
+ * <li>5. UPDATE physicalModel.myOtherPhysical ...</li>
+ * <li>6. INSERT INTO physicalModel.myOtherPhysical ... <li>
+ * <li>7. DELETE FROM physicalModel.myOtherPhysical ...</li>
+ * <li>8. INSERT INTO physicalModel.myPhysical ... </li>
+ * <li>9. INSERT INTO physicalModel.myPhysical ... </li>
+ * <li>10. INSERT INTO physicalModel.myPhysical ... </li>
+ * <li>11. INSERT INTO physicalModel.myPhysical ... </li>
+ * <li>12. INSERT INTO physicalModel.myPhysical ... </li>
+ * </ol>
+ * <br/> this implementation will batch as follows: (1,2), (5, 6, 7), (8 thru
12).
+ * The remaining commands/plans will be executed individually.
+ * @see org.teiid.query.optimizer.CommandPlanner#optimize(Command,
org.teiid.core.id.IDGenerator, org.teiid.query.metadata.QueryMetadataInterface,
org.teiid.query.optimizer.capabilities.CapabilitiesFinder,
org.teiid.query.analysis.AnalysisRecord, CommandContext)
+ * @since 4.2
+ */
+ public ProcessorPlan optimize(Command command,
+ IDGenerator idGenerator,
+ QueryMetadataInterface metadata,
+ CapabilitiesFinder capFinder,
+ AnalysisRecord analysisRecord, CommandContext context)
+ throws QueryPlannerException, QueryMetadataException, TeiidComponentException {
+ BatchedUpdateCommand batchedUpdateCommand = (BatchedUpdateCommand)command;
+ List childPlans = new
ArrayList(batchedUpdateCommand.getUpdateCommands().size());
+ List updateCommands = batchedUpdateCommand.getUpdateCommands();
+ int numCommands = updateCommands.size();
+ List<VariableContext> allContexts =
batchedUpdateCommand.getVariableContexts();
+ List<VariableContext> planContexts = null;
+ if (allContexts != null) {
+ planContexts = new ArrayList<VariableContext>(allContexts.size());
+ }
+ for (int commandIndex = 0; commandIndex < numCommands; commandIndex++) {
+ // Potentially the first command of a batch
+ Command updateCommand = (Command)updateCommands.get(commandIndex);
+ boolean commandWasBatched = false;
+ // If this command can be placed in a batch
+ if (isEligibleForBatching(updateCommand, metadata)) {
+ // Get the model ID. Subsequent and contiguous commands that update a
group in this model are candidates for this batch
+ Object batchModelID =
metadata.getModelID(getUpdatedGroup(updateCommand).getMetadataID());
+ String modelName = metadata.getFullName(batchModelID);
+ SourceCapabilities caps = capFinder.findCapabilities(modelName);
+ // Only attempt batching if the source supports batching
+ if (caps.supportsCapability(Capability.BATCHED_UPDATES)) {
+ // Start a new batch
+ List<Command> batch = new ArrayList<Command>();
+ List<VariableContext> contexts = new
ArrayList<VariableContext>();
+ List<Boolean> shouldEvaluate = new ArrayList<Boolean>();
+ // This is the first command in a potential batch, so add it to the
batch
+ batch.add(updateCommand);
+ if (allContexts != null) {
+ contexts.add(allContexts.get(commandIndex));
+ shouldEvaluate.add(Boolean.TRUE);
+ } else {
+
shouldEvaluate.add(EvaluatableVisitor.needsProcessingEvaluation(updateCommand));
+ }
+ // Find out if there are other commands called on the same physical
model
+ // immediately and contiguously after this one
+ batchLoop: for (int batchIndex = commandIndex+1; batchIndex <
numCommands; batchIndex++) {
+ Command batchingCandidate =
(Command)updateCommands.get(batchIndex);
+ // If this command updates the same model, and is eligible for
batching, add it to the batch
+ if (canBeAddedToBatch(batchingCandidate, batchModelID, metadata,
capFinder)) {
+ batch.add(batchingCandidate);
+ if (allContexts != null) {
+ contexts.add(allContexts.get(batchIndex));
+ shouldEvaluate.add(Boolean.TRUE);
+ } else {
+
shouldEvaluate.add(EvaluatableVisitor.needsProcessingEvaluation(batchingCandidate));
+ }
+ } else { // Otherwise, stop batching at this point. The next
command may well be the start of a new batch
+ break batchLoop;
+ }
+ }
+ // If two or more contiguous commands made on the same model were
found, then batch them
+ if (batch.size() > 1) {
+ ProjectNode projectNode = new
ProjectNode(((IntegerID)idGenerator.create()).getValue());
+ // Create a BatchedUpdateNode that creates a batched request for
the connector
+ BatchedUpdateNode batchNode = new
BatchedUpdateNode(((IntegerID)idGenerator.create()).getValue(),
+ batch,
contexts, shouldEvaluate,
+ modelName);
+ List symbols = batchedUpdateCommand.getProjectedSymbols();
+ projectNode.setSelectSymbols(symbols);
+ projectNode.setElements(symbols);
+
+ batchNode.setElements(symbols);
+
+ projectNode.addChild(batchNode);
+ // Add a new RelationalPlan that represents the plan for this
batch.
+ childPlans.add(new RelationalPlan(projectNode));
+ if (planContexts != null) {
+ planContexts.add(new VariableContext());
+ }
+ // Skip those commands that were added to this batch
+ commandIndex += batch.size() - 1;
+ commandWasBatched = true;
+ }
+ }
+ }
+ if (!commandWasBatched) { // If the command wasn't batched, just add the
plan for this command to the list of plans
+ Command cmd =
(Command)batchedUpdateCommand.getUpdateCommands().get(commandIndex);
+ ProcessorPlan plan = cmd.getProcessorPlan();
+ if (plan == null) {
+ plan = QueryOptimizer.optimizePlan(cmd, metadata, idGenerator, capFinder,
analysisRecord, context);
+ }
+ childPlans.add(plan);
+ if (allContexts != null) {
+ planContexts.add(allContexts.get(commandIndex));
+ }
+ }
+ }
+ return new BatchedUpdatePlan(childPlans,
batchedUpdateCommand.getUpdateCommands().size(), planContexts);
+ }
+
+ /**
+ * Get the group being updated by the update command
+ * @param command an INSERT, UPDATE, DELETE or SELECT INTO command
+ * @return the group being updated
+ * @since 4.2
+ */
+ public static GroupSymbol getUpdatedGroup(Command command) {
+ int type = command.getType();
+ if (type == Command.TYPE_INSERT) {
+ return ((Insert)command).getGroup();
+ } else if (type == Command.TYPE_UPDATE) {
+ return ((Update)command).getGroup();
+ } else if (type == Command.TYPE_DELETE) {
+ return ((Delete)command).getGroup();
+ }
+ throw new
TeiidRuntimeException(QueryExecPlugin.Util.getString("BatchedUpdatePlanner.unrecognized_command",
command)); //$NON-NLS-1$
+ }
+
+ /**
+ * Returns whether a command can be placed in a connector batch
+ * @param command an update command
+ * @param metadata
+ * @return true if this command can be added to a batch; false otherwise
+ * @throws QueryMetadataException
+ * @throws TeiidComponentException
+ * @since 4.2
+ */
+ public static boolean isEligibleForBatching(Command command, QueryMetadataInterface
metadata) throws QueryMetadataException, TeiidComponentException {
+ // If the command updates a physical group, it's eligible
+ return !metadata.isVirtualGroup(getUpdatedGroup(command).getMetadataID());
+ }
+
+ /**
+ * Returns whether a command can be placed in a given batch
+ * @param command an update command
+ * @param batchModelID the model ID for the batch concerned
+ * @param metadata
+ * @param capFinder
+ * @return true if this command can be place in a batch associated with the model ID;
false otherwise
+ * @throws QueryMetadataException
+ * @throws TeiidComponentException
+ * @since 4.2
+ */
+ private static boolean canBeAddedToBatch(Command command, Object batchModelID,
QueryMetadataInterface metadata, CapabilitiesFinder capFinder) throws
QueryMetadataException, TeiidComponentException {
+ // If it's eligible ...
+ if (isEligibleForBatching(command, metadata)) {
+ Object modelID =
metadata.getModelID(getUpdatedGroup(command).getMetadataID());
+
+ return CapabilitiesUtil.isSameConnector(modelID, batchModelID, metadata,
capFinder);
+ }
+ return false;
+ }
+
+}
Modified: trunk/engine/src/main/java/org/teiid/query/optimizer/QueryOptimizer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/optimizer/QueryOptimizer.java 2010-07-28
21:11:54 UTC (rev 2390)
+++ trunk/engine/src/main/java/org/teiid/query/optimizer/QueryOptimizer.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -36,7 +36,6 @@
import org.teiid.query.metadata.TempCapabilitiesFinder;
import org.teiid.query.metadata.TempMetadataAdapter;
import org.teiid.query.metadata.TempMetadataStore;
-import org.teiid.query.optimizer.batch.BatchedUpdatePlanner;
import org.teiid.query.optimizer.capabilities.CapabilitiesFinder;
import org.teiid.query.optimizer.proc.ProcedurePlanner;
import org.teiid.query.optimizer.relational.RelationalPlanner;
Modified:
trunk/engine/src/main/java/org/teiid/query/optimizer/capabilities/SourceCapabilities.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/optimizer/capabilities/SourceCapabilities.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/optimizer/capabilities/SourceCapabilities.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -317,7 +317,8 @@
* @since 6.0.0 indicates support for where all
*/
REQUIRES_CRITERIA,
- INSERT_WITH_QUERYEXPRESSION
+ INSERT_WITH_QUERYEXPRESSION,
+ INSERT_WITH_ITERATOR
}
public enum Scope {
Modified:
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/PlanToProcessConverter.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/PlanToProcessConverter.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/optimizer/relational/PlanToProcessConverter.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -192,8 +192,15 @@
pinode.setModelName(modelName);
processNode = pinode;
SourceCapabilities caps =
capFinder.findCapabilities(modelName);
-
pinode.setDoBatching(caps.supportsCapability(Capability.BATCHED_UPDATES));
-
pinode.setDoBulkInsert(caps.supportsCapability(Capability.BULK_UPDATE));
+ if (caps.supportsCapability(Capability.INSERT_WITH_ITERATOR))
{
+
pinode.setMode(org.teiid.query.processor.relational.ProjectIntoNode.Mode.ITERATOR);
+ } else if (caps.supportsCapability(Capability.BULK_UPDATE))
{
+
pinode.setMode(org.teiid.query.processor.relational.ProjectIntoNode.Mode.BULK);
+ } else if
(caps.supportsCapability(Capability.BATCHED_UPDATES)) {
+
pinode.setMode(org.teiid.query.processor.relational.ProjectIntoNode.Mode.BATCH);
+ } else {
+
pinode.setMode(org.teiid.query.processor.relational.ProjectIntoNode.Mode.SINGLE);
+ }
}
} catch(QueryMetadataException e) {
throw new TeiidComponentException(e);
Modified: trunk/engine/src/main/java/org/teiid/query/processor/BatchIterator.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/BatchIterator.java 2010-07-28
21:11:54 UTC (rev 2390)
+++ trunk/engine/src/main/java/org/teiid/query/processor/BatchIterator.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -31,7 +31,6 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.query.processor.BatchCollector.BatchProducer;
-import org.teiid.query.sql.symbol.Expression;
/**
@@ -87,7 +86,6 @@
return tuple;
}
- @Override
public int available() {
if (batch != null && batch.containsRow(getCurrentIndex())) {
return batch.getEndRow() - getCurrentIndex() + 1;
@@ -95,11 +93,6 @@
return 0;
}
- @Override
- public List<? extends Expression> getSchema() {
- return source.getOutputElements();
- }
-
public void setBuffer(TupleBuffer buffer, boolean saveOnMark) {
this.buffer = buffer;
this.saveOnMark = saveOnMark;
Modified: trunk/engine/src/main/java/org/teiid/query/processor/CollectionTupleSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/CollectionTupleSource.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/processor/CollectionTupleSource.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -28,27 +28,22 @@
import java.util.List;
import org.teiid.common.buffer.TupleSource;
-import org.teiid.query.sql.lang.Command;
-import org.teiid.query.sql.symbol.SingleElementSymbol;
public class CollectionTupleSource implements TupleSource {
private Iterator<? extends List<?>> tuples;
- private List<? extends SingleElementSymbol> schema;
public static CollectionTupleSource createUpdateCountTupleSource(int count) {
- return new CollectionTupleSource(Arrays.asList(Arrays.asList(count)).iterator(),
Command.getUpdateCommandSymbol());
+ return new CollectionTupleSource(Arrays.asList(Arrays.asList(count)).iterator());
}
- public static CollectionTupleSource
createNullTupleSource(List<SingleElementSymbol> schema) {
- return new CollectionTupleSource(new ArrayList<List<Object>>(0).iterator(),
schema);
+ public static CollectionTupleSource createNullTupleSource() {
+ return new CollectionTupleSource(new
ArrayList<List<Object>>(0).iterator());
}
- public CollectionTupleSource(Iterator<? extends List<?>> tuples,
- List<? extends SingleElementSymbol> schema) {
+ public CollectionTupleSource(Iterator<? extends List<?>> tuples) {
this.tuples = tuples;
- this.schema = schema;
}
@Override
@@ -60,20 +55,8 @@
}
@Override
- public List<? extends SingleElementSymbol> getSchema() {
- return schema;
- }
-
- @Override
public void closeSource() {
}
- @Override
- public int available() {
- if (tuples.hasNext()) {
- return 1;
- }
- return 0;
- }
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -65,7 +65,6 @@
import org.teiid.query.sql.symbol.Reference;
import org.teiid.query.sql.util.VariableContext;
import org.teiid.query.tempdata.TempTableStore;
-import org.teiid.query.tempdata.TempTableStore;
import org.teiid.query.util.CommandContext;
import org.teiid.query.util.ErrorMessageKeys;
@@ -300,7 +299,7 @@
}
if(lastTupleSource == null){
- return CollectionTupleSource.createNullTupleSource(null);
+ return CollectionTupleSource.createNullTupleSource();
}
return lastTupleSource;
}
@@ -558,7 +557,7 @@
CursorState cursorState = getCursorState(rsKey);
// get the schema from the tuple source
- List schema = cursorState.ts.getSchema();
+ List schema = cursorState.processor.getOutputElements();
return schema;
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentCriteriaProcessor.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentCriteriaProcessor.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentCriteriaProcessor.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -89,7 +89,7 @@
sortSymbols.add(dependentSetStates.get(i).valueExpression);
}
DependentValueSource originalVs =
(DependentValueSource)dependentNode.getContext().getVariableContext().getGlobalValue(valueSource);
- this.sortUtility = new
SortUtility(originalVs.getTupleBuffer().createIndexedTupleSource(), sortSymbols,
sortDirection, Mode.DUP_REMOVE, dependentNode.getBufferManager(),
dependentNode.getConnectionID());
+ this.sortUtility = new
SortUtility(originalVs.getTupleBuffer().createIndexedTupleSource(), sortSymbols,
sortDirection, Mode.DUP_REMOVE, dependentNode.getBufferManager(),
dependentNode.getConnectionID(), originalVs.getTupleBuffer().getSchema());
}
dvs = new DependentValueSource(sortUtility.sort());
for (SetState setState : dependentSetStates) {
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentValueSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentValueSource.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/DependentValueSource.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -65,7 +65,7 @@
IndexedTupleSource its = buffer.createIndexedTupleSource();
int index = 0;
if (valueExpression != null) {
- index = its.getSchema().indexOf(valueExpression);
+ index = buffer.getSchema().indexOf(valueExpression);
Assertion.assertTrue(index != -1);
}
return new TupleSourceValueIterator(its, index);
@@ -83,10 +83,10 @@
IndexedTupleSource its = buffer.createIndexedTupleSource();
int index = 0;
if (valueExpression != null) {
- index = its.getSchema().indexOf(valueExpression);
+ index = buffer.getSchema().indexOf(valueExpression);
}
Assertion.assertTrue(index != -1);
- if (((SingleElementSymbol)its.getSchema().get(index)).getType() ==
DataTypeManager.DefaultDataClasses.BIG_DECIMAL) {
+ if (((SingleElementSymbol)buffer.getSchema().get(index)).getType() ==
DataTypeManager.DefaultDataClasses.BIG_DECIMAL) {
result = new TreeSet<Object>();
} else {
result = new HashSet<Object>();
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/GroupingNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/GroupingNode.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/GroupingNode.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -306,22 +306,10 @@
}
@Override
- public List<Expression> getSchema() {
- return collectedExpressions;
- }
-
- @Override
public void closeSource() {
}
- @Override
- public int available() {
- if (sourceBatch != null) {
- return sourceBatch.getEndRow() - sourceRow + 1;
- }
- return 0;
- }
};
}
@@ -334,7 +322,7 @@
} else {
this.sortUtility = new SortUtility(getCollectionTupleSource(), sortElements,
sortTypes,
removeDuplicates?Mode.DUP_REMOVE_SORT:Mode.SORT, getBufferManager(),
- getConnectionID());
+ getConnectionID(),
collectedExpressions);
this.phase = SORT;
}
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectIntoNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectIntoNode.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/ProjectIntoNode.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -32,7 +32,9 @@
import org.teiid.client.plan.PlanNode;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.TupleSource;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.query.sql.lang.BatchedUpdateCommand;
@@ -45,6 +47,10 @@
public class ProjectIntoNode extends RelationalNode {
+ public enum Mode {
+ BATCH, BULK, ITERATOR, SINGLE
+ }
+
private static int REQUEST_CREATION = 1;
private static int RESPONSE_PROCESSING = 2;
@@ -52,8 +58,7 @@
private GroupSymbol intoGroup;
private List intoElements;
private String modelName;
- private boolean doBatching = false;
- private boolean doBulkInsert = false;
+ private Mode mode;
// Processing state
private int batchRow = 1;
@@ -61,7 +66,9 @@
private int phase = REQUEST_CREATION;
private int requestsRegistered = 0;
private int tupleSourcesProcessed = 0;
+ private boolean sourceDone;
+ private TupleBuffer buffer;
private TupleBatch currentBatch;
private TupleSource tupleSource;
@@ -78,6 +85,7 @@
this.tupleSourcesProcessed = 0;
this.requestsRegistered = 0;
this.currentBatch=null;
+ this.sourceDone=false;
}
public void setIntoGroup(GroupSymbol group) {
@@ -107,28 +115,45 @@
checkExitConditions();
- /* If we don't have a batch to work with yet, or
- * if this is not the last batch and we've reached the end of this one,
- * then get the next batch.
+ /* If we don't have a batch to work, get the next
*/
-
- if (currentBatch == null || (!currentBatch.getTerminationFlag() &&
this.batchRow > currentBatch.getEndRow())) {
+ if (currentBatch == null) {
+ if (sourceDone) {
+ phase = RESPONSE_PROCESSING;
+ break;
+ }
currentBatch = getChildren()[0].nextBatch(); // can throw
BlockedException
+ sourceDone = currentBatch.getTerminationFlag();
this.batchRow = currentBatch.getBeginRow();
//normally we would want to skip a 0 sized batch, but it typically
represents the terminal batch
//and for implicit temp tables we need to issue an empty insert
- if(currentBatch.getRowCount() == 0 &&
(!this.intoGroup.isImplicitTempGroupSymbol() || !currentBatch.getTerminationFlag())) {
- continue;
+ if(currentBatch.getRowCount() == 0
+ && (!currentBatch.getTerminationFlag() || mode !=
Mode.ITERATOR)) {
+ currentBatch = null;
+ continue;
}
- } else if (currentBatch.getTerminationFlag() && this.batchRow >
currentBatch.getEndRow()) {
- phase = RESPONSE_PROCESSING;
- break;
- }
+ }
int batchSize = currentBatch.getRowCount();
-
- if (doBulkInsert) {
+ int requests = 1;
+ switch (mode) {
+ case ITERATOR:
+ if (buffer == null) {
+ buffer = getBufferManager().createTupleBuffer(intoElements,
getConnectionID(), TupleSourceType.PROCESSOR);
+ }
+ buffer.addTupleBatch(currentBatch, true);
+ if (currentBatch.getTerminationFlag() && (buffer.getRowCount() != 0
|| intoGroup.isImplicitTempGroupSymbol())) {
+ Insert insert = new Insert(intoGroup, intoElements, null);
+ buffer.close();
+ insert.setTupleSource(buffer.createIndexedTupleSource(true));
+ // Register insert command against source
+ registerRequest(insert);
+ } else {
+ requests = 0;
+ }
+ break;
+ case BULK:
//convert to multivalued parameter
List<Constant> parameters = new
ArrayList<Constant>(intoElements.size());
for (int i = 0; i < intoElements.size(); i++) {
@@ -145,26 +170,32 @@
Insert insert = new Insert(intoGroup, intoElements, parameters);
// Register insert command against source
registerRequest(insert);
- } else if (doBatching) {
+ break;
+ case BATCH:
// Register batched update command against source
int endRow = currentBatch.getEndRow();
List rows = new ArrayList(endRow-batchRow);
for(int rowNum = batchRow; rowNum <= endRow; rowNum++) {
- Insert insert = new Insert( intoGroup,
+ insert = new Insert( intoGroup,
intoElements,
convertValuesToConstants(currentBatch.getTuple(rowNum), intoElements));
rows.add( insert );
}
registerRequest(new BatchedUpdateCommand( rows ));
- } else {
+ break;
+ case SINGLE:
batchSize = 1;
// Register insert command against source
// Defect 16036 - submit a new INSERT command to the DataManager.
registerRequest(new Insert(intoGroup, intoElements,
convertValuesToConstants(currentBatch.getTuple(batchRow), intoElements)));
}
+
this.batchRow += batchSize;
- this.requestsRegistered++;
+ if (batchRow > currentBatch.getEndRow()) {
+ currentBatch = null;
+ }
+ this.requestsRegistered+=requests;
}
checkExitConditions();
@@ -197,7 +228,10 @@
}
private void closeRequest() {
-
+ if (this.buffer != null) {
+ this.buffer.remove();
+ this.buffer = null;
+ }
if (this.tupleSource != null) {
tupleSource.closeSource();
this.tupleSource = null;
@@ -216,8 +250,7 @@
clonedNode.intoGroup = intoGroup;
clonedNode.intoElements = intoElements;
clonedNode.modelName = this.modelName;
- clonedNode.doBatching = this.doBatching;
- clonedNode.doBulkInsert = this.doBulkInsert;
+ clonedNode.mode = this.mode;
return clonedNode;
}
@@ -242,15 +275,15 @@
constants.add(new Constant(values.get(i),type));
}
return constants;
- }
+ }
+
+ public Mode getMode() {
+ return mode;
+ }
- public void setDoBatching(boolean doBatching) {
- this.doBatching = doBatching;
- }
-
- public void setDoBulkInsert(boolean doBulkInsert) {
- this.doBulkInsert = doBulkInsert;
- }
+ public void setMode(Mode mode) {
+ this.mode = mode;
+ }
public boolean isTempGroupInsert() {
return intoGroup.isTempGroupSymbol();
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalPlan.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -34,6 +34,7 @@
import org.teiid.core.TeiidProcessingException;
import org.teiid.query.processor.ProcessorDataManager;
import org.teiid.query.processor.ProcessorPlan;
+import org.teiid.query.processor.relational.ProjectIntoNode.Mode;
import org.teiid.query.sql.lang.QueryCommand;
import org.teiid.query.util.CommandContext;
@@ -157,7 +158,12 @@
}
return false;
}
- if (node instanceof AccessNode) {
+ if (node instanceof ProjectIntoNode) {
+ if (((ProjectIntoNode)node).getMode() == Mode.ITERATOR) {
+ return false;
+ }
+ return true;
+ } else if (node instanceof AccessNode) {
return false;
}
if (transactionalReads) {
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortNode.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -91,7 +91,7 @@
private void sortPhase() throws BlockedException, TeiidComponentException,
TeiidProcessingException {
if (this.sortUtility == null) {
this.sortUtility = new SortUtility(new BatchIterator(getChildren()[0]), items,
this.mode, getBufferManager(),
- getConnectionID());
+ getConnectionID(),
getChildren()[0].getElements());
}
this.output = this.sortUtility.sort();
if (this.outputTs == null) {
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -108,12 +108,12 @@
private Collection<List<?>> workingTuples;
public SortUtility(TupleSource sourceID, List<OrderByItem> items, Mode mode,
BufferManager bufferMgr,
- String groupName) {
+ String groupName, List schema) {
this.source = sourceID;
this.mode = mode;
this.bufferManager = bufferMgr;
this.groupName = groupName;
- this.schema = this.source.getSchema();
+ this.schema = schema;
this.schemaSize = bufferManager.getSchemaSize(this.schema);
int distinctIndex = items != null? items.size() - 1:0;
List<Expression> sortElements = null;
@@ -153,8 +153,8 @@
}
public SortUtility(TupleSource ts, List expressions, List<Boolean> types,
- Mode mode, BufferManager bufferManager, String connectionID) {
- this(ts, new OrderBy(expressions, types).getOrderByItems(), mode, bufferManager,
connectionID);
+ Mode mode, BufferManager bufferManager, String connectionID, List schema) {
+ this(ts, new OrderBy(expressions, types).getOrderByItems(), mode, bufferManager,
connectionID, schema);
}
public boolean isDone() {
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortingFilter.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortingFilter.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortingFilter.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -134,7 +134,7 @@
// Sort
if (sortUtility == null) {
- sortUtility = new SortUtility(collectionBuffer.createIndexedTupleSource(),
sortItems, removeDuplicates?Mode.DUP_REMOVE_SORT:Mode.SORT, mgr, groupName);
+ sortUtility = new SortUtility(collectionBuffer.createIndexedTupleSource(),
sortItems, removeDuplicates?Mode.DUP_REMOVE_SORT:Mode.SORT, mgr, groupName,
collectionBuffer.getSchema());
}
TupleBuffer sorted = sortUtility.sort();
sorted.setForwardOnly(true);
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -187,7 +187,7 @@
ts = new BatchIterator(this.source);
}
this.sortUtility = new SortUtility(ts, expressions,
Collections.nCopies(expressions.size(), OrderBy.ASC),
- sortOption == SortOption.SORT_DISTINCT?Mode.DUP_REMOVE_SORT:Mode.SORT,
this.source.getBufferManager(), this.source.getConnectionID());
+ sortOption == SortOption.SORT_DISTINCT?Mode.DUP_REMOVE_SORT:Mode.SORT,
this.source.getBufferManager(), this.source.getConnectionID(), source.getElements());
this.markDistinct(sortOption == SortOption.SORT_DISTINCT &&
expressions.size() == this.getOuterVals().size());
}
this.buffer = sortUtility.sort();
Modified: trunk/engine/src/main/java/org/teiid/query/sql/lang/Insert.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/sql/lang/Insert.java 2010-07-28 21:11:54
UTC (rev 2390)
+++ trunk/engine/src/main/java/org/teiid/query/sql/lang/Insert.java 2010-07-29 15:18:33
UTC (rev 2391)
@@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
+import org.teiid.common.buffer.TupleSource;
import org.teiid.core.util.EquivalenceUtil;
import org.teiid.core.util.HashCodeUtil;
import org.teiid.query.sql.LanguageVisitor;
@@ -38,6 +39,7 @@
import org.teiid.query.sql.symbol.Expression;
import org.teiid.query.sql.symbol.GroupSymbol;
import org.teiid.query.sql.symbol.SingleElementSymbol;
+import org.teiid.query.sql.util.ValueIterator;
/**
@@ -52,10 +54,12 @@
/** list of column variables, null = all columns */
private List variables = new LinkedList();
- /** List of Expressions, required */
+ /** List of Expressions */
private List values = new LinkedList();
private QueryCommand queryExpression;
+
+ private TupleSource tupleSource;
// =========================================================================
// C O N S T R U C T O R S
@@ -298,5 +302,13 @@
public boolean areResultsCachable() {
return false;
}
+
+ public void setTupleSource(TupleSource tupleSource) {
+ this.tupleSource = tupleSource;
+ }
+
+ public TupleSource getTupleSource() {
+ return tupleSource;
+ }
}
Modified: trunk/engine/src/main/java/org/teiid/query/sql/lang/Query.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/sql/lang/Query.java 2010-07-28 21:11:54 UTC
(rev 2390)
+++ trunk/engine/src/main/java/org/teiid/query/sql/lang/Query.java 2010-07-29 15:18:33 UTC
(rev 2391)
@@ -434,7 +434,7 @@
static boolean areResultsCachable(List projectedSymbols) {
for(int i=0; i<projectedSymbols.size(); i++){
SingleElementSymbol projectedSymbol = (SingleElementSymbol)projectedSymbols.get(i);
- if(DataTypeManager.isLOB(projectedSymbol.getType())) {
+ if(DataTypeManager.isLOB(projectedSymbol.getType()) || projectedSymbol.getType() ==
DataTypeManager.DefaultDataClasses.OBJECT) {
return false;
}
}
Modified: trunk/engine/src/main/java/org/teiid/query/sql/visitor/SQLStringVisitor.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/sql/visitor/SQLStringVisitor.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/main/java/org/teiid/query/sql/visitor/SQLStringVisitor.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -403,6 +403,9 @@
if ( obj.getQueryExpression() != null ) {
parts.add(registerNode(obj.getQueryExpression()));
+ } else if (obj.getTupleSource() != null) {
+ parts.add(VALUES);
+ parts.add(" (...)"); //$NON-NLS-1$
} else {
parts.add(VALUES);
parts.add(" ("); //$NON-NLS-1$
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java 2010-07-28 21:11:54
UTC (rev 2390)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java 2010-07-29 15:18:33
UTC (rev 2391)
@@ -37,6 +37,7 @@
import org.teiid.common.buffer.TupleBrowser;
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.TupleSource;
+import org.teiid.common.buffer.BufferManager.BufferReserveMode;
import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidException;
@@ -65,29 +66,75 @@
*/
class TempTable {
- private final class QueryTupleSource extends TupleBrowserTupleSource {
+ private final class InsertUpdateProcessor extends UpdateProcessor {
+
+ private boolean addRowId;
+ private int[] indexes;
+
+ private InsertUpdateProcessor(TupleSource ts, boolean addRowId, int[] indexes)
+ throws TeiidComponentException {
+ super(null, ts);
+ this.addRowId = addRowId;
+ this.indexes = indexes;
+ }
+
+ @Override
+ protected void tuplePassed(List tuple) throws BlockedException,
+ TeiidComponentException, TeiidProcessingException {
+ if (indexes != null) {
+ List<Object> newTuple = new ArrayList<Object>(columns.size());
+ if (addRowId) {
+ newTuple.add(rowId.getAndIncrement());
+ }
+ for (int i = 0; i < indexes.length; i++) {
+ if (indexes[i] == -1) {
+ newTuple.add(null);
+ } else {
+ newTuple.add(tuple.get(indexes[i]));
+ }
+ }
+ tuple = newTuple;
+ } else if (addRowId) {
+ tuple = new ArrayList<Object>(tuple);
+ tuple.add(0, rowId.getAndIncrement());
+ }
+ currentTuple = tuple;
+ insertTuple(tuple);
+ }
+
+ @Override
+ protected void undo(List tuple) throws TeiidComponentException,
+ TeiidProcessingException {
+ deleteTuple(tuple);
+ }
+ }
+
+ private final class QueryTupleSource implements TupleSource {
private final Evaluator eval;
- private final List<SingleElementSymbol> projectedCols;
private final Criteria condition;
private final boolean project;
private final int[] indexes;
+ private int reserved;
+ private TupleBrowser browser;
private QueryTupleSource(TupleBrowser browser, Map map,
List<SingleElementSymbol> projectedCols, Criteria condition) {
- super(browser);
+ this.browser = browser;
this.indexes = RelationalNode.getProjectionIndexes(map, projectedCols);
this.eval = new Evaluator(map, null, null);
- this.projectedCols = projectedCols;
this.condition = condition;
this.project = shouldProject();
+ reserved = reserveBuffers();
}
@Override
public List<?> nextTuple() throws TeiidComponentException,
TeiidProcessingException {
for (;;) {
- List<?> next = super.nextTuple();
+ List<?> next = browser.nextTuple();
if (next == null) {
+ bm.releaseBuffers(reserved);
+ reserved = 0;
return null;
}
if (rowId != null) {
@@ -103,6 +150,13 @@
}
}
+ @Override
+ public void closeSource() {
+ bm.releaseBuffers(reserved);
+ reserved = 0;
+ browser.closeSource();
+ }
+
private boolean shouldProject() {
if (indexes.length == getColumns().size()) {
for (int i = 0; i < indexes.length; i++) {
@@ -115,48 +169,15 @@
return true;
}
- @Override
- public List<? extends Expression> getSchema() {
- return projectedCols;
- }
}
- private class TupleBrowserTupleSource implements TupleSource {
- private final TupleBrowser browser;
-
- private TupleBrowserTupleSource(TupleBrowser browser) {
- this.browser = browser;
- }
-
- @Override
- public List<?> nextTuple() throws TeiidComponentException,
- TeiidProcessingException {
- return browser.next();
- }
-
- @Override
- public List<? extends Expression> getSchema() {
- return columns;
- }
-
- @Override
- public void closeSource() {
-
- }
-
- @Override
- public int available() {
- return 0;
- }
- }
-
private abstract class UpdateProcessor {
private TupleSource ts;
protected final Map lookup;
protected final Evaluator eval;
private final Criteria crit;
protected int updateCount = 0;
- private List currentTuple;
+ protected List currentTuple;
protected TupleBuffer undoLog;
@@ -169,6 +190,7 @@
}
int process() throws ExpressionEvaluationException, TeiidComponentException,
TeiidProcessingException {
+ int reserved = reserveBuffers();
boolean success = false;
try {
while (currentTuple != null || (currentTuple = ts.nextTuple()) != null) {
@@ -179,9 +201,12 @@
}
currentTuple = null;
}
+ bm.releaseBuffers(reserved);
+ reserved = 0;
success();
success = true;
} finally {
+ bm.releaseBuffers(reserved);
if (!success) {
TupleSource undoTs = undoLog.createIndexedTupleSource();
List<?> tuple = null;
@@ -197,7 +222,7 @@
}
return updateCount;
}
-
+
@SuppressWarnings("unused")
void success() throws TeiidComponentException, ExpressionEvaluationException,
TeiidProcessingException {}
@@ -218,6 +243,9 @@
private BufferManager bm;
private String sessionID;
private TempMetadataID tid;
+
+ private int keyBatchSize;
+ private int leafBatchSize;
TempTable(TempMetadataID tid, BufferManager bm, List<ElementSymbol> columns, int
primaryKeyLength, String sessionID) {
this.tid = tid;
@@ -233,7 +261,13 @@
}
this.columns = columns;
this.sessionID = sessionID;
+ this.keyBatchSize = bm.getSchemaSize(columns);
+ this.leafBatchSize = bm.getSchemaSize(columns.subList(0, primaryKeyLength));
}
+
+ private int reserveBuffers() {
+ return bm.reserveBuffers(leafBatchSize + (tree.getHeight() - 1)*keyBatchSize,
BufferReserveMode.WAIT);
+ }
public TupleSource createTupleSource(final List<SingleElementSymbol>
projectedCols, final Criteria condition, OrderBy orderBy) throws TeiidComponentException,
TeiidProcessingException {
Map map = RelationalNode.createLookupMap(getColumns());
@@ -282,21 +316,24 @@
TupleBrowser browser = createTupleBrower(condition, direction);
TupleSource ts = new QueryTupleSource(browser, map, projectedCols, condition);
-
- TupleBuffer tb = null;
- if (!orderByUsingIndex && orderBy != null) {
- SortUtility sort = new SortUtility(ts, orderBy.getOrderByItems(), Mode.SORT, bm,
sessionID);
- tb = sort.sort();
- } else {
- tb = bm.createTupleBuffer(projectedCols, sessionID, TupleSourceType.PROCESSOR);
- List next = null;
- while ((next = ts.nextTuple()) != null) {
- tb.addTuple(next);
+ try {
+ TupleBuffer tb = null;
+ if (!orderByUsingIndex && orderBy != null) {
+ SortUtility sort = new SortUtility(ts, orderBy.getOrderByItems(), Mode.SORT, bm,
sessionID, projectedCols);
+ tb = sort.sort();
+ } else {
+ tb = bm.createTupleBuffer(projectedCols, sessionID, TupleSourceType.PROCESSOR);
+ List next = null;
+ while ((next = ts.nextTuple()) != null) {
+ tb.addTuple(next);
+ }
}
+ tb.close();
+ return tb.createIndexedTupleSource(true);
+ } finally {
+ //ensure the buffers get released
+ ts.closeSource();
}
- tb.close();
- tb.setForwardOnly(true);
- return tb.createIndexedTupleSource(true);
}
private TupleBrowser createTupleBrower(Criteria condition, boolean direction) throws
TeiidComponentException {
@@ -353,23 +390,15 @@
return columns;
}
- public TupleSource insert(List<List<Object>> tuples) throws
TeiidComponentException, ExpressionEvaluationException, TeiidProcessingException {
- UpdateProcessor up = new UpdateProcessor(null, new
CollectionTupleSource(tuples.iterator(), getColumns())) {
-
- protected void tuplePassed(List tuple)
- throws BlockedException, TeiidComponentException, TeiidProcessingException {
- if (rowId != null) {
- tuple.add(0, rowId.getAndIncrement());
- }
- insertTuple(tuple);
- }
-
- @Override
- protected void undo(List tuple) throws TeiidComponentException {
- deleteTuple(tuple);
- }
-
- };
+ public TupleSource insert(TupleSource tuples, final List<ElementSymbol> variables)
throws TeiidComponentException, ExpressionEvaluationException, TeiidProcessingException {
+ List<ElementSymbol> cols = getColumns();
+ final int[] indexes = new int[cols.size()];
+ boolean shouldProject = false;
+ for (int i = 0; i < cols.size(); i++) {
+ indexes[i] = variables.indexOf(cols.get(i));
+ shouldProject |= (indexes[i] != i);
+ }
+ UpdateProcessor up = new InsertUpdateProcessor(tuples, rowId != null,
shouldProject?indexes:null);
int updateCount = up.process();
tid.setCardinality(tree.getRowCount());
return CollectionTupleSource.createUpdateCountTupleSource(updateCount);
@@ -378,7 +407,7 @@
public TupleSource update(Criteria crit, final SetClauseList update) throws
TeiidComponentException, ExpressionEvaluationException, TeiidProcessingException {
final boolean primaryKeyChangePossible = canChangePrimaryKey(update);
final TupleBrowser browser = createTupleBrower(crit, OrderBy.ASC);
- UpdateProcessor up = new UpdateProcessor(crit, new TupleBrowserTupleSource(browser)) {
+ UpdateProcessor up = new UpdateProcessor(crit, browser) {
protected TupleBuffer changeSet;
protected UpdateProcessor changeSetProcessor;
@@ -418,20 +447,7 @@
//changeSet contains possible updates
if (primaryKeyChangePossible) {
if (changeSetProcessor == null) {
- changeSetProcessor = new UpdateProcessor(null,
changeSet.createIndexedTupleSource(true)) {
- @Override
- protected void tuplePassed(List tuple) throws BlockedException,
- TeiidComponentException, TeiidProcessingException {
- insertTuple(tuple);
- }
-
- @Override
- protected void undo(List tuple) throws TeiidComponentException,
- TeiidProcessingException {
- deleteTuple(tuple);
- }
-
- };
+ changeSetProcessor = new
InsertUpdateProcessor(changeSet.createIndexedTupleSource(true), false, null);
}
changeSetProcessor.process(); //when this returns, we're up to date
}
@@ -466,7 +482,7 @@
public TupleSource delete(Criteria crit) throws TeiidComponentException,
ExpressionEvaluationException, TeiidProcessingException {
final TupleBrowser browser = createTupleBrower(crit, OrderBy.ASC);
- UpdateProcessor up = new UpdateProcessor(crit, new TupleBrowserTupleSource(browser)) {
+ UpdateProcessor up = new UpdateProcessor(crit, browser) {
@Override
protected void tuplePassed(List tuple)
throws ExpressionEvaluationException,
@@ -502,5 +518,5 @@
throw new AssertionError("Update failed"); //$NON-NLS-1$
}
}
-
+
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java 2010-07-28
21:11:54 UTC (rev 2390)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -23,15 +23,14 @@
package org.teiid.query.tempdata;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.teiid.api.exception.query.ExpressionEvaluationException;
import org.teiid.api.exception.query.QueryProcessingException;
-import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleSource;
import org.teiid.core.TeiidComponentException;
@@ -52,7 +51,6 @@
import org.teiid.query.sql.lang.Query;
import org.teiid.query.sql.lang.Update;
import org.teiid.query.sql.navigator.PostOrderNavigator;
-import org.teiid.query.sql.symbol.Constant;
import org.teiid.query.sql.symbol.ElementSymbol;
import org.teiid.query.sql.symbol.Expression;
import org.teiid.query.sql.symbol.GroupSymbol;
@@ -142,11 +140,16 @@
final String groupKey = group.getNonCorrelationName().toUpperCase();
final TempTable table = getTempTable(groupKey, command);
if (command instanceof Insert) {
- List<List<Object>> tuples = getBulkRows((Insert)command,
table.getColumns());
- if (tuples.isEmpty()) {
- return CollectionTupleSource.createUpdateCountTupleSource(0);
+ Insert insert = (Insert)command;
+ TupleSource ts = insert.getTupleSource();
+ if (ts == null) {
+ List<Object> values = new
ArrayList<Object>(insert.getValues().size());
+ for (Expression expr : (List<Expression>)insert.getValues()) {
+ values.add(Evaluator.evaluate(expr));
+ }
+ ts = new CollectionTupleSource(Arrays.asList(values).iterator());
}
- return table.insert(tuples);
+ return table.insert(ts, insert.getVariables());
}
if (command instanceof Update) {
final Update update = (Update)command;
@@ -213,36 +216,6 @@
return groupToTupleSourceID.get(tempTableID);
}
- public static List<List<Object>> getBulkRows(Insert insert,
List<ElementSymbol> elements) throws ExpressionEvaluationException,
BlockedException, TeiidComponentException {
- int bulkRowCount = 1;
- if (insert.isBulk()) {
- Constant c = (Constant)insert.getValues().get(0);
- bulkRowCount = ((List<?>)c.getValue()).size();
- }
-
- List<List<Object>> tuples = new
ArrayList<List<Object>>(bulkRowCount);
-
- for (int row = 0; row < bulkRowCount; row++) {
- List<Object> currentRow = new
ArrayList<Object>(insert.getValues().size());
- for (ElementSymbol symbol : elements) {
- int index = insert.getVariables().indexOf(symbol);
- Object value = null;
- if (index != -1) {
- if (insert.isBulk()) {
- Constant multiValue = (Constant)insert.getValues().get(index);
- value = ((List<?>)multiValue.getValue()).get(row);
- } else {
- Expression expr = (Expression)insert.getValues().get(index);
- value = Evaluator.evaluate(expr);
- }
- }
- currentRow.add(value);
- }
- tuples.add(currentRow);
- }
- return tuples;
- }
-
public boolean hasTempTable(Command command){
switch (command.getType()) {
case Command.TYPE_INSERT:
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2010-07-28
21:11:54 UTC (rev 2390)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -36,7 +36,6 @@
import org.teiid.api.exception.query.QueryResolverException;
import org.teiid.client.RequestMessage;
import org.teiid.client.ResultsMessage;
-import org.teiid.common.queue.FakeWorkManager;
import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
import org.teiid.dqp.internal.datamgr.FakeTransactionService;
import org.teiid.dqp.service.AutoGenDataService;
Modified: trunk/engine/src/test/java/org/teiid/query/optimizer/TestOptimizer.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/optimizer/TestOptimizer.java 2010-07-28
21:11:54 UTC (rev 2390)
+++ trunk/engine/src/test/java/org/teiid/query/optimizer/TestOptimizer.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -6413,12 +6413,26 @@
FakeMetadataFacade metadata = FakeMetadataFactory.example1Cached();
ProcessorPlan plan = helpPlan(sql, metadata, new String[] {"SELECT DISTINCT
g_0.e1 FROM pm1.g1 AS g_0"}, ComparisonMode.EXACT_COMMAND_STRING); //$NON-NLS-1$
+ //no txn required, since an interated insert is used
+ assertFalse(plan.requiresTransaction(false));
checkNodeTypes(plan, FULL_PUSHDOWN);
checkNodeTypes(plan, new int[] {1}, new Class[] {ProjectIntoNode.class});
}
+ @Test public void testInsertQueryExpression() throws Exception {
+ String sql = "insert into pm1.g1 (e1) select e1 from pm1.g2";
//$NON-NLS-1$
+
+ FakeMetadataFacade metadata = FakeMetadataFactory.example1Cached();
+
+ ProcessorPlan plan = helpPlan(sql, metadata, new String[] {"SELECT g_0.e1
FROM pm1.g2 AS g_0"}, ComparisonMode.EXACT_COMMAND_STRING); //$NON-NLS-1$
+ //requires a txn, since an non pushdown/iterated insert is used
+ assertTrue(plan.requiresTransaction(false));
+
+ checkNodeTypes(plan, new int[] {1}, new Class[] {ProjectIntoNode.class});
+ }
+
/**
* previously the subqueries were being pushed too far and then not having the
appropriate correlated references
*/
Modified: trunk/engine/src/test/java/org/teiid/query/processor/FakeTupleSource.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/FakeTupleSource.java 2010-07-28
21:11:54 UTC (rev 2390)
+++ trunk/engine/src/test/java/org/teiid/query/processor/FakeTupleSource.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -45,8 +45,6 @@
//the first time nextTuple is called, it will throws BlockedExceptiom
private boolean blockOnce;
- private boolean exceptionOnClose;
-
public FakeTupleSource(List elements, List[] tuples) {
this.elements = elements;
this.tuples = tuples;
@@ -113,13 +111,4 @@
this.blockOnce = true;
}
- public void setExceptionOnClose(boolean exceptionOnClose) {
- this.exceptionOnClose = exceptionOnClose;
- }
-
- @Override
- public int available() {
- return 0;
- }
-
}
Modified:
trunk/engine/src/test/java/org/teiid/query/processor/TestCollectionTupleSource.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/processor/TestCollectionTupleSource.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/test/java/org/teiid/query/processor/TestCollectionTupleSource.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -29,8 +29,6 @@
import java.util.List;
import org.junit.Test;
-import org.teiid.query.processor.CollectionTupleSource;
-import org.teiid.query.sql.lang.Command;
import org.teiid.query.sql.symbol.ElementSymbol;
import org.teiid.query.sql.symbol.SingleElementSymbol;
@@ -42,11 +40,8 @@
List<SingleElementSymbol> elements = new
ArrayList<SingleElementSymbol>();
elements.add(new ElementSymbol("x")); //$NON-NLS-1$
elements.add(new ElementSymbol("y")); //$NON-NLS-1$
- CollectionTupleSource nts =
CollectionTupleSource.createNullTupleSource(elements);
+ CollectionTupleSource nts = CollectionTupleSource.createNullTupleSource();
- // Check schema
- assertEquals("Didn't get expected schema", elements,
nts.getSchema()); //$NON-NLS-1$
-
// Walk it and get no data
List tuple = nts.nextTuple();
nts.closeSource();
@@ -57,9 +52,6 @@
@Test public void testUpdateCountSource() {
CollectionTupleSource nts =
CollectionTupleSource.createUpdateCountTupleSource(5);
- // Check schema
- assertEquals("Didn't get expected schema",
Command.getUpdateCommandSymbol(), nts.getSchema()); //$NON-NLS-1$
-
// Walk it and get no data
List tuple = nts.nextTuple();
nts.closeSource();
Modified:
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestBatchedUpdateNode.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestBatchedUpdateNode.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestBatchedUpdateNode.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -39,7 +39,6 @@
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.optimizer.batch.TestBatchedUpdatePlanner;
import org.teiid.query.processor.ProcessorDataManager;
-import org.teiid.query.processor.relational.BatchedUpdateNode;
import org.teiid.query.sql.lang.BatchedUpdateCommand;
import org.teiid.query.sql.lang.Command;
import org.teiid.query.sql.visitor.EvaluatableVisitor;
@@ -233,7 +232,6 @@
this.numCommands = numCommands;
}
public void closeSource() {}
- public List getSchema() {return null;}
public List nextTuple() throws TeiidComponentException {
if (first) {
first = false;
@@ -245,10 +243,6 @@
return null;
}
- @Override
- public int available() {
- return 0;
- }
}
}
Modified:
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestGroupingNode.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestGroupingNode.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestGroupingNode.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -57,7 +57,7 @@
public class TestGroupingNode {
- public static TupleSource createTupleSource1() {
+ public static FakeTupleSource createTupleSource1() {
List<ElementSymbol> symbols = new ArrayList<ElementSymbol>();
symbols.add(new ElementSymbol("col1")); //$NON-NLS-1$
symbols.get(0).setType(DataTypeManager.DefaultDataClasses.INTEGER);
@@ -90,7 +90,7 @@
List[] expected, ProcessorDataManager dataMgr) throws
TeiidComponentException,
BlockedException,
TeiidProcessingException {
- TupleSource dataSource = createTupleSource1();
+ FakeTupleSource dataSource = createTupleSource1();
helpProcess(mgr, node, context, expected, dataSource, dataMgr);
}
@@ -98,7 +98,7 @@
GroupingNode node,
CommandContext context,
List[] expected,
- TupleSource dataSource, ProcessorDataManager dataMgr) throws
TeiidComponentException,
+ FakeTupleSource dataSource, ProcessorDataManager dataMgr)
throws TeiidComponentException,
BlockedException,
TeiidProcessingException {
RelationalNode dataNode = new FakeRelationalNode(0, dataSource,
mgr.getProcessorBatchSize());
@@ -250,7 +250,7 @@
List symbols = new ArrayList();
symbols.add(bigDecimal);
- TupleSource dataSource = new FakeTupleSource(symbols, data);
+ FakeTupleSource dataSource = new FakeTupleSource(symbols, data);
helpProcess(mgr, node, context, expected, dataSource, null);
}
@@ -292,7 +292,7 @@
List symbols = new ArrayList();
symbols.add(col1);
symbols.add(bigDecimal);
- TupleSource dataSource = new FakeTupleSource(symbols, data);
+ FakeTupleSource dataSource = new FakeTupleSource(symbols, data);
helpProcess(mgr, node, context, expected, dataSource, null);
}
@@ -385,7 +385,7 @@
List symbols = new ArrayList();
symbols.add(col1);
symbols.add(bigDecimal);
- TupleSource dataSource = new FakeTupleSource(symbols, data);
+ FakeTupleSource dataSource = new FakeTupleSource(symbols, data);
helpProcess(mgr, node, context, expected, dataSource, null);
}
Modified:
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -22,42 +22,42 @@
package org.teiid.query.processor.relational;
+import static org.junit.Assert.*;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.junit.Test;
import org.teiid.api.exception.query.ExpressionEvaluationException;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleSource;
import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
+import org.teiid.query.eval.Evaluator;
import org.teiid.query.processor.FakeTupleSource;
import org.teiid.query.processor.ProcessorDataManager;
-import org.teiid.query.processor.relational.ProjectIntoNode;
-import org.teiid.query.processor.relational.RelationalNode;
+import org.teiid.query.processor.relational.ProjectIntoNode.Mode;
import org.teiid.query.sql.lang.BatchedUpdateCommand;
import org.teiid.query.sql.lang.Command;
import org.teiid.query.sql.lang.Insert;
import org.teiid.query.sql.symbol.Constant;
import org.teiid.query.sql.symbol.ElementSymbol;
+import org.teiid.query.sql.symbol.Expression;
import org.teiid.query.sql.symbol.GroupSymbol;
-import org.teiid.query.tempdata.TempTableStore;
import org.teiid.query.util.CommandContext;
-import junit.framework.TestCase;
-
-
-
/**
* @since 4.2
*/
-public class TestProjectIntoNode extends TestCase {
+public class TestProjectIntoNode {
// Rows should be a multiple of batch size for this test to work
private static final int NUM_ROWS = 1000;
- private void helpTestNextBatch(int tupleBatchSize, boolean doBatching, boolean
doBulkInsert, boolean exceptionOnClose) throws Exception {
+ private void helpTestNextBatch(int tupleBatchSize, Mode mode) throws Exception {
ProjectIntoNode node = new ProjectIntoNode(2);
@@ -73,14 +73,13 @@
elements.add(elementSymbol_1);
elements.add(elementSymbol_2);
node.setIntoElements(elements);
- node.setDoBatching(doBatching);
- node.setDoBulkInsert(doBulkInsert);
+ node.setMode(mode);
node.setModelName("myModel"); //$NON-NLS-1$
CommandContext context = new CommandContext();
context.setProcessorID("processorID"); //$NON-NLS-1$
BufferManager bm = NodeTestUtil.getTestBufferManager(tupleBatchSize,
tupleBatchSize);
- ProcessorDataManager dataManager = new FakePDM(tupleBatchSize,
exceptionOnClose);
+ ProcessorDataManager dataManager = new FakePDM(tupleBatchSize);
child.initialize(context, bm, dataManager);
node.initialize(context, bm, dataManager);
@@ -106,32 +105,34 @@
assertEquals(new Integer(NUM_ROWS), columns[0]);
}
- public void testNextBatch() throws Exception {
- helpTestNextBatch(100, true, false, false);
+ @Test public void testNextBatch() throws Exception {
+ helpTestNextBatch(100, Mode.BATCH);
}
- public void testNextBatch_BulkInsert() throws Exception {
- helpTestNextBatch(100, false, true, false);
+ @Test public void testNextBatch_BulkInsert() throws Exception {
+ helpTestNextBatch(100, Mode.BULK);
}
- public void testNextBatch_NoBatching() throws Exception {
- helpTestNextBatch(100, false, false, false);
+ @Test public void testNextBatch_NoBatching() throws Exception {
+ helpTestNextBatch(100, Mode.SINGLE);
}
- public void testNextBatch_Size20Batches() throws Exception {
- helpTestNextBatch(20, true, false, false);
+ @Test public void testNextBatch_Size20Batches() throws Exception {
+ helpTestNextBatch(20, Mode.BATCH);
}
+ @Test public void testNextBatch_Iterator() throws Exception {
+ helpTestNextBatch(100, Mode.ITERATOR);
+ }
+
private static final class FakePDM implements ProcessorDataManager {
private int expectedBatchSize;
private int callCount = 0;
- private boolean exceptionOnClose;
- private FakePDM(int expectedBatchSize, boolean exceptionOnClose) {
+ private FakePDM(int expectedBatchSize) {
this.expectedBatchSize = expectedBatchSize;
- this.exceptionOnClose = exceptionOnClose;
}
public Object lookupCodeValue(CommandContext context,String codeTableName,String
returnElementName,String keyElementName,Object keyValue) throws
BlockedException,TeiidComponentException {return null;}
- public TupleSource registerRequest(Object processorID,Command command,String
modelName,String connectorBindingId, int nodeID) throws TeiidComponentException,
ExpressionEvaluationException {
+ public TupleSource registerRequest(Object processorID,Command command,String
modelName,String connectorBindingId, int nodeID) throws TeiidComponentException,
TeiidProcessingException {
callCount++;
int batchSize = 1;
@@ -140,15 +141,23 @@
if (command instanceof Insert) {
Insert insert = (Insert)command;
if (insert.isBulk()) {
- List batch = TempTableStore.getBulkRows(insert,
insert.getVariables());
+ List batch = getBulkRows(insert, insert.getVariables());
batchSize = batch.size();
assertEquals("Unexpected batch on call " + callCount,
expectedBatchSize, batchSize); //$NON-NLS-1$
for (int i = 0; i < batchSize; i++) {
ensureValue2((List)batch.get(i), 2, ((callCount-1) * batchSize) +
i + 1);
}
+ } else if (insert.getTupleSource() != null) {
+ TupleSource ts = insert.getTupleSource();
+ List tuple = null;
+ int i = 0;
+ while ((tuple = ts.nextTuple()) != null) {
+ ensureValue2(tuple, 2, ++i);
+ }
+ batchSize = i;
} else {
- ensureValue((Insert)command, 2, callCount);
+ ensureValue(insert, 2, callCount);
}
} else if ( command instanceof BatchedUpdateCommand ){
BatchedUpdateCommand bu = (BatchedUpdateCommand)command;
@@ -161,7 +170,6 @@
}
List counts = Arrays.asList(new Object[] { new Integer(batchSize)});
FakeTupleSource fakeTupleSource = new FakeTupleSource(null, new List[]
{counts});
- fakeTupleSource.setExceptionOnClose(this.exceptionOnClose);
return fakeTupleSource;
}
@@ -201,9 +209,36 @@
? null
: Arrays.asList(new Object[] {new Integer(currentRow),
Integer.toString(currentRow)});
}
- @Override
- public int available() {
- return 0;
- }
}
+
+ public static List<List<Object>> getBulkRows(Insert insert,
List<ElementSymbol> elements) throws ExpressionEvaluationException,
BlockedException, TeiidComponentException {
+ int bulkRowCount = 1;
+ if (insert.isBulk()) {
+ Constant c = (Constant)insert.getValues().get(0);
+ bulkRowCount = ((List<?>)c.getValue()).size();
+ }
+
+ List<List<Object>> tuples = new
ArrayList<List<Object>>(bulkRowCount);
+
+ for (int row = 0; row < bulkRowCount; row++) {
+ List<Object> currentRow = new
ArrayList<Object>(insert.getValues().size());
+ for (ElementSymbol symbol : elements) {
+ int index = insert.getVariables().indexOf(symbol);
+ Object value = null;
+ if (index != -1) {
+ if (insert.isBulk()) {
+ Constant multiValue = (Constant)insert.getValues().get(index);
+ value = ((List<?>)multiValue.getValue()).get(row);
+ } else {
+ Expression expr = (Expression)insert.getValues().get(index);
+ value = Evaluator.evaluate(expr);
+ }
+ }
+ currentRow.add(value);
+ }
+ tuples.add(currentRow);
+ }
+ return tuples;
+ }
+
}
Modified:
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java 2010-07-28
21:11:54 UTC (rev 2390)
+++
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java 2010-07-29
15:18:33 UTC (rev 2391)
@@ -305,7 +305,7 @@
BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
TupleBuffer tsid = bm.createTupleBuffer(Arrays.asList(es1), "test",
TupleSourceType.PROCESSOR); //$NON-NLS-1$
tsid.addTuple(Arrays.asList(1));
- SortUtility su = new SortUtility(tsid.createIndexedTupleSource(),
Arrays.asList(es1), Arrays.asList(Boolean.TRUE), Mode.DUP_REMOVE, bm, "test");
//$NON-NLS-1$
+ SortUtility su = new SortUtility(tsid.createIndexedTupleSource(),
Arrays.asList(es1), Arrays.asList(Boolean.TRUE), Mode.DUP_REMOVE, bm, "test",
tsid.getSchema()); //$NON-NLS-1$
TupleBuffer out = su.sort();
TupleSource ts = out.createIndexedTupleSource();
assertEquals(Arrays.asList(1), ts.nextTuple());