Author: shawkins
Date: 2010-01-24 22:26:22 -0500 (Sun, 24 Jan 2010)
New Revision: 1777
Added:
trunk/engine/src/main/java/com/metamatrix/common/buffer/BatchManager.java
Removed:
trunk/engine/src/main/java/com/metamatrix/common/buffer/ManagedBatch.java
Modified:
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java
trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMDataSource.java
trunk/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java
trunk/client/src/main/java/com/metamatrix/dqp/message/RequestMessage.java
trunk/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java
trunk/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/JDBCBaseExecution.java
trunk/connectors/connector-jdbc/src/main/resources/connector-jdbc.xml
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBatch.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/FileStorageManager.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryStorageManager.java
trunk/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java
trunk/engine/src/main/java/com/metamatrix/query/processor/CollectionTupleSource.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java
trunk/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java
trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java
trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
Log:
TEIID-913 introducing a forward only buffermanager and taking steps to reduce memory
consumption, including a canonical value map and streaming of batches to and from disk.
Modified: trunk/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java 2010-01-22
22:49:05 UTC (rev 1776)
+++ trunk/client/src/main/java/com/metamatrix/common/batch/BatchSerializer.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -151,7 +151,7 @@
readIsNullData(in, isNull);
for (int i = 0; i < batch.length; i++) {
if (!isNullObject(isNull, i)) {
- batch[i].set(col, readObject(in));
+ batch[i].set(col,
DataTypeManager.getCanonicalValue(readObject(in)));
}
}
}
Modified: trunk/client/src/main/java/com/metamatrix/dqp/message/RequestMessage.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/dqp/message/RequestMessage.java 2010-01-22
22:49:05 UTC (rev 1776)
+++ trunk/client/src/main/java/com/metamatrix/dqp/message/RequestMessage.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -39,7 +39,7 @@
static final long serialVersionUID = 2258063872049251854L;
- public static final int DEFAULT_FETCH_SIZE = 2000;
+ public static final int DEFAULT_FETCH_SIZE = 2048;
private String[] commands;
private boolean isBatchedUpdate;
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java 2010-01-22
22:49:05 UTC (rev 1776)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMStatement.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -729,12 +729,6 @@
String partial =
getExecutionProperty(ExecutionProperties.PROP_PARTIAL_RESULTS_MODE);
res.setPartialResults(Boolean.valueOf(partial).booleanValue());
- // Get fetch size
- res.setFetchSize(fetchSize);
-
- // Get cursor type
- res.setCursorType(this.resultSetType);
-
// Get xml validation mode
String validate = getExecutionProperty(ExecutionProperties.PROP_XML_VALIDATION);
if(validate == null) {
@@ -846,8 +840,8 @@
sqlOptions.toUpperCase().indexOf(ExecutionProperties.SQL_OPTION_SHOWPLAN.toUpperCase())
>= 0) {
reqMsg.setShowPlan(true);
}
-
- reqMsg.setFetchSize(getFetchSize());
+ reqMsg.setCursorType(this.resultSetType);
+ reqMsg.setFetchSize(this.fetchSize);
reqMsg.setStyleSheet(this.styleSheet);
reqMsg.setRowLimit(this.maxRows);
Modified: trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMDataSource.java
===================================================================
--- trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMDataSource.java 2010-01-22
22:49:05 UTC (rev 1776)
+++ trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestMMDataSource.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -613,7 +613,7 @@
ds.setServerName("hostName"); //$NON-NLS-1$
ds.setDatabaseName("vdbName"); //$NON-NLS-1$
ds.setPortNumber(1);
-
assertEquals("jdbc:teiid:vdbName@mm://hostname:1;fetchSize=2000;ApplicationName=JDBC;serverURL=mm://hostname:1;a=b;VirtualDatabaseName=vdbName;foo=bar",
ds.buildURL()); //$NON-NLS-1$
+
assertEquals("jdbc:teiid:vdbName@mm://hostname:1;fetchSize=2048;ApplicationName=JDBC;serverURL=mm://hostname:1;a=b;VirtualDatabaseName=vdbName;foo=bar",
ds.buildURL()); //$NON-NLS-1$
}
public void testInvalidDataSource() {
Modified:
trunk/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java
===================================================================
---
trunk/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/common-core/src/main/java/com/metamatrix/common/types/DataTypeManager.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -23,6 +23,7 @@
package com.metamatrix.common.types;
import java.io.IOException;
+import java.lang.ref.WeakReference;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Blob;
@@ -42,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.WeakHashMap;
import javax.xml.transform.Source;
@@ -61,6 +63,7 @@
import com.metamatrix.common.types.basic.NumberToLongTransform;
import com.metamatrix.common.types.basic.NumberToShortTransform;
import com.metamatrix.common.types.basic.ObjectToAnyTransform;
+import com.metamatrix.common.util.PropertiesUtils;
import com.metamatrix.core.CorePlugin;
import com.metamatrix.core.ErrorMessageKeys;
import com.metamatrix.core.MetaMatrixRuntimeException;
@@ -81,6 +84,11 @@
* </p>
*/
public class DataTypeManager {
+
+ private static final int MAX_VALUE_MAP_SIZE = 10000;
+ private static boolean USE_VALUE_CACHE =
PropertiesUtils.getBooleanProperty(System.getProperties(),
"org.teiid.useValueCache", true); //$NON-NLS-1$
+
+ private static Map<Class<?>, Map<Object, WeakReference<Object>>>
valueMaps = new HashMap<Class<?>, Map<Object,
WeakReference<Object>>>();
public static final int MAX_STRING_LENGTH = 4000;
@@ -418,19 +426,33 @@
*/
static void loadDataTypes() {
DataTypeManager.addDataType(DefaultDataTypes.BOOLEAN, DefaultDataClasses.BOOLEAN);
+ valueMaps.put(DefaultDataClasses.BOOLEAN, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.BYTE, DefaultDataClasses.BYTE);
+ valueMaps.put(DefaultDataClasses.BYTE, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.SHORT, DefaultDataClasses.SHORT);
+ valueMaps.put(DefaultDataClasses.SHORT, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.CHAR, DefaultDataClasses.CHAR);
+ valueMaps.put(DefaultDataClasses.CHAR, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.INTEGER, DefaultDataClasses.INTEGER);
+ valueMaps.put(DefaultDataClasses.INTEGER, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.LONG, DefaultDataClasses.LONG);
+ valueMaps.put(DefaultDataClasses.LONG, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.BIG_INTEGER,
DefaultDataClasses.BIG_INTEGER);
+ valueMaps.put(DefaultDataClasses.BIG_INTEGER, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.FLOAT, DefaultDataClasses.FLOAT);
+ valueMaps.put(DefaultDataClasses.FLOAT, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.DOUBLE, DefaultDataClasses.DOUBLE);
+ valueMaps.put(DefaultDataClasses.DOUBLE, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.BIG_DECIMAL,
DefaultDataClasses.BIG_DECIMAL);
+ valueMaps.put(DefaultDataClasses.BIG_DECIMAL, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.DATE, DefaultDataClasses.DATE);
+ valueMaps.put(DefaultDataClasses.DATE, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.TIME, DefaultDataClasses.TIME);
+ valueMaps.put(DefaultDataClasses.TIME, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.TIMESTAMP, DefaultDataClasses.TIMESTAMP);
+ valueMaps.put(DefaultDataClasses.TIMESTAMP, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.STRING, DefaultDataClasses.STRING);
+ valueMaps.put(DefaultDataClasses.STRING, Collections.synchronizedMap(new
WeakHashMap<Object, WeakReference<Object>>()));
DataTypeManager.addDataType(DefaultDataTypes.CLOB, DefaultDataClasses.CLOB);
DataTypeManager.addDataType(DefaultDataTypes.XML, DefaultDataClasses.XML);
DataTypeManager.addDataType(DefaultDataTypes.OBJECT, DefaultDataClasses.OBJECT);
@@ -686,7 +708,8 @@
Object[] params = new Object[] { sourceType, targetClass, value};
throw new
TransformationException(CorePlugin.Util.getString("ObjectToAnyTransform.Invalid_value",
params)); //$NON-NLS-1$
}
- return (T) transform.transform(value);
+ T result = (T) transform.transform(value);
+ return getCanonicalValue(result);
}
public static boolean isNonComparable(String type) {
@@ -699,4 +722,29 @@
public static <S> void addSourceTransform(Class<S> sourceClass,
SourceTransform<S, ?> transform) {
sourceConverters.put(sourceClass, transform);
}
+
+ @SuppressWarnings("unchecked")
+ public static <T> T getCanonicalValue(T value) {
+ if (USE_VALUE_CACHE) {
+ if (value == null) {
+ return null;
+ }
+ Map<Object, WeakReference<Object>> valueMap =
valueMaps.get(value.getClass());
+ if (valueMap == null) {
+ return value;
+ }
+ WeakReference<Object> valueReference = valueMap.get(value);
+ Object canonicalValue = null;
+ if (valueReference != null) {
+ canonicalValue = valueReference.get();
+ }
+ if (canonicalValue != null) {
+ return (T)canonicalValue;
+ }
+ if (valueMap.size() <= MAX_VALUE_MAP_SIZE) {
+ valueMap.put(value, new WeakReference<Object>(value));
+ }
+ }
+ return value;
+ }
}
Modified:
trunk/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/JDBCBaseExecution.java
===================================================================
---
trunk/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/JDBCBaseExecution.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/connectors/connector-jdbc/src/main/java/org/teiid/connector/jdbc/JDBCBaseExecution.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -80,7 +80,10 @@
this.logger = logger;
this.context = context;
- fetchSize = PropertiesUtils.getIntProperty(props, JDBCPropertyNames.FETCH_SIZE,
context.getBatchSize());
+ fetchSize = PropertiesUtils.getIntProperty(props, JDBCPropertyNames.FETCH_SIZE,
0);
+ if (fetchSize == 0) {
+ fetchSize = context.getBatchSize();
+ }
maxResultRows = PropertiesUtils.getIntProperty(props,
ConnectorPropertyNames.MAX_RESULT_ROWS, -1);
//if the connector work needs to throw an excpetion, set the size plus 1
if (maxResultRows > 0 && PropertiesUtils.getBooleanProperty(props,
ConnectorPropertyNames.EXCEPTION_ON_MAX_ROWS, false)) {
Modified: trunk/connectors/connector-jdbc/src/main/resources/connector-jdbc.xml
===================================================================
--- trunk/connectors/connector-jdbc/src/main/resources/connector-jdbc.xml 2010-01-22
22:49:05 UTC (rev 1776)
+++ trunk/connectors/connector-jdbc/src/main/resources/connector-jdbc.xml 2010-01-25
03:26:22 UTC (rev 1777)
@@ -17,6 +17,7 @@
<PropertyDefinition Name="ConnectionSource"
DisplayName="Connection Source Class" ShortDescription="Driver, DataSource,
or XADataSource class name" IsRequired="true" />
<PropertyDefinition Name="TrimStrings" DisplayName="Trim
string flag" ShortDescription="Right Trim fixed character types returned as
Strings - note that the native type must be char or nchar and the source must support the
rtrim function." DefaultValue="false" PropertyType="Boolean"
IsExpert="true" />
<PropertyDefinition Name="UseCommentsInSourceQuery"
DisplayName="Use informational comments in Source Queries"
ShortDescription="This will embed /*comment*/ style comment with session/request id
in source SQL query for informational purposes" DefaultValue="false"
PropertyType="Boolean" IsExpert="true"/>
+ <PropertyDefinition Name="FetchSize" DisplayName="Statement
Fetch Size" ShortDescription="Statement Fetch Size"
DefaultValue="0" PropertyType="Integer"
IsExpert="true"/>
</ComponentType>
<ComponentType Name="Oracle Connector"
ComponentTypeCode="2" Deployable="true" Deprecated="false"
Monitorable="false" SuperComponentType="JDBC Connector"
ParentComponentType="Connectors" LastChangedBy="ConfigurationStartup"
CreatedBy="ConfigurationStartup">
<PropertyDefinition Name="ConnectionSource"
DisplayName="Connection Source Class" ShortDescription="Driver, DataSource,
or XADataSource class name" DefaultValue="oracle.jdbc.driver.OracleDriver"
IsRequired="true" />
Added: trunk/engine/src/main/java/com/metamatrix/common/buffer/BatchManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BatchManager.java
(rev 0)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BatchManager.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.common.buffer;
+
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+
+public interface BatchManager {
+
+ public interface ManagedBatch {
+
+ TupleBatch getBatch(boolean cache, String[] types) throws
MetaMatrixComponentException;
+
+ void remove();
+
+ }
+
+ ManagedBatch createManagedBatch(TupleBatch batch) throws MetaMatrixComponentException;
+
+ void remove();
+
+}
Property changes on:
trunk/engine/src/main/java/com/metamatrix/common/buffer/BatchManager.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2010-01-22
22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -48,48 +48,17 @@
FINAL
}
- /**
- * Optional property - the max size of a batch sent between connector and query
service.
- * Making batches larger reduces communication overhead between connector and query
service
- * but increases the granularity of memory management on those batches. This value
should
- * be a positive integer and defaults to 1000.
- */
- public static final String CONNECTOR_BATCH_SIZE =
"metamatrix.buffer.connectorBatchSize"; //$NON-NLS-1$
- /**
- * Optional property - the max size of a batch sent internally within the query
processor.
- * In general, these batches should be smaller than the connector batch size as there
are
- * no communication costs with these batches. Smaller batches typically allow a user to
- * get their first results quicker and allow fine-grained buffer management on
intermediate
- * results. This value should be a positive integer and defaults to 100.
- */
- public static final String PROCESSOR_BATCH_SIZE =
"metamatrix.buffer.processorBatchSize"; //$NON-NLS-1$
- /**
- * Optional property - this value specifies the location to store temporary buffers to
- * large to fit in memory. Temporary buffer files will be created and destroyed in
this
- * directory. This value should be a string specifying an absolute directory path.
- */
- public static final String BUFFER_STORAGE_DIRECTORY =
"metamatrix.buffer.storageDirectory"; //$NON-NLS-1$
- /**
- * Optional property - this values specifies how many open file descriptors should be
cached
- * in the storage directory. Increasing this value in heavy load may improve
performance
- * but will use more file descriptors, which are a limited system resource.
- */
- public static final String MAX_OPEN_FILES = "metamatrix.buffer.maxOpenFiles";
//$NON-NLS-1$
- /**
- * Optional property - this values specifies the maximum size in MegaBytes that a buffer
file can reach.
- * The default is 2048 MB (i.e. 2GB).
- */
- public static final String MAX_FILE_SIZE = "metamatrix.buffer.maxFileSize";
//$NON-NLS-1$
- /**
- * Optional property - the max number of batches to process at once in algorithms such
as sorting.
- */
- public static final String MAX_PROCESSING_BATCHES =
"metamatrix.buffer.maxProcessingBatches"; //$NON-NLS-1$
-
public static int DEFAULT_CONNECTOR_BATCH_SIZE = 2048;
public static int DEFAULT_PROCESSOR_BATCH_SIZE = 1024;
public static int DEFAULT_MAX_PROCESSING_BATCHES = 8;
+
+ /**
+ * The BufferManager may maintain at least this many batch references in memory.
+ *
+ * Up to 2x this value may be held by soft references.
+ */
public static int DEFAULT_RESERVE_BUFFERS = 64;
-
+
/**
* Get the batch size to use during query processing.
* @return Batch size (# of rows)
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java 2010-01-22
22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/FileStore.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -59,13 +59,22 @@
@Override
public void write(byte[] b, int off, int len) throws IOException {
- if (count + len <= buffer.length) {
- System.arraycopy(b, off, buffer, count, len);
- count += len;
+ if (len > buffer.length) {
+ flushBuffer();
+ writeDirect(b, off, len);
return;
}
+ int bufferedLength = Math.min(len, buffer.length - count);
+ if (count < buffer.length) {
+ System.arraycopy(b, off, buffer, count, bufferedLength);
+ count += bufferedLength;
+ if (bufferedLength == len) {
+ return;
+ }
+ }
flushBuffer();
- writeDirect(b, off, len);
+ System.arraycopy(b, off + bufferedLength, buffer, count, len - bufferedLength);
+ count += len - bufferedLength;
}
private void writeDirect(byte[] b, int off, int len) throws IOException {
@@ -77,7 +86,7 @@
}
}
- private void flushBuffer() throws IOException {
+ public void flushBuffer() throws IOException {
if (count > 0) {
writeDirect(buffer, 0, count);
count = 0;
@@ -119,6 +128,9 @@
}
}
+ private boolean removed;
+ private long len;
+
public void setCleanupReference(Object o) {
REFERENCES.add(new CleanupReference(o, this));
for (int i = 0; i < 10; i++) {
@@ -131,8 +143,10 @@
}
}
- private boolean removed;
-
+ public synchronized long getLength() {
+ return len;
+ }
+
public int read(long fileOffset, byte[] b, int offSet, int length)
throws MetaMatrixComponentException {
if (removed) {
@@ -155,18 +169,21 @@
} while (n < length);
}
- public long write(byte[] bytes) throws MetaMatrixComponentException {
- return write(bytes, 0, bytes.length);
+ public void write(byte[] bytes) throws MetaMatrixComponentException {
+ write(bytes, 0, bytes.length);
}
- public long write(byte[] bytes, int offset, int length) throws
MetaMatrixComponentException {
+ public synchronized long write(byte[] bytes, int offset, int length) throws
MetaMatrixComponentException {
if (removed) {
throw new MetaMatrixComponentException("already removed"); //$NON-NLS-1$
}
- return writeDirect(bytes, offset, length);
+ writeDirect(bytes, offset, length);
+ long result = len;
+ len += length;
+ return result;
}
- protected abstract long writeDirect(byte[] bytes, int offset, int length) throws
MetaMatrixComponentException;
+ protected abstract void writeDirect(byte[] bytes, int offset, int length) throws
MetaMatrixComponentException;
public void remove() {
if (!this.removed) {
@@ -177,9 +194,9 @@
protected abstract void removeDirect();
- public InputStream createInputStream() {
+ public InputStream createInputStream(final long start) {
return new InputStream() {
- private long offset;
+ private long offset = start;
@Override
public int read() throws IOException {
Deleted: trunk/engine/src/main/java/com/metamatrix/common/buffer/ManagedBatch.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/ManagedBatch.java 2010-01-22
22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/ManagedBatch.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -1,78 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.buffer;
-
-import java.lang.ref.SoftReference;
-
-class ManagedBatch {
-
- private int beginRow;
- private SoftReference<TupleBatch> batchReference;
- private long offset;
- private int length;
-
- /**
- * Constructor for ManagedBatch.
- */
- public ManagedBatch(TupleBatch batch) {
- this.beginRow = batch.getBeginRow();
- this.batchReference = new SoftReference<TupleBatch>(batch);
- }
-
- /**
- * Get the begin row, must be >= 1
- * @return Begin row
- */
- public int getBeginRow() {
- return this.beginRow;
- }
-
- public String toString() {
- return "ManagedBatch[" + beginRow + "]"; //$NON-NLS-1$
//$NON-NLS-2$
- }
-
- public TupleBatch getBatch() {
- return this.batchReference.get();
- }
-
- public void setBatchReference(TupleBatch batch) {
- this.batchReference = new SoftReference<TupleBatch>(batch);
- }
-
- public int getLength() {
- return length;
- }
-
- public long getOffset() {
- return offset;
- }
-
- public void setLength(int length) {
- this.length = length;
- }
-
- public void setOffset(long offset) {
- this.offset = offset;
- }
-
-}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBatch.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBatch.java 2010-01-22
22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBatch.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -26,6 +26,7 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.io.Serializable;
import java.util.List;
import com.metamatrix.common.batch.BatchSerializer;
@@ -40,8 +41,10 @@
* This object is immutable and Serializable;
*/
public class TupleBatch implements Externalizable {
-
- private int rowOffset;
+
+ private static final long serialVersionUID = 6304443387337336957L;
+
+ private int rowOffset;
private List[] tuples;
// Optional state
@@ -140,6 +143,10 @@
this.terminationFlag = terminationFlag;
}
+ public String[] getDataTypes() {
+ return types;
+ }
+
public void setDataTypes(String[] types) {
this.types = types;
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java 2010-01-22
22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -22,35 +22,29 @@
package com.metamatrix.common.buffer;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.common.types.DataTypeManager;
import com.metamatrix.common.types.Streamable;
-import com.metamatrix.core.util.AccessibleByteArrayOutputStream;
import com.metamatrix.core.util.Assertion;
import com.metamatrix.dqp.DQPPlugin;
-import com.metamatrix.query.execution.QueryExecPlugin;
+import com.metamatrix.query.sql.symbol.Expression;
public class TupleBuffer {
class TupleSourceImpl implements IndexedTupleSource {
- private SoftReference<TupleBatch> currentBatch;
private int currentRow = 1;
private int mark = 1;
private List<?> currentTuple;
+ private TupleBatch batch;
@Override
public int getCurrentIndex() {
@@ -80,47 +74,30 @@
private List<?> getCurrentTuple() throws MetaMatrixComponentException,
BlockedException {
- TupleBatch batch = getBatch();
- if(batch.getRowCount() == 0) {
- // Check if last
- if(isFinal) {
- currentBatch = null;
- return null;
- }
- throw BlockedException.INSTANCE;
- }
-
- return batch.getTuple(currentRow);
+ if (currentRow <= rowCount) {
+ if (forwardOnly) {
+ if (batch == null || currentRow > batch.getEndRow()) {
+ batch = getBatch(currentRow);
+ }
+ return batch.getTuple(currentRow);
+ }
+ //TODO: determine if we should directly hold a soft reference here
+ return getRow(currentRow);
+ }
+ if(isFinal) {
+ return null;
+ }
+ throw BlockedException.INSTANCE;
}
@Override
public void closeSource()
throws MetaMatrixComponentException{
- currentBatch = null;
+ batch = null;
mark = 1;
reset();
}
- // Retrieves the necessary batch based on the currentRow
- TupleBatch getBatch() throws MetaMatrixComponentException{
- TupleBatch batch = null;
- if (currentBatch != null) {
- batch = currentBatch.get();
- }
- if (batch != null) {
- if (currentRow <= batch.getEndRow() && currentRow >=
batch.getBeginRow()) {
- return batch;
- }
- currentBatch = null;
- }
-
- batch = TupleBuffer.this.getBatch(currentRow);
- if (batch != null) {
- currentBatch = new SoftReference<TupleBatch>(batch);
- }
- return batch;
- }
-
@Override
public boolean hasNext() throws MetaMatrixComponentException {
if (this.currentTuple != null) {
@@ -149,34 +126,56 @@
this.currentTuple = null;
}
}
+
+ @Override
+ public int available() {
+ return 0;
+ }
}
+
+ /**
+ * Gets the data type names for each of the input expressions, in order.
+ * @param expressions List of Expressions
+ * @return
+ * @since 4.2
+ */
+ public static String[] getTypeNames(List expressions) {
+ if (expressions == null) {
+ return null;
+ }
+ String[] types = new String[expressions.size()];
+ for (ListIterator i = expressions.listIterator(); i.hasNext();) {
+ Expression expr = (Expression)i.next();
+ types[i.previousIndex()] = DataTypeManager.getDataTypeName(expr.getType());
+ }
+ return types;
+ }
private static final AtomicLong LOB_ID = new AtomicLong();
//construction state
- private StorageManager manager;
+ private BatchManager manager;
private String tupleSourceID;
private List<?> schema;
private String[] types;
private int batchSize;
- private FileStore store;
-
private int rowCount;
private boolean isFinal;
- private TreeMap<Integer, ManagedBatch> batches = new TreeMap<Integer,
ManagedBatch>();
+ private TreeMap<Integer, BatchManager.ManagedBatch> batches = new
TreeMap<Integer, BatchManager.ManagedBatch>();
private ArrayList<List<?>> batchBuffer;
- private AtomicInteger referenceCount = new AtomicInteger(1);
+ private boolean removed;
+ private boolean forwardOnly;
//lob management
private Map<String, Streamable<?>> lobReferences; //references to
contained lobs
private boolean lobs = true;
- public TupleBuffer(StorageManager manager, String id, List<?> schema, String[]
types, int batchSize) {
+ public TupleBuffer(BatchManager manager, String id, List<?> schema, int batchSize)
{
this.manager = manager;
this.tupleSourceID = id;
this.schema = schema;
- this.types = types;
+ this.types = getTypeNames(schema);
this.batchSize = batchSize;
if (types != null) {
int i = 0;
@@ -215,33 +214,16 @@
void saveBatch(boolean finalBatch) throws MetaMatrixComponentException {
Assertion.assertTrue(!this.isRemoved());
- if (batchBuffer == null || batchBuffer.isEmpty()) {
+ if (batchBuffer == null || batchBuffer.isEmpty() || batchBuffer.size() < Math.max(1,
batchSize / 32)) {
return;
}
- List<?> rows = batchBuffer==null?Collections.emptyList():batchBuffer;
- TupleBatch writeBatch = new TupleBatch(rowCount - rows.size() + 1, rows);
+ TupleBatch writeBatch = new TupleBatch(rowCount - batchBuffer.size() + 1,
batchBuffer);
if (finalBatch) {
writeBatch.setTerminationFlag(true);
}
- ManagedBatch mbatch = new ManagedBatch(writeBatch);
- if (this.store == null) {
- this.store = this.manager.createFileStore(this.tupleSourceID);
- this.store.setCleanupReference(this);
- }
- AccessibleByteArrayOutputStream baos = null;
- try {
- baos = new AccessibleByteArrayOutputStream(1024);
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- writeBatch.setDataTypes(types);
- writeBatch.writeExternal(oos);
- oos.flush();
- oos.close();
- } catch(IOException e) {
- throw new MetaMatrixComponentException(e,
QueryExecPlugin.Util.getString("FileStorageManager.batch_error"));
//$NON-NLS-1$
- }
- mbatch.setLength(baos.getCount());
- mbatch.setOffset(this.store.write(baos.getBuffer(), 0, baos.getCount()));
- this.batches.put(mbatch.getBeginRow(), mbatch);
+ writeBatch.setDataTypes(types);
+ BatchManager.ManagedBatch mbatch = manager.createManagedBatch(writeBatch);
+ this.batches.put(writeBatch.getBeginRow(), mbatch);
batchBuffer = null;
}
@@ -252,7 +234,23 @@
}
this.isFinal = true;
}
+
+ List<?> getRow(int row) throws MetaMatrixComponentException {
+ if (this.batchBuffer != null && row > rowCount - this.batchBuffer.size()) {
+ return this.batchBuffer.get(row - rowCount + this.batchBuffer.size() - 1);
+ }
+ TupleBatch batch = getBatch(row);
+ return batch.getTuple(row);
+ }
+ /**
+ * Get the batch containing the given row.
+ * NOTE: the returned batch may be empty or may begin with a row other
+ * than the one specified.
+ * @param row
+ * @return
+ * @throws MetaMatrixComponentException
+ */
public TupleBatch getBatch(int row) throws MetaMatrixComponentException {
if (row > rowCount) {
TupleBatch batch = new TupleBatch(rowCount + 1, new List[] {});
@@ -262,58 +260,44 @@
return batch;
}
if (this.batchBuffer != null && row > rowCount - this.batchBuffer.size()) {
- return new TupleBatch(rowCount - this.batchBuffer.size() + 1, batchBuffer);
+ TupleBatch result = new TupleBatch(rowCount - this.batchBuffer.size() + 1,
batchBuffer);
+ if (forwardOnly) {
+ this.batchBuffer = null;
+ }
+ return result;
}
if (this.batchBuffer != null && !this.batchBuffer.isEmpty()) {
+ //this is just a sanity check to ensure we're not holding too many
+ //hard references to batches.
saveBatch(isFinal);
}
- Map.Entry<Integer, ManagedBatch> entry = batches.floorEntry(row);
+ Map.Entry<Integer, BatchManager.ManagedBatch> entry = batches.floorEntry(row);
Assertion.isNotNull(entry);
- ManagedBatch batch = entry.getValue();
- TupleBatch result = batch.getBatch();
- if (result != null) {
- return result;
+ BatchManager.ManagedBatch batch = entry.getValue();
+ TupleBatch result = batch.getBatch(!forwardOnly, types);
+ if (lobs && result.getDataTypes() == null) {
+ correctLobReferences(result.getAllTuples());
}
- try {
- byte[] bytes = new byte[batch.getLength()];
- this.store.readFully(batch.getOffset(), bytes, 0, bytes.length);
- ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- ObjectInputStream ois = new ObjectInputStream(bais);
-
- result = new TupleBatch();
- result.setDataTypes(types);
- result.readExternal(ois);
- } catch(IOException e) {
- throw new MetaMatrixComponentException(e,
QueryExecPlugin.Util.getString("FileStoreageManager.error_reading",
tupleSourceID)); //$NON-NLS-1$
- } catch (ClassNotFoundException e) {
- throw new MetaMatrixComponentException(e,
QueryExecPlugin.Util.getString("FileStoreageManager.error_reading",
tupleSourceID)); //$NON-NLS-1$
- }
- if (lobs) {
- correctLobReferences(result.getAllTuples());
+ result.setDataTypes(types);
+ if (forwardOnly) {
+ batches.remove(entry.getKey());
}
- batch.setBatchReference(result);
return result;
}
public void remove() {
- int count = this.referenceCount.getAndDecrement();
- if (count == 0) {
- if (this.store != null) {
- this.store.remove();
- this.store = null;
- }
+ if (!removed) {
+ this.manager.remove();
if (this.batchBuffer != null) {
this.batchBuffer = null;
}
+ for (BatchManager.ManagedBatch batch : this.batches.values()) {
+ batch.remove();
+ }
this.batches.clear();
}
}
- public boolean addReference() {
- int count = this.referenceCount.addAndGet(1);
- return count > 1;
- }
-
public int getRowCount() {
return rowCount;
}
@@ -383,6 +367,10 @@
}
}
+ public void setForwardOnly(boolean forwardOnly) {
+ this.forwardOnly = forwardOnly;
+ }
+
/**
* Create a new iterator for this buffer
* @return
@@ -393,11 +381,11 @@
@Override
public String toString() {
- return this.tupleSourceID.toString();
+ return this.tupleSourceID;
}
public boolean isRemoved() {
- return this.referenceCount.get() <= 0;
+ return removed;
}
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java 2010-01-22
22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSource.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -60,5 +60,11 @@
*/
void closeSource()
throws MetaMatrixComponentException;
+
+ /**
+ * Returns an estimate of the number of rows that can be read without blocking.
+ * @return
+ */
+ int available();
}
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -22,9 +22,20 @@
package com.metamatrix.common.buffer.impl;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.ref.Reference;
+import java.lang.ref.SoftReference;
+import java.lang.ref.WeakReference;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
-import java.util.ListIterator;
import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -32,10 +43,14 @@
import javax.xml.transform.Source;
import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.common.buffer.BatchManager;
import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.buffer.FileStore;
import com.metamatrix.common.buffer.StorageManager;
+import com.metamatrix.common.buffer.TupleBatch;
import com.metamatrix.common.buffer.TupleBuffer;
+import com.metamatrix.common.buffer.BatchManager.ManagedBatch;
+import com.metamatrix.common.buffer.FileStore.FileStoreOutputStream;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.types.DataTypeManager;
import com.metamatrix.common.types.InputStreamFactory;
@@ -49,34 +64,185 @@
import com.metamatrix.core.log.MessageLevel;
import com.metamatrix.core.util.Assertion;
import com.metamatrix.dqp.util.LogConstants;
+import com.metamatrix.query.execution.QueryExecPlugin;
import com.metamatrix.query.processor.xml.XMLUtil;
-import com.metamatrix.query.sql.symbol.Expression;
/**
* <p>Default implementation of BufferManager.</p>
* Responsible for creating/tracking TupleBuffers and providing access to the
StorageManager
*/
public class BufferManagerImpl implements BufferManager, StorageManager {
+
+ private static final int IO_BUFFER_SIZE = 1 << 14;
+
+ private final class ManagedBatchImpl implements ManagedBatch {
+ final private String id;
+ final private FileStore store;
+
+ private long offset = -1;
+ private boolean persistent;
+ private volatile TupleBatch pBatch;
+ private Reference<TupleBatch> batchReference;
+
+ public ManagedBatchImpl(String id, FileStore store, TupleBatch batch) throws
MetaMatrixComponentException {
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to
BufferManager", batchAdded.incrementAndGet()); //$NON-NLS-1$
+ this.id = id;
+ this.store = store;
+ this.pBatch = batch;
+ if (batch.getBeginRow() == 1) {
+ activeBatches.add(this);
+ } else {
+ this.persist(false);
+ }
+ persistBatchReferences();
+ }
+ @Override
+ public TupleBatch getBatch(boolean cache, String[] types) throws
MetaMatrixComponentException {
+ readAttempts.getAndIncrement();
+ synchronized (this) {
+ if (this.batchReference != null && this.pBatch == null) {
+ TupleBatch result = this.batchReference.get();
+ if (result != null) {
+ if (!cache) {
+ softCache.remove(this);
+ this.batchReference.clear();
+ }
+ return result;
+ }
+ }
+
+ TupleBatch batch = this.pBatch;
+ if (batch != null){
+ activeBatches.remove(this);
+ if (cache) {
+ activeBatches.add(this);
+ }
+ return batch;
+ }
+ }
+ persistBatchReferences();
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reading batch from
disk", readCount.incrementAndGet()); //$NON-NLS-1$
+ synchronized (this) {
+ //Resurrect from disk
+ try {
+ ObjectInputStream ois = new ObjectInputStream(new
BufferedInputStream(store.createInputStream(this.offset), IO_BUFFER_SIZE));
+ TupleBatch batch = new TupleBatch();
+ batch.setDataTypes(types);
+ batch.readExternal(ois);
+ batch.setDataTypes(null);
+ if (cache) {
+ this.pBatch = batch;
+ activeBatches.add(this);
+ }
+ return batch;
+ } catch(IOException e) {
+ throw new MetaMatrixComponentException(e,
QueryExecPlugin.Util.getString("FileStoreageManager.error_reading", id));
//$NON-NLS-1$
+ } catch (ClassNotFoundException e) {
+ throw new MetaMatrixComponentException(e,
QueryExecPlugin.Util.getString("FileStoreageManager.error_reading", id));
//$NON-NLS-1$
+ }
+ }
+ }
+
+ public void persistBatchReferences() throws MetaMatrixComponentException {
+ persistOneBatch(softCache, reserveBatches * 2, false);
+ persistOneBatch(activeBatches, reserveBatches, true);
+ }
+
+ private void persistOneBatch(Set<ManagedBatchImpl> set, int requiredSize, boolean
createSoft) throws MetaMatrixComponentException {
+ ManagedBatchImpl mb = null;
+ synchronized (set) {
+ if (set.size() > requiredSize) {
+ Iterator<ManagedBatchImpl> iter = set.iterator();
+ mb = iter.next();
+ iter.remove();
+ }
+ }
+ try {
+ if (mb != null) {
+ mb.persist(createSoft);
+ }
+ } catch (MetaMatrixComponentException e) {
+ if (mb == this) {
+ throw e;
+ }
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch,
attempts to read that batch later will result in an exception"); //$NON-NLS-1$
+ }
+ }
+
+ public synchronized void persist(boolean createSoft) throws
MetaMatrixComponentException {
+ try {
+ TupleBatch batch = pBatch;
+ if (batch != null) {
+ if (!persistent) {
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Writing batch to disk",
writeCount.incrementAndGet()); //$NON-NLS-1$
+ synchronized (store) {
+ offset = store.getLength();
+ FileStoreOutputStream fsos = store.createOutputStream(IO_BUFFER_SIZE);
+ ObjectOutputStream oos = new ObjectOutputStream(fsos);
+ batch.writeExternal(oos);
+ oos.flush();
+ oos.close();
+ fsos.flushBuffer();
+ }
+ }
+ if (createSoft) {
+ this.batchReference = new SoftReference<TupleBatch>(batch);
+ softCache.add(this);
+ } else {
+ this.batchReference = new WeakReference<TupleBatch>(batch);
+ }
+ }
+ } catch (IOException e) {
+ throw new MetaMatrixComponentException(e);
+ } finally {
+ persistent = true;
+ pBatch = null;
+ }
+ }
+
+ public void remove() {
+ activeBatches.remove(this);
+ softCache.remove(this);
+ pBatch = null;
+ if (batchReference != null) {
+ batchReference.clear();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ManagedBatch " + id + " " + pBatch; //$NON-NLS-1$
//$NON-NLS-2$
+ }
+ }
+
// Configuration
private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
private int processorBatchSize = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE;
private int maxProcessingBatches = BufferManager.DEFAULT_MAX_PROCESSING_BATCHES;
private int reserveBatches = BufferManager.DEFAULT_RESERVE_BUFFERS;
private int maxReserveBatches = BufferManager.DEFAULT_RESERVE_BUFFERS;
+
private ReentrantLock lock = new ReentrantLock(true);
private Condition batchesFreed = lock.newCondition();
+ private Set<ManagedBatchImpl> activeBatches = Collections.synchronizedSet(new
LinkedHashSet<ManagedBatchImpl>());
+ private Set<ManagedBatchImpl> softCache = Collections.synchronizedSet(new
LinkedHashSet<ManagedBatchImpl>());
+
private StorageManager diskMgr;
- private AtomicLong currentTuple = new AtomicLong(0);
-
+ private AtomicLong currentTuple = new AtomicLong();
+ private AtomicInteger batchAdded = new AtomicInteger();
+ private AtomicInteger readCount = new AtomicInteger();
+ private AtomicInteger writeCount = new AtomicInteger();
+ private AtomicInteger readAttempts = new AtomicInteger();
+
public int getMaxProcessingBatches() {
return maxProcessingBatches;
}
public void setMaxProcessingBatches(int maxProcessingBatches) {
- this.maxProcessingBatches = maxProcessingBatches;
+ this.maxProcessingBatches = Math.max(2, maxProcessingBatches);
}
/**
@@ -118,11 +284,33 @@
}
@Override
- public TupleBuffer createTupleBuffer(List elements, String groupName,
+ public TupleBuffer createTupleBuffer(final List elements, String groupName,
TupleSourceType tupleSourceType)
throws MetaMatrixComponentException {
- String newID = String.valueOf(this.currentTuple.getAndIncrement());
- TupleBuffer tupleBuffer = new TupleBuffer(this, newID, elements,
getTypeNames(elements), getProcessorBatchSize());
+ final String newID = String.valueOf(this.currentTuple.getAndIncrement());
+
+ BatchManager batchManager = new BatchManager() {
+ private FileStore store;
+
+ @Override
+ public ManagedBatch createManagedBatch(TupleBatch batch)
+ throws MetaMatrixComponentException {
+ if (this.store == null) {
+ this.store = createFileStore(newID);
+ this.store.setCleanupReference(this);
+ }
+ return new ManagedBatchImpl(newID, store, batch);
+ }
+
+ @Override
+ public void remove() {
+ if (this.store != null) {
+ this.store.remove();
+ this.store = null;
+ }
+ }
+ };
+ TupleBuffer tupleBuffer = new TupleBuffer(batchManager, newID, elements,
getProcessorBatchSize());
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, new Object[]{"Creating
TupleBuffer:", newID, "of type "+tupleSourceType}); //$NON-NLS-1$
//$NON-NLS-2$
}
@@ -139,7 +327,7 @@
@Override
public void initialize(Properties props) throws MetaMatrixComponentException {
- PropertiesUtils.setBeanProperties(this, props, "metamatrix.buffer");
//$NON-NLS-1$
+ PropertiesUtils.setBeanProperties(this, props, "org.teiid.buffer");
//$NON-NLS-1$
DataTypeManager.addSourceTransform(Source.class, new SourceTransform<Source,
XMLType>() {
@Override
public XMLType transform(Source value) {
@@ -158,24 +346,6 @@
});
}
- /**
- * Gets the data type names for each of the input expressions, in order.
- * @param expressions List of Expressions
- * @return
- * @since 4.2
- */
- private static String[] getTypeNames(List expressions) {
- if (expressions == null) {
- return null;
- }
- String[] types = new String[expressions.size()];
- for (ListIterator i = expressions.listIterator(); i.hasNext();) {
- Expression expr = (Expression)i.next();
- types[i.previousIndex()] = DataTypeManager.getDataTypeName(expr.getType());
- }
- return types;
- }
-
@Override
public void releaseBuffers(int count) {
lock.lock();
@@ -209,6 +379,10 @@
lock.unlock();
}
}
+
+ public void setMaxReserveBatches(int maxReserveBatches) {
+ this.maxReserveBatches = maxReserveBatches;
+ }
public void shutdown() {
}
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/FileStorageManager.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/FileStorageManager.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/FileStorageManager.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -33,7 +33,6 @@
import java.util.TreeMap;
import com.metamatrix.api.exception.MetaMatrixComponentException;
-import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.buffer.FileStore;
import com.metamatrix.common.buffer.StorageManager;
import com.metamatrix.common.log.LogManager;
@@ -113,7 +112,7 @@
}
}
- public synchronized long writeDirect(byte[] bytes, int offset, int length) throws
MetaMatrixComponentException {
+ public void writeDirect(byte[] bytes, int offset, int length) throws
MetaMatrixComponentException {
Map.Entry<Long, FileInfo> entry = this.storageFiles.lastEntry();
boolean createNew = false;
FileInfo fileInfo = null;
@@ -139,7 +138,6 @@
fileAccess.setLength(pointer + length);
fileAccess.seek(pointer);
fileAccess.write(bytes, offset, length);
- return fileOffset + pointer;
} catch(IOException e) {
throw new MetaMatrixComponentException(e,
QueryExecPlugin.Util.getString("FileStoreageManager.error_reading",
fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
} finally {
@@ -185,7 +183,7 @@
* @see com.metamatrix.common.buffer.BufferManager#MAX_FILE_SIZE
*/
public void initialize(Properties props) throws MetaMatrixComponentException {
- this.directory = props.getProperty(BufferManager.BUFFER_STORAGE_DIRECTORY);
+ PropertiesUtils.setBeanProperties(this, props, "org.teiid.buffer");
//$NON-NLS-1$
if(this.directory == null) {
throw new
MetaMatrixComponentException(QueryExecPlugin.Util.getString("FileStoreageManager.no_directory"));
//$NON-NLS-1$
}
@@ -199,14 +197,20 @@
} else if(! dirFile.mkdirs()) {
throw new
MetaMatrixComponentException(QueryExecPlugin.Util.getString("FileStoreageManager.error_creating",
dirFile.getAbsoluteFile())); //$NON-NLS-1$
}
-
- // Set up max number of open file descriptors
- maxOpenFiles = PropertiesUtils.getIntProperty(props,
BufferManager.MAX_OPEN_FILES, DEFAULT_MAX_OPEN_FILES);
-
- // Set the max file size
- maxFileSize = PropertiesUtils.getIntProperty(props, BufferManager.MAX_FILE_SIZE,
2048) * 1024L * 1024L; // Multiply by 1MB
}
+ public void setMaxFileSize(long maxFileSize) {
+ this.maxFileSize = maxFileSize * 1024L * 1024L;
+ }
+
+ public void setMaxOpenFiles(int maxOpenFiles) {
+ this.maxOpenFiles = maxOpenFiles;
+ }
+
+ public void setStorageDirectory(String directory) {
+ this.directory = directory;
+ }
+
File createFile(String name, int fileNumber) throws MetaMatrixComponentException {
try {
File storageFile = File.createTempFile(FILE_PREFIX + name + "_" +
String.valueOf(fileNumber) + "_", null, this.dirFile); //$NON-NLS-1$
//$NON-NLS-2$
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryStorageManager.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryStorageManager.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryStorageManager.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -41,20 +41,16 @@
public FileStore createFileStore(String name) {
return new FileStore() {
private ByteBuffer buffer = ByteBuffer.allocate(2 << 15);
- private int end;
@Override
- public synchronized long writeDirect(byte[] bytes, int offset, int length) throws
MetaMatrixComponentException {
- if (end + length > buffer.capacity()) {
+ public void writeDirect(byte[] bytes, int offset, int length) throws
MetaMatrixComponentException {
+ if (getLength() + length > buffer.capacity()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2 + length);
newBuffer.put(buffer);
buffer = newBuffer;
}
- buffer.position(end);
+ buffer.position((int)getLength());
buffer.put(bytes, offset, length);
- long result = end;
- end += length;
- return result;
}
@Override
@@ -65,12 +61,12 @@
@Override
public synchronized int readDirect(long fileOffset, byte[] b, int offset, int length)
throws MetaMatrixComponentException {
- if (fileOffset >= end) {
+ if (fileOffset >= getLength()) {
return -1;
}
int position = (int)fileOffset;
buffer.position(position);
- length = Math.min(length, end - position);
+ length = Math.min(length, (int)getLength() - position);
buffer.get(b, offset, length);
return length;
}
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/BatchIterator.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -116,4 +116,12 @@
}
}
+ @Override
+ public int available() {
+ if (currentBatch != null) {
+ return currentBatch.getEndRow() - currentRow + 1;
+ }
+ return 0;
+ }
+
}
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/CollectionTupleSource.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/CollectionTupleSource.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/CollectionTupleSource.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -67,4 +67,12 @@
public void closeSource() {
}
+
+ @Override
+ public int available() {
+ if (tuples.hasNext()) {
+ return 1;
+ }
+ return 0;
+ }
}
\ No newline at end of file
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DuplicateFilter.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -106,6 +106,7 @@
if(collectionBuffer == null) {
collectionBuffer = mgr.createTupleBuffer(elements, groupName,
TupleSourceType.PROCESSOR);
+ collectionBuffer.setForwardOnly(true);
}
List row = new ArrayList(1);
@@ -126,6 +127,7 @@
// Sort
sortUtility = new SortUtility(collectionBuffer.createIndexedTupleSource(),
elements, sortTypes, Mode.DUP_REMOVE, mgr, groupName);
TupleBuffer sorted = sortUtility.sort();
+ sorted.setForwardOnly(true);
try {
// Add all input to proxy
TupleSource sortedSource = sorted.createIndexedTupleSource();
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -268,6 +268,14 @@
public void closeSource() throws MetaMatrixComponentException {
}
+
+ @Override
+ public int available() {
+ if (sourceBatch != null) {
+ return sourceBatch.getEndRow() - sourceRow + 1;
+ }
+ return 0;
+ }
};
}
@@ -287,6 +295,7 @@
private void sortPhase() throws BlockedException, MetaMatrixComponentException,
MetaMatrixProcessingException {
this.sortBuffer = this.sortUtility.sort();
+ this.sortBuffer.setForwardOnly(true);
this.groupTupleSource = this.sortBuffer.createIndexedTupleSource();
this.phase = GROUP;
}
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -42,21 +42,10 @@
*/
public class PartitionedSortJoin extends MergeJoinStrategy {
- /**
- * This is a compromise between the max size of the smaller side
- * and effective partitioning assuming that we only want to hold
- * two batches in memory during partitioning.
- *
- * TODO: apply partitioning recursively and/or have a better mechanism
- * for buffermanager reserve/release of memory
- * (would also help the sort utility)
- */
- public static final int MAX_PARTITIONS = 16;
-
private List[] endTuples;
private List<Boolean> overlap = new ArrayList<Boolean>();
private List<Integer> endRows = new ArrayList<Integer>();
- private List<TupleBuffer> partitionIds = new ArrayList<TupleBuffer>();
+ private List<TupleBuffer> partitions = new ArrayList<TupleBuffer>();
private int currentPartition;
private IndexedTupleSource currentSource;
private SourceState sortedSource;
@@ -73,13 +62,13 @@
@Override
public void close() {
super.close();
- for (TupleBuffer tupleSourceID : this.partitionIds) {
+ for (TupleBuffer tupleSourceID : this.partitions) {
tupleSourceID.remove();
}
this.endTuples = null;
this.overlap.clear();
this.endRows.clear();
- this.partitionIds.clear();
+ this.partitions.clear();
this.currentSource = null;
this.sortedSource = null;
this.partitionedSource = null;
@@ -128,7 +117,7 @@
protected void loadRight() throws MetaMatrixComponentException,
MetaMatrixProcessingException {
this.rightSource.getTupleBuffer();
- int maxRows = this.joinNode.getBatchSize() * MAX_PARTITIONS;
+ int maxRows = this.joinNode.getBatchSize() * getMaxProcessingBatches();
if (processingSortRight == SortOption.SORT
&& this.leftSource.getRowCount() < maxRows
&& this.leftSource.getRowCount() * 4 < this.rightSource.getRowCount())
{
@@ -161,13 +150,25 @@
}
}
}
+
+ /**
+ * Since the source to be partitioned is already loaded, then there's no
+ * chance of a blocked exception during partitioning, so double the max.
+ *
+ * TODO: partition at the same time as the load to determine size
+ *
+ * @return
+ */
+ private int getMaxProcessingBatches() {
+ return 2 * this.joinNode.getBufferManager().getMaxProcessingBatches();
+ }
private void partitionSource(boolean left) throws MetaMatrixComponentException,
MetaMatrixProcessingException {
if (partitioned) {
return;
}
- if (endTuples.length > MAX_PARTITIONS + 1) {
+ if (endTuples.length > getMaxProcessingBatches() + 1) {
if (left) {
this.processingSortLeft = SortOption.SORT;
} else {
@@ -176,13 +177,13 @@
return;
}
if (endTuples.length < 2) {
- partitionIds.add(this.partitionedSource.getTupleBuffer());
+ partitions.add(this.partitionedSource.getTupleBuffer());
} else {
- if (partitionIds.isEmpty()) {
+ if (partitions.isEmpty()) {
for (int i = 0; i < endTuples.length; i++) {
TupleBuffer tc = this.partitionedSource.createSourceTupleBuffer();
- tc.setBatchSize(Math.max(1, this.joinNode.getBatchSize()/4));
- this.partitionIds.add(tc);
+ tc.setBatchSize(Math.max(1, this.joinNode.getBatchSize()));
+ this.partitions.add(tc);
}
}
while (this.partitionedSource.getIterator().hasNext()) {
@@ -191,15 +192,18 @@
if (index < 0) {
index = -index - 1;
}
- if (index > this.partitionIds.size() -1) {
+ if (index > this.partitions.size() -1) {
continue;
}
while (index > 0 && this.overlap.get(index - 1)
&& compare(tuple, this.endTuples[index - 1],
this.partitionedSource.getExpressionIndexes(), this.sortedSource.getExpressionIndexes())
== 0) {
index--;
}
- this.partitionIds.get(index).addTuple(tuple);
+ this.partitions.get(index).addTuple(tuple);
}
+ for (TupleBuffer partition : this.partitions) {
+ partition.saveBatch();
+ }
this.partitionedSource.getIterator().setPosition(1);
}
partitioned = true;
@@ -214,12 +218,12 @@
if (endRows.isEmpty()) {
return null; //no rows on the sorted side
}
- while (currentPartition < partitionIds.size()) {
+ while (currentPartition < partitions.size()) {
if (currentSource == null) {
- if (!this.partitionIds.isEmpty()) {
- this.partitionIds.get(currentPartition).close();
+ if (!this.partitions.isEmpty()) {
+ this.partitions.get(currentPartition).close();
}
- currentSource = partitionIds.get(currentPartition).createIndexedTupleSource();
+ currentSource = partitions.get(currentPartition).createIndexedTupleSource();
}
int beginIndex = currentPartition>0?endRows.get(currentPartition - 1)+1:1;
@@ -251,7 +255,7 @@
}
}
if (matchEnd == batch.length - 1 && currentPartition < overlap.size()
&& overlap.get(currentPartition)) {
- this.partitionIds.get(currentPartition + 1).addTuple(partitionedTuple);
+ this.partitions.get(currentPartition + 1).addTuple(partitionedTuple);
}
}
while (matchBegin <= matchEnd) {
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -104,6 +104,8 @@
private TupleBatch outputPhase() throws BlockedException,
MetaMatrixComponentException, MetaMatrixProcessingException {
if (!this.output.isFinal()) {
this.phase = SORT;
+ } else {
+ this.output.setForwardOnly(true);
}
List<?> tuple = null;
try {
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -82,7 +82,7 @@
}
//constructor state
- private TupleSource sourceID;
+ private TupleSource source;
private Mode mode;
private BufferManager bufferManager;
private String groupName;
@@ -105,11 +105,11 @@
public SortUtility(TupleSource sourceID, List sortElements, List<Boolean>
sortTypes, Mode mode, BufferManager bufferMgr,
String groupName) {
- this.sourceID = sourceID;
+ this.source = sourceID;
this.mode = mode;
this.bufferManager = bufferMgr;
this.groupName = groupName;
- this.schema = this.sourceID.getSchema();
+ this.schema = this.source.getSchema();
int distinctIndex = sortElements != null? sortElements.size() - 1:0;
if (mode != Mode.SORT) {
if (sortElements == null) {
@@ -159,6 +159,7 @@
private TupleBuffer createTupleBuffer() throws MetaMatrixComponentException {
TupleBuffer tb = bufferManager.createTupleBuffer(this.schema, this.groupName,
TupleSourceType.PROCESSOR);
+ tb.setForwardOnly(true);
return tb;
}
@@ -178,16 +179,23 @@
try {
int maxRows = bufferManager.getMaxProcessingBatches() *
bufferManager.getProcessorBatchSize();
while(!doneReading) {
+ //attempt to reserve more working memory if there are additional rows
available before blocking
if (workingTuples.size() == maxRows) {
- int reserved = bufferManager.reserveBuffers(1, false);
+ if (source.available() < 1) {
+ break;
+ }
+ int reserved = bufferManager.reserveBuffers(1, false);
if (reserved == 0) {
break;
}
totalReservedBuffers += 1;
- maxRows += bufferManager.getProcessorBatchSize();
+ maxRows += bufferManager.getProcessorBatchSize();
}
try {
- List<?> tuple = sourceID.nextTuple();
+ if (totalReservedBuffers > 0 && source.available() == 0) {
+ break;
+ }
+ List<?> tuple = source.nextTuple();
if (tuple == null) {
doneReading = true;
@@ -293,6 +301,7 @@
// Close sorted source (all others have been removed)
if (doneReading) {
activeTupleBuffers.get(0).close();
+ activeTupleBuffers.get(0).setForwardOnly(false);
if (this.output != null) {
this.output.close();
}
@@ -302,6 +311,7 @@
Assertion.assertTrue(mode == Mode.DUP_REMOVE);
if (this.output == null) {
this.output = activeTupleBuffers.get(0);
+ this.output.setForwardOnly(false);
}
this.phase = INITIAL_SORT;
}
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java 2010-01-22
22:49:05 UTC (rev 1776)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -75,7 +75,7 @@
@Override
public InputStream getInputStream() throws IOException {
//TODO: adjust the buffer size, and/or develop a shared buffer strategy
- return new BufferedInputStream(lobBuffer.createInputStream());
+ return new BufferedInputStream(lobBuffer.createInputStream(0));
}
@Override
Modified:
trunk/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/main/java/com/metamatrix/query/tempdata/TempTableStoreImpl.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -129,6 +129,11 @@
public void closeSource() throws MetaMatrixComponentException {
}
+
+ @Override
+ public int available() {
+ return 0;
+ }
}
private BufferManager buffer;
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/datamgr/impl/ConnectorWorkItem.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -84,7 +84,7 @@
private ICommand translatedCommand;
private Class<?>[] schema;
private List<Integer> convertToRuntimeType;
- private List<Integer> convertToDesiredRuntimeType;
+ private boolean[] convertToDesiredRuntimeType;
/* End state information */
protected boolean lastBatch;
@@ -309,12 +309,12 @@
Command command = this.requestMsg.getCommand();
List<SingleElementSymbol> symbols =
this.requestMsg.getCommand().getProjectedSymbols();
this.schema = new Class[symbols.size()];
- this.convertToDesiredRuntimeType = new ArrayList<Integer>(symbols.size());
+ this.convertToDesiredRuntimeType = new boolean[symbols.size()];
this.convertToRuntimeType = new ArrayList<Integer>(symbols.size());
for (int i = 0; i < schema.length; i++) {
SingleElementSymbol symbol = symbols.get(i);
this.schema[i] = symbol.getType();
- this.convertToDesiredRuntimeType.add(i);
+ this.convertToDesiredRuntimeType[i] = true;
this.convertToRuntimeType.add(i);
}
@@ -469,20 +469,23 @@
}
}
//TODO: add a proper intermediate schema
- for (int i = convertToDesiredRuntimeType.size() - 1; i >= 0; i--) {
- int index = convertToDesiredRuntimeType.get(i);
- Object value = row.get(index);
- if (value != null) {
- Object result;
- try {
- result = DataTypeManager.transformValue(value, value.getClass(),
this.schema[index]);
- } catch (TransformationException e) {
- throw new ConnectorException(e);
+ for (int i = 0; i < row.size(); i++) {
+ if (convertToDesiredRuntimeType[i]) {
+ Object value = row.get(i);
+ if (value != null) {
+ Object result;
+ try {
+ result = DataTypeManager.transformValue(value, value.getClass(), this.schema[i]);
+ } catch (TransformationException e) {
+ throw new ConnectorException(e);
+ }
+ if (value == result) {
+ convertToDesiredRuntimeType[i] = false;
+ }
+ row.set(i, result);
}
- if (value == result) {
- convertToDesiredRuntimeType.remove(i);
- }
- row.set(index, result);
+ } else {
+ row.set(i, DataTypeManager.getCanonicalValue(row.get(i)));
}
}
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -225,4 +225,17 @@
return this.isTransactional;
}
+ @Override
+ public int available() {
+ if (index < currentBatchCount) {
+ return currentBatchCount - index;
+ }
+ synchronized (this) {
+ if (nextBatch != null) {
+ return nextBatch.length;
+ }
+ }
+ return 0;
+ }
+
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedStatementRequest.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -100,15 +100,15 @@
return;
}
List values = requestMsg.getParameterValues();
- List spParams = proc.getParameters();
+ List<SPParameter> spParams = proc.getParameters();
proc.clearParameters();
int inParameterCount = values.size();
if (this.requestMsg.isPreparedBatchUpdate() && values.size() > 0) {
inParameterCount = ((List)values.get(0)).size();
}
int index = 1;
- for (Iterator params = spParams.iterator(); params.hasNext();) {
- SPParameter param = (SPParameter) params.next();
+ for (Iterator<SPParameter> params = spParams.iterator(); params.hasNext();) {
+ SPParameter param = params.next();
if (param.getParameterType() == SPParameter.RETURN_VALUE) {
continue;
}
@@ -147,13 +147,13 @@
if (!this.addedLimit) { //TODO: this is a little problematic
prepPlan.setCommand(this.userCommand);
// Defect 13751: Clone the plan in its current state (i.e. before processing)
so that it can be used for later queries
- prepPlan.setPlan((ProcessorPlan)processPlan.clone());
+ prepPlan.setPlan(processPlan.clone());
prepPlan.setAnalysisRecord(analysisRecord);
this.prepPlanCache.putPreparedPlan(id,
this.context.isSessionFunctionEvaluated(), prepPlan);
}
} else {
LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Query exist in
cache: ", sqlQuery }); //$NON-NLS-1$
- processPlan = (ProcessorPlan)cachedPlan.clone();
+ processPlan = cachedPlan.clone();
//already in cache. obtain the values from cache
analysisRecord = prepPlan.getAnalysisRecord();
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -22,6 +22,7 @@
package org.teiid.dqp.internal.process;
+import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -107,6 +108,7 @@
private AnalysisRecord analysisRecord;
private TransactionContext transactionContext;
protected TupleBuffer resultsBuffer;
+ private TupleBatch savedBatch;
private Collection schemas; // These are schemas associated with XML results
private boolean returnsUpdateCount;
@@ -346,6 +348,9 @@
}
});
resultsBuffer = collector.getTupleBuffer();
+ if (requestMsg.getCursorType() == ResultSet.TYPE_FORWARD_ONLY) {
+ resultsBuffer.setForwardOnly(true);
+ }
analysisRecord = request.analysisRecord;
schemas = request.schemas;
transactionContext = request.transactionContext;
@@ -383,11 +388,18 @@
LogManager.logDetail(LogConstants.CTX_DQP, "[RequestWorkItem.sendResultsIfNeeded]
requestID: " + requestID + " resultsID: " + this.resultsBuffer + "
done: " + doneProducingBatches ); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
- if (batch == null || batch.getBeginRow() > this.begin) {
- batch = resultsBuffer.getBatch(begin);
+ if (batch == null || !(batch.getBeginRow() <= this.begin &&
batch.getEndRow() >= this.begin)) {
+ if (savedBatch != null && savedBatch.getBeginRow() <= this.begin
&& savedBatch.getEndRow() >= this.begin) {
+ batch = savedBatch;
+ } else {
+ batch = resultsBuffer.getBatch(begin);
+ }
//TODO: support fetching more than 1 batch
int count = this.end - this.begin + 1;
if (batch.getRowCount() > count) {
+ if (requestMsg.getCursorType() == ResultSet.TYPE_FORWARD_ONLY) {
+ savedBatch = batch;
+ }
int beginRow = Math.min(this.begin, batch.getEndRow() - count + 1);
int endRow = Math.min(beginRow + count - 1, batch.getEndRow());
int firstOffset = beginRow - batch.getBeginRow();
Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/test/java/com/metamatrix/query/processor/FakeTupleSource.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -120,4 +120,9 @@
this.exceptionOnClose = exceptionOnClose;
}
+ @Override
+ public int available() {
+ return 0;
+ }
+
}
Modified:
trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -492,8 +492,8 @@
private BufferManager createCustomBufferMgr(int batchSize) throws
MetaMatrixComponentException {
BufferManagerImpl bufferMgr = new BufferManagerImpl();
Properties props = new Properties();
- props.setProperty(BufferManager.PROCESSOR_BATCH_SIZE,
String.valueOf(batchSize));
- props.setProperty(BufferManager.CONNECTOR_BATCH_SIZE,
String.valueOf(batchSize));
+ bufferMgr.setConnectorBatchSize(batchSize);
+ bufferMgr.setProcessorBatchSize(batchSize);
bufferMgr.initialize(props);
// Add unmanaged memory storage manager
Modified:
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -37,24 +37,23 @@
public class NodeTestUtil {
static BufferManager getTestBufferManager(long bytesAvailable, int procBatchSize, int
connectorBatchSize) {
-
+ BufferManagerImpl bufferManager = new BufferManagerImpl();
+ bufferManager.setProcessorBatchSize(procBatchSize);
+ bufferManager.setConnectorBatchSize(connectorBatchSize);
// Get the properties for BufferManager
Properties bmProps = new Properties();
- bmProps.setProperty(BufferManager.PROCESSOR_BATCH_SIZE, "" +
procBatchSize); //$NON-NLS-1$
- bmProps.setProperty(BufferManager.CONNECTOR_BATCH_SIZE, "" +
connectorBatchSize); //$NON-NLS-1$
- return createBufferManager(bmProps);
+ return createBufferManager(bufferManager, bmProps);
}
static BufferManager getTestBufferManager(long bytesAvailable, int procBatchSize) {
-
+ BufferManagerImpl bufferManager = new BufferManagerImpl();
+ bufferManager.setProcessorBatchSize(procBatchSize);
// Get the properties for BufferManager
Properties bmProps = new Properties();
- bmProps.setProperty(BufferManager.PROCESSOR_BATCH_SIZE, "" +
procBatchSize); //$NON-NLS-1$
- return createBufferManager(bmProps);
+ return createBufferManager(bufferManager, bmProps);
}
- static BufferManager createBufferManager(Properties bmProps) {
- BufferManagerImpl bufferManager = new BufferManagerImpl();
+ static BufferManager createBufferManager(BufferManagerImpl bufferManager, Properties
bmProps) {
try {
bufferManager.initialize(bmProps);
} catch (MetaMatrixComponentException e) {
Modified:
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestBatchedUpdateNode.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -242,6 +242,11 @@
}
return null;
}
+
+ @Override
+ public int available() {
+ return 0;
+ }
}
}
Modified:
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestProjectIntoNode.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -217,5 +217,9 @@
? null
: Arrays.asList(new Object[] {new Integer(currentRow),
Integer.toString(currentRow)});
}
+ @Override
+ public int available() {
+ return 0;
+ }
}
}
Modified:
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java
===================================================================
---
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java 2010-01-22
22:49:05 UTC (rev 1776)
+++
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -34,7 +34,6 @@
import com.metamatrix.common.application.exception.ApplicationInitializationException;
import com.metamatrix.common.application.exception.ApplicationLifecycleException;
import com.metamatrix.common.buffer.BufferManager;
-import com.metamatrix.common.buffer.StorageManager;
import com.metamatrix.common.buffer.impl.BufferManagerImpl;
import com.metamatrix.common.buffer.impl.FileStorageManager;
import com.metamatrix.common.buffer.impl.MemoryStorageManager;
@@ -92,22 +91,21 @@
String connectorBatchSize = configurationSvc.getConnectorBatchSize();
// Set up buffer configuration properties
- Properties bufferProps = new Properties();
- bufferProps.setProperty(BufferManager.BUFFER_STORAGE_DIRECTORY,
bufferDir.getCanonicalPath());
- bufferProps.setProperty(BufferManager.PROCESSOR_BATCH_SIZE,
processorBatchSize);
- bufferProps.setProperty(BufferManager.CONNECTOR_BATCH_SIZE,
connectorBatchSize);
+ Properties bufferProps = configurationSvc.getSystemProperties();
// Construct and initialize the buffer manager
this.bufferMgr = new BufferManagerImpl();
+ this.bufferMgr.setConnectorBatchSize(Integer.valueOf(connectorBatchSize));
+ this.bufferMgr.setProcessorBatchSize(Integer.valueOf(processorBatchSize));
+
this.bufferMgr.initialize(bufferProps);
// If necessary, add disk storage manager
if(useDisk) {
// Get the properties for FileStorageManager and create.
- Properties fsmProps = new Properties();
- fsmProps.setProperty(BufferManager.BUFFER_STORAGE_DIRECTORY,
bufferDir.getCanonicalPath());
- StorageManager fsm = new FileStorageManager();
- fsm.initialize(fsmProps);
+ FileStorageManager fsm = new FileStorageManager();
+ fsm.setStorageDirectory(bufferDir.getCanonicalPath());
+ fsm.initialize(bufferProps);
this.bufferMgr.setStorageManager(fsm);
// start the file storage manager in clean state
Modified: trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java 2010-01-22
22:49:05 UTC (rev 1776)
+++ trunk/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java 2010-01-25
03:26:22 UTC (rev 1777)
@@ -200,7 +200,6 @@
dqp.begin();
fail("exception expected"); //$NON-NLS-1$
} catch (XATransactionException e) {
- e.printStackTrace();
assertEquals("Component not found: com.metamatrix.dqp.client.ClientSideDQP",
e.getMessage()); //$NON-NLS-1$
}
}