[teiid-commits] teiid SVN: r3465 - in trunk: engine/src/main/java/org/teiid/common/buffer and 14 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Mon Sep 12 14:23:58 EDT 2011
Author: shawkins
Date: 2011-09-12 14:23:57 -0400 (Mon, 12 Sep 2011)
New Revision: 3465
Added:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java
Modified:
trunk/api/src/main/java/org/teiid/CommandContext.java
trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/NullNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java
trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
Log:
TEIID-1750 improvement to batch sizing
Modified: trunk/api/src/main/java/org/teiid/CommandContext.java
===================================================================
--- trunk/api/src/main/java/org/teiid/CommandContext.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/api/src/main/java/org/teiid/CommandContext.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -87,7 +87,8 @@
/**
* Get the processor batch size set on the BufferManager
- * @return
+ * @return - the nominal batch size target. actual batch sizes will vary based
+ * upon the column types
*/
int getProcessorBatchSize();
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -74,6 +74,12 @@
* Get the batch size to use during query processing.
* @return Batch size (# of rows)
*/
+ int getProcessorBatchSize(List<? extends Expression> schema);
+
+ /**
+ * Get the nominal batch size target
+ * @return
+ */
int getProcessorBatchSize();
/**
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -215,7 +215,7 @@
protected abstract void writeDirect(byte[] bytes, int offset, int length) throws TeiidComponentException;
- public void remove() {
+ public synchronized void remove() {
if (!this.removed) {
this.removed = true;
this.removeDirect();
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -102,9 +102,9 @@
this.stree = stree;
this.id = counter.getAndIncrement();
stree.pages.put(this.id, this);
- this.values = new TupleBatch(0, new ArrayList(stree.pageSize/4));
+ this.values = new TupleBatch(0, new ArrayList(stree.getPageSize(leaf)/4));
if (!leaf) {
- children = new ArrayList<SPage>(stree.pageSize/4);
+ children = new ArrayList<SPage>(stree.getPageSize(false)/4);
}
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/STree.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/STree.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -62,7 +62,8 @@
protected BatchManager keyManager;
protected BatchManager leafManager;
protected ListNestedSortComparator comparator;
- protected int pageSize;
+ private int pageSize;
+ protected int leafSize;
protected int keyLength;
protected String[] types;
protected String[] keytypes;
@@ -76,6 +77,7 @@
BatchManager leafManager,
final ListNestedSortComparator comparator,
int pageSize,
+ int leafSize,
int keyLength,
String[] types) {
randomSeed = seedGenerator.nextInt() | 0x00000100; // ensure nonzero
@@ -90,6 +92,7 @@
mask <<= 1;
mask++;
}
+ this.leafSize = leafSize;
this.keyLength = keyLength;
this.types = types;
this.keytypes = Arrays.copyOf(types, keyLength);
@@ -276,10 +279,10 @@
} else {
level = randomLevel();
}
- } else if (!places.isEmpty() && places.getLast().values.getTuples().size() == pageSize) {
+ } else if (!places.isEmpty() && places.getLast().values.getTuples().size() == getPageSize(true)) {
int row = rowCount.get();
- while (row != 0 && row%pageSize == 0) {
- row = (row - pageSize + 1)/pageSize;
+ while (row != 0 && row%getPageSize(true) == 0) {
+ row = (row - getPageSize(true) + 1)/getPageSize(true);
level++;
}
}
@@ -312,9 +315,9 @@
return 0;
}
int logSize = 1;
- while (sizeHint > this.pageSize) {
+ while (sizeHint > this.getPageSize(logSize==0)) {
logSize++;
- sizeHint/=this.pageSize;
+ sizeHint/=this.getPageSize(logSize==0);
}
return logSize;
}
@@ -329,8 +332,8 @@
SPage insert(List k, SearchResult result, SearchResult parent, Object value, boolean ordered) throws TeiidComponentException {
SPage page = result.page;
int index = -result.index - 1;
- if (result.values.getTuples().size() == pageSize) {
- boolean leaf = !(value instanceof SPage);
+ boolean leaf = !(value instanceof SPage);
+ if (result.values.getTuples().size() == getPageSize(leaf)) {
SPage nextPage = new SPage(this, leaf);
TupleBatch nextValues = nextPage.getValues();
nextPage.next = page.next;
@@ -342,17 +345,17 @@
boolean inNext = false;
if (!ordered) {
//split the values
- nextValues.getTuples().addAll(result.values.getTuples().subList(pageSize/2, pageSize));
- result.values.getTuples().subList(pageSize/2, pageSize).clear();
+ nextValues.getTuples().addAll(result.values.getTuples().subList(getPageSize(leaf)/2, getPageSize(leaf)));
+ result.values.getTuples().subList(getPageSize(leaf)/2, getPageSize(leaf)).clear();
if (!leaf) {
- nextPage.children.addAll(page.children.subList(pageSize/2, pageSize));
- page.children.subList(pageSize/2, pageSize).clear();
+ nextPage.children.addAll(page.children.subList(getPageSize(leaf)/2, getPageSize(false)));
+ page.children.subList(getPageSize(false)/2, getPageSize(false)).clear();
}
- if (index <= pageSize/2) {
+ if (index <= getPageSize(leaf)/2) {
setValue(index, k, value, result.values, page);
} else {
inNext = true;
- setValue(index - pageSize/2, k, value, nextValues, nextPage);
+ setValue(index - getPageSize(leaf)/2, k, value, nextValues, nextPage);
}
page.setValues(result.values);
if (parent != null) {
@@ -396,7 +399,9 @@
continue;
}
searchResult.values.getTuples().remove(searchResult.index);
+ boolean leaf = true;
if (searchResult.page.children != null) {
+ leaf = false;
searchResult.page.children.remove(searchResult.index);
}
int size = searchResult.values.getTuples().size();
@@ -423,18 +428,18 @@
header[0] = new SPage(this, true);
}
continue;
- } else if (size < pageSize/2) {
+ } else if (size < getPageSize(leaf)/2) {
//check for merge
if (searchResult.page.next != null) {
TupleBatch nextValues = searchResult.page.next.getValues();
- if (nextValues.getTuples().size() < pageSize/4) {
+ if (nextValues.getTuples().size() < getPageSize(leaf)/4) {
SPage.merge(places, nextValues, searchResult.page, searchResult.values);
continue;
}
}
if (searchResult.page.prev != null) {
TupleBatch prevValues = searchResult.page.prev.getValues();
- if (prevValues.getTuples().size() < pageSize/4) {
+ if (prevValues.getTuples().size() < getPageSize(leaf)/4) {
SPage.merge(places, searchResult.values, searchResult.page.prev, prevValues);
continue;
}
@@ -537,4 +542,11 @@
}
}
+ public int getPageSize(boolean leaf) {
+ if (leaf) {
+ return leafSize;
+ }
+ return pageSize;
+ }
+
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -83,12 +83,12 @@
*
* TODO: allow for cached stores to use lru - (result set/mat view)
* TODO: account for row/content based sizing (difficult given value sharing)
- * TODO: account for memory based lobs (it would be nice if the approximate buffer size matched at 100kB)
* TODO: add detection of pinned batches to prevent unnecessary purging of non-persistent batches
* - this is not necessary for already persistent batches, since we hold a weak reference
*/
public class BufferManagerImpl implements BufferManager, StorageManager {
+ private static final int TARGET_BYTES_PER_ROW = 1 << 11; //2k bytes per row
private static final int IO_BUFFER_SIZE = 1 << 14;
private static final int COMPACTION_THRESHOLD = 1 << 25; //start checking at 32 megs
@@ -233,7 +233,6 @@
private boolean softCache;
private volatile TupleBatch activeBatch;
private volatile Reference<TupleBatch> batchReference;
- private int beginRow;
private WeakReference<BatchManagerImpl> managerRef;
private Long id;
private LobManager lobManager;
@@ -243,7 +242,6 @@
this.softCache = softCache;
id = batchAdded.incrementAndGet();
this.activeBatch = batch;
- this.beginRow = batch.getBeginRow();
this.managerRef = ref;
BatchManagerImpl batchManager = ref.get();
if (batchManager.lobIndexes != null) {
@@ -355,9 +353,9 @@
Assertion.isNotNull(info, "Invalid batch " + id); //$NON-NLS-1$
ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(batchManager.store.createInputStream(info[0]), IO_BUFFER_SIZE));
batch = new TupleBatch();
+ batch.setRowOffset(ois.readInt());
batch.setDataTypes(types);
batch.readExternal(ois);
- batch.setRowOffset(this.beginRow);
batch.setDataTypes(null);
if (lobManager != null) {
for (List<?> tuple : batch.getTuples()) {
@@ -404,6 +402,7 @@
offset = batchManager.getOffset();
OutputStream fsos = new BufferedOutputStream(batchManager.store.createOutputStream(), IO_BUFFER_SIZE);
ObjectOutputStream oos = new ObjectOutputStream(fsos);
+ oos.writeInt(batch.getBeginRow());
batch.writeExternal(oos);
oos.close();
long size = batchManager.store.getLength() - offset;
@@ -475,6 +474,7 @@
private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a hint to set the reserveBatchKB
private boolean useWeakReferences = true;
private boolean inlineLobs = true;
+ private int targetBytesPerRow = TARGET_BYTES_PER_ROW;
private ReentrantLock lock = new ReentrantLock(true);
private Condition batchesFreed = lock.newCondition();
@@ -535,6 +535,10 @@
public void setConnectorBatchSize(int connectorBatchSize) {
this.connectorBatchSize = connectorBatchSize;
}
+
+ public void setTargetBytesPerRow(int targetBytesPerRow) {
+ this.targetBytesPerRow = targetBytesPerRow;
+ }
public void setProcessorBatchSize(int processorBatchSize) {
this.processorBatchSize = processorBatchSize;
@@ -560,7 +564,7 @@
final String newID = String.valueOf(this.tsId.getAndIncrement());
int[] lobIndexes = LobManager.getLobIndexes(elements);
BatchManager batchManager = new BatchManagerImpl(newID, lobIndexes);
- TupleBuffer tupleBuffer = new TupleBuffer(batchManager, newID, elements, lobIndexes, getProcessorBatchSize());
+ TupleBuffer tupleBuffer = new TupleBuffer(batchManager, newID, elements, lobIndexes, getProcessorBatchSize(elements));
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating TupleBuffer:", newID, elements, Arrays.toString(tupleBuffer.getTypes()), "of type", tupleSourceType); //$NON-NLS-1$ //$NON-NLS-2$
}
@@ -595,7 +599,7 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating STree:", newID); //$NON-NLS-1$
}
- return new STree(keyManager, bm, new ListNestedSortComparator(compareIndexes), getProcessorBatchSize(), keyLength, TupleBuffer.getTypeNames(elements));
+ return new STree(keyManager, bm, new ListNestedSortComparator(compareIndexes), getProcessorBatchSize(elements), getProcessorBatchSize(elements.subList(0, keyLength)), keyLength, TupleBuffer.getTypeNames(elements));
}
@Override
@@ -732,7 +736,11 @@
}
@Override
- public int getSchemaSize(List<? extends Expression> elements) {
+ public int getProcessorBatchSize(List<? extends Expression> schema) {
+ return getSizeEstimates(schema)[0];
+ }
+
+ private int[] getSizeEstimates(List<? extends Expression> elements) {
int total = 0;
boolean isValueCacheEnabled = DataTypeManager.isValueCacheEnabled();
//we make a assumption that the average column size under 64bits is approximately 128bytes
@@ -741,10 +749,40 @@
Class<?> type = element.getType();
total += SizeUtility.getSize(isValueCacheEnabled, type);
}
+ //assume 64-bit
total += 8*elements.size() + 36; // column list / row overhead
- total *= processorBatchSize;
- return Math.max(1, total / 1024);
+
+ //nominal targetBytesPerRow but can scale up or down
+
+ int totalCopy = total;
+ boolean less = totalCopy < targetBytesPerRow;
+ int rowCount = processorBatchSize;
+
+ for (int i = 0; i < 2; i++) {
+ if (less) {
+ totalCopy <<= 2;
+ } else {
+ totalCopy >>= 2;
+ }
+ if (less && totalCopy > targetBytesPerRow
+ || !less && totalCopy < targetBytesPerRow) {
+ break;
+ }
+ if (less) {
+ rowCount <<= 1;
+ } else {
+ rowCount >>= 1;
+ }
+ }
+ rowCount = Math.max(1, rowCount);
+ total *= rowCount;
+ return new int[]{rowCount, Math.max(1, total / 1024)};
}
+
+ @Override
+ public int getSchemaSize(List<? extends Expression> elements) {
+ return getSizeEstimates(elements)[1];
+ }
public void shutdown() {
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -29,13 +29,11 @@
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.teiid.common.buffer.FileStore;
import org.teiid.common.buffer.StorageManager;
import org.teiid.core.TeiidComponentException;
-import org.teiid.core.util.Assertion;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
import org.teiid.query.QueryPlugin;
@@ -47,7 +45,6 @@
public class FileStorageManager implements StorageManager {
public static final int DEFAULT_MAX_OPEN_FILES = 64;
- public static final long DEFAULT_MAX_FILESIZE = 2L * 1024L;
public static final long DEFAULT_MAX_BUFFERSPACE = 50L * 1024L * 1024L * 1024L;
private static final String FILE_PREFIX = "b_"; //$NON-NLS-1$
@@ -97,7 +94,7 @@
public class DiskStore extends FileStore {
private String name;
- private TreeMap<Long, FileInfo> storageFiles = new TreeMap<Long, FileInfo>();
+ private FileInfo fileInfo;
public DiskStore(String name) {
this.name = name;
@@ -106,20 +103,15 @@
/**
* Concurrent reads are possible, but only after writing is complete.
*/
- public int readDirect(long fileOffset, byte[] b, int offSet, int length) throws TeiidComponentException {
- Map.Entry<Long, FileInfo> entry = storageFiles.floorEntry(fileOffset);
- Assertion.isNotNull(entry);
- FileInfo fileInfo = entry.getValue();
- synchronized (fileInfo) {
- try {
- RandomAccessFile fileAccess = fileInfo.open();
- fileAccess.seek(fileOffset - entry.getKey());
- return fileAccess.read(b, offSet, length);
- } catch (IOException e) {
- throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
- } finally {
- fileInfo.close();
- }
+ public synchronized int readDirect(long fileOffset, byte[] b, int offSet, int length) throws TeiidComponentException {
+ try {
+ RandomAccessFile fileAccess = fileInfo.open();
+ fileAccess.seek(fileOffset);
+ return fileAccess.read(b, offSet, length);
+ } catch (IOException e) {
+ throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
+ } finally {
+ fileInfo.close();
}
}
@@ -132,26 +124,14 @@
usedBufferSpace.addAndGet(-length);
throw new TeiidComponentException(QueryPlugin.Util.getString("FileStoreageManager.space_exhausted", maxBufferSpace)); //$NON-NLS-1$
}
- Map.Entry<Long, FileInfo> entry = this.storageFiles.lastEntry();
- boolean createNew = false;
- FileInfo fileInfo = null;
long fileOffset = 0;
- if (entry == null) {
- createNew = true;
- } else {
- fileInfo = entry.getValue();
- fileOffset = entry.getKey();
- createNew = entry.getValue().file.length() + length > getMaxFileSize();
- }
- if (createNew) {
- FileInfo newFileInfo = new FileInfo(createFile(name, storageFiles.size()));
+ if (fileInfo == null) {
+ fileInfo = new FileInfo(createFile(name));
if (fileInfo != null) {
fileOffset += fileInfo.file.length();
}
- storageFiles.put(fileOffset, newFileInfo);
- fileInfo = newFileInfo;
}
- synchronized (fileInfo) {
+ synchronized (this) {
try {
RandomAccessFile fileAccess = fileInfo.open();
long pointer = fileAccess.length();
@@ -168,16 +148,13 @@
public synchronized void removeDirect() {
usedBufferSpace.addAndGet(-len);
- for (FileInfo info : storageFiles.values()) {
- info.delete();
- }
+ fileInfo.delete();
}
}
// Initialization
private int maxOpenFiles = DEFAULT_MAX_OPEN_FILES;
- private long maxFileSize = DEFAULT_MAX_FILESIZE * 1024L * 1024L; // 2GB
private String directory;
private File dirFile;
@@ -216,14 +193,6 @@
}
}
- public void setMaxFileSize(long maxFileSize) {
- this.maxFileSize = maxFileSize * 1024L * 1024L;
- }
-
- void setMaxFileSizeDirect(long maxFileSize) {
- this.maxFileSize = maxFileSize;
- }
-
public void setMaxOpenFiles(int maxOpenFiles) {
this.maxOpenFiles = maxOpenFiles;
}
@@ -232,15 +201,15 @@
this.directory = directory;
}
- File createFile(String name, int fileNumber) throws TeiidComponentException {
+ File createFile(String name) throws TeiidComponentException {
try {
- File storageFile = File.createTempFile(FILE_PREFIX + name + "_" + String.valueOf(fileNumber) + "_", null, this.dirFile); //$NON-NLS-1$ //$NON-NLS-2$
+ File storageFile = File.createTempFile(FILE_PREFIX + name + "_", null, this.dirFile); //$NON-NLS-1$
if (LogManager.isMessageToBeRecorded(org.teiid.logging.LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(org.teiid.logging.LogConstants.CTX_BUFFER_MGR, "Created temporary storage area file " + storageFile.getAbsoluteFile()); //$NON-NLS-1$
}
return storageFile;
} catch(IOException e) {
- throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_creating", name + "_" + fileNumber)); //$NON-NLS-1$ //$NON-NLS-2$
+ throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_creating", name)); //$NON-NLS-1$
}
}
@@ -248,10 +217,6 @@
return new DiskStore(name);
}
- public long getMaxFileSize() {
- return maxFileSize;
- }
-
public String getDirectory() {
return directory;
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -25,6 +25,8 @@
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.core.types.DataTypeManager;
@@ -40,6 +42,27 @@
public final class SizeUtility {
public static final int REFERENCE_SIZE = 8;
+ private static Map<Class<?>, int[]> SIZE_ESTIMATES = new HashMap<Class<?>, int[]>(128);
+
+ static {
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.STRING, new int[] {100, 256});
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.DATE, new int[] {20, 28});
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.TIME, new int[] {20, 28});
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.TIMESTAMP, new int[] {20, 28});
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.LONG, new int[] {12, 16});
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.DOUBLE, new int[] {12, 16});
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.INTEGER, new int[] {6, 12});
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.FLOAT, new int[] {6, 12});
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.CHAR, new int[] {4, 10});
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.SHORT, new int[] {4, 10});
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.OBJECT, new int[] {1024, 1024});
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.NULL, new int[] {0, 0});
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.BYTE, new int[] {1, 1});
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.BOOLEAN, new int[] {1, 1});
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.BIG_INTEGER, new int[] {75, 100});
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.BIG_DECIMAL, new int[] {150, 200});
+ }
+
private long bigIntegerEstimate;
private long bigDecimalEstimate;
@@ -85,35 +108,12 @@
static int getSize(boolean isValueCacheEnabled,
Class<?> type) {
- if (type == DataTypeManager.DefaultDataClasses.STRING) {
- return isValueCacheEnabled?100:256; //assumes an "average" string length of approximately 100 chars
- } else if (type == DataTypeManager.DefaultDataClasses.DATE
- || type == DataTypeManager.DefaultDataClasses.TIME
- || type == DataTypeManager.DefaultDataClasses.TIMESTAMP) {
- return isValueCacheEnabled?20:28;
- } else if (type == DataTypeManager.DefaultDataClasses.LONG
- || type == DataTypeManager.DefaultDataClasses.DOUBLE) {
- return isValueCacheEnabled?12:16;
- } else if (type == DataTypeManager.DefaultDataClasses.INTEGER
- || type == DataTypeManager.DefaultDataClasses.FLOAT) {
- return isValueCacheEnabled?6:12;
- } else if (type == DataTypeManager.DefaultDataClasses.CHAR
- || type == DataTypeManager.DefaultDataClasses.SHORT) {
- return isValueCacheEnabled?4:10;
- } else if (type == DataTypeManager.DefaultDataClasses.OBJECT) {
- return 1024;
- } else if (type == DataTypeManager.DefaultDataClasses.NULL) {
- return 0; //it's free
- } else if (type == DataTypeManager.DefaultDataClasses.BYTE
- || type == DataTypeManager.DefaultDataClasses.BOOLEAN) {
- return 1; //should always be value cached, but there's a small chance it's not
- } else if (type == DataTypeManager.DefaultDataClasses.BIG_INTEGER){
- return isValueCacheEnabled?75:100;
- } else if (type == DataTypeManager.DefaultDataClasses.BIG_DECIMAL) {
- return isValueCacheEnabled?150:200;
- }
- return 512; //this is is misleading for lobs
- //most references are not actually removed from memory
+ int[] vals = SIZE_ESTIMATES.get(type);
+ if (vals == null) {
+ return 512; //this is is misleading for lobs
+ //most references are not actually removed from memory
+ }
+ return vals[isValueCacheEnabled?0:1];
}
/**
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -0,0 +1,113 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.teiid.common.buffer.FileStore;
+import org.teiid.common.buffer.StorageManager;
+import org.teiid.core.TeiidComponentException;
+
+public class SplittableStorageManager implements StorageManager {
+
+ public static final long DEFAULT_MAX_FILESIZE = 2L * 1024L;
+ private long maxFileSize = DEFAULT_MAX_FILESIZE * 1024L * 1024L; // 2GB
+ private StorageManager storageManager;
+
+ public SplittableStorageManager(StorageManager storageManager) {
+ this.storageManager = storageManager;
+ }
+
+ @Override
+ public FileStore createFileStore(String name) {
+ return new SplittableFileStore(name);
+ }
+
+ @Override
+ public void initialize() throws TeiidComponentException {
+ storageManager.initialize();
+ }
+
+ public class SplittableFileStore extends FileStore {
+ private String name;
+ private ConcurrentSkipListMap<Long, FileStore> storageFiles = new ConcurrentSkipListMap<Long, FileStore>();
+
+ public SplittableFileStore(String name) {
+ this.name = name;
+ }
+
+ public int readDirect(long fileOffset, byte[] b, int offSet, int length) throws TeiidComponentException {
+ Map.Entry<Long, FileStore> entry = storageFiles.floorEntry(fileOffset);
+ FileStore fileInfo = entry.getValue();
+ return fileInfo.read(fileOffset - entry.getKey(), b, offSet, length);
+ }
+
+ public void writeDirect(byte[] bytes, int offset, int length) throws TeiidComponentException {
+ Map.Entry<Long, FileStore> entry = this.storageFiles.lastEntry();
+ boolean createNew = false;
+ FileStore fileInfo = null;
+ long fileOffset = 0;
+ if (entry == null) {
+ createNew = true;
+ } else {
+ fileInfo = entry.getValue();
+ fileOffset = entry.getKey();
+ createNew = entry.getValue().getLength() + length > getMaxFileSize();
+ }
+ if (createNew) {
+ FileStore newFileInfo = storageManager.createFileStore(name + "_" + storageFiles.size()); //$NON-NLS-1$
+ if (fileInfo != null) {
+ fileOffset += fileInfo.getLength();
+ }
+ storageFiles.put(fileOffset, newFileInfo);
+ fileInfo = newFileInfo;
+ }
+ fileInfo.write(bytes, offset, length);
+ }
+
+ public void removeDirect() {
+ for (FileStore info : storageFiles.values()) {
+ info.remove();
+ }
+ }
+
+ }
+
+ public long getMaxFileSize() {
+ return maxFileSize;
+ }
+
+ public void setMaxFileSize(long maxFileSize) {
+ this.maxFileSize = maxFileSize * 1024L * 1024L;
+ }
+
+ void setMaxFileSizeDirect(long maxFileSize) {
+ this.maxFileSize = maxFileSize;
+ }
+
+ public StorageManager getStorageManager() {
+ return storageManager;
+ }
+
+}
Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/Request.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -233,7 +233,6 @@
props,
this.requestMsg.getShowPlan() != ShowPlan.OFF);
this.context.setProcessorBatchSize(bufferManager.getProcessorBatchSize());
- this.context.setConnectorBatchSize(bufferManager.getConnectorBatchSize());
this.context.setGlobalTableStore(this.globalTables);
if (multiSourceModels != null) {
MultiSourcePlanToProcessConverter modifier = new MultiSourcePlanToProcessConverter(
Modified: trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -163,7 +163,7 @@
*/
public void initialize(CommandContext context, ProcessorDataManager dataMgr, BufferManager bufferMgr) {
this.bufferMgr = bufferMgr;
- this.batchSize = bufferMgr.getProcessorBatchSize();
+ this.batchSize = bufferMgr.getProcessorBatchSize(getOutputElements());
setContext(context.clone());
this.dataMgr = new ProcessorDataManager() {
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/NullNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/NullNode.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/NullNode.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -22,6 +22,9 @@
package org.teiid.query.processor.relational;
+import java.util.Collections;
+import java.util.List;
+
import org.teiid.common.buffer.TupleBatch;
import org.teiid.core.TeiidComponentException;
@@ -38,11 +41,12 @@
this.terminateBatches();
return pullBatch();
}
+
+ @Override
+ public List getOutputElements() {
+ return Collections.emptyList();
+ }
- protected void getNodeString(StringBuffer str) {
- super.getNodeString(str);
- }
-
public Object clone(){
NullNode clonedNode = new NullNode(super.getID());
super.copy(this, clonedNode);
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/RelationalNode.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -114,7 +114,11 @@
this.getProcessingState().nodeStatistics = new RelationalNodeStatistics();
}
- this.getProcessingState().batchSize = bufferManager.getProcessorBatchSize();
+ if (getOutputElements() != null) {
+ this.getProcessingState().batchSize = bufferManager.getProcessorBatchSize(getOutputElements());
+ } else {
+ this.getProcessingState().batchSize = bufferManager.getProcessorBatchSize();
+ }
}
public CommandContext getContext() {
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -94,6 +94,7 @@
private String groupName;
private List<? extends Expression> schema;
private int schemaSize;
+ private int batchSize;
private ListNestedSortComparator comparator;
private TupleBuffer output;
@@ -164,6 +165,7 @@
this.groupName = groupName;
this.schema = schema;
this.schemaSize = bufferManager.getSchemaSize(this.schema);
+ this.batchSize = bufferManager.getProcessorBatchSize(this.schema);
this.comparator = new ListNestedSortComparator(cols, sortTypes);
int distinctIndex = cols.length - 1;
this.comparator.setDistinctIndex(distinctIndex);
@@ -226,7 +228,7 @@
int totalReservedBuffers = 0;
try {
- int maxRows = this.bufferManager.getProcessorBatchSize();
+ int maxRows = this.batchSize;
while(!doneReading) {
//attempt to reserve more working memory if there are additional rows available before blocking
if (workingTuples.size() >= maxRows) {
@@ -236,7 +238,7 @@
break;
}
totalReservedBuffers += reserved;
- maxRows += bufferManager.getProcessorBatchSize();
+ maxRows += this.batchSize;
}
try {
List<?> tuple = source.nextTuple();
@@ -249,7 +251,7 @@
this.collected++;
}
} catch(BlockedException e) {
- if (workingTuples.size() >= bufferManager.getProcessorBatchSize()) {
+ if (workingTuples.size() >= this.batchSize) {
break;
}
if (mode != Mode.DUP_REMOVE
Modified: trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/query/processor/xml/RelationalPlanExecutor.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -144,11 +144,8 @@
if (this.resultInfo.isAutoStaged() && tempTable != null) {
AlterTempTable att = new AlterTempTable(tempTable);
- int size = (Integer)this.currentRow.get(0);
- if (size > this.bufferMgr.getProcessorBatchSize() * 2) {
- //TODO: if the parent is small, then this is not necessary
- att.setIndexColumns(this.resultInfo.getFkColumns());
- }
+ //TODO: if the parent is small, then this is not necessary
+ att.setIndexColumns(this.resultInfo.getFkColumns());
this.dataManager.registerRequest(this.internalProcessor.getContext(), att, TempMetadataAdapter.TEMP_MODEL.getName(), null, 0, -1);
}
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -213,7 +213,7 @@
TempTable tt = contextStore.getTempTable(att.getTempTable().toUpperCase());
Assertion.isNotNull(tt, "Table doesn't exist"); //$NON-NLS-1$
tt.setUpdatable(false);
- if (att.getIndexColumns() != null) {
+ if (att.getIndexColumns() != null && tt.getRowCount() > 2*tt.getTree().getPageSize(true)) {
tt.addIndex(att.getIndexColumns(), false);
}
return CollectionTupleSource.createUpdateCountTupleSource(0);
Modified: trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/main/java/org/teiid/query/util/CommandContext.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
@@ -78,8 +79,6 @@
private int processorBatchSize = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE;
- private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
-
private String userName;
private Serializable commandPayload;
@@ -312,20 +311,15 @@
return this.globalState.collectNodeStatistics;
}
- public int getConnectorBatchSize() {
- return this.globalState.connectorBatchSize;
- }
-
- public void setConnectorBatchSize(int connectorBatchSize) {
- this.globalState.connectorBatchSize = connectorBatchSize;
- }
-
-
+ @Override
public int getProcessorBatchSize() {
return this.globalState.processorBatchSize;
}
-
+ public int getProcessorBatchSize(List<Expression> schema) {
+ return this.globalState.bufferManager.getProcessorBatchSize(schema);
+ }
+
public void setProcessorBatchSize(int processorBatchSize) {
this.globalState.processorBatchSize = processorBatchSize;
}
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -79,7 +79,7 @@
@Test public void testOrderedInsert() throws TeiidComponentException {
BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
- bm.setProcessorBatchSize(16);
+ bm.setProcessorBatchSize(4);
ElementSymbol e1 = new ElementSymbol("x");
e1.setType(Integer.class);
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -27,9 +27,7 @@
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.RandomAccessFile;
import java.util.Arrays;
-import java.util.Map;
import java.util.Random;
import org.junit.Test;
@@ -41,12 +39,9 @@
@SuppressWarnings("nls")
public class TestFileStorageManager {
- public FileStorageManager getStorageManager(Integer maxFileSize, Integer openFiles, String dir) throws TeiidComponentException {
+ public FileStorageManager getStorageManager(Integer openFiles, String dir) throws TeiidComponentException {
FileStorageManager sm = new FileStorageManager();
sm.setStorageDirectory(UnitTestUtil.getTestScratchPath() + (dir != null ? File.separator + dir : "")); //$NON-NLS-1$
- if (maxFileSize != null) {
- sm.setMaxFileSizeDirect(maxFileSize);
- }
if (openFiles != null) {
sm.setMaxOpenFiles(openFiles);
}
@@ -55,7 +50,7 @@
}
@Test public void testWrite() throws Exception {
- FileStorageManager sm = getStorageManager(null, null, null);
+ FileStorageManager sm = getStorageManager(null, null);
String tsID = "0"; //$NON-NLS-1$
FileStore store = sm.createFileStore(tsID);
writeBytes(store);
@@ -64,27 +59,8 @@
assertEquals(0, sm.getUsedBufferSpace());
}
- @Test public void testCreatesSpillFiles() throws Exception {
- FileStorageManager sm = getStorageManager(1024, null, null); // 1KB
- String tsID = "0"; //$NON-NLS-1$
- // Add one batch
- FileStore store = sm.createFileStore(tsID);
- writeBytes(store);
-
- Map<File, RandomAccessFile> cache = sm.getFileCache();
- assertEquals(1, cache.size());
-
- writeBytes(store);
-
- assertEquals(2, cache.size());
-
- store.remove();
-
- assertEquals(0, cache.size());
- }
-
@Test(expected=TeiidComponentException.class) public void testMaxSpace() throws Exception {
- FileStorageManager sm = getStorageManager(null, null, null);
+ FileStorageManager sm = getStorageManager(null, null);
sm.setMaxBufferSpace(1);
String tsID = "0"; //$NON-NLS-1$
// Add one batch
@@ -93,7 +69,7 @@
}
@Test public void testFlush() throws Exception {
- FileStorageManager sm = getStorageManager(null, null, null);
+ FileStorageManager sm = getStorageManager(null, null);
FileStore store = sm.createFileStore("0");
FileStoreOutputStream fsos = store.createOutputStream(2);
fsos.write(new byte[3]);
@@ -104,7 +80,7 @@
static Random r = new Random();
- private void writeBytes(FileStore store)
+ static void writeBytes(FileStore store)
throws TeiidComponentException {
byte[] bytes = new byte[2048];
r.nextBytes(bytes);
@@ -114,9 +90,8 @@
assertTrue(Arrays.equals(bytes, bytesRead));
}
-
@Test public void testWritingMultipleFiles() throws Exception {
- FileStorageManager sm = getStorageManager(1024, null, null);
+ FileStorageManager sm = getStorageManager(null, null);
String tsID = "0"; //$NON-NLS-1$
// Add one batch
FileStore store = sm.createFileStore(tsID);
Added: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java (rev 0)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -0,0 +1,52 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.teiid.common.buffer.FileStore;
+
+public class TestSplittableStorageManager {
+
+ @Test public void testCreatesSpillFiles() throws Exception {
+ MemoryStorageManager msm = new MemoryStorageManager();
+ SplittableStorageManager ssm = new SplittableStorageManager(msm);
+ ssm.setMaxFileSizeDirect(2048);
+ String tsID = "0"; //$NON-NLS-1$
+ // Add one batch
+ FileStore store = ssm.createFileStore(tsID);
+ TestFileStorageManager.writeBytes(store);
+
+ assertEquals(1, msm.getCreated());
+
+ TestFileStorageManager.writeBytes(store);
+
+ assertEquals(2, msm.getCreated());
+
+ store.remove();
+
+ assertEquals(2, msm.getRemoved());
+ }
+
+}
Property changes on: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -302,8 +302,8 @@
}
@Test public void testBufferLimit() throws Exception {
- //the sql should return 100 rows
- String sql = "SELECT A.IntKey FROM BQT1.SmallA as A, BQT1.SmallA as B"; //$NON-NLS-1$
+ //the sql should return 400 rows
+ String sql = "SELECT A.IntKey FROM BQT1.SmallA as A, BQT1.SmallA as B, (select intkey from BQT1.SmallA limit 4) as C"; //$NON-NLS-1$
String userName = "1"; //$NON-NLS-1$
String sessionid = "1"; //$NON-NLS-1$
@@ -315,27 +315,29 @@
Future<ResultsMessage> message = core.executeRequest(reqMsg.getExecutionId(), reqMsg);
ResultsMessage rm = message.get(500000, TimeUnit.MILLISECONDS);
assertNull(rm.getException());
- assertEquals(2, rm.getResults().length);
+
+ int rowsPerBatch = 8;
+ assertEquals(rowsPerBatch, rm.getResults().length);
RequestWorkItem item = core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
- message = core.processCursorRequest(reqMsg.getExecutionId(), 3, 2);
+ message = core.processCursorRequest(reqMsg.getExecutionId(), 9, rowsPerBatch);
rm = message.get(500000, TimeUnit.MILLISECONDS);
assertNull(rm.getException());
- assertEquals(2, rm.getResults().length);
+ assertEquals(rowsPerBatch, rm.getResults().length);
//ensure that we are idle
for (int i = 0; i < 10 && item.getThreadState() != ThreadState.IDLE; i++) {
Thread.sleep(100);
}
assertEquals(ThreadState.IDLE, item.getThreadState());
- assertTrue(item.resultsBuffer.getManagedRowCount() <= 46);
+ assertTrue(item.resultsBuffer.getManagedRowCount() <= rowsPerBatch*23);
//pull the rest of the results
for (int j = 0; j < 48; j++) {
item = core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
- message = core.processCursorRequest(reqMsg.getExecutionId(), j * 2 + 5, 2);
+ message = core.processCursorRequest(reqMsg.getExecutionId(), (j + 2) * rowsPerBatch + 1, rowsPerBatch);
rm = message.get(5000, TimeUnit.MILLISECONDS);
assertNull(rm.getException());
- assertEquals(2, rm.getResults().length);
+ assertEquals(rowsPerBatch, rm.getResults().length);
}
}
@@ -353,7 +355,7 @@
Future<ResultsMessage> message = core.executeRequest(reqMsg.getExecutionId(), reqMsg);
ResultsMessage rm = message.get(500000, TimeUnit.MILLISECONDS);
assertNull(rm.getException());
- assertEquals(2, rm.getResults().length);
+ assertEquals(8, rm.getResults().length);
RequestWorkItem item = core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
assertEquals(100, item.resultsBuffer.getRowCount());
}
Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestProcessor.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -365,7 +365,6 @@
props.setProperty("soap_port", "12345"); //$NON-NLS-1$ //$NON-NLS-2$
CommandContext context = new CommandContext("0", "test", "user", null, "myvdb", 1, props, DEBUG); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
context.setProcessorBatchSize(BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE);
- context.setConnectorBatchSize(BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE);
context.setBufferManager(BufferManagerFactory.getStandaloneBufferManager());
context.setPreparedPlanCache(new SessionAwareCache<PreparedPlan>());
return context;
@@ -1080,7 +1079,6 @@
CommandContext context = createCommandContext();
context.setProcessorBatchSize(2);
- context.setConnectorBatchSize(2);
context.setMetadata(RealMetadataFactory.example1Cached());
// Plan query
Modified: trunk/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -33,9 +33,9 @@
import org.junit.Before;
import org.junit.Test;
import org.teiid.common.buffer.BlockedException;
-import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.BufferManagerFactory;
import org.teiid.common.buffer.TupleBatch;
+import org.teiid.common.buffer.impl.BufferManagerImpl;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
@@ -232,7 +232,8 @@
}
public void helpTestJoinDirect(List[] expectedResults, int batchSize, int processingBytes) throws TeiidComponentException, TeiidProcessingException {
- BufferManager mgr = BufferManagerFactory.getTestBufferManager(processingBytes, batchSize);
+ BufferManagerImpl mgr = BufferManagerFactory.getTestBufferManager(processingBytes, batchSize);
+ mgr.setTargetBytesPerRow(100);
CommandContext context = new CommandContext("pid", "test", null, null, 1); //$NON-NLS-1$ //$NON-NLS-2$
join.addChild(leftNode);
Modified: trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/engine/src/test/java/org/teiid/query/processor/relational/TestProjectIntoNode.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -72,10 +72,9 @@
ElementSymbol elementSymbol_2 = new ElementSymbol("myGroup.myElement2"); //$NON-NLS-1$
elementSymbol_1.setType(Integer.class);
elementSymbol_2.setType(String.class);
- ArrayList elements = new ArrayList();
- elements.add(elementSymbol_1);
- elements.add(elementSymbol_2);
+ List<ElementSymbol> elements = Arrays.asList(elementSymbol_1, elementSymbol_2);
node.setIntoElements(elements);
+ child.setElements(elements);
node.setMode(mode);
node.setModelName("myModel"); //$NON-NLS-1$
Modified: trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -34,6 +34,7 @@
import org.teiid.common.buffer.impl.BufferManagerImpl;
import org.teiid.common.buffer.impl.FileStorageManager;
import org.teiid.common.buffer.impl.MemoryStorageManager;
+import org.teiid.common.buffer.impl.SplittableStorageManager;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.util.FileUtils;
@@ -61,7 +62,7 @@
private int processorBatchSize = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE;
private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
private int maxOpenFiles = FileStorageManager.DEFAULT_MAX_OPEN_FILES;
- private long maxFileSize = FileStorageManager.DEFAULT_MAX_FILESIZE; // 2GB
+ private long maxFileSize = SplittableStorageManager.DEFAULT_MAX_FILESIZE; // 2GB
private int maxProcessingKb = BufferManager.DEFAULT_MAX_PROCESSING_KB;
private int maxReserveKb = BufferManager.DEFAULT_RESERVE_BUFFER_KB;
private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE;
@@ -100,11 +101,12 @@
// Get the properties for FileStorageManager and create.
fsm = new FileStorageManager();
fsm.setStorageDirectory(bufferDir.getCanonicalPath());
- fsm.setMaxFileSize(maxFileSize);
fsm.setMaxOpenFiles(maxOpenFiles);
fsm.setMaxBufferSpace(maxBufferSpace*MB);
- fsm.initialize();
- this.bufferMgr.setStorageManager(fsm);
+ SplittableStorageManager ssm = new SplittableStorageManager(fsm);
+ ssm.setMaxFileSize(maxFileSize);
+ ssm.initialize();
+ this.bufferMgr.setStorageManager(ssm);
} else {
this.bufferMgr.setStorageManager(new MemoryStorageManager());
}
Modified: trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java 2011-09-12 14:56:06 UTC (rev 3464)
+++ trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java 2011-09-12 18:23:57 UTC (rev 3465)
@@ -31,6 +31,7 @@
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.impl.BufferManagerImpl;
import org.teiid.common.buffer.impl.FileStorageManager;
+import org.teiid.common.buffer.impl.SplittableStorageManager;
import org.teiid.core.types.DataTypeManager;
import org.teiid.core.util.UnitTestUtil;
import org.teiid.query.sql.symbol.Constant;
@@ -53,7 +54,8 @@
assertTrue(svc.isUseDisk());
BufferManagerImpl mgr = (BufferManagerImpl) svc.getBufferManager();
- assertTrue(((FileStorageManager)mgr.getStorageManager()).getDirectory().endsWith(svc.getBufferDirectory().getName()));
+ SplittableStorageManager ssm = (SplittableStorageManager)mgr.getStorageManager();
+ assertTrue(((FileStorageManager)ssm.getStorageManager()).getDirectory().endsWith(svc.getBufferDirectory().getName()));
}
@Test public void testCheckMemPropertyGotSet2() throws Exception {
@@ -91,7 +93,8 @@
svc.start();
BufferManager mgr = svc.getBufferManager();
- assertEquals(13141, mgr.getSchemaSize(schema));
+ assertEquals(6570, mgr.getSchemaSize(schema));
+ assertEquals(256, mgr.getProcessorBatchSize(schema));
}
}
More information about the teiid-commits
mailing list