Author: shawkins
Date: 2011-09-29 22:00:28 -0400 (Thu, 29 Sep 2011)
New Revision: 3508
Removed:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStoreCache.java
Modified:
trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java
trunk/documentation/admin-guide/src/main/docbook/en-US/content/installation.xml
trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java
trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
Log:
TEIID-1750 converting storage back over to the old file algorithm (1 filestore per
table/tuplebuffer) with an updated compaction algorithm
Modified: trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java
===================================================================
--- trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java 2011-09-29
16:46:38 UTC (rev 3507)
+++ trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java 2011-09-30
02:00:28 UTC (rev 3508)
@@ -50,6 +50,45 @@
*/
public class UnitTestUtil {
+ public static final class LogFormatter extends Formatter {
+ @Override
+ public String format(LogRecord record) {
+ final StringBuilder result = new StringBuilder();
+ result.append(new Timestamp(record.getMillis()));
+ result.append(" "); //$NON-NLS-1$
+ result.append(record.getLoggerName());
+ result.append(" "); //$NON-NLS-1$
+ result.append(record.getLevel());
+ result.append(" "); //$NON-NLS-1$
+ result.append(Thread.currentThread().getName());
+ result.append(" "); //$NON-NLS-1$
+ result.append(record.getMessage());
+ result.append('\n');
+ if (record.getThrown() != null) {
+ record.getThrown().printStackTrace(new PrintWriter(new Writer() {
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void flush() throws IOException {
+
+ }
+
+ @Override
+ public void write(char[] cbuf, int off, int len)
+ throws IOException {
+ result.append(new String(cbuf, off, len));
+ }
+ }));
+ result.append('\n');
+ }
+ return result.toString();
+ }
+ }
+
public static final String PATH_SEPARATOR = "/"; //$NON-NLS-1$
private static final String DEFAULT_TESTDATA_PATH = "src/test/resources";
//$NON-NLS-1$
@@ -419,45 +458,7 @@
} else {
logger.setUseParentHandlers(false);
ConsoleHandler ch = new ConsoleHandler();
- ch.setFormatter(new Formatter() {
-
- @Override
- public String format(LogRecord record) {
- final StringBuilder result = new StringBuilder();
- result.append(new Timestamp(record.getMillis()));
- result.append(" "); //$NON-NLS-1$
- result.append(record.getLoggerName());
- result.append(" "); //$NON-NLS-1$
- result.append(record.getLevel());
- result.append(" "); //$NON-NLS-1$
- result.append(record.getThreadID());
- result.append(" "); //$NON-NLS-1$
- result.append(record.getMessage());
- result.append('\n');
- if (record.getThrown() != null) {
- record.getThrown().printStackTrace(new PrintWriter(new Writer() {
-
- @Override
- public void close() throws IOException {
-
- }
-
- @Override
- public void flush() throws IOException {
-
- }
-
- @Override
- public void write(char[] cbuf, int off, int len)
- throws IOException {
- result.append(new String(cbuf, off, len));
- }
- }));
- result.append('\n');
- }
- return result.toString();
- }
- });
+ ch.setFormatter(new LogFormatter());
ch.setLevel(Level.FINEST);
logger.addHandler(ch);
}
Modified: trunk/documentation/admin-guide/src/main/docbook/en-US/content/installation.xml
===================================================================
---
trunk/documentation/admin-guide/src/main/docbook/en-US/content/installation.xml 2011-09-29
16:46:38 UTC (rev 3507)
+++
trunk/documentation/admin-guide/src/main/docbook/en-US/content/installation.xml 2011-09-30
02:00:28 UTC (rev 3508)
@@ -98,7 +98,8 @@
/deployers
/teiid.deployer
/lib
- /teiid-examples]]></programlisting>
+ /teiid-examples
+ /tmp/teiid]]></programlisting>
</example>
<section>
<title>/deploy/teiid/teiid-jboss-beans.xml</title>
@@ -163,6 +164,14 @@
<title>teiid-docs</title>
<para>This directory contains the PDF documents related Teiid and Teiid
development. </para>
</section>
+
+ <section>
+ <title>tmp/teiid</title>
+ <para>This directory contains temporary files created by Teiid. These are
mostly created by the buffer manager.
+ These files are not needed across a VM restart. Heavy usage of large data sets
will create lots of temporary files.
+ In heavy usage scenarios, consider pointing the buffer directory at a partition
that is routinely defragmented.
+ </para>
+ </section>
</section>
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java 2011-09-29 16:46:38 UTC
(rev 3507)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java 2011-09-30 02:00:28 UTC
(rev 3508)
@@ -31,9 +31,11 @@
*/
public interface Cache extends StorageManager {
void createCacheGroup(Long gid); //called prior to adding an entry
+ //TODO: this should use a callback on the buffermangaer to remove memory entries
+ //without materializing all group keys
Collection<Long> removeCacheGroup(Long gid);
void addToCacheGroup(Long gid, Long oid);
CacheEntry get(Long id, Serializer<?> serializer) throws TeiidComponentException;
- void add(CacheEntry entry, Serializer<?> s);
+ void add(CacheEntry entry, Serializer<?> s) throws Exception;
void remove(Long gid, Long id);
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2011-09-29 16:46:38
UTC (rev 3507)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2011-09-30 02:00:28
UTC (rev 3508)
@@ -25,7 +25,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -224,28 +223,6 @@
};
}
- public ByteBuffer getBuffer(long start, int length, boolean allocate) throws
IOException {
- byte[] b = new byte[length];
- if (!allocate) {
- readFully(start, b, 0, length);
- }
- return ByteBuffer.wrap(b);
- }
-
- public void updateFromBuffer(ByteBuffer bb, long start) throws IOException {
- byte[] b = null;
- int offset = 0;
- bb.rewind();
- if (bb.hasArray()) {
- b = bb.array();
- offset = bb.arrayOffset();
- } else {
- b = new byte[bb.limit()];
- bb.get(b);
- }
- write(start, b, offset, bb.limit());
- }
-
public InputStream createInputStream(final long start) {
return createInputStream(start, -1);
}
Deleted: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java 2011-09-29
16:46:38 UTC (rev 3507)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java 2011-09-30
02:00:28 UTC (rev 3508)
@@ -1,191 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
- package org.teiid.common.buffer.impl;
-
-import java.util.BitSet;
-
-import org.teiid.common.buffer.impl.FileStoreCache.BlockInfo;
-
-/**
- * Extends a {@link BitSet} by adding a cumulative total and a
- * first level index to speed queries against large bitsets.
- */
-public class BlockBitSetTree {
-
- private static final int LOG_BITS_PER_BLOCK = FileStoreCache.LOG_BLOCK_SIZE + 3;
- private static final int MAX_TOP_VALUE = 1 << LOG_BITS_PER_BLOCK;
- private int maxIndex;
- private int bitsSet;
- private int[] topVals;
- private BlockManager blockManager;
- private int blockCount = 0;
-
- public BlockBitSetTree(int maxIndex, BlockManager blockManager) {
- this.maxIndex = maxIndex;
- this.blockManager = blockManager;
- this.topVals = new int[(maxIndex >> (FileStoreCache.LOG_BLOCK_SIZE + 3)) + 1];
- }
-
- public int getMaxIndex() {
- return maxIndex;
- }
-
- /**
- * Set the given bit at the index.
- * @param bitIndex
- * @param value
- */
- public synchronized void set(int bitIndex, boolean value) {
- getOrSet(bitIndex, value, true);
- }
-
- public synchronized boolean get(int bitIndex) {
- return getOrSet(bitIndex, false, false);
- }
-
- private boolean getOrSet(int bitIndex, boolean value, boolean update) {
- if (bitIndex > maxIndex) {
- throw new ArrayIndexOutOfBoundsException(bitIndex);
- }
- int blockIndex = bitIndex>>LOG_BITS_PER_BLOCK;
- BlockInfo bb = null;
- if (blockIndex >= blockCount) {
- if (!update) {
- return false;
- }
- for (; blockCount < blockIndex+1; blockCount++) {
- bb = blockManager.allocateBlock(blockCount);
- bb.buf.position(0);
- int longsPerBlock = FileStoreCache.BLOCK_SIZE >> 6;
- for (int j = 0; j < longsPerBlock; j++) {
- bb.buf.putLong(0);
- }
- }
- } else {
- bb = blockManager.getBlock(blockIndex);
- }
- int relativeIndex = bitIndex&(MAX_TOP_VALUE-1);
- int longByteIndex = (relativeIndex>>6)<<3;
- long word = bb.buf.getLong(longByteIndex);
- long mask = 1L << bitIndex;
- boolean currentValue = ((word & mask) != 0);
- if (!update) {
- return currentValue;
- }
- if (currentValue == value) {
- return currentValue;
- }
- if (value) {
- word |= mask;
- } else {
- word &= ~mask;
- }
- bb.buf.putLong(longByteIndex, word);
- blockManager.updateBlock(bb);
- int topIndex = bitIndex >> LOG_BITS_PER_BLOCK;
- int increment = value?1:-1;
- bitsSet+=increment;
- topVals[topIndex]+=increment;
- return currentValue;
- }
-
- public synchronized int getBitsSet() {
- return bitsSet;
- }
-
- public synchronized int nextClearBit(int fromIndex) {
- int start = fromIndex >> LOG_BITS_PER_BLOCK;
- for (int i = start; i < topVals.length; i++) {
- if (topVals[i] == MAX_TOP_VALUE) {
- continue;
- }
- if (topVals[i] == 0) {
- if (i == start) {
- return fromIndex;
- }
- return i * MAX_TOP_VALUE;
- }
- int relativeIndex = 0;
- if (i == start) {
- relativeIndex = fromIndex&(MAX_TOP_VALUE-1);
- }
- BlockInfo bb = blockManager.getBlock(i);
-
- int longByteIndex = (relativeIndex>>6)<<3;
-
- long word = ~bb.buf.getLong(longByteIndex) & (-1l << relativeIndex);
-
- while (true) {
- if (word != 0) {
- return longByteIndex*8 + (i * MAX_TOP_VALUE) + Long.numberOfTrailingZeros(word);
- }
- longByteIndex+=8;
- if (longByteIndex > FileStoreCache.BLOCK_MASK) {
- break;
- }
- word = ~bb.buf.getLong(longByteIndex);
- }
- }
- return -1;
- }
-
- public synchronized int nextSetBit(int fromIndex) {
- if (bitsSet == 0) {
- return -1;
- }
- int start = fromIndex >> LOG_BITS_PER_BLOCK;
- for (int i = start; i < topVals.length; i++) {
- if (topVals[i] == 0) {
- continue;
- }
- if (topVals[i] == MAX_TOP_VALUE) {
- if (i == start) {
- return fromIndex;
- }
- return i * MAX_TOP_VALUE;
- }
- int relativeIndex = 0;
- if (i == start) {
- relativeIndex = fromIndex&(MAX_TOP_VALUE-1);
- }
- BlockInfo bb = blockManager.getBlock(i);
-
- int longByteIndex = (relativeIndex>>6)<<3;
-
- long word = bb.buf.getLong(longByteIndex) & (-1l << relativeIndex);
-
- while (true) {
- if (word != 0) {
- return longByteIndex*8 + (i * MAX_TOP_VALUE) + Long.numberOfTrailingZeros(word);
- }
- longByteIndex+=8;
- if (longByteIndex > FileStoreCache.BLOCK_MASK) {
- break;
- }
- word = bb.buf.getLong(longByteIndex);
- }
- }
- return -1;
- }
-
-}
Deleted:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java 2011-09-29
16:46:38 UTC (rev 3507)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java 2011-09-30
02:00:28 UTC (rev 3508)
@@ -1,238 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.teiid.common.buffer.impl.FileStoreCache.BlockInfo;
-
-/**
- * Represents the logical structure of a cache group / directory
- *
- * Implemented by a closed hash table using a single step linear probe with delayed
removal.
- * Uses power of 2 hashing.
- *
- * Provides an extremely simple hash structure that rivals {@link HashMap} performance
and
- * is directly mapped to {@link ByteBuffer}s to avoid serialization overhead.
- *
- * Does not expect keys or values to be negative.
- */
-public class BlockClosedLongIntHashTable {
-
- private enum Mode {
- GET,
- UPDATE,
- REMOVE
- }
-
- private static final int BYTES_PER_ROW = 12; //8+4
- private static final int BLOCK_SIZE = FileStoreCache.BLOCK_DATA_BYTES/BYTES_PER_ROW;
- private static final float LOAD_FACTOR = .7f;
- private static final int MIN_SIZE = 1 << (31 -
Integer.numberOfLeadingZeros(BLOCK_SIZE)); //should fit in a single block
-
- static final int EMPTY = -1;
- private static final int REMOVED = -2;
-
- protected int size;
- protected int capacityMask = EMPTY;
- protected BlockManager blockManager;
-
- public BlockClosedLongIntHashTable(BlockManager blockManager) {
- this.blockManager = blockManager;
- }
-
- private void init(int capacity) {
- int currentBlockCount = blockCount(capacityMask + 1);
- int desiredBlockCount = blockCount(capacity);
- if (capacity > capacityMask) {
- for (int i = currentBlockCount; i < desiredBlockCount; i++) {
- BlockInfo bb = blockManager.allocateBlock(i);
- empty(bb.buf, BLOCK_SIZE);
- blockManager.updateBlock(bb);
- }
- }
- capacityMask = capacity - 1;
- }
-
- private void empty(ByteBuffer bb, int toIndex) {
- bb.position(0);
- for (int j = 0; j < toIndex; j++) {
- bb.putLong(j * BYTES_PER_ROW, EMPTY);
- }
- }
-
- private static int blockCount(int size) {
- int currentBlockCount = (size) / BLOCK_SIZE;
- if (size%BLOCK_SIZE > 0) {
- currentBlockCount++;
- }
- return currentBlockCount;
- }
-
- public synchronized int put(long key, int value) {
- int result = getOrUpdate(key, value, Mode.UPDATE);
- if ((result == EMPTY || result == REMOVED) && ++size >
LOAD_FACTOR*capacityMask) {
- int newCapacity = (capacityMask+1)<<1;
- int oldLength = capacityMask + 1;
- init(newCapacity);
- rehash(oldLength, newCapacity);
- }
- return result;
- }
-
- public synchronized int get(long key) {
- return getOrUpdate(key, EMPTY, Mode.GET);
- }
-
- public synchronized int remove(long key) {
- int result = getOrUpdate(key, EMPTY, Mode.REMOVE);
- if (result != EMPTY && --size*LOAD_FACTOR < capacityMask>>3 &&
(capacityMask+1)>>1 >= MIN_SIZE) {
- //reduce the size of the table by half
- int oldLength = capacityMask + 1;
- capacityMask >>= 1;
- rehash(oldLength, oldLength>>1);
- int oldBlocks = blockCount(oldLength);
- int newBlocks = blockCount(capacityMask +1);
- for (int i = oldBlocks-1; i >= newBlocks; i--) {
- blockManager.freeBlock(i);
- }
- }
- return result;
- }
-
- private void rehash(int oldLength, int newLength) {
- BlockInfo lastBlockInfo = null;
- ByteBuffer lastBlock = null;
- for (int i = 0; i < oldLength; i++) {
- int relativeIndex = i%BLOCK_SIZE;
- if (lastBlock == null || relativeIndex == 0) {
- int lastIndex = i/BLOCK_SIZE;
- lastBlockInfo = blockManager.getBlock(lastIndex);
- lastBlock = lastBlockInfo.buf;
- if (i < newLength) {
- byte[] buf = new byte[FileStoreCache.BLOCK_DATA_BYTES];
- lastBlock.position(0);
- lastBlock.get(buf);
- ByteBuffer copyBlock = ByteBuffer.wrap(buf);
- empty(lastBlock, Math.min(BLOCK_SIZE, oldLength - lastIndex*BLOCK_SIZE));
- blockManager.updateBlock(lastBlockInfo);
- lastBlock = copyBlock;
- }
- }
- lastBlock.position(relativeIndex*BYTES_PER_ROW);
- long oldKey = lastBlock.getLong();
- int oldValue = lastBlock.getInt();
- if (oldKey != REMOVED && oldKey != EMPTY) {
- getOrUpdate(oldKey, oldValue, Mode.UPDATE);
- }
- }
- }
-
- @SuppressWarnings("null")
- protected int getOrUpdate(long key, int value, Mode mode) {
- if (capacityMask == EMPTY) {
- if (mode == Mode.GET || mode == Mode.REMOVE) {
- return EMPTY;
- }
- init(MIN_SIZE);
- }
- int i = hashIndex(key);
- BlockInfo lastBlockInfo = null;
- long old = EMPTY;
- int position = 0;
- while (true) {
- int relativeIndex = i%BLOCK_SIZE;
- if (lastBlockInfo == null || relativeIndex == 0) {
- int index = i/BLOCK_SIZE;
- lastBlockInfo = blockManager.getBlock(index);
- }
- position = relativeIndex*BYTES_PER_ROW;
- old = lastBlockInfo.buf.getLong(position);
- if (old == EMPTY || old == key || (mode == Mode.UPDATE && old == REMOVED)) {
- break;
- }
- i = (i + 1) & capacityMask;
- }
- int result = EMPTY;
- if (old != EMPTY && old != REMOVED) {
- result = lastBlockInfo.buf.getInt(position + 8);
- }
- switch (mode) {
- case GET:
- return result;
- case UPDATE:
- lastBlockInfo.buf.putLong(position, key);
- lastBlockInfo.buf.putInt(position + 8, value);
- blockManager.updateBlock(lastBlockInfo);
- return result;
- case REMOVE:
- if (old == EMPTY || old == REMOVED) {
- return EMPTY;
- }
- lastBlockInfo.buf.putLong(position, REMOVED);
- blockManager.updateBlock(lastBlockInfo);
- return result;
- default:
- throw new AssertionError();
- }
- }
-
- private int hashIndex(long key) {
- //start with the usual long hash
- int primaryHash = (int)(key ^ (key >>> 32));
- //allow the lower bits to spread the entries
- primaryHash += primaryHash <<= 2;
- primaryHash += primaryHash <<= 3;
- return primaryHash & capacityMask;
- }
-
- public synchronized int size() {
- return size;
- }
-
- public synchronized Map<Long, Integer> remove() {
- Map<Long, Integer> result = new HashMap<Long, Integer>();
- BlockInfo lastBlockInfo = null;
- int blockIndex = 0;
- for (int i = capacityMask; i >= 0; i--) {
- int relativeIndex = i%BLOCK_SIZE;
- if (lastBlockInfo == null || relativeIndex == BLOCK_SIZE - 1) {
- if (lastBlockInfo != null) {
- blockManager.freeBlock(blockIndex);
- }
- blockIndex = i/BLOCK_SIZE;
- lastBlockInfo = blockManager.getBlock(blockIndex);
- }
- lastBlockInfo.buf.position(relativeIndex*BYTES_PER_ROW);
- long key = lastBlockInfo.buf.getLong();
- if (key != EMPTY && key != REMOVED) {
- result.put(key, lastBlockInfo.buf.getInt());
- }
- }
- blockManager.free();
- return result;
- }
-
-}
Deleted: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java 2011-09-29
16:46:38 UTC (rev 3507)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java 2011-09-30
02:00:28 UTC (rev 3508)
@@ -1,51 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import org.teiid.common.buffer.impl.FileStoreCache.BlockInfo;
-
-/**
- * Represents an INode
- *
- * Returned BlockInfo may be shared. If shared there and no guarantees about position
and mark.
- * in particular system/index blocks can be used by multiple threads relative methods
should be
- * avoided, but may be used for exclusive write operations.
- * Otherwise the position will be 0.
- *
- * Due to buffermanager locking, non-index data blocks can be assumed to be thread-safe.
- */
-public interface BlockManager {
-
- int getInode();
-
- BlockInfo allocateBlock(int index);
-
- BlockInfo getBlock(int index);
-
- void updateBlock(BlockInfo block);
-
- void freeBlock(int index);
-
- void free();
-
-}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-29
16:46:38 UTC (rev 3507)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-30
02:00:28 UTC (rev 3508)
@@ -199,6 +199,7 @@
cache.remove(this.id, batch);
}
ce.setSerializer(this.ref);
+ ce.setPersistent(true);
if (retain) {
addMemoryEntry(ce);
}
@@ -556,7 +557,11 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, ce.getId(), "writing batch to
storage, total writes: ", count); //$NON-NLS-1$
}
- cache.add(ce, s);
+ try {
+ cache.add(ce, s);
+ } catch (Throwable e) {
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch,
attempts to read batch "+ ce.getId() +" later will result in an
exception"); //$NON-NLS-1$ //$NON-NLS-2$
+ }
ce.setPersistent(true);
}
if (s.useSoftCache()) {
@@ -668,6 +673,7 @@
cleanSoftReferences();
Collection<Long> vals = cache.removeCacheGroup(id);
for (Long val : vals) {
+ //TODO: we will unnecessarily call remove on the cache, but that should be low cost
fastGet(val, prefersMemory, false);
}
}
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-09-29
16:46:38 UTC (rev 3507)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-09-30
02:00:28 UTC (rev 3508)
@@ -29,6 +29,7 @@
public abstract class ExtensibleBufferedOutputStream extends OutputStream {
protected ByteBuffer buf;
+ protected int bytesWritten;
public ExtensibleBufferedOutputStream() {
}
@@ -63,18 +64,22 @@
public void flush() throws IOException {
if (buf != null && buf.position() > 0) {
- flushDirect();
+ bytesWritten += flushDirect();
}
buf = null;
}
protected abstract ByteBuffer newBuffer();
- protected abstract void flushDirect() throws IOException;
+ protected abstract int flushDirect() throws IOException;
@Override
public void close() throws IOException {
flush();
}
+ public int getBytesWritten() {
+ return bytesWritten;
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java 2011-09-29
16:46:38 UTC (rev 3507)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java 2011-09-30
02:00:28 UTC (rev 3508)
@@ -22,17 +22,23 @@
package org.teiid.common.buffer.impl;
+import java.io.BufferedInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.LinkedHashMap;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.teiid.common.buffer.Cache;
import org.teiid.common.buffer.CacheEntry;
@@ -40,688 +46,292 @@
import org.teiid.common.buffer.Serializer;
import org.teiid.common.buffer.StorageManager;
import org.teiid.core.TeiidComponentException;
-import org.teiid.core.TeiidRuntimeException;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
import org.teiid.query.QueryPlugin;
/**
- * Implements storage against a {@link FileStore} abstraction using a filesystem
paradigm.
- * The filesystem uses a 31bit address space on top of 2^14 byte blocks.
+ * A minimally blocking Cache using {@link FileStore}s.
*
- * Therefore there is 2^31*2^14 = 2^45 or 32 terabytes max of addressable space.
+ * Storage files with significant unused space are compacted after reaching a size
threshold.
+ * Compacting the empty space may be costly as it is currently implemented by blocking
all
+ * read/write operations against the group.
*
- * Some amount of the space is taken up by system information (inodes and use flags).
- * This is held in a separate file.
+ * Since empty is concentrated at the beginning of the store a better approach could
+ * be to users smaller file segments and move batches off of the beginning.
*
- * The 64 byte inode format is:
- * 14 32 bit direct block pointers
- * 1 32 bit block indirect pointer
- * 1 32 bit block doubly indirect pointer
- *
- * The data block format is:
- * data bytes | long gid | long oid | three byte int block num
- *
- * The gid/oid are stored with the block so that defrag/compaction can be performed
- * with minimal blocking/lookups.
- *
- * This means that the maximum number of blocks available to a "file" is
- * 14 + (2^14-18)/4 + ((2^14-18)/4)^2 ~= 2^24
- *
- * Thus the max serialized object size is: 2^24*(2^14-18) ~= 256GB.
- *
- * The root directory "physicalMapping" is held in memory for performance,
- * but could itself be switched to using a block map. It will grow in
- * proportion to the number of tables/tuplebuffers in use.
- *
- * TODO: defragment
- * TODO: lobs could also be supported in this structure.
+ * There is unfortunately a significant memory footprint per group.
*/
-public class FileStoreCache implements Cache, StorageManager {
+public class FileStoreCache implements Cache {
- //TODO allow the block size to be configurable
- static final int LOG_BLOCK_SIZE = 14;
- static final int ADDRESS_BITS = 31;
- static final int SYSTEM_MASK = 1<<ADDRESS_BITS;
- static final int BLOCK_SIZE = 1 << LOG_BLOCK_SIZE;
- static final int BLOCK_MASK = BLOCK_SIZE - 1;
- static final int BLOCK_OVERHEAD = 8+8+3;
- static final int BLOCK_DATA_BYTES = BLOCK_SIZE - BLOCK_OVERHEAD;
- static final int BYTES_PER_BLOCK_ADDRESS = 4;
- static final int ADDRESSES_PER_BLOCK = BLOCK_DATA_BYTES/BYTES_PER_BLOCK_ADDRESS;
- static final int INODE_BYTES = 16*BYTES_PER_BLOCK_ADDRESS;
- static final int DIRECT_POINTERS = 14;
- static final int MAX_INDIRECT = DIRECT_POINTERS + ADDRESSES_PER_BLOCK;
- static final int MAX_DOUBLE_INDIRECT = MAX_INDIRECT + ADDRESSES_PER_BLOCK *
ADDRESSES_PER_BLOCK;
- static final int EMPTY_ADDRESS = -1;
-
- private final class BlockOutputStream extends
- ExtensibleBufferedOutputStream {
- private final BlockManager blockManager;
- int blockNum = -1;
- BlockInfo bi;
-
- private BlockOutputStream(BlockManager blockManager) {
- this.blockManager = blockManager;
- }
-
- @Override
- protected ByteBuffer newBuffer() {
- bi = blockManager.allocateBlock(++blockNum);
- return bi.buf;
- }
-
- @Override
- protected void flushDirect() throws IOException {
- blockManager.updateBlock(bi);
- bi = null;
- }
- }
-
- private final class BitSetBlockManager implements BlockManager {
- final int offset;
+ private static class CacheGroup {
+ private static final int MAX_FREE_SPACE = 1 << 11;
+ FileStore store;
+ long tail;
+ long unusedSpace = 0;
+ ReadWriteLock lock = new ReentrantReadWriteLock();
+ Map<Long, long[]> physicalMapping = Collections.synchronizedMap(new
HashMap<Long, long[]>());
+ List<Long> freed = Collections.synchronizedList(new LinkedList<Long>());
- BitSetBlockManager(int offset) {
- this.offset = offset;
+ CacheGroup(FileStore store) {
+ this.store = store;
}
- @Override
- public void updateBlock(BlockInfo info) {
- updatePhysicalBlock(info);
- }
-
- @Override
- public BlockInfo getBlock(int index) {
- return getPhysicalBlock(index + offset, true, false);
- }
-
- @Override
- public BlockInfo allocateBlock(int index) {
- return getPhysicalBlock(index + offset, true, true);
- }
-
- @Override
- public int getInode() {
- throw new AssertionError();
- }
-
- @Override
- public void freeBlock(int index) {
- throw new AssertionError();
- }
-
- @Override
- public void free() {
- throw new AssertionError();
- }
-
- }
-
- private enum Mode {
- GET,
- UPDATE,
- ALLOCATE
- }
-
- static final class BlockInfo {
- final boolean system;
- final int inodeOffset;
- final ByteBuffer buf;
- final int physicalAddress;
- boolean dirty;
-
- BlockInfo(boolean system, ByteBuffer ib, int index, int inodeOffset) {
- this.system = system;
- this.buf = ib;
- this.physicalAddress = index;
- this.inodeOffset = inodeOffset;
- }
- }
-
- private final class InodeBlockManager implements BlockManager {
- private final int inode;
- private final long gid;
- private final long oid;
- private int lastBlock = -1;
- boolean trackLast;
-
- InodeBlockManager(long gid, long oid, int inode) {
- if (inode == EMPTY_ADDRESS) {
- synchronized (inodesInuse) {
- inode = inodesInuse.nextClearBit(0);
- if (inode == -1) {
- throw new TeiidRuntimeException("no inodes available"); //$NON-NLS-1$
+ void freeBatch(Long batch) throws IOException {
+ long[] info = physicalMapping.remove(batch);
+ if (info != null) {
+ if (info[0] + info[1] == tail) {
+ tail -= info[1];
+ if (store.getLength() - tail > IO_BUFFER_SIZE << 5) {
+ store.setLength(tail);
}
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_DQP, "Allocating inode", inode,
"to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
- }
- inodesInuse.set(inode, true);
+ } else {
+ unusedSpace += info[1];
}
- int inodeBlock = inode/inodesPerBlock;
- int inodePosition = INODE_BYTES*(inode%inodesPerBlock);
- BlockInfo bb = getInodeSubBlock(inodeBlock, inodePosition);
- bb.buf.putInt(EMPTY_ADDRESS);
- updatePhysicalBlock(bb);
}
- this.inode = inode;
- this.gid = gid;
- this.oid = oid;
}
- @Override
- public int getInode() {
- return inode;
- }
-
- @Override
- public void updateBlock(BlockInfo info) {
- updatePhysicalBlock(info);
- }
-
- @Override
- public BlockInfo getBlock(int index) {
- int dataBlock = getOrUpdateDataBlockIndex(index, EMPTY_ADDRESS, Mode.GET);
- BlockInfo bb = getPhysicalBlock(dataBlock, false, false);
- bb.buf.position(0);
- bb.buf.limit(BLOCK_DATA_BYTES);
- return bb;
- }
-
- private int getOrUpdateDataBlockIndex(int index, int value, Mode mode) {
- if (index >= MAX_DOUBLE_INDIRECT) {
- throw new TeiidRuntimeException("Max block number exceeded");
//$NON-NLS-1$
+ private long getOffset(Long gid, long compactionThreshold) throws IOException {
+ long currentLength = store.getLength();
+ if (currentLength <= compactionThreshold || unusedSpace * 4 <= currentLength *
3) {
+ return tail;
}
- int dataBlock = 0;
- int position = 0;
- int inodePosition = INODE_BYTES*(inode%inodesPerBlock);
- BlockInfo info = getInodeSubBlock(inode/inodesPerBlock, inodePosition);
- if (index >= MAX_INDIRECT) {
- position = BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1);
- BlockInfo next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT,
MAX_DOUBLE_INDIRECT+1, value, mode);
- if (next != info) {
- info = next;
- //should have traversed to the secondary
- int indirectAddressBlock = (index - MAX_INDIRECT) / ADDRESSES_PER_BLOCK;
- position = indirectAddressBlock * BYTES_PER_BLOCK_ADDRESS;
- if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS <
BLOCK_DATA_BYTES) {
- info.buf.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_DQP, "Running full compaction on",
gid); //$NON-NLS-1$
+ }
+ byte[] buffer = new byte[IO_BUFFER_SIZE];
+ TreeSet<long[]> bySize = new TreeSet<long[]>(new
Comparator<long[]>() {
+ @Override
+ public int compare(long[] o1, long[] o2) {
+ int signum = Long.signum(o1[1] - o2[1]);
+ if (signum == 0) {
+ //take the upper address first
+ return Long.signum(o2[0] - o1[0]);
}
- next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT +
indirectAddressBlock * ADDRESSES_PER_BLOCK, MAX_DOUBLE_INDIRECT + 2 +
indirectAddressBlock, value, mode);
- if (next != info) {
- info = next;
- position = ((index - MAX_INDIRECT)%ADDRESSES_PER_BLOCK) * BYTES_PER_BLOCK_ADDRESS;
- if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS <
BLOCK_DATA_BYTES) {
- info.buf.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
- }
+ return signum;
+ }
+ });
+ TreeSet<long[]> byAddress = new TreeSet<long[]>(new
Comparator<long[]>() {
+
+ @Override
+ public int compare(long[] o1, long[] o2) {
+ return Long.signum(o1[0] - o2[0]);
+ }
+ });
+ synchronized (physicalMapping) {
+ for (long[] value : physicalMapping.values()) {
+ if (value == null) {
+ continue;
}
+ bySize.add(value);
+ byAddress.add(value);
}
- } else if (index >= DIRECT_POINTERS) {
- //indirect
- position = BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS;
- BlockInfo next = updateIndirectBlockInfo(info, index, position, DIRECT_POINTERS,
MAX_DOUBLE_INDIRECT, value, mode);
- if (next != info) {
- info = next;
- position = (index - DIRECT_POINTERS) * BYTES_PER_BLOCK_ADDRESS;
- if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS <
BLOCK_DATA_BYTES) {
- info.buf.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+ }
+ long lastEndAddress = 0;
+ long usedSpace = tail - unusedSpace;
+ while (!byAddress.isEmpty()) {
+ long[] info = byAddress.pollFirst();
+ bySize.remove(info);
+
+ long currentOffset = info[0];
+ long space = currentOffset - lastEndAddress;
+ boolean movedLast = false;
+ while (space > 0 && !bySize.isEmpty()) {
+ long[] last = byAddress.last();
+ if (last[1] > space) {
+ break;
}
+ movedLast = true;
+ byAddress.pollLast();
+ bySize.remove(last);
+ move(last, lastEndAddress, buffer);
+ space -= last[1];
+ lastEndAddress += last[1];
}
- } else {
- position = BYTES_PER_BLOCK_ADDRESS*index;
- if (mode == Mode.ALLOCATE) {
- info.buf.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+ if (movedLast && !byAddress.isEmpty()) {
+ long[] last = byAddress.last();
+ long currentLastEndAddress = last[0] + last[1];
+ if (currentLastEndAddress < currentLength>>1) {
+ lastEndAddress = currentLastEndAddress;
+ break;
+ }
}
- }
- if (mode == Mode.ALLOCATE) {
- dataBlock = nextBlock();
- info.buf.putInt(position, dataBlock);
- updatePhysicalBlock(info);
- } else {
- dataBlock = info.buf.getInt(position);
- if (mode == Mode.UPDATE) {
- info.buf.putInt(position, value);
- updatePhysicalBlock(info);
+ while (space > 0 && !bySize.isEmpty()) {
+ long[] smallest = bySize.first();
+ if (smallest[1] > space) {
+ break;
+ }
+ bySize.pollFirst();
+ byAddress.remove(smallest);
+ move(smallest, lastEndAddress, buffer);
+ space -= smallest[1];
+ lastEndAddress += smallest[1];
}
- }
- return dataBlock;
- }
-
- private BlockInfo updateIndirectBlockInfo(BlockInfo info, int index, int position, int
cutOff, int blockId, int value, Mode mode) {
- int sib_index = info.buf.getInt(position);
- boolean newBlock = false;
- if (index == cutOff) {
- if (mode == Mode.ALLOCATE) {
- sib_index = nextBlock();
- info.buf.putInt(position, sib_index);
- updatePhysicalBlock(info);
- newBlock = true;
- } else if (mode == Mode.UPDATE && value == EMPTY_ADDRESS) {
- freeDataBlock(sib_index);
- return info;
+
+ if (space > MAX_FREE_SPACE) {
+ move(info, lastEndAddress, buffer);
}
+ lastEndAddress = info[0] + info[1];
}
- info = getPhysicalBlock(sib_index, false, false);
- if (newBlock) {
- putBlockId(blockId, info.buf);
+ store.setLength(lastEndAddress);
+ tail = lastEndAddress;
+ unusedSpace = lastEndAddress - usedSpace;
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Compacted store", gid,
"pre-size", currentLength, "post-size", lastEndAddress); //$NON-NLS-1$
//$NON-NLS-2$ //$NON-NLS-3$
}
- return info;
+ return tail;
}
-
- private int nextBlock() {
- int next = -1;
- synchronized (blocksInuse) {
- next = blocksInuse.nextClearBit(lastBlock + 1);
- if (next == -1) {
- throw new TeiidRuntimeException("no freespace available"); //$NON-NLS-1$
- }
- blocksInuse.set(next, true);
- }
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_DQP, "Allocating block", next,
"to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
- }
- if (trackLast) {
- lastBlock = next;
- }
- return next;
- }
-
- @Override
- public void freeBlock(int index) {
- int dataBlock = getOrUpdateDataBlockIndex(index, EMPTY_ADDRESS, Mode.UPDATE);
- freeDataBlock(dataBlock);
- }
-
- private void freeDataBlock(int dataBlock) {
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_DQP, "freeing data block", dataBlock,
"for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
- }
- blockCache.remove(dataBlock);
- blocksInuse.set(dataBlock, false);
- }
- BlockInfo getInodeSubBlock(int inodeBlock, int inodePosition) {
- BlockInfo bi = getPhysicalBlock(inodeBlock + blocksInUseBlocks + inodesInUseBlocks,
true, false);
- ByteBuffer bb = bi.buf.duplicate();
- bb.position(inodePosition);
- bb.limit(inodePosition + INODE_BYTES);
- bb = bb.slice();
- return new BlockInfo(true, bb, inodeBlock + blocksInUseBlocks + inodesInUseBlocks,
inodePosition);
- }
-
- @Override
- public void free() {
- int inodeBlock = inode/inodesPerBlock;
- int inodePosition = INODE_BYTES*(inode%inodesPerBlock);
- BlockInfo bi = getInodeSubBlock(inodeBlock, inodePosition);
- ByteBuffer ib = bi.buf;
- int indirectIndexBlock = ib.getInt(ib.position() +
BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS);
- int doublyIndirectIndexBlock = ib.getInt(ib.position() +
BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1));
- boolean freedAll = freeBlock(ib, DIRECT_POINTERS, true);
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_DQP, "freeing inode", inode,
"for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+ private void move(long[] toMove, long newOffset, byte[] buffer) throws IOException {
+ long oldOffset = toMove[0];
+ toMove[0] = newOffset;
+ int size = (int)toMove[1];
+ while (size > 0) {
+ int toWrite = Math.min(IO_BUFFER_SIZE, size);
+ store.readFully(oldOffset, buffer, 0, toWrite);
+ store.write(newOffset, buffer, 0, toWrite);
+ size -= toWrite;
+ oldOffset += toWrite;
+ newOffset += toWrite;
}
- inodesInuse.set(inode, false);
- if (!freedAll || indirectIndexBlock == EMPTY_ADDRESS) {
- return;
- }
- freedAll = freeIndirectBlock(indirectIndexBlock);
- if (!freedAll || doublyIndirectIndexBlock == EMPTY_ADDRESS) {
- return;
- }
- BlockInfo bb = getPhysicalBlock(doublyIndirectIndexBlock, false, false);
- freeBlock(bb.buf, ADDRESSES_PER_BLOCK, false);
- freeDataBlock(doublyIndirectIndexBlock);
}
-
- private boolean freeIndirectBlock(int indirectIndexBlock) {
- BlockInfo bb = getPhysicalBlock(indirectIndexBlock, false, false);
- bb.buf.position(0);
- boolean freedAll = freeBlock(bb.buf, ADDRESSES_PER_BLOCK, true);
- freeDataBlock(indirectIndexBlock);
- return freedAll;
- }
-
- private boolean freeBlock(ByteBuffer ib, int numPointers, boolean primary) {
- for (int i = 0; i < numPointers; i++) {
- int dataBlock = ib.getInt();
- if (dataBlock == EMPTY_ADDRESS) {
- return false;
- }
- if (primary) {
- freeDataBlock(dataBlock);
- } else {
- freeIndirectBlock(dataBlock);
- }
- }
- return true;
- }
-
- @Override
- public BlockInfo allocateBlock(int blockNum) {
- int dataBlock = getOrUpdateDataBlockIndex(blockNum, EMPTY_ADDRESS, Mode.ALLOCATE);
- BlockInfo bb = getPhysicalBlock(dataBlock, false, true);
- putBlockId(blockNum, bb.buf);
- bb.buf.position(0);
- bb.buf.limit(BLOCK_DATA_BYTES);
- return bb;
- }
-
- private void putBlockId(int blockNum, ByteBuffer bb) {
- bb.position(BLOCK_DATA_BYTES);
- bb.putLong(gid);
- bb.putLong(oid);
- bb.put((byte)(blockNum >> 16));
- bb.putShort((short)blockNum);
- }
-
}
+ private static final int COMPACTION_THRESHOLD = 1 << 24; //start checking at 16
megs
+ private static final int IO_BUFFER_SIZE = 1<<13;
+ int compactionThreshold = COMPACTION_THRESHOLD;
+ private ConcurrentHashMap<Long, CacheGroup> cacheGroups = new
ConcurrentHashMap<Long, CacheGroup>();
private StorageManager storageManager;
- private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE;
- private int inodes;
- private int blocks;
- private int inodesPerBlock;
- private int inodesInUseBlocks;
- private int blocksInUseBlocks;
- private FileStore store;
- private FileStore systemStore;
- private BlockBitSetTree blocksInuse;
- private BlockBitSetTree inodesInuse;
-
- //root directory
- ConcurrentHashMap<Long, BlockClosedLongIntHashTable> physicalMapping = new
ConcurrentHashMap<Long, BlockClosedLongIntHashTable>();
- //block caching
- int blockCacheSize = 128; //2 MB
- ReentrantLock blockLock = new ReentrantLock();
- LinkedHashMap<Integer, BlockInfo> blockCache = new LinkedHashMap<Integer,
BlockInfo>() {
- private static final long serialVersionUID = -4240291744435552008L;
- BlockInfo eldestEntry = null;
+ @Override
+ public void add(CacheEntry entry, Serializer s) throws Exception {
+ final CacheGroup group = cacheGroups.get(s.getId());
+ if (group == null) {
+ return;
+ }
- protected boolean removeEldestEntry(Map.Entry<Integer,BlockInfo> eldest) {
- if (size() > blockCacheSize) {
- BlockInfo bi = eldest.getValue();
- if (bi.dirty) {
- eldestEntry = bi;
+ group.lock.writeLock().lock();
+ try {
+ synchronized (group.freed) {
+ while (!group.freed.isEmpty()) {
+ group.freeBatch(group.freed.remove(0));
}
- return true;
}
- return false;
- }
-
- public BlockInfo put(Integer key, BlockInfo value) {
- blockLock.lock();
- value.dirty = true;
- try {
- return super.put(key, value);
- } finally {
- BlockInfo toUpdate = eldestEntry;
- eldestEntry = null;
- blockLock.unlock();
- if (toUpdate != null) {
- updatePhysicalBlockDirect(toUpdate);
+ final ByteBuffer buffer = ByteBuffer.allocate(IO_BUFFER_SIZE);
+ final long offset = group.getOffset(s.getId(), compactionThreshold);
+ ExtensibleBufferedOutputStream fsos = new ExtensibleBufferedOutputStream() {
+ @Override
+ protected ByteBuffer newBuffer() {
+ buffer.rewind();
+ return buffer;
}
- }
- }
-
- public BlockInfo get(Object key) {
- blockLock.lock();
- try {
- return super.get(key);
- } finally {
- blockLock.unlock();
- }
- }
-
- public BlockInfo remove(Object key) {
- blockLock.lock();
- try {
- return super.remove(key);
- } finally {
- blockLock.unlock();
- }
- }
- };
-
- BlockInfo getPhysicalBlock(int block, boolean system, boolean allocate) {
- if (block < 0) {
- throw new AssertionError("invalid block address " + block); //$NON-NLS-1$
- }
- try {
- int key = block;
- if (system) {
- key |= SYSTEM_MASK;
- }
- BlockInfo result = blockCache.get(key);
- assert result == null || !allocate;
- if (result == null) {
- ByteBuffer bb = null;
- if (system) {
- bb = systemStore.getBuffer(block<<LOG_BLOCK_SIZE, BLOCK_SIZE, allocate);
- } else {
- bb = store.getBuffer(block<<LOG_BLOCK_SIZE, BLOCK_SIZE, allocate);
+
+ @Override
+ protected int flushDirect() throws IOException {
+ group.store.write(offset + bytesWritten, buffer.array(), 0, buf.position());
+ return buf.position();
}
- result = new BlockInfo(system, bb, block, -1);
- blockLock.lock();
- try {
- BlockInfo existing = blockCache.get(key);
- if (existing != null) {
- return existing;
- }
- blockCache.put(key, result);
- } finally {
- blockLock.unlock();
- }
- return result;
+ };
+ ObjectOutputStream oos = new ObjectOutputStream(fsos);
+ oos.writeInt(entry.getSizeEstimate());
+ s.serialize(entry.getObject(), oos);
+ oos.close();
+ long size = fsos.getBytesWritten();
+ long[] info = new long[] {offset, size};
+ group.physicalMapping.put(entry.getId(), info);
+ group.tail = Math.max(group.tail, offset + size);
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE))
{
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, s.getId(), entry.getId(),
"batch written starting at:", offset); //$NON-NLS-1$
}
- return result;
- } catch (IOException e) {
- throw new TeiidRuntimeException(e);
+ } finally {
+ group.lock.writeLock().unlock();
}
}
-
- void updatePhysicalBlock(BlockInfo bi) {
- int key = bi.physicalAddress;
- if (bi.system) {
- key |= SYSTEM_MASK;
- }
- if (bi.inodeOffset >= 0) {
- blockLock.lock();
- try {
- BlockInfo actual = blockCache.get(key);
- if (actual == null) {
- //we're not in the cache, so just update storage
- updatePhysicalBlockDirect(bi);
- } else {
- //TODO: check to see we're sharing the same buffer
- for (int i = 0; i < INODE_BYTES; i++) {
- actual.buf.put(bi.inodeOffset + i, bi.buf.get(i));
- }
- }
- } finally {
- blockLock.unlock();
- }
- return;
- }
- blockCache.put(key, bi);
- }
- private void updatePhysicalBlockDirect(BlockInfo bi) {
- try {
- bi.buf.rewind();
- if (!bi.system) {
- store.updateFromBuffer(bi.buf, bi.physicalAddress<<LOG_BLOCK_SIZE);
- } else {
- systemStore.updateFromBuffer(bi.buf, bi.physicalAddress<<LOG_BLOCK_SIZE);
- }
- } catch (IOException e) {
- throw new TeiidRuntimeException(e);
- }
- }
-
- InodeBlockManager getBlockManager(long gid, long oid, int inode) {
- return new InodeBlockManager(gid, oid, inode);
- }
-
- @SuppressWarnings("unchecked")
@Override
- public void add(CacheEntry entry, Serializer s) {
- boolean success = false;
- InodeBlockManager blockManager = null;
- try {
- BlockClosedLongIntHashTable map = physicalMapping.get(s.getId());
- if (map == null) {
- return;
- }
- blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
- blockManager.trackLast = true;
- ExtensibleBufferedOutputStream fsos = new BlockOutputStream(blockManager);
- ObjectOutputStream oos = new ObjectOutputStream(fsos);
- oos.writeInt(entry.getSizeEstimate());
- s.serialize(entry.getObject(), oos);
- oos.close();
- map.put(entry.getId(), blockManager.getInode());
- success = true;
- } catch (Throwable e) {
- LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch,
attempts to read batch "+ entry.getId() +" later will result in an
exception"); //$NON-NLS-1$ //$NON-NLS-2$
- } finally {
- if (!success && blockManager != null) {
- blockManager.free();
- }
- }
+ public void createCacheGroup(Long gid) {
+ cacheGroups.put(gid, new
CacheGroup(storageManager.createFileStore(String.valueOf(gid))));
}
@Override
- public CacheEntry get(Long oid, Serializer<?> serializer) throws
TeiidComponentException {
+ public CacheEntry get(Long id, Serializer<?> serializer)
+ throws TeiidComponentException {
+ CacheGroup group = cacheGroups.get(serializer.getId());
+ if (group == null) {
+ return null;
+ }
try {
- BlockClosedLongIntHashTable map = physicalMapping.get(serializer.getId());
- if (map == null) {
+ group.lock.readLock().lock();
+ long[] info = group.physicalMapping.get(id);
+ if (info == null) {
return null;
}
- final int inode = map.get(oid);
- if (inode == EMPTY_ADDRESS) {
- return null;
- }
- final BlockManager manager = getBlockManager(serializer.getId(), oid, inode);
- ObjectInputStream ois = new ObjectInputStream(new InputStream() {
-
- int blockIndex;
- BlockInfo buf;
-
- @Override
- public int read() throws IOException {
- ensureBytes();
- return buf.buf.get() & 0xff;
- }
-
- private void ensureBytes() {
- if (buf == null || buf.buf.remaining() == 0) {
- if (buf != null) {
- buf = null;
- }
- buf = manager.getBlock(blockIndex++);
- }
- }
-
- @Override
- public int read(byte[] b, int off, int len)
- throws IOException {
- ensureBytes();
- len = Math.min(len, buf.buf.remaining());
- buf.buf.get(b, off, len);
- return len;
- }
- });
- CacheEntry ce = new CacheEntry(oid);
+ ObjectInputStream ois = new ObjectInputStream(new
BufferedInputStream(group.store.createInputStream(info[0]), IO_BUFFER_SIZE));
+ CacheEntry ce = new CacheEntry(id);
ce.setSizeEstimate(ois.readInt());
ce.setObject(serializer.deserialize(ois));
- ce.setPersistent(true);
return ce;
} catch(IOException e) {
- throw new TeiidComponentException(e,
QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid));
//$NON-NLS-1$
+ throw new TeiidComponentException(e,
QueryPlugin.Util.getString("FileStoreageManager.error_reading", id));
//$NON-NLS-1$
} catch (ClassNotFoundException e) {
- throw new TeiidComponentException(e,
QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid));
//$NON-NLS-1$
+ throw new TeiidComponentException(e,
QueryPlugin.Util.getString("FileStoreageManager.error_reading", id));
//$NON-NLS-1$
+ } finally {
+ group.lock.readLock().unlock();
}
}
-
- @Override
- public FileStore createFileStore(String name) {
- return storageManager.createFileStore(name);
- }
-
- @Override
- public void initialize() throws TeiidComponentException {
- storageManager.initialize();
- int logSpace = Math.min(45, 63 - Long.numberOfLeadingZeros(this.maxBufferSpace));
-
- blocks = 1 << Math.min(Math.max(12, logSpace -LOG_BLOCK_SIZE +1), ADDRESS_BITS);
//blocks per segment
- inodes = blocks>>1;
- inodesPerBlock = BLOCK_SIZE/INODE_BYTES;
-
- inodesInUseBlocks = computeInuseBlocks(inodes);
- blocksInUseBlocks = computeInuseBlocks(blocks);
-
- inodesInuse = new BlockBitSetTree(inodes - 1, new BitSetBlockManager(0));
- blocksInuse = new BlockBitSetTree(blocks - 1, new
BitSetBlockManager(inodesInUseBlocks));
-
- store = storageManager.createFileStore("data_store"); //$NON-NLS-1$
- systemStore = storageManager.createFileStore("system_store"); //$NON-NLS-1$
- }
-
- static int computeInuseBlocks(int number) {
- int blockCount = (number>>LOG_BLOCK_SIZE) + ((number&BLOCK_MASK)>0?1:0);
- return (blockCount>>3) + ((blockCount&7)>0?1:0);
- }
-
@Override
- public void addToCacheGroup(Long gid, Long oid) {
- BlockClosedLongIntHashTable map = physicalMapping.get(gid);
- if (map == null) {
+ public void remove(Long gid, Long id) {
+ CacheGroup group = cacheGroups.get(gid);
+ if (group == null) {
return;
}
- map.put(oid, EMPTY_ADDRESS);
+ if (group.lock.writeLock().tryLock()) {
+ try {
+ try {
+ group.freeBatch(id);
+ } catch (IOException e) {
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error removing
batch"); //$NON-NLS-1$
+ }
+ } finally {
+ group.lock.writeLock().unlock();
+ }
+ } else {
+ group.freed.add(id);
+ }
}
@Override
- public void createCacheGroup(Long gid) {
- BlockClosedLongIntHashTable map = new BlockClosedLongIntHashTable(getBlockManager(gid,
-1, EMPTY_ADDRESS));
- physicalMapping.put(gid, map);
- }
-
- @Override
- public void remove(Long gid, Long id) {
- BlockClosedLongIntHashTable map = physicalMapping.get(gid);
- if (map == null) {
+ public void addToCacheGroup(Long gid, Long oid) {
+ CacheGroup group = cacheGroups.get(gid);
+ if (group == null) {
return;
}
- int inode = map.remove(id);
- free(gid, id, inode);
+ group.physicalMapping.put(oid, null);
}
@Override
public Collection<Long> removeCacheGroup(Long gid) {
- BlockClosedLongIntHashTable map = physicalMapping.remove(gid);
- if (map == null) {
- return Collections.emptySet();
+ CacheGroup group = cacheGroups.remove(gid);
+ if (group == null) {
+ return Collections.emptyList();
}
- Map<Long, Integer> values = map.remove();
- for (Map.Entry<Long, Integer> entry : values.entrySet()) {
- if (entry.getValue() != null) {
- free(gid, entry.getKey(), entry.getValue());
- }
+ group.store.remove();
+ synchronized (group.physicalMapping) {
+ return new ArrayList<Long>(group.physicalMapping.keySet());
}
- return values.keySet();
}
-
- void free(Long gid, Long oid, int inode) {
- if (inode == EMPTY_ADDRESS) {
- return;
- }
- BlockManager bm = getBlockManager(gid, oid, inode);
- bm.free();
+
+ @Override
+ public FileStore createFileStore(String name) {
+ return storageManager.createFileStore(name);
}
+
+ @Override
+ public void initialize() throws TeiidComponentException {
+ this.storageManager.initialize();
+ }
public void setStorageManager(StorageManager storageManager) {
this.storageManager = storageManager;
@@ -731,16 +341,8 @@
return storageManager;
}
- public void setMaxBufferSpace(long maxBufferSpace) {
- this.maxBufferSpace = maxBufferSpace;
+ public void setCompactionThreshold(int compactionThreshold) {
+ this.compactionThreshold = compactionThreshold;
}
-
- public int getInodesInUse() {
- return this.inodesInuse.getBitsSet();
- }
-
- public int getDataBlocksInUse() {
- return this.blocksInuse.getBitsSet();
- }
-
+
}
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java 2011-09-29
16:46:38 UTC (rev 3507)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java 2011-09-30
02:00:28 UTC (rev 3508)
@@ -22,7 +22,6 @@
package org.teiid.common.buffer.impl;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -87,20 +86,6 @@
public synchronized long getLength() {
return buffer.limit();
}
-
- @Override
- public synchronized ByteBuffer getBuffer(long start, int length, boolean allocate) {
- int position = (int)start;
- buffer.limit(position + length);
- buffer.position(position);
- return buffer.slice();
- }
-
- @Override
- public void updateFromBuffer(ByteBuffer bb, long start)
- throws IOException {
- //do nothing we are sharing the bytes
- }
}
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java 2011-09-29
16:46:38 UTC (rev 3507)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java 2011-09-30
02:00:28 UTC (rev 3508)
@@ -23,7 +23,6 @@
package org.teiid.common.buffer.impl;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -79,7 +78,7 @@
FileStore store = null;
if (!write) {
synchronized (this) {
- if (fileOffset + length > len) {
+ if (fileOffset > len) {
throw new IOException("Invalid file position " + fileOffset + "
length " + length); //$NON-NLS-1$ //$NON-NLS-2$
}
store = storageFiles.get((int)(fileOffset/maxFileSize));
@@ -118,28 +117,6 @@
len = length;
}
- @Override
- public ByteBuffer getBuffer(long start, int length, boolean allocate) throws
IOException {
- FileStore store = null;
- synchronized (this) {
- ensureLength(start + length);
- store = storageFiles.get((int)(start/maxFileSize));
- }
- long fileBegin = start%maxFileSize;
- return store.getBuffer(fileBegin, length, allocate);
- }
-
- @Override
- public void updateFromBuffer(ByteBuffer bb, long start)
- throws IOException {
- FileStore store = null;
- synchronized (this) {
- store = storageFiles.get((int)(start/maxFileSize));
- }
- long fileBegin = start%maxFileSize;
- store.updateFromBuffer(bb, fileBegin);
- }
-
@Override
public synchronized void setLength(long length) throws IOException {
if (length > len) {
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java 2011-09-29
16:46:38 UTC (rev 3507)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java 2011-09-30
02:00:28 UTC (rev 3508)
@@ -86,7 +86,6 @@
SplittableStorageManager ssm = new SplittableStorageManager(storageManager);
ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
FileStoreCache fsc = new FileStoreCache();
- fsc.setMaxBufferSpace(Runtime.getRuntime().maxMemory()/4);
fsc.setStorageManager(ssm);
fsc.initialize();
bufferManager.setCache(fsc);
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java 2011-09-29 16:46:38
UTC (rev 3507)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java 2011-09-30 02:00:28
UTC (rev 3508)
@@ -30,6 +30,7 @@
import org.junit.Test;
import org.teiid.common.buffer.STree.InsertMode;
import org.teiid.common.buffer.impl.BufferManagerImpl;
+import org.teiid.common.buffer.impl.FileStoreCache;
import org.teiid.core.TeiidComponentException;
import org.teiid.query.sql.symbol.ElementSymbol;
@@ -109,6 +110,7 @@
BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
bm.setProcessorBatchSize(32);
bm.setMaxReserveKB(0);//force all to disk
+ ((FileStoreCache)bm.getCache()).setCompactionThreshold(0);
bm.initialize();
ElementSymbol e1 = new ElementSymbol("x");
Deleted: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java 2011-09-29
16:46:38 UTC (rev 3507)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java 2011-09-30
02:00:28 UTC (rev 3508)
@@ -1,66 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
- package org.teiid.common.buffer.impl;
-
-import static org.junit.Assert.*;
-
-import java.util.BitSet;
-import java.util.Random;
-
-import org.junit.Test;
-
-public class TestBlockBitSetTree {
-
- @Test public void testBitsSet() {
- BlockBitSetTree bst = new BlockBitSetTree((1 << 20) -1, new
TestBlockClosedLongIntHashTable.DummyBlockManager());
- bst.set(1, true);
- bst.set(100, true);
- bst.set(10000, true);
- bst.set(1000000, true);
- assertEquals(4, bst.getBitsSet());
- bst.set(1, false);
- assertEquals(3, bst.getBitsSet());
- assertFalse(bst.get(1));
- }
-
- @Test public void testNextClearSet() {
- BlockBitSetTree bst = new BlockBitSetTree((1 << 20) -1, new
TestBlockClosedLongIntHashTable.DummyBlockManager());
- BitSet bst1 = new BitSet();
- Random r = new Random(1);
- for (int i = 0; i < 1000; i++) {
- int rand = r.nextInt() & bst.getMaxIndex();
- bst.set(rand, true);
- bst1.set(rand, true);
- assertTrue(bst.get(rand));
- assertEquals(bst1.nextSetBit(rand), bst.nextSetBit(rand));
- assertEquals(String.valueOf(i), bst1.nextSetBit(rand), bst.nextSetBit(rand));
- }
-
- for (int i = 0; i < 10000; i++) {
- int rand = r.nextInt() & bst.getMaxIndex();
- assertEquals(bst1.nextClearBit(rand), bst.nextClearBit(rand));
- assertEquals(bst1.nextSetBit(rand), bst.nextSetBit(rand));
- }
- }
-
-}
Deleted:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java 2011-09-29
16:46:38 UTC (rev 3507)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java 2011-09-30
02:00:28 UTC (rev 3508)
@@ -1,92 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import static org.junit.Assert.*;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
-
-import org.junit.Test;
-import org.teiid.common.buffer.impl.FileStoreCache.BlockInfo;
-
-public class TestBlockClosedLongIntHashTable {
-
- public static final class DummyBlockManager implements BlockManager {
- List<BlockInfo> blocks = new ArrayList<BlockInfo>();
-
- @Override
- public int getInode() {
- return 0;
- }
-
- @Override
- public void updateBlock(BlockInfo block) {
-
- }
-
- @Override
- public void free() {
- blocks.clear();
- }
-
- @Override
- public BlockInfo getBlock(int index) {
- BlockInfo block = blocks.get(index);
- block.buf.rewind();
- return block;
- }
-
- @Override
- public void freeBlock(int index) {
- blocks.remove(index);
- }
-
- @Override
- public BlockInfo allocateBlock(int index) {
- assertEquals(index, blocks.size());
- ByteBuffer result = ByteBuffer.wrap(new byte[FileStoreCache.BLOCK_SIZE]);
- blocks.add(new BlockInfo(false, result, index, -1));
- return blocks.get(blocks.size() - 1);
- }
- }
-
- @Test public void testAgainstHashMap() {
- BlockClosedLongIntHashTable table = new BlockClosedLongIntHashTable(new
DummyBlockManager());
- HashMap<Long, Integer> table1 = new HashMap<Long, Integer>(16);
- for (long i = 1; i < 200000; i++) {
- table.put(i, (int)i);
- table1.put(i, (int)i);
- }
- Random r = new Random(0);
- for (int i = 1; i < 2000000; i++) {
- long toRemove = r.nextInt(i);
- boolean removed = table.remove(toRemove) != BlockClosedLongIntHashTable.EMPTY;
- assertEquals(table1.remove(toRemove) != null, removed);
- }
- }
-
-}
Deleted: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStoreCache.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStoreCache.java 2011-09-29
16:46:38 UTC (rev 3507)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStoreCache.java 2011-09-30
02:00:28 UTC (rev 3508)
@@ -1,119 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.junit.Test;
-import org.teiid.common.buffer.CacheEntry;
-import org.teiid.common.buffer.Serializer;
-import org.teiid.core.util.UnitTestUtil;
-
-public class TestFileStoreCache {
-
- private final class SimpleSerializer implements Serializer<Integer> {
- @Override
- public Integer deserialize(ObjectInputStream ois)
- throws IOException, ClassNotFoundException {
- Integer result = ois.readInt();
- for (int i = 0; i < result; i++) {
- assertEquals(i, ois.readInt());
- }
- return result;
- }
-
- @Override
- public Long getId() {
- return 1l;
- }
-
- @Override
- public void serialize(Integer obj, ObjectOutputStream oos)
- throws IOException {
- oos.writeInt(obj);
- for (int i = 0; i < obj; i++) {
- oos.writeInt(i);
- }
- }
-
- @Override
- public boolean useSoftCache() {
- return false;
- }
- }
-
- @Test public void testAddGetMultiBlock() throws Exception {
- FileStoreCache fsc = new FileStoreCache();
- fsc.setMaxBufferSpace(1 << 28);
- SplittableStorageManager ssm = new SplittableStorageManager(new
MemoryStorageManager());
- ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
- fsc.setStorageManager(ssm);
- fsc.initialize();
-
- UnitTestUtil.enableTraceLogging("org.teiid"); //$NON-NLS-1$
-
- CacheEntry ce = new CacheEntry(2l);
- Serializer<Integer> s = new SimpleSerializer();
- fsc.createCacheGroup(s.getId());
- Integer cacheObject = Integer.valueOf(2);
- ce.setObject(cacheObject);
- fsc.add(ce, s);
-
- ce = fsc.get(2l, s);
- assertEquals(cacheObject, ce.getObject());
-
- //test something that exceeds the direct inode data blocks
- ce = new CacheEntry(3l);
- cacheObject = Integer.valueOf(80000);
- ce.setObject(cacheObject);
- fsc.add(ce, s);
-
- ce = fsc.get(3l, s);
- assertEquals(cacheObject, ce.getObject());
-
- fsc.removeCacheGroup(1l);
-
- assertEquals(0, fsc.getDataBlocksInUse());
- assertEquals(0, fsc.getInodesInUse());
-
- //test something that exceeds the indirect inode data blocks
- ce = new CacheEntry(3l);
- fsc.createCacheGroup(s.getId());
- cacheObject = Integer.valueOf(5000000);
- ce.setObject(cacheObject);
- fsc.add(ce, s);
-
- ce = fsc.get(3l, s);
- assertEquals(cacheObject, ce.getObject());
-
- fsc.removeCacheGroup(1l);
-
- assertEquals(0, fsc.getDataBlocksInUse());
- assertEquals(0, fsc.getInodesInUse());
- }
-
-}
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-09-29
16:46:38 UTC (rev 3507)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-09-30
02:00:28 UTC (rev 3508)
@@ -378,17 +378,17 @@
Future<ResultsMessage> message =
core.executeRequest(reqMsg.getExecutionId(), reqMsg);
ResultsMessage rm = message.get(500000, TimeUnit.MILLISECONDS);
assertNull(rm.getException());
- assertEquals(5, rm.getResults().length);
+ assertEquals(5, rm.getResultsList().size());
message = core.processCursorRequest(reqMsg.getExecutionId(), 6, 5);
rm = message.get(500000, TimeUnit.MILLISECONDS);
assertNull(rm.getException());
- assertEquals(5, rm.getResults().length);
+ assertEquals(5, rm.getResultsList().size());
message = core.processCursorRequest(reqMsg.getExecutionId(), 11, 5);
rm = message.get(500000, TimeUnit.MILLISECONDS);
assertNull(rm.getException());
- assertEquals(5, rm.getResults().length);
+ assertEquals(5, rm.getResultsList().size());
}
@Test public void testSourceConcurrency() throws Exception {
Modified: trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2011-09-29
16:46:38 UTC (rev 3507)
+++ trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2011-09-30
02:00:28 UTC (rev 3508)
@@ -108,7 +108,6 @@
ssm.setMaxFileSize(maxFileSize);
FileStoreCache fsc = new FileStoreCache();
fsc.setStorageManager(ssm);
- fsc.setMaxBufferSpace(maxBufferSpace*MB);
fsc.initialize();
this.bufferMgr.setCache(fsc);
} else {