Author: rareddy
Date: 2010-02-18 15:18:35 -0500 (Thu, 18 Feb 2010)
New Revision: 1842
Added:
branches/JCA/console/src/main/java/org/teiid/rhq/plugin/util/DeploymentUtils.java
Removed:
branches/JCA/console/.settings/
Modified:
branches/JCA/build/kit-jboss-container/deploy/teiid/teiid-engine.rar/META-INF/ra.xml
branches/JCA/build/kit-jboss-container/deploy/teiid/teiid-jboss-beans.xml
branches/JCA/build/kit-runtime/deploy.properties
branches/JCA/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
branches/JCA/client/src/main/java/com/metamatrix/common/lob/ReaderInputStream.java
branches/JCA/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java
branches/JCA/common-core/src/main/java/com/metamatrix/core/util/HashCodeUtil.java
branches/JCA/connector-api/src/main/java/org/teiid/connector/visitor/util/SQLStringVisitor.java
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/postgresql/LocateFunctionModifier.java
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/ConvertModifier.java
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/LocateFunctionModifier.java
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/SQLConversionVisitor.java
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/TranslatedCommand.java
branches/JCA/connectors/connector-jdbc/src/test/java/org/teiid/connector/jdbc/db2/TestDB2SqlTranslator.java
branches/JCA/connectors/connector-jdbc/src/test/java/org/teiid/connector/jdbc/postgresql/TestPostgreSQLTranslator.java
branches/JCA/console/src/main/java/org/teiid/rhq/plugin/Facet.java
branches/JCA/console/src/main/java/org/teiid/rhq/plugin/PlatformComponent.java
branches/JCA/console/src/main/java/org/teiid/rhq/plugin/VDBDiscoveryComponent.java
branches/JCA/console/src/main/java/org/teiid/rhq/plugin/util/ProfileServiceUtil.java
branches/JCA/console/src/resources/embedded/META-INF/rhq-plugin.xml
branches/JCA/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
branches/JCA/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
branches/JCA/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java
branches/JCA/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
branches/JCA/engine/src/main/java/com/metamatrix/query/eval/Evaluator.java
branches/JCA/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleImplementJoinStrategy.java
branches/JCA/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RulePushSelectCriteria.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/BatchCollector.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/AccessNode.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/BatchedUpdateNode.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentAccessNode.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentProcedureAccessNode.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentProcedureExecutionNode.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/InsertPlanExecutionNode.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/JoinNode.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/JoinStrategy.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/LimitNode.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/MergeJoinStrategy.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/NullNode.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/PlanExecutionNode.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/ProjectIntoNode.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/RelationalNode.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SourceState.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SubqueryAwareEvaluator.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SubqueryAwareRelationalNode.java
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/UnionAllNode.java
branches/JCA/engine/src/main/java/com/metamatrix/query/sql/lang/JoinType.java
branches/JCA/engine/src/main/java/com/metamatrix/query/sql/lang/SetQuery.java
branches/JCA/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/CodeTableCache.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
branches/JCA/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java
branches/JCA/engine/src/test/java/com/metamatrix/query/optimizer/TestPartitionedJoinPlanning.java
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/xml/TestXMLPlanningEnhancements.java
branches/JCA/engine/src/test/java/com/metamatrix/query/resolver/TestResolver.java
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestCodeTableCache.java
branches/JCA/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
branches/JCA/test-integration/common/src/test/java/com/metamatrix/connector/jdbc/extension/TestSQLConversionVisitor.java
Log:
TEIID-833: forward merging trunk into JCA -r 1798:1840
Modified:
branches/JCA/build/kit-jboss-container/deploy/teiid/teiid-engine.rar/META-INF/ra.xml
===================================================================
---
branches/JCA/build/kit-jboss-container/deploy/teiid/teiid-engine.rar/META-INF/ra.xml 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/build/kit-jboss-container/deploy/teiid/teiid-engine.rar/META-INF/ra.xml 2010-02-18
20:18:35 UTC (rev 1842)
@@ -47,10 +47,10 @@
<config-property-value>localhost</config-property-value>
</config-property>
<config-property>
- <description>Process pool maximum thread count. (default
64)</description>
+ <description>Process pool maximum thread count. (default 16)
Increase this value if your load includes a large number of XQueries or if the
system's available processors is larger than 8.</description>
<config-property-name>MaxThreads</config-property-name>
<config-property-type>java.lang.Integer</config-property-type>
- <config-property-value>64</config-property-value>
+ <config-property-value>16</config-property-value>
</config-property>
<config-property>
<description>Query processor time slice, in milliseconds. (default
2000)</description>
@@ -62,16 +62,15 @@
<description>Plan debug messages allowed. see option
debug.</description>
<config-property-name>OptionDebugAllowed</config-property-name>
<config-property-type>java.lang.Boolean</config-property-type>
- <config-property-value>true</config-property-value>
</config-property>
<config-property>
- <description>Maximum allowed fetch size, set via JDBC. User
requested value ignored above this value. (default 20000)</description>
+ <description>Maximum allowed fetch size, set via JDBC. User
requested value ignored above this value. (default 20480)</description>
<config-property-name>MaxRowsFetchSize</config-property-name>
<config-property-type>java.lang.Integer</config-property-type>
- <config-property-value>2000</config-property-value>
+ <config-property-value>20480</config-property-value>
</config-property>
<config-property>
- <description>The max lob chunk size transferred each time when
processing blobs, clobs(10KB default)</description>
+ <description>The max lob chunk size in KB transferred each time when
processing blobs, clobs(100KB default)</description>
<config-property-name>LobChunkSizeInKB</config-property-name>
<config-property-type>java.lang.Integer</config-property-type>
<config-property-value>10</config-property-value>
Modified: branches/JCA/build/kit-jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- branches/JCA/build/kit-jboss-container/deploy/teiid/teiid-jboss-beans.xml 2010-02-18
20:12:13 UTC (rev 1841)
+++ branches/JCA/build/kit-jboss-container/deploy/teiid/teiid-jboss-beans.xml 2010-02-18
20:18:35 UTC (rev 1842)
@@ -48,10 +48,23 @@
<property name="useDisk">true</property>
<property
name="diskDirectory">${jboss.server.temp.dir}/teiid</property>
<property name="bufferMemorySizeInMB">64</property>
- <property name="processorBatchSize">2000</property>
- <property name="connectorBatchSize">2000</property>
+ <!-- The max row count of a batch sent internally within the query processor.
Should be <= the connectorBatchSize. (default 256) -->
+ <property name="processorBatchSize">512</property>
+ <!-- The max row count of a batch from a connector. Should be even multiple of
processorBatchSize. (default 512) -->
+ <property name="connectorBatchSize">1024</property>
<property name="maxProcessingBatches">8</property>
- <property name="maxReserveBatches">64</property>
+ <!--
+ The number of batch columns to allow in memory (default 16384).
+ This value should be set lower or higher depending on the available memory to
Teiid in the VM.
+ 16384 is considered a good default for a dedicated 32-bit VM running Teiid
with a 1 gig heap.
+ -->
+ <property name="maxReserveBatchColumns">16384</property>
+ <!--
+ The number of batch columns guarenteed to a processing operation. Set this
value lower if the workload typically
+ processes larger numbers of concurrent queries with large intermediate
results from operations such as sorting,
+ grouping, etc. (default 124)
+ -->
+ <property
name="maxProcessingBatchesColumns">128</property>
<!-- Max File size in GB -->
<property name="maxFileSize">2</property>
<property name="maxOpenFiles">256</property>
Modified: branches/JCA/build/kit-runtime/deploy.properties
===================================================================
--- branches/JCA/build/kit-runtime/deploy.properties 2010-02-18 20:12:13 UTC (rev 1841)
+++ branches/JCA/build/kit-runtime/deploy.properties 2010-02-18 20:18:35 UTC (rev 1842)
@@ -28,37 +28,42 @@
# Processor settings
#
-#Process pool maximum thread count. (default 64)
-process.maxThreads=64
+#Process pool maximum thread count. (default 16) Increase this value if your load
includes a large number of XQueries
+#or if the system's available processors is larger than 8.
+process.maxThreads=16
#Query processor time slice, in milliseconds. (default 2000)
process.timeSliceInMilli=2000
-#Plan debug messages allowed. see option debug.
-process.optionDebugAllowed=true
-
#Maximum allowed fetch size, set via JDBC. User requested value ignored above this value.
(default 20480)
process.maxRowsFetchSize=20480
-# The max lob chunk size transferred each time when processing blobs, clobs(100KB
default)
+# The max lob chunk size in KB transferred each time when processing blobs, clobs(100KB
default)
process.lobChunkSizeInKB=100
#
# BufferManager Settings
#
-#The max size of a batch sent between connector and query service. Should be even
multiple of processorBatchSize. (default 2048)
-dqp.buffer.connectorBatchSize=2048
+#The max row count of a batch from a connector. Should be even multiple of
processorBatchSize. (default 512)
+dqp.buffer.connectorBatchSize=1024
-#The max size of a batch sent internally within the query processor. Should be <= the
connectorBatchSize. (default 1024)
-dqp.buffer.processorBatchSize=1024
+#The max row count of a batch sent internally within the query processor. Should be <=
the connectorBatchSize. (default 256)
+dqp.buffer.processorBatchSize=512
#Defines whether to use disk buffering or not. (default true)
dqp.buffer.useDisk=true
-#The number of batches to actively hold in the BufferManager
-org.teiid.buffer.maxReserveBatches=64
+#The number of batch columns to allow in memory (default 16384).
+#This value should be set lower or higher depending on the available memory to Teiid in
the VM.
+#16384 is considered a good default for a dedicated 32-bit VM running Teiid with a 1 gig
heap.
+org.teiid.buffer.maxReserveBatchColumns=16384
+#The number of batch columns guarenteed to a processing operation. Set this value lower
if the workload typically
+#processes larger numbers of concurrent queries with large intermediate results from
operations such as sorting,
+#grouping, etc. (default 124)
+org.teiid.buffer.maxProcessingBatchesColumns=128
+
#
# Cache Settings
#
Modified:
branches/JCA/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java
===================================================================
---
branches/JCA/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -322,10 +322,6 @@
int b;
boolean readingShort;
int length = in.readInt();
- /* Although using a StringBuffer and doing a toString() to get the String
value reuses
- * the StringBuffer's internal char[], the StringBuffer.append() calls
are all synchronized,
- * and likely too costly compared to simply copying the array during
derialization.
- */
char[] chars = new char[length];
readingShort = true;
for (int i = 0; i < length; i++) {
Modified:
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java
===================================================================
---
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -58,6 +58,7 @@
private final Socket socket;
private ObjectOutputStream outputStream;
private ObjectInputStream inputStream;
+ private Object readLock = new Object();
private OioObjectChannel(Socket socket) throws IOException {
log.fine("creating new OioObjectChannel"); //$NON-NLS-1$
@@ -118,13 +119,15 @@
//## JDBC4.0-end ##
public Object read() throws IOException, ClassNotFoundException {
log.finer("reading message from socket"); //$NON-NLS-1$
- try {
- return inputStream.readObject();
- } catch (SocketTimeoutException e) {
- throw e;
- } catch (IOException e) {
- close();
- throw e;
+ synchronized (readLock) {
+ try {
+ return inputStream.readObject();
+ } catch (SocketTimeoutException e) {
+ throw e;
+ } catch (IOException e) {
+ close();
+ throw e;
+ }
}
}
Modified:
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
===================================================================
---
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -70,19 +70,18 @@
public class SocketServerInstanceImpl implements SocketServerInstance {
static final int HANDSHAKE_RETRIES = 10;
+ private static Logger log = Logger.getLogger("org.teiid.client.sockets");
//$NON-NLS-1$
private AtomicInteger MESSAGE_ID = new AtomicInteger();
+ private Map<Serializable, ResultsReceiver<Object>> asynchronousListeners
= new ConcurrentHashMap<Serializable, ResultsReceiver<Object>>();
private HostInfo hostInfo;
private boolean ssl;
- private ObjectChannel socketChannel;
- private static Logger log = Logger.getLogger("org.teiid.client.sockets");
//$NON-NLS-1$
private long synchTimeout;
+ private ObjectChannel socketChannel;
private Cryptor cryptor;
- private Map<Serializable, ResultsReceiver<Object>> asynchronousListeners
= new ConcurrentHashMap<Serializable, ResultsReceiver<Object>>();
-
private boolean hasReader;
public SocketServerInstanceImpl() {
@@ -95,7 +94,7 @@
this.synchTimeout = synchTimeout;
}
- public void connect(ObjectChannelFactory channelFactory) throws
CommunicationException, IOException {
+ public synchronized void connect(ObjectChannelFactory channelFactory) throws
CommunicationException, IOException {
InetSocketAddress address = new InetSocketAddress(hostInfo.getInetAddress(),
hostInfo.getPortNumber());
this.socketChannel = channelFactory.createObjectChannel(address, ssl);
try {
@@ -249,6 +248,45 @@
public Cryptor getCryptor() {
return this.cryptor;
}
+
+ void read(long timeout, TimeUnit unit, ResultsFuture<?> future) throws
TimeoutException, InterruptedException {
+ long timeoutMillis = (int)Math.min(unit.toMillis(timeout), Integer.MAX_VALUE);
+ long start = System.currentTimeMillis();
+ while (!future.isDone()) {
+ boolean reading = false;
+ synchronized (this) {
+ if (!hasReader) {
+ hasReader = true;
+ reading = true;
+ } else if (!future.isDone()) {
+ this.wait(Math.max(1, timeoutMillis));
+ }
+ }
+ if (reading) {
+ try {
+ if (!future.isDone()) {
+ receivedMessage(socketChannel.read());
+ }
+ } catch (SocketTimeoutException e) {
+ } catch (Exception e) {
+ exceptionOccurred(e);
+ } finally {
+ synchronized (this) {
+ hasReader = false;
+ this.notifyAll();
+ }
+ }
+ }
+ if (!future.isDone()) {
+ long now = System.currentTimeMillis();
+ timeoutMillis -= now - start;
+ start = now;
+ if (timeoutMillis <= 0) {
+ throw new TimeoutException();
+ }
+ }
+ }
+ }
@SuppressWarnings("unchecked")
//## JDBC4.0-begin ##
@@ -315,42 +353,7 @@
public Object get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
- long timeoutMillis = (int)Math.min(unit.toMillis(timeout), Integer.MAX_VALUE);
- long start = System.currentTimeMillis();
- while (!isDone()) {
- boolean reading = false;
- synchronized (SocketServerInstanceImpl.this) {
- if (!hasReader) {
- hasReader = true;
- reading = true;
- } else if (!isDone()) {
- SocketServerInstanceImpl.this.wait(Math.max(1, timeoutMillis));
- }
- }
- if (reading) {
- try {
- if (!isDone()) {
- receivedMessage(socketChannel.read());
- }
- } catch (SocketTimeoutException e) {
- } catch (Exception e) {
- exceptionOccurred(e);
- } finally {
- synchronized (SocketServerInstanceImpl.this) {
- hasReader = false;
- SocketServerInstanceImpl.this.notifyAll();
- }
- }
- }
- if (!isDone()) {
- long now = System.currentTimeMillis();
- timeoutMillis -= now - start;
- start = now;
- if (timeoutMillis <= 0) {
- throw new TimeoutException();
- }
- }
- }
+ read(timeout, unit, this);
return super.get(timeout, unit);
}
};
Modified:
branches/JCA/client/src/main/java/com/metamatrix/common/lob/ReaderInputStream.java
===================================================================
---
branches/JCA/client/src/main/java/com/metamatrix/common/lob/ReaderInputStream.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/client/src/main/java/com/metamatrix/common/lob/ReaderInputStream.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -32,13 +32,15 @@
import java.nio.CharBuffer;
import java.nio.charset.Charset;
+import com.metamatrix.common.types.Streamable;
+
public class ReaderInputStream extends InputStream {
- private static final int DEFAULT_BUFFER_SIZE = 100 * 1024;
+ private static final int DEFAULT_BUFFER_SIZE =
Streamable.STREAMING_BATCH_SIZE_IN_BYTES;
private final Reader reader;
private final Charset charSet;
- private final int bufferSize;
+ private char[] charBuffer;
private boolean hasMore = true;
private ByteBuffer currentBuffer;
@@ -52,7 +54,7 @@
public ReaderInputStream(Reader reader, Charset charSet, int bufferSize) {
this.reader = reader;
this.charSet = charSet;
- this.bufferSize = bufferSize;
+ this.charBuffer = new char[bufferSize];
if (charSet.displayName().equalsIgnoreCase("UTF-16")) { //$NON-NLS-1$
prefixBytes = 2;
}
@@ -64,18 +66,12 @@
if (!hasMore) {
return -1;
}
- char[] charBuffer = new char[bufferSize];
int charsRead = reader.read(charBuffer);
if (charsRead == -1) {
hasMore = false;
return -1;
}
- if (charsRead != charBuffer.length) {
- char[] buf = new char[charsRead];
- System.arraycopy(charBuffer, 0, buf, 0, charsRead);
- charBuffer = buf;
- }
- currentBuffer = charSet.encode(CharBuffer.wrap(charBuffer));
+ currentBuffer = charSet.encode(CharBuffer.wrap(charBuffer, 0, charsRead));
if (!needsPrefix) {
currentBuffer.position(prefixBytes);
}
Modified:
branches/JCA/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java
===================================================================
---
branches/JCA/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -34,6 +34,7 @@
/*## JDBC3.0-JDK1.5-begin ##
import com.metamatrix.core.jdbc.SQLXML;
## JDBC3.0-JDK1.5-end ##*/
+import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
@@ -43,7 +44,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.WeakHashMap;
import javax.xml.transform.Source;
@@ -68,6 +68,7 @@
import com.metamatrix.core.ErrorMessageKeys;
import com.metamatrix.core.MetaMatrixRuntimeException;
import com.metamatrix.core.util.ArgCheck;
+import com.metamatrix.core.util.HashCodeUtil;
/**
* <p>
@@ -85,11 +86,57 @@
*/
public class DataTypeManager {
- private static final int MAX_VALUE_MAP_SIZE = 10000;
- private static boolean USE_VALUE_CACHE =
PropertiesUtils.getBooleanProperty(System.getProperties(),
"org.teiid.useValueCache", true); //$NON-NLS-1$
+ private static final boolean USE_VALUE_CACHE =
PropertiesUtils.getBooleanProperty(System.getProperties(),
"org.teiid.useValueCache", true); //$NON-NLS-1$
- private static Map<Class<?>, Map<Object, WeakReference<Object>>>
valueMaps = new HashMap<Class<?>, Map<Object,
WeakReference<Object>>>();
+ private static boolean valueCacheEnabled;
+
+ private interface ValueCache<T> {
+ T getValue(T value);
+ }
+
+ private static class HashedValueCache<T> implements ValueCache<T> {
+
+ final Object[] cache;
+ final boolean weak = false;
+
+ HashedValueCache(int size) {
+ cache = new Object[1 << size];
+ }
+
+ @SuppressWarnings("unchecked")
+ public T getValue(T value) {
+ int index = hash(primaryHash(value)) & (cache.length - 1);
+ Object canonicalValue = get(index);
+ if (value.equals(canonicalValue)) {
+ return (T)canonicalValue;
+ }
+ set(index, value);
+ return value;
+ }
+
+ protected Object get(int index) {
+ return cache[index];
+ }
+
+ protected void set(int index, T value) {
+ cache[index] = value;
+ }
+
+ protected int primaryHash(T value) {
+ return value.hashCode();
+ }
+ /*
+ * The same power of 2 hash bucketing from the Java HashMap
+ */
+ final static int hash(int h) {
+ h ^= (h >>> 20) ^ (h >>> 12);
+ return h ^= (h >>> 7) ^ (h >>> 4);
+ }
+ }
+
+ private static Map<Class<?>, ValueCache<?>> valueMaps = new
HashMap<Class<?>, ValueCache<?>>(128);
+
public static final int MAX_STRING_LENGTH = 4000;
public static final class DefaultDataTypes {
@@ -152,7 +199,7 @@
* Doubly-nested map of String srcType --> Map of String targetType -->
* Transform
*/
- private static Map<String, Map<String, Transform>> transforms = new
HashMap<String, Map<String, Transform>>();
+ private static Map<String, Map<String, Transform>> transforms = new
HashMap<String, Map<String, Transform>>(128);
/** Utility to easily get Transform given srcType and targetType */
private static Transform getTransformFromMaps(String srcType,
@@ -165,10 +212,10 @@
}
/** Base data type names and classes, Type name --> Type class */
- private static Map<String, Class> dataTypeNames = new LinkedHashMap<String,
Class>();
+ private static Map<String, Class> dataTypeNames = new LinkedHashMap<String,
Class>(128);
/** Base data type names and classes, Type class --> Type name */
- private static Map<Class, String> dataTypeClasses = new LinkedHashMap<Class,
String>();
+ private static Map<Class, String> dataTypeClasses = new LinkedHashMap<Class,
String>(128);
private static Set<String> DATA_TYPE_NAMES =
Collections.unmodifiableSet(dataTypeNames.keySet());
@@ -426,38 +473,94 @@
*/
static void loadDataTypes() {
DataTypeManager.addDataType(DefaultDataTypes.BOOLEAN, DefaultDataClasses.BOOLEAN);
- valueMaps.put(DefaultDataClasses.BOOLEAN, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.BYTE, DefaultDataClasses.BYTE);
- valueMaps.put(DefaultDataClasses.BYTE, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.SHORT, DefaultDataClasses.SHORT);
- valueMaps.put(DefaultDataClasses.SHORT, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.CHAR, DefaultDataClasses.CHAR);
- valueMaps.put(DefaultDataClasses.CHAR, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.INTEGER, DefaultDataClasses.INTEGER);
- valueMaps.put(DefaultDataClasses.INTEGER, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.LONG, DefaultDataClasses.LONG);
- valueMaps.put(DefaultDataClasses.LONG, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.BIG_INTEGER,
DefaultDataClasses.BIG_INTEGER);
- valueMaps.put(DefaultDataClasses.BIG_INTEGER, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.FLOAT, DefaultDataClasses.FLOAT);
- valueMaps.put(DefaultDataClasses.FLOAT, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.DOUBLE, DefaultDataClasses.DOUBLE);
- valueMaps.put(DefaultDataClasses.DOUBLE, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.BIG_DECIMAL,
DefaultDataClasses.BIG_DECIMAL);
- valueMaps.put(DefaultDataClasses.BIG_DECIMAL, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.DATE, DefaultDataClasses.DATE);
- valueMaps.put(DefaultDataClasses.DATE, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.TIME, DefaultDataClasses.TIME);
- valueMaps.put(DefaultDataClasses.TIME, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.TIMESTAMP, DefaultDataClasses.TIMESTAMP);
- valueMaps.put(DefaultDataClasses.TIMESTAMP, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.STRING, DefaultDataClasses.STRING);
- valueMaps.put(DefaultDataClasses.STRING, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.CLOB, DefaultDataClasses.CLOB);
DataTypeManager.addDataType(DefaultDataTypes.XML, DefaultDataClasses.XML);
DataTypeManager.addDataType(DefaultDataTypes.OBJECT, DefaultDataClasses.OBJECT);
DataTypeManager.addDataType(DefaultDataTypes.NULL, DefaultDataClasses.NULL);
DataTypeManager.addDataType(DefaultDataTypes.BLOB, DefaultDataClasses.BLOB);
+
+ if (USE_VALUE_CACHE) {
+ valueMaps.put(DefaultDataClasses.BOOLEAN, new ValueCache<Boolean>() {
+ @Override
+ public Boolean getValue(Boolean value) {
+ return Boolean.valueOf(value);
+ }
+ });
+ valueMaps.put(DefaultDataClasses.BYTE, new ValueCache<Byte>() {
+ @Override
+ public Byte getValue(Byte value) {
+ return Byte.valueOf(value);
+ }
+ });
+ valueMaps.put(DefaultDataClasses.SHORT, new HashedValueCache<Short>(13));
+ valueMaps.put(DefaultDataClasses.CHAR, new HashedValueCache<Character>(13));
+ valueMaps.put(DefaultDataClasses.INTEGER, new HashedValueCache<Integer>(14));
+ valueMaps.put(DefaultDataClasses.LONG, new HashedValueCache<Long>(14));
+ valueMaps.put(DefaultDataClasses.BIG_INTEGER, new
HashedValueCache<BigInteger>(14));
+ valueMaps.put(DefaultDataClasses.FLOAT, new HashedValueCache<Float>(14));
+ valueMaps.put(DefaultDataClasses.DOUBLE, new HashedValueCache<Double>(14));
+ valueMaps.put(DefaultDataClasses.DATE, new HashedValueCache<Date>(14));
+ valueMaps.put(DefaultDataClasses.TIME, new HashedValueCache<Time>(14));
+ valueMaps.put(DefaultDataClasses.TIMESTAMP, new
HashedValueCache<Timestamp>(14));
+ valueMaps.put(DefaultDataClasses.BIG_DECIMAL, new
HashedValueCache<BigDecimal>(15) {
+ @Override
+ protected Object get(int index) {
+ WeakReference<?> ref = (WeakReference<?>) cache[index];
+ if (ref != null) {
+ return ref.get();
+ }
+ return null;
+ }
+
+ @Override
+ protected void set(int index, BigDecimal value) {
+ cache[index] = new WeakReference<BigDecimal>(value);
+ }
+ });
+ valueMaps.put(DefaultDataClasses.STRING, new HashedValueCache<String>(15) {
+ HashedValueCache<String> smallCache = new HashedValueCache<String>(13);
+
+ @Override
+ public String getValue(String value) {
+ if (value.length() < 14) {
+ return smallCache.getValue(value);
+ }
+ return super.getValue(value);
+ }
+
+ @Override
+ protected Object get(int index) {
+ WeakReference<?> ref = (WeakReference<?>) cache[index];
+ if (ref != null) {
+ return ref.get();
+ }
+ return null;
+ }
+
+ @Override
+ protected void set(int index, String value) {
+ cache[index] = new WeakReference<Object>(value);
+ }
+
+ @Override
+ protected int primaryHash(String value) {
+ return HashCodeUtil.expHashCode(value);
+ }
+ });
+ }
}
/**
@@ -723,27 +826,26 @@
sourceConverters.put(sourceClass, transform);
}
+ public static void setValueCacheEnabled(boolean enabled) {
+ valueCacheEnabled = enabled;
+ }
+
+ public static boolean isValueCacheEnabled() {
+ return valueCacheEnabled;
+ }
+
@SuppressWarnings("unchecked")
public static <T> T getCanonicalValue(T value) {
- if (USE_VALUE_CACHE) {
+ if (USE_VALUE_CACHE && valueCacheEnabled) {
if (value == null) {
return null;
}
- Map<Object, WeakReference<Object>> valueMap =
valueMaps.get(value.getClass());
- if (valueMap == null) {
+ //TODO: this initial lookup is inefficient, since there are likely collisions
+ ValueCache valueCache = valueMaps.get(value.getClass());
+ if (valueCache == null) {
return value;
}
- WeakReference<Object> valueReference = valueMap.get(value);
- Object canonicalValue = null;
- if (valueReference != null) {
- canonicalValue = valueReference.get();
- }
- if (canonicalValue != null) {
- return (T)canonicalValue;
- }
- if (valueMap.size() <= MAX_VALUE_MAP_SIZE) {
- valueMap.put(value, new WeakReference<Object>(value));
- }
+ return (T)valueCache.getValue(value);
}
return value;
}
Modified:
branches/JCA/common-core/src/main/java/com/metamatrix/core/util/HashCodeUtil.java
===================================================================
---
branches/JCA/common-core/src/main/java/com/metamatrix/core/util/HashCodeUtil.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/common-core/src/main/java/com/metamatrix/core/util/HashCodeUtil.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.List;
+import java.util.RandomAccess;
/**
* <P>This class provides utility functions for generating good
@@ -105,9 +106,7 @@
* Compute a hash code on a large collection by walking the list
* and combining the hash code at every exponential index:
* 1, 2, 4, 8, ... This has been shown to give a good hash
- * for good time complexity. This uses an iterator to walk
- * the collection and pull the necessary hash code values.
- * Slower than a List or array but faster than getting EVERY value.
+ * for good time complexity.
*/
public static final int expHashCode(int previous, List x) {
if(x == null || x.size() == 0) {
@@ -115,19 +114,42 @@
}
int size = x.size(); // size of collection
int hc = (PRIME*previous) + size; // hash code so far
- int skip = 0; // skip between samples
- int total = 0; // collection examined already
- Iterator iter = x.iterator(); // collection iterator
- Object obj = iter.next(); // last iterated object, primed at first
- while(total < size) {
- for(int i=0; i<skip; i++) { // skip to next sample
- obj = iter.next();
+ if (x instanceof RandomAccess) {
+ int index = 1;
+ int xlen = x.size()+1; // switch to 1-based
+ while(index < xlen) {
+ hc = hashCode(hc, x.get(index-1));
+ index = index << 1; // left shift by 1 to double
}
- hc = hashCode(hc, obj); // add sample to hashcode
- skip = (skip == 0) ? 1 : skip << 1; // left shift by 1 to double
- total += skip; // update total
+ } else {
+ int skip = 0; // skip between samples
+ int total = 0; // collection examined already
+ Iterator iter = x.iterator(); // collection iterator
+ Object obj = iter.next(); // last iterated object, primed at first
+ while(total < size) {
+ for(int i=0; i<skip; i++) { // skip to next sample
+ obj = iter.next();
+ }
+ hc = hashCode(hc, obj); // add sample to hashcode
+ skip = (skip == 0) ? 1 : skip << 1; // left shift by 1 to double
+ total += skip; // update total
+ }
}
return hc;
}
+
+ public static final int expHashCode(String x) {
+ if(x == null) {
+ return 0;
+ }
+ int hc = x.length();
+ int index = 1;
+ int xlen = x.length()+1; // switch to 1-based
+ while(index < xlen) {
+ hc = PRIME * hc + x.charAt(index-1);
+ index = index << 1; // left shift by 1 to double
+ }
+ return hc;
+ }
}
Modified:
branches/JCA/connector-api/src/main/java/org/teiid/connector/visitor/util/SQLStringVisitor.java
===================================================================
---
branches/JCA/connector-api/src/main/java/org/teiid/connector/visitor/util/SQLStringVisitor.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/connector-api/src/main/java/org/teiid/connector/visitor/util/SQLStringVisitor.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -1015,6 +1015,9 @@
append(obj);
buffer.append(SQLReservedWords.RPAREN);
} else {
+ if (!parent.isAll() && obj instanceof ISetQuery) {
+ ((ISetQuery)obj).setAll(false);
+ }
append(obj);
}
}
Modified:
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/postgresql/LocateFunctionModifier.java
===================================================================
---
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/postgresql/LocateFunctionModifier.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/postgresql/LocateFunctionModifier.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -29,6 +29,7 @@
import org.teiid.connector.language.IExpression;
import org.teiid.connector.language.IFunction;
import org.teiid.connector.language.ILanguageFactory;
+import org.teiid.connector.language.ILiteral;
public class LocateFunctionModifier extends
org.teiid.connector.jdbc.translator.LocateFunctionModifier {
@@ -44,8 +45,29 @@
parts.add("position("); //$NON-NLS-1$
parts.add(params.get(0));
parts.add(" in "); //$NON-NLS-1$
+ boolean useSubStr = false;
if (params.size() == 3) {
+ useSubStr = true;
+ if (params.get(2) instanceof ILiteral && ((ILiteral)params.get(2)).getValue()
instanceof Integer) {
+ Integer value = (Integer)((ILiteral)params.get(2)).getValue();
+ if (value > 1) {
+ ((ILiteral)params.get(2)).setValue(value - 1);
+ } else {
+ useSubStr = false;
+ }
+ }
+ }
+ if (useSubStr) {
+ parts.add(0, "("); //$NON-NLS-1$
parts.add(this.getLanguageFactory().createFunction("substr",
params.subList(1, 3), TypeFacility.RUNTIME_TYPES.STRING)); //$NON-NLS-1$
+ parts.add(")"); //$NON-NLS-1$
+ parts.add(" + "); //$NON-NLS-1$
+ if (params.get(2) instanceof ILiteral && ((ILiteral)params.get(2)).getValue()
instanceof Integer) {
+ parts.add(params.get(2));
+ } else {
+ parts.add(params.get(2));
+ parts.add(" - 1"); //$NON-NLS-1$
+ }
} else {
parts.add(params.get(1));
}
Modified:
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/ConvertModifier.java
===================================================================
---
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/ConvertModifier.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/ConvertModifier.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -181,8 +181,14 @@
this.addConvert(FunctionModifier.BOOLEAN, FunctionModifier.STRING, new
FunctionModifier() {
@Override
public List<?> translate(IFunction function) {
- IExpression stringValue = function.getParameters().get(0);
- return Arrays.asList("CASE WHEN ", stringValue, " = 0 THEN
'false' WHEN ", stringValue, " IS NOT NULL THEN 'true'
END"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ IExpression booleanValue = function.getParameters().get(0);
+ if (booleanValue instanceof IFunction) {
+ IFunction nested = (IFunction)booleanValue;
+ if (nested.getName().equalsIgnoreCase("convert") &&
Number.class.isAssignableFrom(nested.getParameters().get(0).getType())) { //$NON-NLS-1$
+ booleanValue = nested.getParameters().get(0);
+ }
+ }
+ return Arrays.asList("CASE WHEN ", booleanValue, " = 0 THEN
'false' WHEN ", booleanValue, " IS NOT NULL THEN 'true'
END"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
});
this.addConvert(FunctionModifier.STRING, FunctionModifier.BOOLEAN, new
FunctionModifier() {
Modified:
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/LocateFunctionModifier.java
===================================================================
---
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/LocateFunctionModifier.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/LocateFunctionModifier.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -160,7 +160,7 @@
private IExpression ensurePositiveStartIndex(IExpression startIndex) {
if (startIndex instanceof ILiteral) {
ILiteral literal = (ILiteral)startIndex;
- if (literal.getValue() != null && ((Integer)literal.getValue() < 1)) {
+ if (literal.getValue() instanceof Integer && ((Integer)literal.getValue() <
1)) {
literal.setValue(1);
}
} else {
Modified:
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/SQLConversionVisitor.java
===================================================================
---
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/SQLConversionVisitor.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/SQLConversionVisitor.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -32,6 +32,7 @@
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.teiid.connector.api.ExecutionContext;
@@ -76,6 +77,7 @@
private List preparedValues = new ArrayList();
private Set<ILanguageObject> recursionObjects = Collections.newSetFromMap(new
IdentityHashMap<ILanguageObject, Boolean>());
+ private Map<ILanguageObject, Object> translations = new
IdentityHashMap<ILanguageObject, Object>();
private boolean replaceWithBinding = false;
@@ -98,7 +100,19 @@
}
List<?> parts = null;
if (!recursionObjects.contains(obj)) {
- parts = translator.translate(obj, context);
+ Object trans = this.translations.get(obj);
+ if (trans instanceof List<?>) {
+ parts = (List<?>)trans;
+ } else if (trans instanceof ILanguageObject) {
+ obj = (ILanguageObject)trans;
+ } else {
+ parts = translator.translate(obj, context);
+ if (parts != null) {
+ this.translations.put(obj, parts);
+ } else {
+ this.translations.put(obj, obj);
+ }
+ }
}
if (parts != null) {
recursionObjects.add(obj);
Modified:
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/TranslatedCommand.java
===================================================================
---
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/TranslatedCommand.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/translator/TranslatedCommand.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -45,8 +45,8 @@
private boolean prepared;
private List preparedValues;
- private SQLConversionVisitor sqlConversionVisitor;
private Translator sqlTranslator;
+ private ExecutionContext context;
/**
* Constructor, takes a SQLConversionVisitor subclass
@@ -54,8 +54,7 @@
*/
public TranslatedCommand(ExecutionContext context, Translator sqlTranslator){
this.sqlTranslator = sqlTranslator;
- this.sqlConversionVisitor = sqlTranslator.getSQLConversionVisitor();
- this.sqlConversionVisitor.setExecutionContext(context);
+ this.context = context;
}
/**
@@ -66,20 +65,18 @@
* @throws ConnectorException
*/
public void translateCommand(ICommand command) throws ConnectorException {
- this.sql = getSQL(command);
- this.preparedValues = this.sqlConversionVisitor.getPreparedValues();
- this.prepared = this.sqlConversionVisitor.isPrepared();
- }
-
- private String getSQL(ICommand command) {
+ SQLConversionVisitor sqlConversionVisitor =
sqlTranslator.getSQLConversionVisitor();
+ sqlConversionVisitor.setExecutionContext(context);
if (sqlTranslator.usePreparedStatements() || hasBindValue(command)) {
- this.sqlConversionVisitor.setPrepared(true);
+ sqlConversionVisitor.setPrepared(true);
}
- this.sqlConversionVisitor.append(command);
- return this.sqlConversionVisitor.toString();
- }
-
+ sqlConversionVisitor.append(command);
+ this.sql = sqlConversionVisitor.toString();
+ this.preparedValues = sqlConversionVisitor.getPreparedValues();
+ this.prepared = sqlConversionVisitor.isPrepared();
+ }
+
/**
* Simple check to see if any values in the command should be replaced with bind
values
*
Modified:
branches/JCA/connectors/connector-jdbc/src/test/java/org/teiid/connector/jdbc/db2/TestDB2SqlTranslator.java
===================================================================
---
branches/JCA/connectors/connector-jdbc/src/test/java/org/teiid/connector/jdbc/db2/TestDB2SqlTranslator.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/connectors/connector-jdbc/src/test/java/org/teiid/connector/jdbc/db2/TestDB2SqlTranslator.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -233,5 +233,14 @@
input, output,
TRANSLATOR);
}
+
+ @Test public void testBooleanToString() throws Exception {
+ String input = "SELECT convert(convert(INTKEY, boolean), string) FROM
BQT1.SmallA"; //$NON-NLS-1$
+ String output = "SELECT CASE WHEN SmallA.IntKey = 0 THEN 'false'
WHEN SmallA.IntKey IS NOT NULL THEN 'true' END FROM SmallA"; //$NON-NLS-1$
+
+ TranslationHelper.helpTestVisitor(TranslationHelper.BQT_VDB,
+ input,
+ output, TRANSLATOR);
+ }
}
Modified:
branches/JCA/connectors/connector-jdbc/src/test/java/org/teiid/connector/jdbc/postgresql/TestPostgreSQLTranslator.java
===================================================================
---
branches/JCA/connectors/connector-jdbc/src/test/java/org/teiid/connector/jdbc/postgresql/TestPostgreSQLTranslator.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/connectors/connector-jdbc/src/test/java/org/teiid/connector/jdbc/postgresql/TestPostgreSQLTranslator.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -409,7 +409,7 @@
*/
@Test public void testLocate() throws Exception {
String input = "SELECT locate(INTNUM, 'chimp', 1) FROM
BQT1.SMALLA"; //$NON-NLS-1$
- String output = "SELECT position(cast(SmallA.IntNum AS varchar(4000)) in
substr('chimp', 1)) FROM SmallA"; //$NON-NLS-1$
+ String output = "SELECT position(cast(SmallA.IntNum AS varchar(4000)) in
'chimp') FROM SmallA"; //$NON-NLS-1$
TranslationHelper.helpTestVisitor(TranslationHelper.BQT_VDB,
input, output,
@@ -443,7 +443,7 @@
*/
@Test public void testLocate3() throws Exception {
String input = "SELECT locate(INTNUM, '234567890', 1) FROM
BQT1.SMALLA WHERE INTKEY = 26"; //$NON-NLS-1$
- String output = "SELECT position(cast(SmallA.IntNum AS varchar(4000)) in
substr('234567890', 1)) FROM SmallA WHERE SmallA.IntKey = 26";
//$NON-NLS-1$
+ String output = "SELECT position(cast(SmallA.IntNum AS varchar(4000)) in
'234567890') FROM SmallA WHERE SmallA.IntKey = 26"; //$NON-NLS-1$
TranslationHelper.helpTestVisitor(TranslationHelper.BQT_VDB,
input, output,
@@ -477,7 +477,7 @@
*/
@Test public void testLocate5() throws Exception {
String input = "SELECT locate(STRINGNUM, 'chimp', -5) FROM
BQT1.SMALLA"; //$NON-NLS-1$
- String output = "SELECT position(SmallA.StringNum in substr('chimp',
1)) FROM SmallA"; //$NON-NLS-1$
+ String output = "SELECT position(SmallA.StringNum in 'chimp') FROM
SmallA"; //$NON-NLS-1$
TranslationHelper.helpTestVisitor(TranslationHelper.BQT_VDB,
input, output,
@@ -494,7 +494,7 @@
*/
@Test public void testLocate6() throws Exception {
String input = "SELECT locate(STRINGNUM, 'chimp', INTNUM) FROM
BQT1.SMALLA"; //$NON-NLS-1$
- String output = "SELECT position(SmallA.StringNum in substr('chimp',
CASE WHEN SmallA.IntNum < 1 THEN 1 ELSE SmallA.IntNum END)) FROM SmallA";
//$NON-NLS-1$
+ String output = "SELECT (position(SmallA.StringNum in
substr('chimp', CASE WHEN SmallA.IntNum < 1 THEN 1 ELSE SmallA.IntNum END)) +
CASE WHEN SmallA.IntNum < 1 THEN 1 ELSE SmallA.IntNum END - 1) FROM SmallA";
//$NON-NLS-1$
TranslationHelper.helpTestVisitor(TranslationHelper.BQT_VDB,
input, output,
@@ -511,7 +511,7 @@
*/
@Test public void testLocate7() throws Exception {
String input = "SELECT locate(STRINGNUM, 'chimp', LOCATE(STRINGNUM,
'chimp') + 1) FROM BQT1.SMALLA"; //$NON-NLS-1$
- String output = "SELECT position(SmallA.StringNum in substr('chimp',
CASE WHEN (position(SmallA.StringNum in 'chimp') + 1) < 1 THEN 1 ELSE
(position(SmallA.StringNum in 'chimp') + 1) END)) FROM SmallA";
//$NON-NLS-1$
+ String output = "SELECT (position(SmallA.StringNum in
substr('chimp', CASE WHEN (position(SmallA.StringNum in 'chimp') + 1) <
1 THEN 1 ELSE (position(SmallA.StringNum in 'chimp') + 1) END)) + CASE WHEN
(position(SmallA.StringNum in 'chimp') + 1) < 1 THEN 1 ELSE
(position(SmallA.StringNum in 'chimp') + 1) END - 1) FROM SmallA";
//$NON-NLS-1$
TranslationHelper.helpTestVisitor(TranslationHelper.BQT_VDB,
input, output,
Modified: branches/JCA/console/src/main/java/org/teiid/rhq/plugin/Facet.java
===================================================================
--- branches/JCA/console/src/main/java/org/teiid/rhq/plugin/Facet.java 2010-02-18 20:12:13
UTC (rev 1841)
+++ branches/JCA/console/src/main/java/org/teiid/rhq/plugin/Facet.java 2010-02-18 20:18:35
UTC (rev 1842)
@@ -21,17 +21,32 @@
*/
package org.teiid.rhq.plugin;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.deployers.spi.management.deploy.DeploymentManager;
+import org.jboss.deployers.spi.management.deploy.DeploymentProgress;
+import org.jboss.deployers.spi.management.deploy.DeploymentStatus;
import org.rhq.core.domain.configuration.Configuration;
import org.rhq.core.domain.configuration.ConfigurationUpdateStatus;
+import org.rhq.core.domain.content.PackageDetailsKey;
import org.rhq.core.domain.content.PackageType;
+import org.rhq.core.domain.content.transfer.ContentResponseResult;
+import org.rhq.core.domain.content.transfer.DeployIndividualPackageResponse;
import org.rhq.core.domain.content.transfer.DeployPackageStep;
import org.rhq.core.domain.content.transfer.DeployPackagesResponse;
import org.rhq.core.domain.content.transfer.RemovePackagesResponse;
@@ -39,10 +54,13 @@
import org.rhq.core.domain.measurement.AvailabilityType;
import org.rhq.core.domain.measurement.MeasurementReport;
import org.rhq.core.domain.measurement.MeasurementScheduleRequest;
+import org.rhq.core.domain.resource.CreateResourceStatus;
+import org.rhq.core.domain.resource.ResourceType;
import org.rhq.core.pluginapi.configuration.ConfigurationFacet;
import org.rhq.core.pluginapi.configuration.ConfigurationUpdateReport;
import org.rhq.core.pluginapi.content.ContentFacet;
import org.rhq.core.pluginapi.content.ContentServices;
+import org.rhq.core.pluginapi.content.version.PackageVersions;
import org.rhq.core.pluginapi.inventory.CreateChildResourceFacet;
import org.rhq.core.pluginapi.inventory.CreateResourceReport;
import org.rhq.core.pluginapi.inventory.DeleteResourceFacet;
@@ -52,36 +70,35 @@
import org.rhq.core.pluginapi.measurement.MeasurementFacet;
import org.rhq.core.pluginapi.operation.OperationFacet;
import org.rhq.core.pluginapi.operation.OperationResult;
+import org.rhq.core.util.ZipUtil;
+import org.rhq.core.util.exception.ThrowableUtil;
import org.teiid.rhq.admin.utils.SingletonConnectionManager;
-import org.teiid.rhq.comm.Component;
import org.teiid.rhq.comm.Connection;
import org.teiid.rhq.comm.ConnectionException;
import org.teiid.rhq.comm.ExecutedResult;
-import org.teiid.rhq.comm.VMComponent;
import org.teiid.rhq.plugin.objects.ExecutedOperationResultImpl;
+import org.teiid.rhq.plugin.util.DeploymentUtils;
+import org.teiid.rhq.plugin.util.ProfileServiceUtil;
-
/**
* This class implements required RHQ interfaces and provides common logic used
* by all MetaMatrix components.
*/
-public abstract class Facet implements ResourceComponent,
- MeasurementFacet, OperationFacet, ConfigurationFacet, ContentFacet,
- DeleteResourceFacet, CreateChildResourceFacet {
+public abstract class Facet implements ResourceComponent, MeasurementFacet,
+ OperationFacet, ConfigurationFacet, ContentFacet, DeleteResourceFacet,
+ CreateChildResourceFacet {
protected static SingletonConnectionManager connMgr = SingletonConnectionManager
.getInstance();
- protected final Log LOG = LogFactory
- .getLog(Facet.class);
-
+ protected final Log LOG = LogFactory.getLog(Facet.class);
+
/**
* Represents the resource configuration of the custom product being
* managed.
*/
protected Configuration resourceConfiguration;
-
/**
* All AMPS plugins are stateful - this context contains information that
* your resource component can use when performing its processing.
@@ -99,6 +116,40 @@
protected boolean isAvailable = false;
+ /**
+ * Name of the backing package type that will be used when discovering
+ * packages. This corresponds to the name of the package type defined in the
+ * plugin descriptor.
+ */
+ private static final String PKG_TYPE_FILE = "vdb";
+
+ /**
+ * Architecture string used in describing discovered packages.
+ */
+ private static final String ARCHITECTURE = "noarch";
+
+ private static final String BACKUP_FILE_EXTENSION = ".rej";
+
+ private final Log log = LogFactory.getLog(this.getClass());
+
+ private PackageVersions versions;
+
+ /**
+ * The name of the ManagedDeployment (e.g.:
+ * vfszip:/C:/opt/jboss-5.0.0.GA/server/default/deploy/foo.vdb).
+ */
+ protected String deploymentName;
+
+ /**
+ * The type of the ManagedDeployment.
+ */
+ // protected KnownDeploymentTypes deploymentType;
+ /**
+ * The absolute path of the deployment file (e.g.:
+ * C:/opt/jboss-5.0.0.GA/server/default/deploy/foo.vdb).
+ */
+ protected File deploymentFile;
+
abstract String getComponentType();
/**
@@ -122,7 +173,7 @@
public void stop() {
this.isAvailable = false;
}
-
+
/**
* @return the resourceConfiguration
*/
@@ -131,7 +182,8 @@
}
/**
- * @param resourceConfiguration the resourceConfiguration to set
+ * @param resourceConfiguration
+ * the resourceConfiguration to set
*/
public void setResourceConfiguration(Configuration resourceConfiguration) {
this.resourceConfiguration = resourceConfiguration;
@@ -161,16 +213,16 @@
this.identifier = identifier;
}
- protected void setOperationArguments(String name, Configuration configuration,
- Map argumentMap) {
-// moved this logic up to the associated implemented class
- throw new InvalidPluginConfigurationException("Not implemented on component type
" + this.getComponentType() + " named " + this.getComponentName());
-
-
+ protected void setOperationArguments(String name,
+ Configuration configuration, Map argumentMap) {
+ // moved this logic up to the associated implemented class
+ throw new InvalidPluginConfigurationException(
+ "Not implemented on component type " + this.getComponentType()
+ + " named " + this.getComponentName());
+
}
- protected void execute(final ExecutedResult result,
- final Map valueMap) {
+ protected void execute(final ExecutedResult result, final Map valueMap) {
Connection conn = null;
try {
conn = getConnection();
@@ -210,15 +262,15 @@
}
/*
- * (non-Javadoc)
- * This method is called by JON to check the availability of the inventoried component
on a time scheduled basis
+ * (non-Javadoc) This method is called by JON to check the availability of
+ * the inventoried component on a time scheduled basis
*
* @see org.rhq.core.pluginapi.inventory.ResourceComponent#getAvailability()
*/
public AvailabilityType getAvailability() {
LOG.debug("Checking availability of " + identifier); //$NON-NLS-1$
-
+
return AvailabilityType.UP;
}
@@ -259,16 +311,13 @@
Set operationDefinitionSet = this.resourceContext.getResourceType()
.getOperationDefinitions();
+ ExecutedResult result = new ExecutedOperationResultImpl(this
+ .getComponentType(), name, operationDefinitionSet);
- ExecutedResult result = new ExecutedOperationResultImpl(
- this.getComponentType(),
- name,
- operationDefinitionSet);
-
setOperationArguments(name, configuration, valueMap);
-
+
execute(result, valueMap);
-
+
return ((ExecutedOperationResultImpl) result).getOperationResult();
}
@@ -290,9 +339,10 @@
// start with.
// note that it is empty, so we're assuming there are no required
// configs in the plugin descriptor.
- resourceConfiguration = this.resourceContext.getPluginConfiguration();
+ resourceConfiguration = this.resourceContext
+ .getPluginConfiguration();
}
-
+
Configuration config = resourceConfiguration;
return config;
@@ -314,81 +364,400 @@
report.setStatus(ConfigurationUpdateStatus.SUCCESS);
}
- /**
- * When this is called, the plugin is responsible for scanning its managed
- * resource and look for content that need to be managed for that resource.
- * This method should only discover packages of the given package type.
- *
- * @see ContentFacet#discoverDeployedPackages(PackageType)
- */
- public Set<ResourcePackageDetails> discoverDeployedPackages(PackageType type) {
- return null;
+ @Override
+ public void deleteResource() throws Exception {
+ // TODO Auto-generated method stub
+
}
- /**
- * The plugin container calls this method when new packages need to be
- * deployed/installed on resources.
- *
- * @see ContentFacet#deployPackages(Set, ContentServices)
- */
+ @Override
public DeployPackagesResponse deployPackages(
Set<ResourcePackageDetails> packages,
ContentServices contentServices) {
- return null;
+ String resourceTypeName = this.resourceContext.getResourceType()
+ .getName();
+
+ // You can only update the one application file referenced by this
+ // resource, so punch out if multiple are
+ // specified.
+ if (packages.size() != 1) {
+ log.warn("Request to update " + resourceTypeName
+ + " file contained multiple packages: " + packages);
+ DeployPackagesResponse response = new DeployPackagesResponse(
+ ContentResponseResult.FAILURE);
+ response.setOverallRequestErrorMessage("Only one "
+ + resourceTypeName + " can be updated at a time.");
+ return response;
+ }
+
+ ResourcePackageDetails packageDetails = packages.iterator().next();
+
+ log.debug("Updating VDB file '" + this.deploymentFile + "' using
["
+ + packageDetails + "]...");
+ // Find location of existing application.
+ if (!this.deploymentFile.exists()) {
+ return failApplicationDeployment(
+ "Could not find application to update at location: "
+ + this.deploymentFile, packageDetails);
+ }
+
+ log.debug("Writing new EAR/WAR bits to temporary file...");
+ File tempFile;
+ try {
+ tempFile = writeNewAppBitsToTempFile(contentServices,
+ packageDetails);
+ } catch (Exception e) {
+ return failApplicationDeployment(
+ "Error writing new application bits to temporary file - cause: "
+ + e, packageDetails);
+ }
+ log.debug("Wrote new EAR/WAR bits to temporary file '" + tempFile
+ + "'.");
+
+ boolean deployExploded = this.deploymentFile.isDirectory();
+
+ // Backup the original app file/dir.
+ File tempDir = resourceContext.getTemporaryDirectory();
+ File backupDir = new File(tempDir, "deployBackup");
+ File backupOfOriginalFile = new File(backupDir, this.deploymentFile
+ .getName());
+ log.debug("Backing up existing EAR/WAR '" + this.deploymentFile
+ + "' to '" + backupOfOriginalFile + "'...");
+ try {
+ if (backupOfOriginalFile.exists()) {
+ FileUtils.forceDelete(backupOfOriginalFile);
+ }
+ if (this.deploymentFile.isDirectory()) {
+ FileUtils.copyDirectory(this.deploymentFile,
+ backupOfOriginalFile, true);
+ } else {
+ FileUtils.copyFile(this.deploymentFile, backupOfOriginalFile,
+ true);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to backup existing "
+ + resourceTypeName + "'" + this.deploymentFile + "' to
'"
+ + backupOfOriginalFile + "'.");
+ }
+
+ // Now stop the original app.
+ try {
+ DeploymentManager deploymentManager = ProfileServiceUtil
+ .getDeploymentManager();
+ DeploymentProgress progress = deploymentManager
+ .stop(this.deploymentName);
+ DeploymentUtils.run(progress);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to stop deployment ["
+ + this.deploymentName + "].", e);
+ }
+
+ // And then remove it (this will delete the physical file/dir from the
+ // deploy dir).
+ try {
+ DeploymentManager deploymentManager = ProfileServiceUtil
+ .getDeploymentManager();
+ DeploymentProgress progress = deploymentManager
+ .remove(this.deploymentName);
+ DeploymentUtils.run(progress);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to remove deployment ["
+ + this.deploymentName + "].", e);
+ }
+
+ // Deploy away!
+ log.debug("Deploying '" + tempFile + "'...");
+ DeploymentManager deploymentManager = null;
+ try {
+ deploymentManager = ProfileServiceUtil.getDeploymentManager();
+ DeploymentUtils.deployArchive(deploymentManager, tempFile,
+ deployExploded);
+ } catch (Exception e) {
+ // Deploy failed - rollback to the original app file...
+ log.debug("Redeploy failed - rolling back to original archive...",
+ e);
+ String errorMessage = ThrowableUtil.getAllMessages(e);
+ try {
+ // Try to delete the new app file, which failed to deploy, if it
+ // still exists.
+ if (this.deploymentFile.exists()) {
+ try {
+ FileUtils.forceDelete(this.deploymentFile);
+ } catch (IOException e1) {
+ log.debug("Failed to delete application file '"
+ + this.deploymentFile
+ + "' that failed to deploy.", e1);
+ }
+ }
+ // Now redeploy the original file - this generally should
+ // succeed.
+ DeploymentUtils.deployArchive(deploymentManager,
+ backupOfOriginalFile, deployExploded);
+ errorMessage += " ***** ROLLED BACK TO ORIGINAL APPLICATION FILE. *****";
+ } catch (Exception e1) {
+ log.debug("Rollback failed!", e1);
+ errorMessage += " ***** FAILED TO ROLLBACK TO ORIGINAL APPLICATION FILE. *****:
"
+ + ThrowableUtil.getAllMessages(e1);
+ }
+ log
+ .info("Failed to update " + resourceTypeName + " file '"
+ + this.deploymentFile + "' using ["
+ + packageDetails + "].");
+ return failApplicationDeployment(errorMessage, packageDetails);
+ }
+
+ // Deploy was successful!
+ deleteBackupOfOriginalFile(backupOfOriginalFile);
+ persistApplicationVersion(packageDetails, this.deploymentFile);
+
+ DeployPackagesResponse response = new DeployPackagesResponse(
+ ContentResponseResult.SUCCESS);
+ DeployIndividualPackageResponse packageResponse = new DeployIndividualPackageResponse(
+ packageDetails.getKey(), ContentResponseResult.SUCCESS);
+ response.addPackageResponse(packageResponse);
+
+ log.debug("Updated " + resourceTypeName + " file '"
+ + this.deploymentFile + "' successfully - returning response ["
+ + response + "]...");
+
+ return response;
}
- /**
- * When a remote client wants to see the actual data content for an
- * installed package, this method will be called. This method must return a
- * stream of data containing the full content of the package.
- *
- * @see ContentFacet#retrievePackageBits(ResourcePackageDetails)
- */
- public InputStream retrievePackageBits(ResourcePackageDetails packageDetails) {
- return null;
+ @Override
+ public Set<ResourcePackageDetails> discoverDeployedPackages(PackageType arg0) {
+ if (!this.deploymentFile.exists())
+ throw new IllegalStateException("Deployment file '"
+ + this.deploymentFile + "' for " + "VDB Archive"
+ + " does not exist.");
+
+ String fileName = this.deploymentFile.getName();
+ PackageVersions packageVersions = loadPackageVersions();
+ String version = packageVersions.getVersion(fileName);
+ if (version == null) {
+ // This is either the first time we've discovered this VDB, or
+ // someone purged the PC's data dir.
+ version = "1.0";
+ packageVersions.putVersion(fileName, version);
+ packageVersions.saveToDisk();
+ }
+
+ // Package name is the deployment's file name (e.g. foo.ear).
+ PackageDetailsKey key = new PackageDetailsKey(fileName, version,
+ PKG_TYPE_FILE, ARCHITECTURE);
+ ResourcePackageDetails packageDetails = new ResourcePackageDetails(key);
+ packageDetails.setFileName(fileName);
+ packageDetails.setLocation(this.deploymentFile.getPath());
+ if (!this.deploymentFile.isDirectory())
+ packageDetails.setFileSize(this.deploymentFile.length());
+ packageDetails.setFileCreatedDate(null); // TODO: get created date via
+ // SIGAR
+ Set<ResourcePackageDetails> packages = new
HashSet<ResourcePackageDetails>();
+ packages.add(packageDetails);
+
+ return packages;
}
- /**
- * This is the method that is used when the component has to create the
- * installation steps and their results.
- *
- * @see ContentFacet#generateInstallationSteps(ResourcePackageDetails)
- */
+ @Override
public List<DeployPackageStep> generateInstallationSteps(
- ResourcePackageDetails packageDetails) {
+ ResourcePackageDetails arg0) {
return null;
}
+ public RemovePackagesResponse removePackages(
+ Set<ResourcePackageDetails> packages) {
+ throw new UnsupportedOperationException(
+ "Cannot remove the package backing an VDB resource.");
+ }
+
+ @Override
+ public InputStream retrievePackageBits(ResourcePackageDetails packageDetails) {
+ File packageFile = new File(packageDetails.getName());
+ File fileToSend;
+ try {
+ if (packageFile.isDirectory()) {
+ fileToSend = File.createTempFile("rhq", ".zip");
+ ZipUtil.zipFileOrDirectory(packageFile, fileToSend);
+ } else
+ fileToSend = packageFile;
+ return new BufferedInputStream(new FileInputStream(fileToSend));
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to retrieve package bits for "
+ + packageDetails, e);
+ }
+ }
+
/**
- * This is called when the actual content of packages should be deleted from
- * the managed resource.
+ * Returns an instantiated and loaded versions store access point.
*
- * @see ContentFacet#removePackages(Set)
+ * @return will not be <code>null</code>
*/
- public RemovePackagesResponse removePackages(
- Set<ResourcePackageDetails> packages) {
- return null;
+ private PackageVersions loadPackageVersions() {
+ if (this.versions == null) {
+ ResourceType resourceType = this.resourceContext.getResourceType();
+ String pluginName = resourceType.getPlugin();
+ File dataDirectoryFile = this.resourceContext.getDataDirectory();
+ dataDirectoryFile.mkdirs();
+ String dataDirectory = dataDirectoryFile.getAbsolutePath();
+ log.trace("Creating application versions store with plugin name ["
+ + pluginName + "] and data directory [" + dataDirectory
+ + "]");
+ this.versions = new PackageVersions(pluginName, dataDirectory);
+ this.versions.loadFromDisk();
+ }
+
+ return this.versions;
}
/**
- * When called, the plugin container is asking the plugin to create a new
- * managed resource. The new resource's details need to be added to the
- * given report.
+ * Creates the necessary transfer objects to report a failed application
+ * deployment (update).
*
- * @see CreateChildResourceFacet#createResource(CreateResourceReport)
+ * @param errorMessage
+ * reason the deploy failed
+ * @param packageDetails
+ * describes the update being made
+ *
+ * @return response populated to reflect a failure
*/
- public CreateResourceReport createResource(CreateResourceReport report) {
- return null;
+ private DeployPackagesResponse failApplicationDeployment(
+ String errorMessage, ResourcePackageDetails packageDetails) {
+ DeployPackagesResponse response = new DeployPackagesResponse(
+ ContentResponseResult.FAILURE);
+
+ DeployIndividualPackageResponse packageResponse = new DeployIndividualPackageResponse(
+ packageDetails.getKey(), ContentResponseResult.FAILURE);
+ packageResponse.setErrorMessage(errorMessage);
+
+ response.addPackageResponse(packageResponse);
+
+ return response;
}
+ private File writeNewAppBitsToTempFile(ContentServices contentServices,
+ ResourcePackageDetails packageDetails) throws Exception {
+ File tempDir = resourceContext.getTemporaryDirectory();
+ File tempFile = new File(tempDir, this.deploymentFile.getName());
+
+ OutputStream tempOutputStream = null;
+ try {
+ tempOutputStream = new BufferedOutputStream(new FileOutputStream(
+ tempFile));
+ long bytesWritten = contentServices.downloadPackageBits(
+ resourceContext.getContentContext(), packageDetails
+ .getKey(), tempOutputStream, true);
+ log
+ .debug("Wrote " + bytesWritten + " bytes to '" + tempFile
+ + "'.");
+ } catch (IOException e) {
+ log.error(
+ "Error writing updated application bits to temporary location: "
+ + tempFile, e);
+ throw e;
+ } finally {
+ if (tempOutputStream != null) {
+ try {
+ tempOutputStream.close();
+ } catch (IOException e) {
+ log.error("Error closing temporary output stream", e);
+ }
+ }
+ }
+ if (!tempFile.exists()) {
+ log.error("Temporary file for application update not written to: "
+ + tempFile);
+ throw new Exception();
+ }
+ return tempFile;
+ }
+
+ private void persistApplicationVersion(
+ ResourcePackageDetails packageDetails, File appFile) {
+ String packageName = appFile.getName();
+ PackageVersions versions = loadApplicationVersions();
+ versions.putVersion(packageName, packageDetails.getVersion());
+ }
+
+ private void deleteBackupOfOriginalFile(File backupOfOriginalFile) {
+ try {
+ FileUtils.forceDelete(backupOfOriginalFile);
+ } catch (Exception e) {
+ // not critical.
+ log.warn("Failed to delete backup of original file: "
+ + backupOfOriginalFile);
+ }
+ }
+
/**
- * When called, the plugin container is asking the plugin to delete a
- * managed resource.
+ * Returns an instantiated and loaded versions store access point.
*
- * @see DeleteResourceFacet#deleteResource()
+ * @return will not be <code>null</code>
*/
- public void deleteResource() {
+ private PackageVersions loadApplicationVersions() {
+ if (versions == null) {
+ ResourceType resourceType = resourceContext.getResourceType();
+ String pluginName = resourceType.getPlugin();
+
+ File dataDirectoryFile = resourceContext.getDataDirectory();
+
+ if (!dataDirectoryFile.exists()) {
+ dataDirectoryFile.mkdir();
+ }
+
+ String dataDirectory = dataDirectoryFile.getAbsolutePath();
+
+ log.debug("Creating application versions store with plugin name ["
+ + pluginName + "] and data directory [" + dataDirectory
+ + "]");
+
+ versions = new PackageVersions(pluginName, dataDirectory);
+ versions.loadFromDisk();
+ }
+
+ return versions;
}
-
-
+
+ @Override
+ public CreateResourceReport createResource(CreateResourceReport createResourceReport) {
+ ResourcePackageDetails details = createResourceReport
+ .getPackageDetails();
+ PackageDetailsKey key = details.getKey();
+ // This is the full path to a temporary file which was written by the UI
+ // layer.
+ String archivePath = key.getName();
+
+ try {
+ File archiveFile = new File(archivePath);
+
+ if (!DeploymentUtils.hasCorrectExtension(archiveFile.getName(),
resourceContext.getResourceType())) {
+ createResourceReport.setStatus(CreateResourceStatus.FAILURE);
+ createResourceReport
+ .setErrorMessage("Incorrect extension specified on filename ["
+ + archivePath + "]");
+ return createResourceReport;
+ }
+
+ Configuration deployTimeConfig = details
+ .getDeploymentTimeConfiguration();
+ @SuppressWarnings( { "ConstantConditions" })
+ // boolean deployExploded = deployTimeConfig.getSimple(
+ // "deployExploded").getBooleanValue();
+
+ DeploymentManager deploymentManager = ProfileServiceUtil.getDeploymentManager();
+ DeploymentUtils.deployArchive(deploymentManager, archiveFile, false);
+
+ createResourceReport.setResourceName(archivePath);
+ createResourceReport.setResourceKey(archivePath);
+ createResourceReport.setStatus(CreateResourceStatus.SUCCESS);
+
+ } catch (Throwable t) {
+ log.error("Error deploying application for report: "
+ + createResourceReport, t);
+ createResourceReport.setStatus(CreateResourceStatus.FAILURE);
+ createResourceReport.setException(t);
+ }
+
+ return createResourceReport;
+
+ }
+
}
Modified: branches/JCA/console/src/main/java/org/teiid/rhq/plugin/PlatformComponent.java
===================================================================
---
branches/JCA/console/src/main/java/org/teiid/rhq/plugin/PlatformComponent.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/console/src/main/java/org/teiid/rhq/plugin/PlatformComponent.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -27,8 +27,13 @@
import java.util.Properties;
import java.util.Set;
+import javax.naming.NamingException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.managed.api.ComponentType;
+import org.jboss.managed.api.ManagedComponent;
+import org.jboss.managed.api.RunState;
import org.rhq.core.domain.configuration.Configuration;
import org.rhq.core.domain.configuration.PropertySimple;
import org.rhq.core.domain.measurement.AvailabilityType;
@@ -36,11 +41,17 @@
import org.rhq.core.domain.measurement.MeasurementReport;
import org.rhq.core.domain.measurement.MeasurementScheduleRequest;
import org.rhq.core.pluginapi.configuration.ConfigurationUpdateReport;
-import org.rhq.core.pluginapi.inventory.InvalidPluginConfigurationException;
import org.teiid.rhq.admin.utils.SingletonConnectionManager;
import org.teiid.rhq.comm.Connection;
import org.teiid.rhq.comm.ConnectionConstants;
-import org.teiid.rhq.comm.ConnectionConstants.ComponentType;
+//import org.teiid.rhq.comm.ConnectionConstants;
+//import org.teiid.rhq.comm.Connection;
+//import org.teiid.rhq.comm.ConnectionConstants;
+//import org.teiid.rhq.comm.ConnectionConstants.ComponentType.Runtime.System.Metrics;
+import org.teiid.rhq.plugin.util.PluginConstants;
+import org.teiid.rhq.plugin.util.ProfileServiceUtil;
+import org.teiid.rhq.plugin.util.PluginConstants.Operation;
+import org.teiid.rhq.plugin.util.PluginConstants.ComponentType.Runtime.Metrics;
/**
@@ -60,12 +71,27 @@
@Override
public AvailabilityType getAvailability() {
+ RunState runState = null;
+ ManagedComponent mc;
+ try {
+ mc = ProfileServiceUtil.getManagedComponent(
+ new ComponentType(PluginConstants.ComponentType.Runtime.TYPE,
+ PluginConstants.ComponentType.Runtime.SUBTYPE),
+ PluginConstants.ComponentType.Runtime.TEIID_RUNTIME_ENGINE);
+ runState = mc.getRunState();
+ } catch (NamingException e) {
+ LOG.error("Naming exception getting: " +
PluginConstants.ComponentType.Runtime.TEIID_RUNTIME_ENGINE);
+ return AvailabilityType.DOWN;
+ } catch (Exception e) {
+ LOG.error("Exception getting: " +
PluginConstants.ComponentType.Runtime.TEIID_RUNTIME_ENGINE);
+ return AvailabilityType.DOWN;
+ }
+
+ return (runState == RunState.RUNNING) ? AvailabilityType.UP : AvailabilityType.DOWN;
+
+ }
- return AvailabilityType.UP;
- }
-
-
protected void setOperationArguments(String name, Configuration configuration,
Map valueMap) {
Modified:
branches/JCA/console/src/main/java/org/teiid/rhq/plugin/VDBDiscoveryComponent.java
===================================================================
---
branches/JCA/console/src/main/java/org/teiid/rhq/plugin/VDBDiscoveryComponent.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/console/src/main/java/org/teiid/rhq/plugin/VDBDiscoveryComponent.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -26,8 +26,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.deployers.spi.management.ManagementView;
import org.jboss.managed.api.ComponentType;
import org.jboss.managed.api.ManagedComponent;
+import org.jboss.managed.api.ManagedDeployment;
import org.jboss.managed.api.ManagedProperty;
import org.jboss.managed.plugins.ManagedObjectImpl;
import org.jboss.metatype.api.values.CollectionValueSupport;
@@ -67,13 +69,27 @@
String vdbName = ((SimpleValueSupport) mcVdb.getProperty("name")
.getValue()).getValue().toString();
- String vdbVersion = ((SimpleValueSupport) mcVdb.getProperty("version")
- .getValue()).getValue().toString();
- //TODO: Correct this after deploying proper VDB/Metadata
+// ManagementView managementView = ProfileServiceUtil
+// .getManagementView(ProfileServiceUtil.getProfileService(),
+// false);
+ //ManagedDeployment managedDeployment =
managementView.getDeploymentNamesForType(arg0)(vdbName);
+ //Set deploymentNames = null;
+
+// try
+// {
+// deploymentNames = managementView.getDeploymentNames();
+// }
+// catch (Exception e)
+// {
+// log.error("Unable to get deployment for type " , e);
+// }
+ String vdbVersion = ((SimpleValueSupport) mcVdb.getProperty(
+ "version").getValue()).getValue().toString();
+ // TODO: Correct this after deploying proper VDB/Metadata
String vdbDescription = "description"; //
mcVdb.getProperty("description");
String vdbStatus = "active"; // mcVdb.getProperty("status");
String vdbURL = "url"; // mcVdb.getProperty("url");
-
+
/**
*
* A discovered resource must have a unique key, that must stay the
@@ -85,23 +101,25 @@
vdbName, // Resource Name
vdbVersion, // Version
PluginConstants.ComponentType.VDB.DESCRIPTION, // Description
- discoveryContext.getDefaultPluginConfiguration(), // Plugin Config
+ discoveryContext.getDefaultPluginConfiguration(), // Plugin
+ // Config
null // Process info from a process scan
);
-
- //Get plugin config map for models
+
+ // Get plugin config map for models
Configuration configuration = detail.getPluginConfiguration();
configuration.put(new PropertySimple("name", vdbName));
configuration.put(new PropertySimple("version", vdbVersion));
- configuration.put(new PropertySimple("description", vdbDescription));
+ configuration
+ .put(new PropertySimple("description", vdbDescription));
configuration.put(new PropertySimple("status", vdbStatus));
- configuration.put(new PropertySimple("url", vdbURL));
-
+ configuration.put(new PropertySimple("url", vdbURL));
+
getModels(mcVdb, configuration);
detail.setPluginConfiguration(configuration);
-
+
// Add to return values
discoveredResources.add(detail);
log.info("Discovered Teiid VDB: " + vdbName);
@@ -115,28 +133,33 @@
* @param configuration
*/
private void getModels(ManagedComponent mcVdb, Configuration configuration) {
- //Get models from VDB
+ // Get models from VDB
ManagedProperty property = mcVdb.getProperty("models");
- CollectionValueSupport valueSupport = (CollectionValueSupport) property.getValue();
+ CollectionValueSupport valueSupport = (CollectionValueSupport) property
+ .getValue();
MetaValue[] metaValues = valueSupport.getElements();
-
+
PropertyList modelsList = new PropertyList("models");
configuration.put(modelsList);
-
+
for (MetaValue value : metaValues) {
GenericValueSupport genValueSupport = (GenericValueSupport) value;
- ManagedObjectImpl managedObject = (ManagedObjectImpl)genValueSupport.getValue();
+ ManagedObjectImpl managedObject = (ManagedObjectImpl) genValueSupport
+ .getValue();
String modelName = managedObject.getName();
- String type = ((SimpleValueSupport)
managedObject.getProperty("modelType").getValue()).getValue().toString();
- String visibility = ((SimpleValueSupport)
managedObject.getProperty("visible").getValue()).getValue().toString();
- String path = ((SimpleValueSupport)
managedObject.getProperty("path").getValue()).getValue().toString();
-
- PropertyMap model = new PropertyMap("model", new
PropertySimple("name", modelName),
- new PropertySimple("type", type), new PropertySimple("path",
path),
- new PropertySimple("visibility", visibility));
+ String type = ((SimpleValueSupport) managedObject.getProperty(
+ "modelType").getValue()).getValue().toString();
+ String visibility = ((SimpleValueSupport) managedObject
+ .getProperty("visible").getValue()).getValue().toString();
+ String path = ((SimpleValueSupport) managedObject.getProperty(
+ "path").getValue()).getValue().toString();
+
+ PropertyMap model = new PropertyMap("model", new PropertySimple(
+ "name", modelName), new PropertySimple("type", type),
+ new PropertySimple("path", path), new PropertySimple(
+ "visibility", visibility));
modelsList.add(model);
}
- }
-
+ }
}
Copied: branches/JCA/console/src/main/java/org/teiid/rhq/plugin/util/DeploymentUtils.java
(from rev 1840,
trunk/console/src/main/java/org/teiid/rhq/plugin/util/DeploymentUtils.java)
===================================================================
--- branches/JCA/console/src/main/java/org/teiid/rhq/plugin/util/DeploymentUtils.java
(rev 0)
+++
branches/JCA/console/src/main/java/org/teiid/rhq/plugin/util/DeploymentUtils.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -0,0 +1,146 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package org.teiid.rhq.plugin.util;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.deployers.spi.management.deploy.DeploymentManager;
+import org.jboss.deployers.spi.management.deploy.DeploymentProgress;
+import org.jboss.deployers.spi.management.deploy.DeploymentStatus;
+import org.rhq.core.domain.resource.ResourceType;
+import org.rhq.core.util.exception.ThrowableUtil;
+
+/**
+ * A set of utility methods for deploying applications.
+ *
+ */
+public class DeploymentUtils {
+ private static final Log LOG = LogFactory.getLog(DeploymentUtils.class);
+
+ public static boolean hasCorrectExtension(String archiveFileName, ResourceType
resourceType) {
+ String expectedExtension = "vdb";
+ int lastPeriod = archiveFileName.lastIndexOf(".");
+ String extension = (lastPeriod != -1) ? archiveFileName.substring(lastPeriod + 1)
: null;
+ // Use File.equals() to compare the extensions so case-sensitivity is correct for
this platform.
+ return (extension != null && new File(extension).equals(new
File(expectedExtension)));
+ }
+
+ /**
+ * Deploys (i.e. distributes then starts) the specified archive file.
+ *
+ * @param deploymentManager
+ * @param archiveFile
+ * @param deployExploded
+ *
+ * @return
+ *
+ * @throws Exception if the deployment fails for any reason
+ */
+ public static void deployArchive(DeploymentManager deploymentManager, File
archiveFile, boolean deployExploded)
+ throws Exception {
+ String archiveFileName = archiveFile.getName();
+ LOG.debug("Deploying '" + archiveFileName + "'
(deployExploded=" + deployExploded + ")...");
+ URL contentURL;
+ try {
+ contentURL = archiveFile.toURI().toURL();
+ }
+ catch (MalformedURLException e) {
+ throw new IllegalArgumentException("Failed to convert archive file path
'" + archiveFile + "' to URL.", e);
+ }
+
+ DeploymentProgress progress = null;
+ DeploymentStatus distributeStatus;
+ Exception distributeFailure = null;
+ try {
+ progress = deploymentManager.distribute(archiveFileName, contentURL, false);
+ distributeStatus = run(progress);
+ if (distributeStatus.isFailed()) {
+ distributeFailure = (distributeStatus.getFailure() != null) ?
distributeStatus.getFailure() :
+ new Exception("Distribute failed for unknown
reason.");
+ }
+ }
+ catch (Exception e) {
+ distributeFailure = e;
+ }
+ if (distributeFailure != null) {
+ throw new Exception("Failed to distribute '" + contentURL +
"' to '" + archiveFileName + "' - cause: "
+ + ThrowableUtil.getAllMessages(distributeFailure));
+ }
+
+ // Now that we've successfully distributed the deployment, we need to start
it.
+ String[] deploymentNames = progress.getDeploymentID().getRepositoryNames();
+ DeploymentStatus startStatus;
+ Exception startFailure = null;
+ try {
+ progress = deploymentManager.start(deploymentNames);
+ startStatus = run(progress);
+ if (startStatus.isFailed()) {
+ startFailure = (startStatus.getFailure() != null) ?
startStatus.getFailure() :
+ new Exception("Start failed for unknown reason.");
+ }
+ }
+ catch (Exception e) {
+ startFailure = e;
+ }
+ if (startFailure != null) {
+ LOG.error("Failed to start deployment " +
Arrays.asList(deploymentNames)
+ + " during deployment of '" + archiveFileName +
"'. Backing out the deployment...", startFailure);
+ // If start failed, the app is invalid, so back out the deployment.
+ DeploymentStatus removeStatus;
+ Exception removeFailure = null;
+ try {
+ progress = deploymentManager.remove(deploymentNames);
+ removeStatus = run(progress);
+ if (removeStatus.isFailed()) {
+ removeFailure = (removeStatus.getFailure() != null) ?
removeStatus.getFailure() :
+ new Exception("Remove failed for unknown reason.");
+ }
+ }
+ catch (Exception e) {
+ removeFailure = e;
+ }
+ if (removeFailure != null) {
+ LOG.error("Failed to remove deployment " +
Arrays.asList(deploymentNames)
+ + " after start failure.", removeFailure);
+ }
+ throw new Exception("Failed to start deployment " +
Arrays.asList(deploymentNames)
+ + " during deployment of '" + archiveFileName + "'
- cause: " +
+ ThrowableUtil.getAllMessages(startFailure));
+ }
+ // If we made it this far, the deployment (distribution+start) was successful.
+ return;
+ }
+
+ public static DeploymentStatus run(DeploymentProgress progress) {
+ progress.run();
+ return progress.getDeploymentStatus();
+ }
+
+ private DeploymentUtils() {
+ }
+
+}
Modified:
branches/JCA/console/src/main/java/org/teiid/rhq/plugin/util/ProfileServiceUtil.java
===================================================================
---
branches/JCA/console/src/main/java/org/teiid/rhq/plugin/util/ProfileServiceUtil.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/console/src/main/java/org/teiid/rhq/plugin/util/ProfileServiceUtil.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -1,17 +1,27 @@
package org.teiid.rhq.plugin.util;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.Set;
import javax.naming.InitialContext;
import javax.naming.NamingException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.deployers.spi.management.KnownDeploymentTypes;
import org.jboss.deployers.spi.management.ManagementView;
+import org.jboss.deployers.spi.management.deploy.DeploymentManager;
import org.jboss.managed.api.ComponentType;
import org.jboss.managed.api.ManagedComponent;
+import org.jboss.managed.api.ManagedDeployment;
import org.jboss.profileservice.spi.ProfileService;
public class ProfileServiceUtil {
+ protected final Log LOG = LogFactory.getLog(ProfileServiceUtil.class);
+
/**
* Get the passed in {@link ManagedComponent}
*
@@ -23,7 +33,7 @@
ComponentType componentType, String componentName)
throws NamingException, Exception {
ProfileService ps = getProfileService();
- ManagementView mv = getManagementView(ps);
+ ManagementView mv = getManagementView(ps, true);
ManagedComponent mc = mv.getComponent(componentName, componentType);
return mc;
@@ -34,13 +44,13 @@
* type.
*
* @return Set of {@link ManagedComponent}s
- * @throws NamingException
+ * @throws NamingException, Exception
* @throws Exception
*/
public static Set<ManagedComponent> getManagedComponents(
ComponentType componentType) throws NamingException, Exception {
ProfileService ps = getProfileService();
- ManagementView mv = getManagementView(ps);
+ ManagementView mv = getManagementView(ps, true);
Set<ManagedComponent> mcSet = mv.getComponentsForType(componentType);
@@ -51,21 +61,75 @@
* @param {@link ManagementView}
* @return
*/
- private static ManagementView getManagementView(ProfileService ps) {
+ public static ManagementView getManagementView(ProfileService ps, boolean load) {
ManagementView mv = ps.getViewManager();
- mv.load();
+ if (load) {
+ mv.load();
+ }
return mv;
}
/**
- * @return {@link ProfileService}
+ * Get the {@link DeploymentManager} from the ProfileService
+ *
+ * @return DeploymentManager
* @throws NamingException
+ * @throws Exception
*/
- private static ProfileService getProfileService() throws NamingException {
+ public static DeploymentManager getDeploymentManager()
+ throws NamingException, Exception {
+ ProfileService ps = getProfileService();
+ DeploymentManager deploymentManager = ps.getDeploymentManager();
+
+ return deploymentManager;
+ }
+
+ /**
+ * @return {@link ProfileService}
+ * @throws NamingException, Exception
+ */
+ public static ProfileService getProfileService() throws NamingException {
InitialContext ic = new InitialContext();
ProfileService ps = (ProfileService) ic
.lookup(PluginConstants.PROFILE_SERVICE);
return ps;
}
+ /**
+ * @return {@link File}
+ * @throws NamingException, Exception
+ */
+ public static File getDeployDirectory() throws NamingException, Exception {
+ ProfileService ps = getProfileService();
+ ManagementView mv = getManagementView(ps, false);
+ Set<ManagedDeployment> warDeployments;
+ try {
+ warDeployments = mv
+ .getDeploymentsForType(KnownDeploymentTypes.JavaEEWebApplication
+ .getType());
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ ManagedDeployment standaloneWarDeployment = null;
+ for (ManagedDeployment warDeployment : warDeployments) {
+ if (warDeployment.getParent() == null) {
+ standaloneWarDeployment = warDeployment;
+ break;
+ }
+ }
+ if (standaloneWarDeployment == null)
+ // This could happen if no standalone WARs, including the admin
+ // console WAR, have been fully deployed yet.
+ return null;
+ URL warUrl;
+ try {
+ warUrl = new URL(standaloneWarDeployment.getName());
+ } catch (MalformedURLException e) {
+ throw new IllegalStateException(e);
+ }
+ File warFile = new File(warUrl.getPath());
+ File deployDir = warFile.getParentFile();
+ return deployDir;
+ }
+
}
Modified: branches/JCA/console/src/resources/embedded/META-INF/rhq-plugin.xml
===================================================================
--- branches/JCA/console/src/resources/embedded/META-INF/rhq-plugin.xml 2010-02-18
20:12:13 UTC (rev 1841)
+++ branches/JCA/console/src/resources/embedded/META-INF/rhq-plugin.xml 2010-02-18
20:18:35 UTC (rev 1842)
@@ -26,21 +26,52 @@
<depends plugin="JMX" />
<depends plugin="JBossAS5" useClasses="true" />
-
<server name="Data Services" description="JBoss Enterprise Data
Services"
- class="PlatformComponent" discovery="PlatformDiscoveryComponent"
- createDeletePolicy="both">
+ class="PlatformComponent"
discovery="PlatformDiscoveryComponent">
-
<runs-inside>
<parent-resource-type name="JBossAS Server"
plugin="JBossAS5" />
</runs-inside>
- <service name="Virtual Database VDB(s)"
+ <!-- <metric displayName="Query Count" defaultOn="true"
displayType="summary"
+ category="throughput" property="queryCount"
+ description="The number of queries for a given point in time" />
+
+ <metric displayName="Long Running Queries" defaultOn="true"
+ displayType="summary" category="performance"
property="longRunningQueries"
+ description="The number of queries that have been running longer than the limit
set for queries." />
+
+ <metric displayName="Session Count" defaultOn="true"
+ displayType="summary" category="throughput"
property="sessionCount"
+ description="The number of user connections for a given point in time"
/>
+
+ <resource-configuration>
+ <c:group name="general" displayName="General"
+ hiddenByDefault="false">
+ <c:description>Query Configuration</c:description>
+ <c:simple-property name="longRunningQueryLimit"
+ type="integer" activationPolicy="immediate"
units="seconds"
+ default="600" displayName="Long Running Query limit"
+ description="The value (in seconds) to use to determine if a query is to be
considered 'long running'.">
+ <c:constraint>
+ <c:integer-constraint minimum="0" maximum="999999" />
+ </c:constraint>
+ </c:simple-property>
+ </c:group>
+ </resource-configuration>
+ -->
+ <service name="Virtual Database (VDB)s"
description="JBoss Enterprise Data Services Virtual Databases"
class="VDBComponent"
- discovery="VDBDiscoveryComponent" createDeletePolicy="both">
+ discovery="VDBDiscoveryComponent" createDeletePolicy="both"
+ creationDataType="content">
+ <content name="vdb" displayName="VDB Archive"
category="deployable"
+ isCreationType="true">
+
+ </content>
+
+
<resource-configuration>
<c:group name="general" displayName="General"
hiddenByDefault="false">
@@ -72,6 +103,7 @@
</c:group>
</resource-configuration>
+
<service name="Models" description="Models that map to a
datasource"
class="ModelComponent" discovery="ModelDiscoveryComponent"
createDeletePolicy="both">
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -47,17 +47,23 @@
*/
FINAL
}
+
+ public enum BufferReserveMode {
+ WAIT,
+ FORCE,
+ NO_WAIT
+ }
- public static int DEFAULT_CONNECTOR_BATCH_SIZE = 2048;
- public static int DEFAULT_PROCESSOR_BATCH_SIZE = 1024;
- public static int DEFAULT_MAX_PROCESSING_BATCHES = 8;
+ public static int DEFAULT_CONNECTOR_BATCH_SIZE = 1024;
+ public static int DEFAULT_PROCESSOR_BATCH_SIZE = 512;
+ public static int DEFAULT_MAX_PROCESSING_BATCHES = 128;
+
/**
- * The BufferManager may maintain at least this many batch references in memory.
- *
- * Up to 2x this value may be held by soft references.
+ * This is the maximum number of batch columns used for processing.
+ * See {@link #reserveBuffers(int, boolean)}
*/
- public static int DEFAULT_RESERVE_BUFFERS = 64;
+ public static int DEFAULT_RESERVE_BUFFERS = 16384;
/**
* Get the batch size to use during query processing.
@@ -79,7 +85,7 @@
* across even a blocked exception.
* @return
*/
- int getMaxProcessingBatches();
+ int getMaxProcessingBatchColumns();
/**
* Creates a new {@link FileStore}. See {@link
FileStore#setCleanupReference(Object)} to
@@ -90,14 +96,12 @@
FileStore createFileStore(String name);
/**
- * Reserve up to count buffers for use. Wait will cause the process to block until
- * all of the requested or half of the total buffers are available.
+ * Reserve up to count buffers for use.
* @param count
- * @param wait
+ * @param mode
* @return
- * @throws MetaMatrixComponentException
*/
- int reserveBuffers(int count, boolean wait) throws MetaMatrixComponentException;
+ int reserveBuffers(int count, BufferReserveMode mode);
/**
* Releases the buffers reserved by a call to {@link
BufferManager#reserveBuffers(int, boolean)}
@@ -105,4 +109,9 @@
*/
void releaseBuffers(int count);
+ /**
+ * Get the size estimate for the given schema.
+ */
+ int getSchemaSize(List elements);
+
}
Modified: branches/JCA/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -22,6 +22,7 @@
package com.metamatrix.common.buffer;
+import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -32,10 +33,13 @@
import java.util.concurrent.atomic.AtomicLong;
import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.types.DataTypeManager;
import com.metamatrix.common.types.Streamable;
+import com.metamatrix.core.log.MessageLevel;
import com.metamatrix.core.util.Assertion;
import com.metamatrix.dqp.DQPPlugin;
+import com.metamatrix.dqp.util.LogConstants;
import com.metamatrix.query.sql.symbol.Expression;
public class TupleBuffer {
@@ -75,15 +79,16 @@
private List<?> getCurrentTuple() throws MetaMatrixComponentException,
BlockedException {
if (currentRow <= rowCount) {
- if (forwardOnly) {
- if (batch == null || currentRow > batch.getEndRow()) {
+ //if (forwardOnly) {
+ if (batch == null || !batch.containsRow(currentRow)) {
batch = getBatch(currentRow);
}
return batch.getTuple(currentRow);
- }
+ //}
//TODO: determine if we should directly hold a soft reference here
- return getRow(currentRow);
+ //return getRow(currentRow);
}
+ batch = null;
if(isFinal) {
return null;
}
@@ -91,8 +96,7 @@
}
@Override
- public void closeSource()
- throws MetaMatrixComponentException{
+ public void closeSource() {
batch = null;
mark = 1;
reset();
@@ -210,21 +214,26 @@
* @throws MetaMatrixComponentException
*/
public void addTupleBatch(TupleBatch batch, boolean save) throws
MetaMatrixComponentException {
- Assertion.assertTrue(this.rowCount < batch.getBeginRow());
- if (this.rowCount != batch.getBeginRow() - 1) {
- saveBatch(false, true);
- this.rowCount = batch.getBeginRow() - 1;
- }
+ setRowCount(batch.getBeginRow() - 1);
if (save) {
for (List<?> tuple : batch.getAllTuples()) {
addTuple(tuple);
}
}
}
+
+ public void setRowCount(int rowCount)
+ throws MetaMatrixComponentException {
+ assert this.rowCount <= rowCount;
+ if (this.rowCount != rowCount) {
+ saveBatch(false, true);
+ this.rowCount = rowCount;
+ }
+ }
public void purge() {
if (this.batchBuffer != null) {
- this.batchBuffer = null;
+ this.batchBuffer.clear();
}
for (BatchManager.ManagedBatch batch : this.batches.values()) {
batch.remove();
@@ -260,14 +269,6 @@
this.isFinal = true;
}
- List<?> getRow(int row) throws MetaMatrixComponentException {
- if (this.batchBuffer != null && row > rowCount - this.batchBuffer.size()) {
- return this.batchBuffer.get(row - rowCount + this.batchBuffer.size() - 1);
- }
- TupleBatch batch = getBatch(row);
- return batch.getTuple(row);
- }
-
/**
* Get the batch containing the given row.
* NOTE: the returned batch may be empty or may begin with a row other
@@ -311,8 +312,13 @@
public void remove() {
if (!removed) {
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Removing
TupleBuffer:", this.tupleSourceID); //$NON-NLS-1$
+ }
+ this.batchBuffer = null;
+ purge();
this.manager.remove();
- purge();
+ removed = true;
}
}
Modified: branches/JCA/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -54,12 +54,8 @@
/**
* Closes the Tuple Source.
- * @throws MetaMatrixComponentException indicating a non-business
- * exception such as a communication exception, or other such
- * nondeterministic exception
*/
- void closeSource()
- throws MetaMatrixComponentException;
+ void closeSource();
/**
* Returns an estimate of the number of rows that can be read without blocking.
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -27,17 +27,13 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.ref.Reference;
-import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
-import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -62,7 +58,6 @@
import com.metamatrix.common.types.StandardXMLTranslator;
import com.metamatrix.common.types.Streamable;
import com.metamatrix.common.types.XMLType;
-import com.metamatrix.common.util.PropertiesUtils;
import com.metamatrix.core.MetaMatrixRuntimeException;
import com.metamatrix.core.log.MessageLevel;
import com.metamatrix.core.util.Assertion;
@@ -81,6 +76,10 @@
* with a simple LRU.
*
* TODO: allow for cached stores to use lru - (result set/mat view)
+ * TODO: account for row/content based sizing (difficult given value sharing)
+ * TODO: account for memory based lobs (it would be nice if the approximate buffer size
matched at 100kB)
+ * TODO: add detection of pinned batches to prevent unnecessary purging of non-persistent
batches
+ * - this is not necessary for already persistent batches, since we hold a weak
reference
*/
public class BufferManagerImpl implements BufferManager, StorageManager {
@@ -96,10 +95,7 @@
ManagedBatchImpl removeBatch(int row) {
ManagedBatchImpl result = batches.remove(row);
if (result != null) {
- activeBatchCount--;
- if (toPersistCount > 0) {
- toPersistCount--;
- }
+ activeBatchColumnCount -= result.columnCount;
}
return result;
}
@@ -111,23 +107,30 @@
private long offset = -1;
private boolean persistent;
- private volatile TupleBatch pBatch;
- private Reference<TupleBatch> batchReference;
+ private volatile TupleBatch activeBatch;
+ private volatile Reference<TupleBatch> batchReference;
private int beginRow;
+ private int columnCount;
- public ManagedBatchImpl(String id, FileStore store, TupleBatch batch) throws
MetaMatrixComponentException {
+ public ManagedBatchImpl(String id, FileStore store, TupleBatch batch) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to
BufferManager", batchAdded.incrementAndGet()); //$NON-NLS-1$
this.id = id;
this.store = store;
- this.pBatch = batch;
+ this.activeBatch = batch;
this.beginRow = batch.getBeginRow();
- addToCache(false);
- persistBatchReferences();
+ List[] allTuples = batch.getAllTuples();
+ if (allTuples.length > 0) {
+ columnCount = allTuples[0].size();
+ }
}
private void addToCache(boolean update) {
synchronized (activeBatches) {
- activeBatchCount++;
+ TupleBatch batch = this.activeBatch;
+ if (batch == null) {
+ return; //already removed
+ }
+ activeBatchColumnCount += columnCount;
TupleBufferInfo tbi = null;
if (update) {
tbi = activeBatches.remove(this.id);
@@ -164,35 +167,34 @@
}
}
}
+ persistBatchReferences();
synchronized (this) {
- if (this.batchReference != null && this.pBatch == null) {
- TupleBatch result = this.batchReference.get();
- if (result != null) {
- if (!cache) {
- softCache.remove(this);
- this.batchReference.clear();
+ TupleBatch batch = this.activeBatch;
+ if (batch != null){
+ return batch;
+ }
+ Reference<TupleBatch> ref = this.batchReference;
+ this.batchReference = null;
+ if (ref != null) {
+ batch = ref.get();
+ if (batch != null) {
+ if (cache) {
+ this.activeBatch = batch;
+ addToCache(true);
}
referenceHit.getAndIncrement();
- return result;
+ return batch;
}
}
-
- TupleBatch batch = this.pBatch;
- if (batch != null){
- return batch;
- }
- }
- persistBatchReferences();
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reading batch from
disk", readCount.incrementAndGet()); //$NON-NLS-1$
- synchronized (this) {
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reading batch from disk",
readCount.incrementAndGet()); //$NON-NLS-1$
try {
ObjectInputStream ois = new ObjectInputStream(new
BufferedInputStream(store.createInputStream(this.offset), IO_BUFFER_SIZE));
- TupleBatch batch = new TupleBatch();
+ batch = new TupleBatch();
batch.setDataTypes(types);
batch.readExternal(ois);
batch.setDataTypes(null);
if (cache) {
- this.pBatch = batch;
+ this.activeBatch = batch;
addToCache(true);
}
return batch;
@@ -204,71 +206,9 @@
}
}
- public void persistBatchReferences() throws MetaMatrixComponentException {
- ManagedBatchImpl mb = null;
- boolean createSoft = false;
- /*
- * If we are over our limit, collect half of the batches.
- */
- synchronized (activeBatches) {
- if (activeBatchCount > reserveBatches && toPersistCount == 0) {
- toPersistCount = activeBatchCount / 2;
- }
- }
- while (true) {
- synchronized (activeBatches) {
- if (activeBatchCount == 0 || toPersistCount == 0) {
- toPersistCount = 0;
- break;
- }
- Iterator<TupleBufferInfo> iter = activeBatches.values().iterator();
- TupleBufferInfo tbi = iter.next();
- Map.Entry<Integer, ManagedBatchImpl> entry = null;
- if (tbi.lastUsed != null) {
- entry = tbi.batches.floorEntry(tbi.lastUsed - 1);
- }
- if (entry == null) {
- entry = tbi.batches.pollLastEntry();
- } else {
- createSoft = true;
- tbi.batches.remove(entry.getKey());
- }
- if (tbi.batches.isEmpty()) {
- iter.remove();
- }
- activeBatchCount--;
- toPersistCount--;
- mb = entry.getValue();
- }
- persist(createSoft, mb);
- }
- synchronized (softCache) {
- if (softCache.size() > reserveBatches) {
- Iterator<ManagedBatchImpl> iter = softCache.iterator();
- mb = iter.next();
- iter.remove();
- }
- }
- persist(false, mb);
- }
-
- private void persist(boolean createSoft, ManagedBatchImpl mb)
- throws MetaMatrixComponentException {
+ public synchronized void persist() throws MetaMatrixComponentException {
try {
- if (mb != null) {
- mb.persist(createSoft);
- }
- } catch (MetaMatrixComponentException e) {
- if (mb == this) {
- throw e;
- }
- LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch,
attempts to read that batch later will result in an exception"); //$NON-NLS-1$
- }
- }
-
- public synchronized void persist(boolean createSoft) throws
MetaMatrixComponentException {
- try {
- TupleBatch batch = pBatch;
+ TupleBatch batch = activeBatch;
if (batch != null) {
if (!persistent) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Writing batch to disk",
writeCount.incrementAndGet()); //$NON-NLS-1$
@@ -282,18 +222,13 @@
fsos.flushBuffer();
}
}
- if (createSoft) {
- this.batchReference = new SoftReference<TupleBatch>(batch);
- softCache.add(this);
- } else {
- this.batchReference = new WeakReference<TupleBatch>(batch);
- }
+ this.batchReference = new WeakReference<TupleBatch>(batch);
}
} catch (IOException e) {
throw new MetaMatrixComponentException(e);
} finally {
persistent = true;
- pBatch = null;
+ activeBatch = null;
}
}
@@ -304,16 +239,13 @@
activeBatches.remove(this.id);
}
}
- softCache.remove(this);
- pBatch = null;
- if (batchReference != null) {
- batchReference.clear();
- }
+ activeBatch = null;
+ batchReference = null;
}
@Override
public String toString() {
- return "ManagedBatch " + id + " " + pBatch; //$NON-NLS-1$
//$NON-NLS-2$
+ return "ManagedBatch " + id + " " + activeBatch; //$NON-NLS-1$
//$NON-NLS-2$
}
}
@@ -321,16 +253,14 @@
private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
private int processorBatchSize = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE;
private int maxProcessingBatches = BufferManager.DEFAULT_MAX_PROCESSING_BATCHES;
- private int reserveBatches = BufferManager.DEFAULT_RESERVE_BUFFERS;
- private int maxReserveBatches = BufferManager.DEFAULT_RESERVE_BUFFERS;
+ private int maxReserveBatchColumns = BufferManager.DEFAULT_RESERVE_BUFFERS;
+ private volatile int reserveBatchColumns = BufferManager.DEFAULT_RESERVE_BUFFERS;
private ReentrantLock lock = new ReentrantLock(true);
private Condition batchesFreed = lock.newCondition();
- private int toPersistCount = 0;
- private int activeBatchCount = 0;
+ private volatile int activeBatchColumnCount = 0;
private Map<String, TupleBufferInfo> activeBatches = new
LinkedHashMap<String, TupleBufferInfo>();
- private Set<ManagedBatchImpl> softCache = Collections.synchronizedSet(new
LinkedHashSet<ManagedBatchImpl>());
private StorageManager diskMgr;
@@ -341,18 +271,20 @@
private AtomicInteger readAttempts = new AtomicInteger();
private AtomicInteger referenceHit = new AtomicInteger();
- public int getMaxProcessingBatches() {
+ @Override
+ public int getMaxProcessingBatchColumns() {
return maxProcessingBatches;
}
- public void setMaxProcessingBatches(int maxProcessingBatches) {
- this.maxProcessingBatches = Math.max(2, maxProcessingBatches);
+ public void setMaxProcessingBatchColumns(int maxProcessingBatches) {
+ this.maxProcessingBatches = Math.max(0, maxProcessingBatches);
}
/**
* Get processor batch size
* @return Number of rows in a processor batch
*/
+ @Override
public int getProcessorBatchSize() {
return this.processorBatchSize;
}
@@ -361,6 +293,7 @@
* Get connector batch size
* @return Number of rows in a connector batch
*/
+ @Override
public int getConnectorBatchSize() {
return this.connectorBatchSize;
}
@@ -402,7 +335,10 @@
this.store = createFileStore(newID);
this.store.setCleanupReference(this);
}
- return new ManagedBatchImpl(newID, store, batch);
+ ManagedBatchImpl mbi = new ManagedBatchImpl(newID, store, batch);
+ mbi.addToCache(false);
+ persistBatchReferences();
+ return mbi;
}
@Override
@@ -450,9 +386,12 @@
@Override
public void releaseBuffers(int count) {
+ if (count < 1) {
+ return;
+ }
lock.lock();
try {
- this.reserveBatches += count;
+ this.reserveBatchColumns += count;
batchesFreed.signalAll();
} finally {
lock.unlock();
@@ -460,31 +399,83 @@
}
@Override
- public int reserveBuffers(int count, boolean wait) throws
MetaMatrixComponentException {
+ public int reserveBuffers(int count, BufferReserveMode mode) {
lock.lock();
try {
- while (wait && count > this.reserveBatches &&
this.reserveBatches < this.maxReserveBatches / 2) {
- try {
- batchesFreed.await();
- } catch (InterruptedException e) {
- throw new MetaMatrixComponentException(e);
- }
- }
- this.reserveBatches -= count;
- if (this.reserveBatches >= 0) {
+ if (mode == BufferReserveMode.WAIT) {
+ int waitCount = 0;
+ while (count - waitCount > this.reserveBatchColumns) {
+ try {
+ batchesFreed.await(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new MetaMatrixRuntimeException(e);
+ }
+ waitCount++;
+ }
+ }
+ if (this.reserveBatchColumns >= count || mode == BufferReserveMode.FORCE) {
+ this.reserveBatchColumns -= count;
return count;
}
- int result = count + this.reserveBatches;
- this.reserveBatches = 0;
+ int result = Math.max(0, this.reserveBatchColumns);
+ this.reserveBatchColumns -= result;
return result;
} finally {
lock.unlock();
+ persistBatchReferences();
}
}
- public void setMaxReserveBatches(int maxReserveBatches) {
- this.maxReserveBatches = maxReserveBatches;
+ void persistBatchReferences() {
+ if (activeBatchColumnCount == 0 || activeBatchColumnCount <= reserveBatchColumns) {
+ int memoryCount = activeBatchColumnCount + maxReserveBatchColumns -
reserveBatchColumns;
+ if (DataTypeManager.isValueCacheEnabled()) {
+ if (memoryCount < maxReserveBatchColumns / 8) {
+ DataTypeManager.setValueCacheEnabled(false);
+ }
+ } else if (memoryCount > maxReserveBatchColumns / 4) {
+ DataTypeManager.setValueCacheEnabled(true);
+ }
+ return;
+ }
+ while (true) {
+ ManagedBatchImpl mb = null;
+ synchronized (activeBatches) {
+ if (activeBatchColumnCount == 0 || activeBatchColumnCount * 5 <
reserveBatchColumns * 4) {
+ break;
+ }
+ Iterator<TupleBufferInfo> iter = activeBatches.values().iterator();
+ TupleBufferInfo tbi = iter.next();
+ Map.Entry<Integer, ManagedBatchImpl> entry = null;
+ if (tbi.lastUsed != null) {
+ entry = tbi.batches.floorEntry(tbi.lastUsed - 1);
+ }
+ if (entry == null) {
+ entry = tbi.batches.lastEntry();
+ }
+ tbi.removeBatch(entry.getKey());
+ if (tbi.batches.isEmpty()) {
+ iter.remove();
+ }
+ mb = entry.getValue();
+ }
+ try {
+ mb.persist();
+ } catch (MetaMatrixComponentException e) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch,
attempts to read that batch later will result in an exception"); //$NON-NLS-1$
+ }
+ }
}
+
+ @Override
+ public int getSchemaSize(List elements) {
+ return elements.size();
+ }
+
+ public void setMaxReserveBatchColumns(int maxReserve) {
+ this.maxReserveBatchColumns = maxReserve;
+ this.reserveBatchColumns = maxReserve;
+ }
public void shutdown() {
}
Modified: branches/JCA/engine/src/main/java/com/metamatrix/query/eval/Evaluator.java
===================================================================
--- branches/JCA/engine/src/main/java/com/metamatrix/query/eval/Evaluator.java 2010-02-18
20:12:13 UTC (rev 1841)
+++ branches/JCA/engine/src/main/java/com/metamatrix/query/eval/Evaluator.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -39,7 +39,6 @@
import com.metamatrix.api.exception.query.ExpressionEvaluationException;
import com.metamatrix.common.buffer.BlockedException;
import com.metamatrix.common.types.Sequencable;
-import com.metamatrix.core.util.ArgCheck;
import com.metamatrix.core.util.Assertion;
import com.metamatrix.core.util.EquivalenceUtil;
import com.metamatrix.query.QueryPlugin;
@@ -224,8 +223,11 @@
}
private final int compareValues(Object leftValue, Object rightValue) {
- ArgCheck.isInstanceOf(Comparable.class, leftValue);
- ArgCheck.isInstanceOf(Comparable.class, rightValue);
+ assert leftValue instanceof Comparable<?>;
+ assert rightValue instanceof Comparable<?>;
+ if (leftValue == rightValue) {
+ return 0;
+ }
return ((Comparable)leftValue).compareTo(rightValue);
}
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleImplementJoinStrategy.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleImplementJoinStrategy.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RuleImplementJoinStrategy.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -79,13 +79,11 @@
if (joinNode.getProperty(NodeConstants.Info.JOIN_TYPE) == JoinType.JOIN_INNER
&& context != null) {
float leftCost =
NewCalculateCostUtil.computeCostForTree(joinNode.getFirstChild(), metadata);
float rightCost =
NewCalculateCostUtil.computeCostForTree(joinNode.getLastChild(), metadata);
- boolean leftSmall = leftCost < context.getProcessorBatchSize() / 4;
- boolean rightSmall = rightCost < context.getProcessorBatchSize() / 4;
- boolean leftLarge = leftCost > context.getProcessorBatchSize();
- boolean rightLarge = rightCost > context.getProcessorBatchSize();
- if (leftLarge || rightLarge) {
- pushLeft = leftCost == NewCalculateCostUtil.UNKNOWN_VALUE || leftSmall
|| rightLarge;
- pushRight = rightCost == NewCalculateCostUtil.UNKNOWN_VALUE ||
rightSmall || leftLarge || joinNode.getProperty(NodeConstants.Info.DEPENDENT_VALUE_SOURCE)
!= null;
+ if (leftCost != NewCalculateCostUtil.UNKNOWN_VALUE && rightCost !=
NewCalculateCostUtil.UNKNOWN_VALUE
+ && (leftCost > context.getProcessorBatchSize() || rightCost
> context.getProcessorBatchSize())) {
+ //we use a larger constant here to ensure that we don't unwisely
prevent pushdown
+ pushLeft = leftCost < context.getProcessorBatchSize() || leftCost /
rightCost < 16;
+ pushRight = rightCost < context.getProcessorBatchSize() || rightCost /
leftCost < 16 || joinNode.getProperty(NodeConstants.Info.DEPENDENT_VALUE_SOURCE) !=
null;
}
}
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RulePushSelectCriteria.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RulePushSelectCriteria.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/optimizer/relational/rules/RulePushSelectCriteria.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -140,6 +140,12 @@
CapabilitiesFinder capFinder, PlanNode critNode)
throws MetaMatrixComponentException, QueryMetadataException {
if (critNode.getGroups().isEmpty()) {
+ //check to see if pushing may impact cardinality
+ PlanNode groupNode = NodeEditor.findNodePreOrder(critNode,
NodeConstants.Types.GROUP, NodeConstants.Types.SOURCE);
+ if (groupNode != null &&
!groupNode.hasCollectionProperty(NodeConstants.Info.GROUP_COLS)) {
+ return groupNode;
+ }
+
Object modelId = getSubqueryModelId(metadata, capFinder, critNode);
if (modelId != null) {
for (PlanNode node : NodeEditor.findAllNodes(critNode, NodeConstants.Types.SOURCE))
{
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/BatchCollector.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/BatchCollector.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/BatchCollector.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -95,7 +95,7 @@
add = this.batchHandler.batchProduced(batch);
}
// Add batch
- if(batch.getRowCount() > 0) {
+ if(batch.getRowCount() > 0 || batch.getTerminationFlag()) {
buffer.addTupleBatch(batch, add);
}
}
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -103,7 +103,7 @@
}
@Override
- public void closeSource() throws MetaMatrixComponentException {
+ public void closeSource() {
if (this.buffer != null) {
this.buffer.remove();
this.buffer = null;
@@ -124,7 +124,8 @@
List result = currentTuple;
currentTuple = null;
if (mark && saveOnMark && this.currentRow - 1 >
this.buffer.getRowCount()) {
- this.buffer.addTupleBatch(new TupleBatch(this.currentRow - 1, new List[]
{result}), true);
+ this.buffer.setRowCount(this.currentRow - 2);
+ this.buffer.addTuple(result);
this.bufferedIndex = this.currentRow - 1;
}
return result;
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -30,6 +30,7 @@
import com.metamatrix.common.buffer.BlockedException;
import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.buffer.TupleBatch;
+import com.metamatrix.common.buffer.BufferManager.BufferReserveMode;
import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.core.MetaMatrixRuntimeException;
@@ -57,6 +58,7 @@
private BufferManager bufferMgr;
private ProcessorPlan processPlan;
private boolean initialized = false;
+ private int reserved;
/** Flag that marks whether the request has been canceled. */
private volatile boolean requestCanceled = false;
private static final int DEFAULT_WAIT = 50;
@@ -125,10 +127,12 @@
try {
// initialize if necessary
if(! initialized) {
+ if (reserved == 0) {
+ reserved =
this.bufferMgr.reserveBuffers(this.bufferMgr.getSchemaSize(this.getOutputElements()),
BufferReserveMode.FORCE);
+ }
// Open the top node for reading
processPlan.open();
-
- initialized = true;
+ initialized = true;
}
long currentTime = System.currentTimeMillis();
@@ -155,11 +159,7 @@
} catch (BlockedException e) {
throw e;
} catch (MetaMatrixException e) {
- try {
- closeProcessing();
- } catch (MetaMatrixException e1){
- LogManager.logDetail(LogConstants.CTX_DQP, e1, "Error closing
processor"); //$NON-NLS-1$
- }
+ closeProcessing();
if (e instanceof MetaMatrixProcessingException) {
throw (MetaMatrixProcessingException)e;
}
@@ -180,18 +180,22 @@
/**
* Close processing and clean everything up. Should only be called by the same
thread that called process.
- * @throws MetaMatrixComponentException
*/
- public void closeProcessing() throws MetaMatrixComponentException {
+ public void closeProcessing() {
if (processorClosed) {
return;
}
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_DQP, "QueryProcessor: closing
processor"); //$NON-NLS-1$
}
+ this.bufferMgr.releaseBuffers(reserved);
+ reserved = 0;
processorClosed = true;
-
- processPlan.close();
+ try {
+ processPlan.close();
+ } catch (MetaMatrixComponentException e1){
+ LogManager.logDetail(LogConstants.CTX_DQP, e1, "Error closing processor");
//$NON-NLS-1$
+ }
}
@Override
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/AccessNode.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/AccessNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/AccessNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -103,7 +103,7 @@
isUpdate = RelationalNodeUtil.isUpdate(atomicCommand);
if(needProcessing) {
- this.tupleSource =
getDataManager().registerRequest(this.getContext().getProcessorID(), atomicCommand,
modelName, connectorBindingId, getID());
+ registerRequest(atomicCommand);
}
}
@@ -153,7 +153,7 @@
}
Command atomicCommand = (Command)command.clone();
if (prepareNextCommand(atomicCommand)) {
- tupleSource =
getDataManager().registerRequest(this.getContext().getProcessorID(), atomicCommand,
modelName, null, getID());
+ registerRequest(atomicCommand);
break;
}
}
@@ -168,6 +168,11 @@
terminateBatches();
return pullBatch();
}
+
+ private void registerRequest(Command atomicCommand)
+ throws MetaMatrixComponentException, MetaMatrixProcessingException {
+ tupleSource = getDataManager().registerRequest(this.getContext().getProcessorID(),
atomicCommand, modelName, connectorBindingId, getID());
+ }
protected boolean processCommandsIndividually() {
return false;
@@ -177,15 +182,11 @@
return false;
}
- public void close() throws MetaMatrixComponentException {
- if (!isClosed()) {
- super.close();
-
- closeSources();
- }
+ public void closeDirect() {
+ closeSources();
}
- private void closeSources() throws MetaMatrixComponentException {
+ private void closeSources() {
if(this.tupleSource != null) {
this.tupleSource.closeSource();
tupleSource = null;
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/BatchedUpdateNode.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/BatchedUpdateNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/BatchedUpdateNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -144,14 +144,10 @@
* @see com.metamatrix.query.processor.relational.RelationalNode#close()
* @since 4.2
*/
- public void close() throws MetaMatrixComponentException {
- if (!isClosed()) {
- super.close();
-
- if (tupleSource != null) {
- tupleSource.closeSource();
- tupleSource = null;
- }
+ public void closeDirect() {
+ if (tupleSource != null) {
+ tupleSource.closeSource();
+ tupleSource = null;
}
}
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentAccessNode.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentAccessNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentAccessNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -54,11 +54,8 @@
/**
* @see com.metamatrix.query.processor.relational.AccessNode#close()
*/
- public void close() throws MetaMatrixComponentException {
- if (isClosed()) {
- return;
- }
- super.close();
+ public void closeDirect() {
+ super.closeDirect();
if (criteriaProcessor != null) {
criteriaProcessor.close();
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -90,7 +90,7 @@
DependentValueSource originalVs =
(DependentValueSource)dependentNode.getContext().getVariableContext().getGlobalValue(valueSource);
this.sortUtility = new
SortUtility(originalVs.getTupleBuffer().createIndexedTupleSource(), sortSymbols,
sortDirection, Mode.DUP_REMOVE, dependentNode.getBufferManager(),
dependentNode.getConnectionID());
}
- dvs = new DependentValueSource(sortUtility.sort(),
dependentNode.getBufferManager().getProcessorBatchSize() / 2);
+ dvs = new DependentValueSource(sortUtility.sort());
for (SetState setState : dependentSetStates) {
setState.valueIterator =
dvs.getValueIterator(setState.valueExpression);
}
@@ -176,7 +176,7 @@
}
}
- public void close() throws MetaMatrixComponentException {
+ public void close() {
if (dependentState != null) {
for (TupleState state : dependentState.values()) {
state.close();
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentProcedureAccessNode.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentProcedureAccessNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentProcedureAccessNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -77,11 +77,8 @@
criteriaProcessor = null;
}
- public void close() throws MetaMatrixComponentException {
- if (isClosed()) {
- return;
- }
- super.close();
+ public void closeDirect() {
+ super.closeDirect();
if (criteriaProcessor != null) {
criteriaProcessor.close();
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentProcedureExecutionNode.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentProcedureExecutionNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentProcedureExecutionNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -76,11 +76,8 @@
criteriaProcessor = null;
}
- public void close() throws MetaMatrixComponentException {
- if (isClosed()) {
- return;
- }
- super.close();
+ public void closeDirect() {
+ super.closeDirect();
if (criteriaProcessor != null) {
criteriaProcessor.close();
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -45,17 +45,15 @@
public class DependentValueSource implements
ValueIteratorSource {
- private TupleBuffer tupleSourceID;
- private int maxSetSize;
+ private TupleBuffer buffer;
private Map<Expression, Set<Object>> cachedSets;
- public DependentValueSource(TupleBuffer tupleSourceID, int maxSetSize) {
- this.tupleSourceID = tupleSourceID;
- this.maxSetSize = maxSetSize;
+ public DependentValueSource(TupleBuffer tupleSourceID) {
+ this.buffer = tupleSourceID;
}
public TupleBuffer getTupleBuffer() {
- return tupleSourceID;
+ return buffer;
}
/**
@@ -63,7 +61,7 @@
* @see
com.metamatrix.query.sql.util.ValueIteratorSource#getValueIterator(com.metamatrix.query.sql.symbol.Expression)
*/
public ValueIterator getValueIterator(Expression valueExpression) throws
MetaMatrixComponentException {
- IndexedTupleSource its = tupleSourceID.createIndexedTupleSource();
+ IndexedTupleSource its = buffer.createIndexedTupleSource();
int index = 0;
if (valueExpression != null) {
index = its.getSchema().indexOf(valueExpression);
@@ -78,10 +76,10 @@
result = cachedSets.get(valueExpression);
}
if (result == null) {
- if (tupleSourceID.getRowCount() > maxSetSize) {
+ if (buffer.getRowCount() > buffer.getBatchSize()) {
return null;
}
- IndexedTupleSource its = tupleSourceID.createIndexedTupleSource();
+ IndexedTupleSource its = buffer.createIndexedTupleSource();
int index = 0;
if (valueExpression != null) {
index = its.getSchema().indexOf(valueExpression);
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -125,7 +125,9 @@
this.collectionBuffer.close();
// Sort
- sortUtility = new SortUtility(collectionBuffer.createIndexedTupleSource(),
elements, sortTypes, Mode.DUP_REMOVE, mgr, groupName);
+ if (sortUtility == null) {
+ sortUtility = new SortUtility(collectionBuffer.createIndexedTupleSource(),
elements, sortTypes, Mode.DUP_REMOVE_SORT, mgr, groupName);
+ }
TupleBuffer sorted = sortUtility.sort();
sorted.setForwardOnly(true);
try {
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -212,9 +212,8 @@
return groupPhase();
}
- TupleBatch terminationBatch = new TupleBatch(1, Collections.EMPTY_LIST);
- terminationBatch.setTerminationFlag(true);
- return terminationBatch;
+ this.terminateBatches();
+ return pullBatch();
}
public TupleSource getCollectionTupleSource() {
@@ -265,7 +264,7 @@
}
@Override
- public void closeSource() throws MetaMatrixComponentException {
+ public void closeSource() {
}
@@ -390,14 +389,11 @@
}
}
- public void close() throws MetaMatrixComponentException {
- if (!isClosed()) {
- if (this.sortBuffer != null) {
- this.sortBuffer.remove();
- this.sortBuffer = null;
- }
- super.close();
- }
+ public void closeDirect() {
+ if (this.sortBuffer != null) {
+ this.sortBuffer.remove();
+ this.sortBuffer = null;
+ }
}
protected void getNodeString(StringBuffer str) {
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/InsertPlanExecutionNode.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/InsertPlanExecutionNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/InsertPlanExecutionNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -102,8 +102,7 @@
}
@Override
- public void close() throws MetaMatrixComponentException {
- super.close();
+ public void closeDirect() {
this.currentBatch = null;
}
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/JoinNode.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/JoinNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/JoinNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -36,6 +36,7 @@
import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.buffer.TupleBatch;
import com.metamatrix.common.buffer.TupleBuffer;
+import com.metamatrix.common.buffer.BufferManager.BufferReserveMode;
import com.metamatrix.query.processor.ProcessorDataManager;
import com.metamatrix.query.processor.relational.SourceState.ImplicitBuffer;
import com.metamatrix.query.sql.LanguageObject;
@@ -48,6 +49,10 @@
*/
public class JoinNode extends SubqueryAwareRelationalNode {
+ static class BatchAvailableException extends RuntimeException {}
+
+ static BatchAvailableException BATCH_AVILABLE = new BatchAvailableException();
+
public enum JoinStrategyType {
MERGE,
PARTITIONED_SORT,
@@ -59,6 +64,7 @@
private boolean leftOpened;
private boolean rightOpened;
+ private int reserved;
private JoinStrategy joinStrategy;
private JoinType joinType;
@@ -147,16 +153,25 @@
this.leftOpened = true;
}
- if(!isDependent() && !this.rightOpened) {
- // Open right child if non-dependent
- getChildren()[1].open();
- this.rightOpened = true;
+ if(!isDependent()) {
+ openRight();
}
this.state = State.LOAD_LEFT;
// Set Up Join Strategy
this.joinStrategy.initialize(this);
}
+
+ private void openRight() throws MetaMatrixComponentException,
+ MetaMatrixProcessingException {
+ if (!this.rightOpened) {
+ if (reserved == 0) {
+ reserved =
getBufferManager().reserveBuffers(getBufferManager().getSchemaSize(getOutputElements()),
BufferReserveMode.FORCE);
+ }
+ getChildren()[1].open();
+ this.rightOpened = true;
+ }
+ }
/**
* @see com.metamatrix.query.processor.relational.RelationalNode#clone()
@@ -194,33 +209,25 @@
}
//left child was already opened by the join node
this.joinStrategy.loadLeft();
+ if (isDependent()) {
+ TupleBuffer buffer = this.joinStrategy.leftSource.getTupleBuffer();
+
this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource, new
DependentValueSource(buffer));
+ }
state = State.LOAD_RIGHT;
}
if (state == State.LOAD_RIGHT) {
- if (isDependent() && !this.rightOpened) {
- TupleBuffer tsID = this.joinStrategy.leftSource.getTupleBuffer();
-
this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource, new
DependentValueSource(tsID, this.getBufferManager().getProcessorBatchSize() / 2));
- //open the right side now that the tuples have been collected
- this.getChildren()[1].open();
- this.rightOpened = true;
- }
+ this.openRight();
this.joinStrategy.loadRight();
+ this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource,
null);
state = State.EXECUTE;
}
-
- while(true) {
- if(super.isBatchFull()) {
- return super.pullBatch();
- }
- List outputTuple = this.joinStrategy.nextTuple();
- if(outputTuple != null) {
- List projectTuple = projectTuple(this.projectionIndexes, outputTuple);
- super.addBatchRow(projectTuple);
- } else {
- super.terminateBatches();
- return super.pullBatch();
- }
+ try {
+ this.joinStrategy.process();
+ this.terminateBatches();
+ } catch (BatchAvailableException e) {
+ //pull the batch
}
+ return pullBatch();
}
/**
@@ -295,13 +302,12 @@
this.dependentValueSource = dependentValueSource;
}
- public void close()
- throws MetaMatrixComponentException {
- super.close();
+ public void closeDirect() {
+ getBufferManager().releaseBuffers(reserved);
+ reserved = 0;
+ super.closeDirect();
joinStrategy.close();
- if (this.isDependent()) {
- this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource,
null);
- }
+ this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource,
null);
}
public JoinType getJoinType() {
@@ -335,5 +341,14 @@
}
return Arrays.asList(this.joinCriteria);
}
+
+ @Override
+ protected void addBatchRow(List row) {
+ List projectTuple = projectTuple(this.projectionIndexes, row);
+ super.addBatchRow(projectTuple);
+ if (isBatchFull()) {
+ throw BATCH_AVILABLE;
+ }
+ }
}
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/JoinStrategy.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/JoinStrategy.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/JoinStrategy.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -79,7 +79,7 @@
return combinedRow;
}
- protected abstract List nextTuple() throws MetaMatrixComponentException,
CriteriaEvaluationException, MetaMatrixProcessingException;
+ protected abstract void process() throws MetaMatrixComponentException,
CriteriaEvaluationException, MetaMatrixProcessingException;
public abstract Object clone();
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/LimitNode.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/LimitNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/LimitNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -98,9 +98,6 @@
rowCounter += resultBatch.getRowCount();
if (rowCounter == limit || batch.getTerminationFlag()) {
resultBatch.setTerminationFlag(true);
- if (!batch.getTerminationFlag()){
- getChildren()[0].close();
- }
}
return resultBatch;
}
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/MergeJoinStrategy.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/MergeJoinStrategy.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/MergeJoinStrategy.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -135,7 +135,7 @@
}
@Override
- protected List nextTuple() throws MetaMatrixComponentException,
+ protected void process() throws MetaMatrixComponentException,
CriteriaEvaluationException, MetaMatrixProcessingException {
while (this.mergeState != MergeState.DONE) {
@@ -149,7 +149,7 @@
this.leftScanState = ScanState.DONE;
if (joinNode.getJoinType() != JoinType.JOIN_FULL_OUTER) {
mergeState = MergeState.DONE;
- return null;
+ return;
}
}
}
@@ -163,7 +163,7 @@
this.rightScanState = ScanState.DONE;
if (!this.joinNode.getJoinType().isOuter()) {
mergeState = MergeState.DONE;
- return null;
+ return;
}
}
}
@@ -173,7 +173,7 @@
if (this.leftScanState == ScanState.DONE) {
if (this.rightScanState == ScanState.DONE) {
this.mergeState = MergeState.DONE;
- return null;
+ return;
}
result = -1;
} else if (this.rightScanState == ScanState.DONE) {
@@ -191,12 +191,12 @@
} else if (result > 0) {
this.leftScanState = ScanState.READ;
if (this.joinNode.getJoinType().isOuter()) {
- return outputTuple(this.leftSource.getCurrentTuple(),
this.rightSource.getOuterVals());
+
this.joinNode.addBatchRow(outputTuple(this.leftSource.getCurrentTuple(),
this.rightSource.getOuterVals()));
}
} else {
this.rightScanState = ScanState.READ;
if (joinNode.getJoinType() == JoinType.JOIN_FULL_OUTER) {
- return outputTuple(this.leftSource.getOuterVals(),
this.rightSource.getCurrentTuple());
+
this.joinNode.addBatchRow(outputTuple(this.leftSource.getOuterVals(),
this.rightSource.getCurrentTuple()));
}
}
}
@@ -229,11 +229,11 @@
while (loopState == LoopState.LOAD_INNER || loopState ==
LoopState.EVALUATE_CRITERIA) {
if (loopState == LoopState.LOAD_INNER) {
- if (compareToPrevious(innerState)) {
- loopState = LoopState.EVALUATE_CRITERIA;
- } else {
- loopState = LoopState.LOAD_OUTER;
+ if (!compareToPrevious(innerState)) {
+ loopState = LoopState.LOAD_OUTER;
+ break;
}
+ loopState = LoopState.EVALUATE_CRITERIA;
}
if (loopState == LoopState.EVALUATE_CRITERIA) {
@@ -248,25 +248,25 @@
if (this.joinNode.getJoinType() == JoinType.JOIN_SEMI) {
this.loopState = LoopState.LOAD_OUTER; //only one
match is needed for semi join
}
- return outputTuple;
+ this.joinNode.addBatchRow(outputTuple);
+ continue;
}
//right outer join || anti semi join can skip to the next
outer value
this.loopState = LoopState.LOAD_OUTER;
+ break;
}
}
}
if (!outerMatched) {
if (matchState == MatchState.MATCH_RIGHT) {
- return outputTuple(this.leftSource.getOuterVals(),
this.rightSource.getCurrentTuple());
+
this.joinNode.addBatchRow(outputTuple(this.leftSource.getOuterVals(),
this.rightSource.getCurrentTuple()));
+ } else if (this.joinNode.getJoinType().isOuter()) {
+
this.joinNode.addBatchRow(outputTuple(this.leftSource.getCurrentTuple(),
this.rightSource.getOuterVals()));
}
- if (this.joinNode.getJoinType().isOuter()) {
- return outputTuple(this.leftSource.getCurrentTuple(),
this.rightSource.getOuterVals());
- }
}
}
}
- return null;
}
protected boolean compareToPrevious(SourceState target) throws
MetaMatrixComponentException,
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/NullNode.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/NullNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/NullNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -22,7 +22,6 @@
package com.metamatrix.query.processor.relational;
-import java.util.Collections;
import java.util.Map;
import com.metamatrix.api.exception.MetaMatrixComponentException;
@@ -37,9 +36,8 @@
public TupleBatch nextBatchDirect()
throws MetaMatrixComponentException {
- TupleBatch batch = new TupleBatch(1, Collections.EMPTY_LIST);
- batch.setTerminationFlag(true);
- return batch;
+ this.terminateBatches();
+ return pullBatch();
}
protected void getNodeString(StringBuffer str) {
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -32,6 +32,7 @@
import com.metamatrix.common.buffer.IndexedTupleSource;
import com.metamatrix.common.buffer.TupleBatch;
import com.metamatrix.common.buffer.TupleBuffer;
+import com.metamatrix.common.buffer.BufferManager.BufferReserveMode;
/**
* Extends the basic fully sorted merge join to check for conditions necessary
@@ -54,6 +55,7 @@
private List<?> partitionedTuple;
private int matchBegin = -1;
private int matchEnd = -1;
+ private int reserved;
public PartitionedSortJoin(SortOption sortLeft, SortOption sortRight) {
super(sortLeft, sortRight, false);
@@ -65,6 +67,7 @@
for (TupleBuffer tupleSourceID : this.partitions) {
tupleSourceID.remove();
}
+ releaseReserved();
this.endTuples = null;
this.overlap.clear();
this.endRows.clear();
@@ -117,14 +120,13 @@
protected void loadRight() throws MetaMatrixComponentException,
MetaMatrixProcessingException {
this.rightSource.getTupleBuffer();
- int maxRows = this.joinNode.getBatchSize() * getMaxProcessingBatches();
if (processingSortRight == SortOption.SORT
- && this.leftSource.getRowCount() < maxRows
- && this.leftSource.getRowCount() * 4 < this.rightSource.getRowCount())
{
+ && this.leftSource.getRowCount() * 4 < this.rightSource.getRowCount()
+ && testAndSetPartitions(this.rightSource.getRowCount(),
this.rightSource.getSource().getOutputElements())) {
this.processingSortRight = SortOption.PARTITION;
} else if (processingSortLeft == SortOption.SORT
- && this.rightSource.getRowCount() < maxRows
- && this.rightSource.getRowCount() * 4 < this.leftSource.getRowCount())
{
+ && this.rightSource.getRowCount() * 4 < this.leftSource.getRowCount()
+ && testAndSetPartitions(this.leftSource.getRowCount(),
this.leftSource.getSource().getOutputElements())) {
this.processingSortLeft = SortOption.PARTITION;
}
super.loadRight(); //sort right if needed
@@ -140,49 +142,52 @@
this.partitionedSource = this.rightSource;
}
if (this.processingSortLeft == SortOption.PARTITION) {
- partitionSource(true);
+ partitionSource();
}
if (this.processingSortRight == SortOption.PARTITION) {
- partitionSource(false);
- if (this.processingSortRight == SortOption.SORT) {
- //degrade to a merge join
- super.loadRight();
- }
+ partitionSource();
}
}
/**
* Since the source to be partitioned is already loaded, then there's no
- * chance of a blocked exception during partitioning, so double the max.
+ * chance of a blocked exception during partitioning, so reserve some batches.
*
* TODO: partition at the same time as the load to determine size
*
* @return
*/
- private int getMaxProcessingBatches() {
- return 2 * this.joinNode.getBufferManager().getMaxProcessingBatches();
+ private boolean testAndSetPartitions(int rowCount, List elements) {
+ int partitionCount = (rowCount / this.joinNode.getBatchSize() + rowCount %
this.joinNode.getBatchSize() == 0 ? 0:1)
+ * this.joinNode.getBufferManager().getSchemaSize(elements);
+ if (partitionCount > this.joinNode.getBufferManager().getMaxProcessingBatchColumns()
* 8) {
+ return false;
+ }
+ int toReserve = Math.max(1, (int)(partitionCount * .75));
+ int excess = Math.max(0, toReserve -
this.joinNode.getBufferManager().getMaxProcessingBatchColumns());
+ reserved = this.joinNode.getBufferManager().reserveBuffers(toReserve - excess,
BufferReserveMode.FORCE);
+ if (excess > 0) {
+ reserved += this.joinNode.getBufferManager().reserveBuffers(toReserve,
BufferReserveMode.NO_WAIT);
+ }
+ if (reserved == toReserve) {
+ return true;
+ }
+ releaseReserved();
+ return false;
}
- private void partitionSource(boolean left) throws MetaMatrixComponentException,
+ private void partitionSource() throws MetaMatrixComponentException,
MetaMatrixProcessingException {
if (partitioned) {
return;
}
- if (endTuples.length > getMaxProcessingBatches() + 1) {
- if (left) {
- this.processingSortLeft = SortOption.SORT;
- } else {
- this.processingSortRight = SortOption.SORT;
- }
- return;
- }
if (endTuples.length < 2) {
partitions.add(this.partitionedSource.getTupleBuffer());
} else {
if (partitions.isEmpty()) {
for (int i = 0; i < endTuples.length; i++) {
TupleBuffer tc = this.partitionedSource.createSourceTupleBuffer();
- tc.setBatchSize(Math.max(1, this.joinNode.getBatchSize()));
+ tc.setForwardOnly(true);
this.partitions.add(tc);
}
}
@@ -204,18 +209,24 @@
for (TupleBuffer partition : this.partitions) {
partition.close();
}
+ releaseReserved();
}
partitioned = true;
}
+
+ private void releaseReserved() {
+ this.joinNode.getBufferManager().releaseBuffers(this.reserved);
+ this.reserved = 0;
+ }
@Override
- protected List nextTuple() throws MetaMatrixComponentException,
+ protected void process() throws MetaMatrixComponentException,
CriteriaEvaluationException, MetaMatrixProcessingException {
if (this.processingSortLeft != SortOption.PARTITION &&
this.processingSortRight != SortOption.PARTITION) {
- return super.nextTuple();
+ super.process();
}
if (endRows.isEmpty()) {
- return null; //no rows on the sorted side
+ return; //no rows on the sorted side
}
while (currentPartition < partitions.size()) {
if (currentSource == null) {
@@ -260,7 +271,7 @@
boolean matches = this.joinNode.matchesCriteria(outputTuple);
matchBegin++;
if (matches) {
- return outputTuple;
+ this.joinNode.addBatchRow(outputTuple);
}
}
matchBegin = -1;
@@ -271,7 +282,6 @@
currentSource = null;
currentPartition++;
}
- return null;
}
public int binarySearch(List<?> tuple, List[] tuples, int[] leftIndexes, int[]
rightIndexes) {
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/PlanExecutionNode.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/PlanExecutionNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/PlanExecutionNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -30,10 +30,13 @@
import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.common.buffer.BlockedException;
import com.metamatrix.common.buffer.TupleBatch;
+import com.metamatrix.common.log.LogManager;
+import com.metamatrix.dqp.util.LogConstants;
import com.metamatrix.query.processor.ProcessorPlan;
import com.metamatrix.query.sql.util.VariableContext;
import com.metamatrix.query.util.CommandContext;
+//TODO: consolidate with QueryProcessor
public class PlanExecutionNode extends RelationalNode {
// Initialization state
@@ -133,11 +136,12 @@
return false;
}
- public void close() throws MetaMatrixComponentException {
- if (!isClosed()) {
- super.close();
- plan.close();
- }
+ public void closeDirect() {
+ try {
+ plan.close();
+ } catch (MetaMatrixComponentException e1){
+ LogManager.logDetail(LogConstants.CTX_DQP, e1, "Error closing processor");
//$NON-NLS-1$
+ }
}
protected void getNodeString(StringBuffer str) {
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/ProjectIntoNode.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/ProjectIntoNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/ProjectIntoNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -191,7 +191,7 @@
tupleSource = getDataManager().registerRequest(this.getContext().getProcessorID(),
command, this.modelName, null, getID());
}
- private void closeRequest() throws MetaMatrixComponentException {
+ private void closeRequest() {
if (this.tupleSource != null) {
tupleSource.closeSource();
@@ -253,8 +253,7 @@
return intoGroup.isTempGroupSymbol();
}
- public void close() throws MetaMatrixComponentException {
+ public void closeDirect() {
closeRequest();
- super.close();
}
}
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/RelationalNode.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/RelationalNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/RelationalNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -252,17 +252,16 @@
* @throws MetaMatrixComponentException
* @since 4.2
*/
- public TupleBatch nextBatch() throws BlockedException, MetaMatrixComponentException,
MetaMatrixProcessingException {
+ public final TupleBatch nextBatch() throws BlockedException,
MetaMatrixComponentException, MetaMatrixProcessingException {
boolean recordStats = this.context != null &&
(this.context.getCollectNodeStatistics() || this.context.getProcessDebug());
- TupleBatch batch = null;
try {
while (true) {
//start timer for this batch
if(recordStats && this.context.getCollectNodeStatistics()) {
this.nodeStatistics.startBatchTimer();
}
- batch = nextBatchDirect();
+ TupleBatch batch = nextBatchDirect();
if (recordStats) {
if(this.context.getCollectNodeStatistics()) {
// stop timer for this batch (normal)
@@ -279,15 +278,17 @@
//there have been several instances in the code that have not correctly
accounted for non-terminal zero length batches
//this processing style however against the spirit of batch processing
(but was already utilized by Sort and Grouping nodes)
if (batch.getRowCount() != 0 || batch.getTerminationFlag()) {
- break;
+ if (batch.getTerminationFlag()) {
+ close();
+ }
+ return batch;
}
}
- return batch;
} catch (BlockedException e) {
if(recordStats && this.context.getCollectNodeStatistics()) {
// stop timer for this batch (BlockedException)
this.nodeStatistics.stopBatchTimer();
- this.nodeStatistics.collectCumulativeNodeStats(batch,
RelationalNodeStatistics.BLOCKEDEXCEPTION_STOP);
+ this.nodeStatistics.collectCumulativeNodeStats(null,
RelationalNodeStatistics.BLOCKEDEXCEPTION_STOP);
}
throw e;
} catch (MetaMatrixComponentException e) {
@@ -310,10 +311,11 @@
protected abstract TupleBatch nextBatchDirect()
throws BlockedException, MetaMatrixComponentException, MetaMatrixProcessingException;
- public void close()
+ public final void close()
throws MetaMatrixComponentException {
if (!this.closed) {
+ closeDirect();
for(int i=0; i<children.length; i++) {
if(children[i] != null) {
children[i].close();
@@ -324,6 +326,10 @@
this.closed = true;
}
}
+
+ public void closeDirect() {
+
+ }
/**
* Check if the node has been already closed
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -126,14 +126,11 @@
return this.pullBatch();
}
- public void close() throws MetaMatrixComponentException {
- if (!isClosed()) {
- super.close();
- if(this.output != null) {
- this.output.remove();
- this.output = null;
- this.outputTs = null;
- }
+ public void closeDirect() {
+ if(this.output != null) {
+ this.output.remove();
+ this.output = null;
+ this.outputTs = null;
}
}
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -36,6 +36,7 @@
import com.metamatrix.common.buffer.IndexedTupleSource;
import com.metamatrix.common.buffer.TupleBuffer;
import com.metamatrix.common.buffer.TupleSource;
+import com.metamatrix.common.buffer.BufferManager.BufferReserveMode;
import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.core.log.MessageLevel;
@@ -87,6 +88,7 @@
private BufferManager bufferManager;
private String groupName;
private List<SingleElementSymbol> schema;
+ private int schemaSize;
private ListNestedSortComparator comparator;
private TupleBuffer output;
@@ -110,6 +112,7 @@
this.bufferManager = bufferMgr;
this.groupName = groupName;
this.schema = this.source.getSchema();
+ this.schemaSize = bufferManager.getSchemaSize(this.schema);
int distinctIndex = sortElements != null? sortElements.size() - 1:0;
if (mode != Mode.SORT) {
if (sortElements == null) {
@@ -175,26 +178,22 @@
workingTuples = new TreeSet<List<?>>(comparator);
}
}
+
int totalReservedBuffers = 0;
try {
- int maxRows = bufferManager.getMaxProcessingBatches() *
bufferManager.getProcessorBatchSize();
+ int maxRows = this.bufferManager.getProcessorBatchSize();
while(!doneReading) {
//attempt to reserve more working memory if there are additional rows
available before blocking
- if (workingTuples.size() == maxRows) {
- if (source.available() < 1) {
+ if (workingTuples.size() >= maxRows) {
+ int reserved = bufferManager.reserveBuffers(schemaSize,
+ (totalReservedBuffers + schemaSize <=
bufferManager.getMaxProcessingBatchColumns())?BufferReserveMode.FORCE:BufferReserveMode.NO_WAIT);
+ if (reserved != schemaSize) {
break;
- }
- int reserved = bufferManager.reserveBuffers(1, false);
- if (reserved == 0) {
- break;
}
- totalReservedBuffers += 1;
+ totalReservedBuffers += reserved;
maxRows += bufferManager.getProcessorBatchSize();
}
try {
- if (totalReservedBuffers > 0 && source.available() == 0) {
- break;
- }
List<?> tuple = source.nextTuple();
if (tuple == null) {
@@ -205,6 +204,9 @@
this.collected++;
}
} catch(BlockedException e) {
+ if (workingTuples.size() >= bufferManager.getProcessorBatchSize()) {
+ break;
+ }
if (mode != Mode.DUP_REMOVE
|| (this.output != null && collected <
this.output.getRowCount() * 2)
|| (this.output == null && this.workingTuples.isEmpty()
&& this.activeTupleBuffers.isEmpty())) {
@@ -231,7 +233,7 @@
sublist.saveBatch();
} finally {
- bufferManager.releaseBuffers(totalReservedBuffers);
+ bufferManager.releaseBuffers(totalReservedBuffers);
}
}
@@ -248,12 +250,17 @@
TupleBuffer merged = createTupleBuffer();
- int maxSortIndex = Math.min(activeTupleBuffers.size(),
this.bufferManager.getMaxProcessingBatches());
- int reservedBuffers = 0;
- if (activeTupleBuffers.size() > maxSortIndex) {
- reservedBuffers = bufferManager.reserveBuffers(activeTupleBuffers.size() -
maxSortIndex, true);
+ int desiredSpace = activeTupleBuffers.size() * schemaSize;
+ int reserved = Math.min(desiredSpace,
this.bufferManager.getMaxProcessingBatchColumns());
+ bufferManager.reserveBuffers(reserved, BufferReserveMode.FORCE);
+ if (desiredSpace > reserved) {
+ reserved += bufferManager.reserveBuffers(desiredSpace - reserved,
BufferReserveMode.WAIT);
}
- maxSortIndex += reservedBuffers;
+ int maxSortIndex = Math.max(2, reserved / schemaSize); //always allow
progress
+ //release any partial excess
+ int release = reserved % schemaSize > 0 ? 1 : 0;
+ bufferManager.releaseBuffers(release);
+ reserved -= release;
try {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE))
{
LogManager.logTrace(LogConstants.CTX_DQP, "Merging",
maxSortIndex, "sublists out of", activeTupleBuffers.size()); //$NON-NLS-1$
//$NON-NLS-2$
@@ -294,16 +301,21 @@
masterSortIndex = this.activeTupleBuffers.size() - 1;
}
} finally {
- this.bufferManager.releaseBuffers(reservedBuffers);
+ this.bufferManager.releaseBuffers(reserved);
}
}
// Close sorted source (all others have been removed)
if (doneReading) {
- activeTupleBuffers.get(0).close();
- activeTupleBuffers.get(0).setForwardOnly(false);
if (this.output != null) {
this.output.close();
+ TupleBuffer last = activeTupleBuffers.remove(0);
+ if (output != last) {
+ last.remove();
+ }
+ } else {
+ activeTupleBuffers.get(0).close();
+ activeTupleBuffers.get(0).setForwardOnly(false);
}
this.phase = DONE;
return;
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SourceState.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SourceState.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SourceState.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -29,6 +29,7 @@
import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.common.buffer.IndexedTupleSource;
import com.metamatrix.common.buffer.TupleBuffer;
+import com.metamatrix.common.buffer.TupleSource;
import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
import com.metamatrix.query.processor.BatchCollector;
import com.metamatrix.query.processor.BatchIterator;
@@ -64,6 +65,10 @@
this.expressionIndexes = getExpressionIndecies(expressions, elements);
}
+ public RelationalNode getSource() {
+ return source;
+ }
+
public void setImplicitBuffer(ImplicitBuffer implicitBuffer) {
this.implicitBuffer = implicitBuffer;
}
@@ -101,10 +106,7 @@
this.buffer = null;
}
if (this.iterator != null) {
- try {
- this.iterator.closeSource();
- } catch (MetaMatrixComponentException e) {
- }
+ this.iterator.closeSource();
this.iterator = null;
}
}
@@ -173,9 +175,15 @@
public void sort(SortOption sortOption) throws MetaMatrixComponentException,
MetaMatrixProcessingException {
if (sortOption == SortOption.SORT || sortOption == SortOption.SORT_DISTINCT) {
if (this.sortUtility == null) {
- this.sortUtility = new SortUtility(this.buffer != null ?
this.buffer.createIndexedTupleSource() : new BatchIterator(this.source),
- expressions,
Collections.nCopies(expressions.size(), OrderBy.ASC), sortOption ==
SortOption.SORT_DISTINCT?Mode.DUP_REMOVE_SORT:Mode.SORT,
- this.source.getBufferManager(),
this.source.getConnectionID());
+ TupleSource ts = null;
+ if (this.buffer != null) {
+ this.buffer.setForwardOnly(true);
+ ts = this.buffer.createIndexedTupleSource();
+ } else {
+ ts = new BatchIterator(this.source);
+ }
+ this.sortUtility = new SortUtility(ts, expressions,
Collections.nCopies(expressions.size(), OrderBy.ASC),
+ sortOption == SortOption.SORT_DISTINCT?Mode.DUP_REMOVE_SORT:Mode.SORT,
this.source.getBufferManager(), this.source.getConnectionID());
this.markDistinct(sortOption == SortOption.SORT_DISTINCT &&
expressions.size() == this.getOuterVals().size());
}
this.buffer = sortUtility.sort();
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SubqueryAwareEvaluator.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SubqueryAwareEvaluator.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SubqueryAwareEvaluator.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -56,7 +56,7 @@
List<?> tuple;
ProcessorPlan plan;
- void close() throws MetaMatrixComponentException {
+ void close() {
if (processor == null) {
return;
}
@@ -85,7 +85,7 @@
}
}
- public void close() throws MetaMatrixComponentException {
+ public void close() {
for (SubqueryState state : subqueries.values()) {
state.close();
}
@@ -123,9 +123,9 @@
state.collector = state.processor.createBatchCollector();
}
state.done = true;
- state.processor.getProcessorPlan().reset();
+ state.plan.reset();
}
- return new DependentValueSource(state.collector.collectTuples(),
this.manager.getProcessorBatchSize() / 2).getValueIterator(ref.getValueExpression());
+ return new
DependentValueSource(state.collector.collectTuples()).getValueIterator(ref.getValueExpression());
}
}
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SubqueryAwareRelationalNode.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SubqueryAwareRelationalNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/SubqueryAwareRelationalNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -2,7 +2,6 @@
import java.util.Map;
-import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.query.eval.Evaluator;
public abstract class SubqueryAwareRelationalNode extends RelationalNode {
@@ -31,8 +30,7 @@
}
@Override
- public void close() throws MetaMatrixComponentException {
- super.close();
+ public void closeDirect() {
if (evaluator != null) {
evaluator.close();
}
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/UnionAllNode.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/UnionAllNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/processor/relational/UnionAllNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -28,13 +28,19 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
import com.metamatrix.common.buffer.BlockedException;
+import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.buffer.TupleBatch;
+import com.metamatrix.common.buffer.BufferManager.BufferReserveMode;
+import com.metamatrix.query.processor.ProcessorDataManager;
+import com.metamatrix.query.util.CommandContext;
public class UnionAllNode extends RelationalNode {
private boolean[] sourceDone;
private int outputRow = 1;
+ private int reserved;
+ private int schemaSize;
public UnionAllNode(int nodeID) {
super(nodeID);
@@ -47,12 +53,21 @@
outputRow = 1;
}
+ @Override
+ public void initialize(CommandContext context, BufferManager bufferManager,
+ ProcessorDataManager dataMgr) {
+ super.initialize(context, bufferManager, dataMgr);
+ this.schemaSize = getBufferManager().getSchemaSize(getOutputElements());
+ }
+
public void open()
throws MetaMatrixComponentException, MetaMatrixProcessingException {
// Initialize done flags
sourceDone = new boolean[getChildren().length];
-
+ if (reserved == 0) {
+ reserved = getBufferManager().reserveBuffers((getChildren().length - 1) *
schemaSize, BufferReserveMode.FORCE);
+ }
// Open the children
super.open();
}
@@ -79,6 +94,10 @@
// Mark source as being done and decrement the activeSources
counter
sourceDone[i] = true;
activeSources--;
+ if (reserved > 0) {
+ getBufferManager().releaseBuffers(schemaSize);
+ reserved-=schemaSize;
+ }
}
} catch(BlockedException e) {
if(i<children.length-1 &&
hasDependentProcedureExecutionNode(children[0])){
@@ -122,6 +141,12 @@
return outputBatch;
}
+
+ @Override
+ public void closeDirect() {
+ getBufferManager().releaseBuffers(reserved);
+ reserved = 0;
+ }
public Object clone(){
UnionAllNode clonedNode = new UnionAllNode(super.getID());
Modified: branches/JCA/engine/src/main/java/com/metamatrix/query/sql/lang/JoinType.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/sql/lang/JoinType.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/sql/lang/JoinType.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -36,38 +36,40 @@
// Constants defining join type - users will construct these
/** Represents an inner join: a INNER JOIN b */
- public static final JoinType JOIN_INNER = new JoinType(0);
+ public static final JoinType JOIN_INNER = new JoinType(0, false);
/** Represents a right outer join: a RIGHT OUTER JOIN b */
- public static final JoinType JOIN_RIGHT_OUTER = new JoinType(1);
+ public static final JoinType JOIN_RIGHT_OUTER = new JoinType(1, true);
/** Represents a left outer join: a LEFT OUTER JOIN b */
- public static final JoinType JOIN_LEFT_OUTER = new JoinType(2);
+ public static final JoinType JOIN_LEFT_OUTER = new JoinType(2, true);
/** Represents a full outer join: a FULL OUTER JOIN b */
- public static final JoinType JOIN_FULL_OUTER = new JoinType(3);
+ public static final JoinType JOIN_FULL_OUTER = new JoinType(3, true);
/** Represents a cross join: a CROSS JOIN b */
- public static final JoinType JOIN_CROSS = new JoinType(4);
+ public static final JoinType JOIN_CROSS = new JoinType(4, false);
/** Represents a union join: a UNION JOIN b - not used after rewrite */
- public static final JoinType JOIN_UNION = new JoinType(5);
+ public static final JoinType JOIN_UNION = new JoinType(5, true);
/** internal SEMI Join type */
- public static final JoinType JOIN_SEMI = new JoinType(6);
+ public static final JoinType JOIN_SEMI = new JoinType(6, false);
/** internal ANTI SEMI Join type */
- public static final JoinType JOIN_ANTI_SEMI = new JoinType(7);
+ public static final JoinType JOIN_ANTI_SEMI = new JoinType(7, true);
private int type;
+ private boolean outer;
/**
* Construct a join type object. This is private and is only called by
* the static constant objects in this class.
* @param type Type code for object
*/
- private JoinType(int type) {
+ private JoinType(int type, boolean outer) {
this.type = type;
+ this.outer = outer;
}
/**
@@ -97,7 +99,7 @@
* @return True if left/right/full outer, false if inner/cross
*/
public boolean isOuter() {
- return this.equals(JOIN_LEFT_OUTER) || this.equals(JOIN_FULL_OUTER) ||
this.equals(JOIN_RIGHT_OUTER) || this.equals(JOIN_ANTI_SEMI);
+ return outer;
}
public boolean isSemi() {
Modified: branches/JCA/engine/src/main/java/com/metamatrix/query/sql/lang/SetQuery.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/sql/lang/SetQuery.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/sql/lang/SetQuery.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -181,6 +181,10 @@
if(this.getLimit() != null) {
copy.setLimit( (Limit) this.getLimit().clone() );
}
+
+ if (this.projectedTypes != null) {
+ copy.setProjectedTypes(new ArrayList<Class<?>>(projectedTypes),
this.metadata);
+ }
return copy;
}
Modified:
branches/JCA/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java
===================================================================
---
branches/JCA/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -77,6 +77,7 @@
private final Criteria crit;
protected int updateCount = 0;
private boolean done;
+ private List<?> currentTuple;
private UpdateTupleSource(String groupKey, TupleBuffer tsId, Criteria crit) throws
MetaMatrixComponentException {
this.groupKey = groupKey;
@@ -96,14 +97,14 @@
return null;
}
- List<?> tuple = null;
//still have to worry about blocked exceptions...
- while ((tuple = ts.nextTuple()) != null) {
- if (eval.evaluate(crit, tuple)) {
- tuplePassed(tuple);
+ while (currentTuple != null || (currentTuple = ts.nextTuple()) != null) {
+ if (eval.evaluate(crit, currentTuple)) {
+ tuplePassed(currentTuple);
} else {
- tupleFailed(tuple);
+ tupleFailed(currentTuple);
}
+ currentTuple = null;
}
newBuffer.close();
groupToTupleSourceID.put(groupKey, newBuffer);
@@ -126,7 +127,7 @@
}
@Override
- public void closeSource() throws MetaMatrixComponentException {
+ public void closeSource() {
}
@@ -270,20 +271,12 @@
}
//allow implicit temp group definition
List columns = null;
- switch (command.getType()) {
- case Command.TYPE_QUERY:
- Query query = (Query)command;
- if(query.getInto() != null &&
query.getInto().getGroup().isImplicitTempGroupSymbol()) {
- columns = query.getSelect().getSymbols();
- }
- break;
- case Command.TYPE_INSERT:
+ if (command instanceof Insert) {
Insert insert = (Insert)command;
GroupSymbol group = insert.getGroup();
if(group.isImplicitTempGroupSymbol()) {
columns = insert.getVariables();
}
- break;
}
if (columns == null) {
throw new
QueryProcessingException(QueryExecPlugin.Util.getString("TempTableStore.table_doesnt_exist_error",
tempTableID)); //$NON-NLS-1$
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.resource.spi.work.WorkEvent;
@@ -102,7 +103,7 @@
protected RequestState requestState = RequestState.NEW;
- private volatile boolean isCancelled;
+ private AtomicBoolean isCancelled = new AtomicBoolean();
private volatile boolean moreRequested;
private volatile boolean closeRequested;
private boolean isClosed;
@@ -191,7 +192,7 @@
}
private void checkForCloseEvent() throws NeedsClosedException {
- if (this.isCancelled || this.closeRequested) {
+ if (this.isCancelled.get() || this.closeRequested) {
throw new NeedsClosedException();
}
}
@@ -232,7 +233,7 @@
manager.logSRCCommand(this.requestMsg, this.securityContext,
CommandLogMessage.CMD_STATUS_ERROR, -1);
String msg = DQPPlugin.Util.getString("ConnectorWorker.process_failed",
this.id); //$NON-NLS-1$
- if (isCancelled) {
+ if (isCancelled.get()) {
LogManager.logDetail(LogConstants.CTX_CONNECTOR, msg);
} else if (t instanceof ConnectorException || t instanceof
MetaMatrixProcessingException) {
LogManager.logWarning(LogConstants.CTX_CONNECTOR, t, msg);
@@ -508,8 +509,7 @@
}
void asynchCancel() throws ConnectorException {
- if (!this.isCancelled) {
- this.isCancelled = true;
+ if (this.isCancelled.compareAndSet(false, true)) {
if(execution != null) {
execution.cancel();
}
@@ -518,7 +518,7 @@
}
boolean isCancelled() {
- return this.isCancelled;
+ return this.isCancelled.get();
}
@Override
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/SynchConnectorWorkItem.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -71,7 +71,9 @@
@Override
protected void pauseProcessing() {
try {
- this.wait();
+ while (isIdle()) {
+ this.wait();
+ }
} catch (InterruptedException e) {
interrupted(e);
}
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/AbstractWorkItem.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -54,7 +54,7 @@
}
}
- ThreadState getThreadState() {
+ synchronized ThreadState getThreadState() {
return this.threadState;
}
@@ -87,10 +87,14 @@
}
break;
default:
- throw new IllegalStateException("Should not END on IDLE or DONE");
//$NON-NLS-1$
+ throw new IllegalStateException("Should not END on " +
this.threadState); //$NON-NLS-1$
}
}
+ protected boolean isIdle() {
+ return this.threadState == ThreadState.IDLE;
+ }
+
protected void moreWork() {
moreWork(true);
}
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/CodeTableCache.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/CodeTableCache.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/CodeTableCache.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -37,6 +37,7 @@
import com.metamatrix.dqp.embedded.DQPEmbeddedProperties;
import com.metamatrix.dqp.util.LogConstants;
import com.metamatrix.query.util.CommandContext;
+import com.metamatrix.vdb.runtime.VDBKey;
/**
* Code table cache. Heavily synchronized in-memory cache of code tables. There is no
purging policy for this cache. Once the limits have been reached exceptions will occur.
@@ -189,32 +190,13 @@
}
return table.codeMap.get(keyValue);
}
-
- /**
- * Places the lookup results in the cache and marks the cache loaded
- * @param requestID
- * @param nodeID
- * @return the set of waiting requests
- * @since 4.2
- */
- public Set<Object> markCacheLoaded(CacheKey requestKey) {
- return markCacheDone(requestKey, false);
- }
-
- /**
- * Notifies the CodeTableCache that this code table had an error. Removes any existing
cached results and clears any state
- * for this CacheKey.
- * @param requestID
- * @param nodeID
- * @return the set of waiting requests
- * @since 4.2
- */
- public Set<Object> errorLoadingCache(CacheKey requestKey) {
- return markCacheDone(requestKey, true);
- }
- private synchronized Set<Object> markCacheDone(CacheKey cacheKey, boolean
errorOccurred) {
- if (errorOccurred) {
+ /**
+ * Notifies the CodeTableCache that this code is done. If the table had an error, it
removes any temporary results.
+ * @return the set of waiting requests
+ */
+ public synchronized Set<Object> markCacheDone(CacheKey cacheKey, boolean
success) {
+ if (!success) {
// Remove any results already cached
CodeTable table = codeTableCache.remove(cacheKey);
if (table != null) {
@@ -224,7 +206,7 @@
}
CodeTable table = codeTableCache.get(cacheKey);
if (table == null || table.codeMap == null) {
- return null;
+ return null; //can only happen if cache was cleared between load and now
}
rowCount += table.codeMap.size();
Set<Object> waiting = table.waitingRequests;
@@ -264,8 +246,7 @@
private String codeTable;
private String returnElement;
private String keyElement;
- private String vdbName;
- private int vdbVersion;
+ private VDBKey vdbKey;
private int hashCode;
@@ -273,15 +254,11 @@
this.codeTable = codeTable;
this.returnElement = returnElement;
this.keyElement = keyElement;
- this.vdbName = vdbName;
- this.vdbVersion = vdbVersion;
+ this.vdbKey = new VDBKey(vdbName, vdbVersion);
// Compute hash code and cache it
- hashCode = HashCodeUtil.hashCode(0, codeTable);
- hashCode = HashCodeUtil.hashCode(hashCode, returnElement);
- hashCode = HashCodeUtil.hashCode(hashCode, keyElement);
- hashCode = HashCodeUtil.hashCode(hashCode, vdbName);
- hashCode = HashCodeUtil.hashCode(hashCode, vdbVersion);
+ hashCode = HashCodeUtil.hashCode(codeTable.toUpperCase().hashCode(),
returnElement.toUpperCase(),
+ keyElement.toUpperCase(), vdbKey);
}
public String getCodeTable() {
@@ -304,11 +281,10 @@
CacheKey other = (CacheKey) obj;
return (other.hashCode() == hashCode() &&
- this.codeTable.equals(other.codeTable) &&
- this.returnElement.equals(other.returnElement) &&
- this.keyElement.equals(other.keyElement) &&
- this.vdbName.equals(other.vdbName) &&
- this.vdbVersion ==other.vdbVersion);
+ this.codeTable.equalsIgnoreCase(other.codeTable) &&
+ this.returnElement.equalsIgnoreCase(other.returnElement) &&
+ this.keyElement.equalsIgnoreCase(other.keyElement) &&
+ this.vdbKey.equals(other.vdbKey));
}
return false;
}
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierManagerImpl.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -440,12 +440,7 @@
}
success = true;
} finally {
- Collection requests = null;
- if (success) {
- requests = codeTableCache.markCacheLoaded(codeRequestId);
- } else {
- requests = codeTableCache.errorLoadingCache(codeRequestId);
- }
+ Collection requests = codeTableCache.markCacheDone(codeRequestId, success);
notifyWaitingCodeTableRequests(requests);
}
}
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -148,7 +148,7 @@
}
}
- public void fullyCloseSource() throws MetaMatrixComponentException {
+ public void fullyCloseSource() {
this.dataMgr.closeRequest(aqr.getAtomicRequestID(), this.connectorName);
}
@@ -159,7 +159,7 @@
/**
* @see TupleSource#closeSource()
*/
- public void closeSource() throws MetaMatrixComponentException {
+ public void closeSource() {
if (this.supportsImplicitClose) {
this.dataMgr.closeRequest(aqr.getAtomicRequestID(), this.connectorName);
}
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -273,14 +273,8 @@
protected void attemptClose() {
int rowcount = -1;
if (this.resultsBuffer != null) {
- try {
- if (this.processor != null) {
- this.processor.closeProcessing();
- }
- } catch (MetaMatrixComponentException e) {
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, e, e.getMessage());
- }
+ if (this.processor != null) {
+ this.processor.closeProcessing();
}
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
@@ -301,13 +295,7 @@
}
for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
- try {
- connectorRequest.fullyCloseSource();
- } catch (MetaMatrixComponentException e) {
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, e, e.getMessage());
- }
- }
+ connectorRequest.fullyCloseSource();
}
this.resultsBuffer = null;
@@ -374,16 +362,16 @@
if (this.transactionContext != null && this.transactionContext.getXid() !=
null) {
this.transactionState = TransactionState.ACTIVE;
}
+ if (analysisRecord.recordQueryPlan()) {
+
analysisRecord.setQueryPlan(processor.getProcessorPlan().getDescriptionProperties());
+ }
Option option = originalCommand.getOption();
if (option != null && option.getPlanOnly()) {
doneProducingBatches = true;
resultsBuffer.close();
this.cid = null;
+ this.processor = null;
}
-
- if (analysisRecord.recordQueryPlan()) {
-
analysisRecord.setQueryPlan(processor.getProcessorPlan().getDescriptionProperties());
- }
this.returnsUpdateCount = request.returnsUpdateCount;
request = null;
}
Modified:
branches/JCA/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java
===================================================================
---
branches/JCA/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/test/java/com/metamatrix/query/optimizer/TestOptimizer.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -40,6 +40,7 @@
import com.metamatrix.api.exception.query.QueryPlannerException;
import com.metamatrix.api.exception.query.QueryResolverException;
import com.metamatrix.api.exception.query.QueryValidatorException;
+import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.types.DataTypeManager;
import com.metamatrix.core.MetaMatrixRuntimeException;
import com.metamatrix.query.analysis.AnalysisRecord;
@@ -3361,11 +3362,11 @@
FakeMetadataFacade metadata = FakeMetadataFactory.example1();
FakeMetadataObject g1 = metadata.getStore().findObject("pm1.g1",
FakeMetadataObject.GROUP); //$NON-NLS-1$
- g1.putProperty(FakeMetadataObject.Props.CARDINALITY, new
Integer(RuleChooseDependent.DEFAULT_INDEPENDENT_CARDINALITY + 500));
+ g1.putProperty(FakeMetadataObject.Props.CARDINALITY, new
Integer(RuleChooseDependent.DEFAULT_INDEPENDENT_CARDINALITY +
BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE / 4));
FakeMetadataObject g2 = metadata.getStore().findObject("pm1.g2",
FakeMetadataObject.GROUP); //$NON-NLS-1$
- g2.putProperty(FakeMetadataObject.Props.CARDINALITY, new
Integer(RuleChooseDependent.DEFAULT_INDEPENDENT_CARDINALITY + 1000));
+ g2.putProperty(FakeMetadataObject.Props.CARDINALITY, new
Integer(RuleChooseDependent.DEFAULT_INDEPENDENT_CARDINALITY +
BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE));
FakeMetadataObject g3 = metadata.getStore().findObject("pm1.g3",
FakeMetadataObject.GROUP); //$NON-NLS-1$
- g3.putProperty(FakeMetadataObject.Props.CARDINALITY, new
Integer(RuleChooseDependent.DEFAULT_INDEPENDENT_CARDINALITY + 1000));
+ g3.putProperty(FakeMetadataObject.Props.CARDINALITY, new
Integer(RuleChooseDependent.DEFAULT_INDEPENDENT_CARDINALITY +
BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE));
ProcessorPlan plan = helpPlan(sql, metadata,
null, capFinder,
Modified:
branches/JCA/engine/src/test/java/com/metamatrix/query/optimizer/TestPartitionedJoinPlanning.java
===================================================================
---
branches/JCA/engine/src/test/java/com/metamatrix/query/optimizer/TestPartitionedJoinPlanning.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/test/java/com/metamatrix/query/optimizer/TestPartitionedJoinPlanning.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -27,6 +27,7 @@
import org.junit.Test;
+import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.query.optimizer.capabilities.BasicSourceCapabilities;
import com.metamatrix.query.optimizer.capabilities.FakeCapabilitiesFinder;
import com.metamatrix.query.optimizer.capabilities.SourceCapabilities.Capability;
@@ -47,13 +48,14 @@
caps.setCapabilitySupport(Capability.CRITERIA_COMPARE_EQ, true);
caps.setCapabilitySupport(Capability.QUERY_FROM_GROUP_ALIAS, true);
caps.setCapabilitySupport(Capability.QUERY_ORDERBY, true);
+ caps.setSourceProperty(Capability.MAX_IN_CRITERIA_SIZE, 100);
capFinder.addCapabilities("pm1", caps); //$NON-NLS-1$
FakeMetadataFacade metadata = FakeMetadataFactory.example1();
FakeMetadataObject g1 = metadata.getStore().findObject("pm1.g1",
FakeMetadataObject.GROUP); //$NON-NLS-1$
- g1.putProperty(FakeMetadataObject.Props.CARDINALITY, 600);
+ g1.putProperty(FakeMetadataObject.Props.CARDINALITY,
BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE);
FakeMetadataObject g2 = metadata.getStore().findObject("pm1.g2",
FakeMetadataObject.GROUP); //$NON-NLS-1$
- g2.putProperty(FakeMetadataObject.Props.CARDINALITY, 3000);
+ g2.putProperty(FakeMetadataObject.Props.CARDINALITY,
BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE * 16);
ProcessorPlan plan = helpPlan(sql, metadata,
null, capFinder,
Modified:
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java
===================================================================
---
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -105,11 +105,7 @@
return null;
}
- public void closeSource()
- throws MetaMatrixComponentException {
- if (exceptionOnClose) {
- throw new FakeComponentException();
- }
+ public void closeSource() {
}
public void setBlockOnce(){
Modified:
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java
===================================================================
---
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -7573,6 +7573,20 @@
helpProcess(plan, dataManager, expected);
}
+
+ @Test public void testImplicitAggregateWithInlineView() {
+ String sql = "SELECT * FROM (SELECT b.count, enterprise_id FROM (SELECT
COUNT(*), 2 AS enterprise_id FROM (SELECT 'A Name' AS Name, 1 AS enterprise_id) c
) b ) a WHERE enterprise_id = 1"; //$NON-NLS-1$
+
+ List[] expected = new List[] {};
+
+ FakeDataManager dataManager = new FakeDataManager();
+ sampleData1(dataManager);
+
+ ProcessorPlan plan = helpGetPlan(helpParse(sql),
FakeMetadataFactory.example1Cached());
+
+ helpProcess(plan, dataManager, expected);
+ }
+
private static final boolean DEBUG = false;
}
Modified:
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java
===================================================================
---
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -231,7 +231,7 @@
private FakeTupleSource(int numCommands) {
this.numCommands = numCommands;
}
- public void closeSource() throws MetaMatrixComponentException {}
+ public void closeSource() {}
public List getSchema() {return null;}
public List nextTuple() throws MetaMatrixComponentException {
if (first) {
Modified:
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java
===================================================================
---
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -36,7 +36,6 @@
import com.metamatrix.common.buffer.TupleSource;
import com.metamatrix.query.processor.FakeTupleSource;
import com.metamatrix.query.processor.ProcessorDataManager;
-import com.metamatrix.query.processor.FakeTupleSource.FakeComponentException;
import com.metamatrix.query.sql.lang.BatchedUpdateCommand;
import com.metamatrix.query.sql.lang.Command;
import com.metamatrix.query.sql.lang.Insert;
@@ -120,24 +119,6 @@
helpTestNextBatch(20, true, false, false);
}
- public void testNextBatch_ExceptionOnClose() throws Exception {
- try {
- helpTestNextBatch(100, true, false, true);
- fail("expected exception"); //$NON-NLS-1$
- } catch (FakeComponentException e) {
- //expected
- }
- }
-
- public void testNextBatch_ExceptionOnClose1() throws Exception {
- try {
- helpTestNextBatch(100, false, false, true);
- fail("expected exception"); //$NON-NLS-1$
- } catch (FakeComponentException e) {
- //expected
- }
- }
-
private static final class FakePDM implements ProcessorDataManager {
private int expectedBatchSize;
private int callCount = 0;
@@ -205,7 +186,7 @@
private FakeDataTupleSource(int rows) {
this.rows = rows;
}
- public void closeSource() throws MetaMatrixComponentException {}
+ public void closeSource() {}
public List getSchema() {return null;}
public List nextTuple() throws MetaMatrixComponentException {
if (currentRow % 100 == 0 && block) {
Modified:
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/xml/TestXMLPlanningEnhancements.java
===================================================================
---
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/xml/TestXMLPlanningEnhancements.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/test/java/com/metamatrix/query/processor/xml/TestXMLPlanningEnhancements.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -28,6 +28,7 @@
import junit.framework.TestCase;
+import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.types.DataTypeManager;
import com.metamatrix.dqp.message.ParameterInfo;
import com.metamatrix.query.mapping.relational.QueryNode;
@@ -36,6 +37,7 @@
import com.metamatrix.query.mapping.xml.MappingElement;
import com.metamatrix.query.mapping.xml.MappingNode;
import com.metamatrix.query.optimizer.TestOptimizer;
+import com.metamatrix.query.optimizer.relational.rules.RuleChooseDependent;
import com.metamatrix.query.processor.FakeDataManager;
import com.metamatrix.query.processor.ProcessorPlan;
import com.metamatrix.query.unittest.FakeMetadataFacade;
@@ -483,8 +485,8 @@
FakeMetadataObject suppliers =
metadata.getStore().findObject("stock.suppliers", FakeMetadataObject.GROUP);
//$NON-NLS-1$
// supply the costing information for OrdersC
- orders.putProperty(FakeMetadataObject.Props.CARDINALITY, new Integer(1000));
- suppliers.putProperty(FakeMetadataObject.Props.CARDINALITY, new Integer(10));
+ orders.putProperty(FakeMetadataObject.Props.CARDINALITY,
BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE - 1);
+ suppliers.putProperty(FakeMetadataObject.Props.CARDINALITY,
RuleChooseDependent.DEFAULT_INDEPENDENT_CARDINALITY);
String expectedDoc =
TestXMLProcessor.readFile("TestXMLProcessor-FullSuppliers.xml"); //$NON-NLS-1$
Modified:
branches/JCA/engine/src/test/java/com/metamatrix/query/resolver/TestResolver.java
===================================================================
---
branches/JCA/engine/src/test/java/com/metamatrix/query/resolver/TestResolver.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/test/java/com/metamatrix/query/resolver/TestResolver.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -2085,6 +2085,16 @@
assertEquals(DataTypeManager.DefaultDataClasses.INTEGER,
((SingleElementSymbol)command.getProjectedSymbols().get(0)).getType());
}
+ @Test public void testUnionQueryClone() throws Exception{
+ SetQuery command = (SetQuery)helpResolve("SELECT e2, e3 FROM pm1.g1 UNION
SELECT e3, e2 from pm1.g1"); //$NON-NLS-1$
+
+ assertEquals(DataTypeManager.DefaultDataClasses.INTEGER,
((SingleElementSymbol)command.getProjectedSymbols().get(1)).getType());
+
+ command = (SetQuery)command.clone();
+
+ assertEquals(DataTypeManager.DefaultDataClasses.INTEGER,
((SingleElementSymbol)command.getProjectedSymbols().get(1)).getType());
+ }
+
@Test public void testSelectIntoNoFrom() {
helpResolve("SELECT 'a', 19, {b'true'}, 13.999 INTO
pm1.g1"); //$NON-NLS-1$
}
Modified:
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestCodeTableCache.java
===================================================================
---
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestCodeTableCache.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/engine/src/test/java/org/teiid/dqp/internal/process/TestCodeTableCache.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -73,11 +73,7 @@
} catch (MetaMatrixProcessingException e) {
throw new RuntimeException(e);
}
- if(setDone) {
- ctc.markCacheLoaded(nodeId);
- } else {
- ctc.errorLoadingCache(nodeId);
- }
+ ctc.markCacheDone(nodeId, setDone);
return ctc;
}
@@ -98,7 +94,7 @@
} catch (MetaMatrixProcessingException e) {
throw new RuntimeException(e);
}
- ctc.markCacheLoaded(nodeId);
+ ctc.markCacheDone(nodeId, true);
return ctc;
}
@@ -115,6 +111,10 @@
CacheState actualState = ctc.cacheExists("countrycode", "code",
"country", TEST_CONTEXT); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
assertEquals("Actual cache state doesn't match with expected: ",
CacheState.CACHE_EXISTS, actualState); //$NON-NLS-1$
+
+ //test case insensitive
+ actualState = ctc.cacheExists("countryCODE", "code",
"Country", TEST_CONTEXT); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ assertEquals("Actual cache state doesn't match with expected: ",
CacheState.CACHE_EXISTS, actualState); //$NON-NLS-1$
}
/** state = 1; loading state */
Modified: branches/JCA/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
---
branches/JCA/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -45,6 +45,8 @@
* for file service access.
*/
public class BufferServiceImpl implements BufferService, Serializable {
+ private static final long serialVersionUID = -6217808623863643531L;
+
private static final int DEFAULT_MAX_OPEN_FILES = 256;
// Instance
@@ -58,8 +60,8 @@
private CacheFactory cacheFactory;
private int maxOpenFiles = DEFAULT_MAX_OPEN_FILES;
private long maxFileSize = 2L; // 2GB
- private int maxProcessingBatches = BufferManager.DEFAULT_MAX_PROCESSING_BATCHES;
- private int maxReserveBatches = BufferManager.DEFAULT_RESERVE_BUFFERS;
+ private int maxProcessingBatchesColumns =
BufferManager.DEFAULT_MAX_PROCESSING_BATCHES;
+ private int maxReserveBatchColumns = BufferManager.DEFAULT_RESERVE_BUFFERS;
/**
* Clean the file storage directory on startup
@@ -82,8 +84,8 @@
this.bufferMgr = new BufferManagerImpl();
this.bufferMgr.setConnectorBatchSize(Integer.valueOf(connectorBatchSize));
this.bufferMgr.setProcessorBatchSize(Integer.valueOf(processorBatchSize));
- this.bufferMgr.setMaxReserveBatches(maxReserveBatches);
- this.bufferMgr.setMaxProcessingBatches(maxProcessingBatches);
+ this.bufferMgr.setMaxReserveBatchColumns(this.maxReserveBatchColumns);
+
this.bufferMgr.setMaxProcessingBatchColumns(this.maxProcessingBatchesColumns);
this.bufferMgr.initialize();
@@ -193,11 +195,11 @@
this.maxOpenFiles = maxOpenFiles;
}
- public void setMaxProcessingBatches(int maxProcessingBatches) {
- this.maxProcessingBatches = maxProcessingBatches;
+ public void setMaxReserveBatchColumns(int value) {
+ this.maxReserveBatchColumns = value;
}
- public void setMaxReserveBatches(int maxReserveBatches) {
- this.maxReserveBatches = maxReserveBatches;
- }
+ public void setMaxProcessingBatchesColumns(int value) {
+ this.maxProcessingBatchesColumns = value;
+ }
}
Modified:
branches/JCA/test-integration/common/src/test/java/com/metamatrix/connector/jdbc/extension/TestSQLConversionVisitor.java
===================================================================
---
branches/JCA/test-integration/common/src/test/java/com/metamatrix/connector/jdbc/extension/TestSQLConversionVisitor.java 2010-02-18
20:12:13 UTC (rev 1841)
+++
branches/JCA/test-integration/common/src/test/java/com/metamatrix/connector/jdbc/extension/TestSQLConversionVisitor.java 2010-02-18
20:18:35 UTC (rev 1842)
@@ -432,5 +432,14 @@
input,
output, TRANSLATOR);
}
+
+ @Test public void testNestedSetQuery3() throws Exception {
+ String input = "select part_id id FROM parts UNION (select part_name FROM parts
Union ALL select part_id FROM parts)"; //$NON-NLS-1$
+ String output = "SELECT rtrim(PARTS.PART_ID) AS id FROM PARTS UNION SELECT
PARTS.PART_NAME FROM PARTS UNION SELECT rtrim(PARTS.PART_ID) FROM PARTS";
//$NON-NLS-1$
+
+ TranslationHelper.helpTestVisitor(TranslationHelper.PARTS_VDB,
+ input,
+ output, TRANSLATOR);
+ }
}