Author: shawkins
Date: 2011-10-20 11:37:18 -0400 (Thu, 20 Oct 2011)
New Revision: 3573
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
Log:
TEIID-1750 fixing a bug with positional filestore writes and fixing size indexing.
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java 2011-10-20
14:41:28 UTC (rev 3572)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java 2011-10-20
15:37:18 UTC (rev 3573)
@@ -34,12 +34,10 @@
int blockIndex;
ByteBuffer buf;
boolean done;
- private final boolean threadSafe;
- BlockInputStream(BlockManager manager, int blockCount, boolean threadSafe) {
+ BlockInputStream(BlockManager manager, int blockCount) {
this.manager = manager;
this.maxBlock = blockCount;
- this.threadSafe = threadSafe;
}
@Override
@@ -58,9 +56,6 @@
return;
}
buf = manager.getBlock(blockIndex++);
- if (threadSafe) {
- buf = buf.duplicate();
- }
}
}
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-20
14:41:28 UTC (rev 3572)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-20
15:37:18 UTC (rev 3573)
@@ -405,12 +405,9 @@
List<BlockStore> stores = new ArrayList<BlockStore>();
int size = BLOCK_SIZE;
do {
- if ((size>>1) >= maxStorageObjectSize) {
- size>>=1; //adjust the last block size if needed
- }
- stores.add(new BlockStore(this.storageManager, size, 15,
BufferManagerImpl.CONCURRENCY_LEVEL>>2));
- size <<=2;
- } while (size>>2 < maxStorageObjectSize);
+ stores.add(new BlockStore(this.storageManager, size, 30,
BufferManagerImpl.CONCURRENCY_LEVEL));
+ size <<=1;
+ } while ((size>>1) < maxStorageObjectSize);
this.sizeBasedStores = stores.toArray(new BlockStore[stores.size()]);
}
@@ -503,7 +500,6 @@
}
} catch (Throwable e) {
if (e == PhysicalInfo.sizeChanged) {
- //System.out.println("size changed " + info.inode + " " +
info.block + " " + info);
//entries are mutable after adding, the original should be removed shortly so just
ignore
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Object "+ entry.getId()
+" changed size since first persistence, keeping the original."); //$NON-NLS-1$
//$NON-NLS-2$
} else {
@@ -587,7 +583,7 @@
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Getting object at
inode", info.inode, serializer.getId(), oid); //$NON-NLS-1$
}
BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
- is = new BlockInputStream(manager, info.memoryBlockCount, info.evicting);
+ is = new BlockInputStream(manager, info.memoryBlockCount);
} else if (info.block != EMPTY_ADDRESS) {
memoryBufferEntries.recordAccess(info);
storageReads.incrementAndGet();
@@ -720,7 +716,7 @@
try {
if (demote && block == EMPTY_ADDRESS) {
storageWrites.getAndIncrement();
- BlockInputStream is = new BlockInputStream(bm, memoryBlockCount, false); //we know
this can always be single threaded
+ BlockInputStream is = new BlockInputStream(bm, memoryBlockCount);
BlockStore blockStore = sizeBasedStores[sizeIndex];
block = getAndSetNextClearBit(blockStore);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
@@ -930,9 +926,9 @@
return; //no changes
}
this.memoryBlockCount = newMemoryBlockCount;
- while (newMemoryBlockCount >= 1) {
+ while (newMemoryBlockCount > 1) {
this.sizeIndex++;
- newMemoryBlockCount>>=2;
+ newMemoryBlockCount = (newMemoryBlockCount>>1) +
((newMemoryBlockCount&0x01)==0?0:1);
}
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-20
14:41:28 UTC (rev 3572)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-20
15:37:18 UTC (rev 3573)
@@ -323,7 +323,7 @@
}
}
- static final int CONCURRENCY_LEVEL = 32; //TODO: make this configurable
+ static final int CONCURRENCY_LEVEL = 32; //TODO: make this configurable since it is
roughly the same as max active plans
private static final int TARGET_BYTES_PER_ROW = 1 << 11; //2k bytes per row
private static ReferenceQueue<CacheEntry> SOFT_QUEUE = new
ReferenceQueue<CacheEntry>();
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java 2011-10-20
14:41:28 UTC (rev 3572)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java 2011-10-20
15:37:18 UTC (rev 3573)
@@ -180,7 +180,7 @@
}
/**
- * Set to try to always allocate against the first available block in a segment.
+ * Set to always allocate against the first available block in a segment.
* @param compact
*/
public void setCompact(boolean compact) {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-10-20
14:41:28 UTC (rev 3572)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-10-20
15:37:18 UTC (rev 3573)
@@ -133,11 +133,10 @@
if (bytesUsed > 0) {
long used = usedBufferSpace.addAndGet(bytesUsed);
if (used > maxBufferSpace) {
- usedBufferSpace.addAndGet(-bytesUsed);
//TODO: trigger a compaction before this is thrown
throw new
IOException(QueryPlugin.Util.getString("FileStoreageManager.space_exhausted",
maxBufferSpace)); //$NON-NLS-1$
}
- fileAccess.setLength(bytesUsed);
+ fileAccess.setLength(newLength);
bytesUsed = 0;
}
fileAccess.seek(fileOffset);
Modified:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-20
14:41:28 UTC (rev 3572)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-20
15:37:18 UTC (rev 3573)
@@ -188,4 +188,26 @@
return fsc;
}
+ @Test public void testSizeIndex() throws Exception {
+ PhysicalInfo info = new PhysicalInfo(1l, 1l, -1, 0);
+ info.setSize(1<<13);
+ assertEquals(0, info.sizeIndex);
+
+ info = new PhysicalInfo(1l, 1l, -1, 0);
+ info.setSize(1 + (1<<13));
+ assertEquals(1, info.sizeIndex);
+
+ info = new PhysicalInfo(1l, 1l, -1, 0);
+ info.setSize(2 + (1<<15));
+ assertEquals(3, info.sizeIndex);
+ }
+
+ @Test(expected=Exception.class) public void testSizeChanged() throws Exception {
+ PhysicalInfo info = new PhysicalInfo(1l, 1l, -1, 0);
+ info.setSize(1<<13);
+ assertEquals(0, info.sizeIndex);
+
+ info.setSize(1 + (1<<13));
+ }
+
}
Modified:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java 2011-10-20
14:41:28 UTC (rev 3572)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java 2011-10-20
15:37:18 UTC (rev 3573)
@@ -59,6 +59,25 @@
store.remove();
assertEquals(0, sm.getUsedBufferSpace());
}
+
+ @Test public void testPositionalWrite() throws Exception {
+ FileStorageManager sm = getStorageManager(null, null);
+ String tsID = "0"; //$NON-NLS-1$
+ FileStore store = sm.createFileStore(tsID);
+ byte[] expectedBytes = writeBytes(store, 2048);
+ assertEquals(4096, sm.getUsedBufferSpace());
+
+ writeBytes(store, 4096);
+ assertEquals(6144, sm.getUsedBufferSpace());
+
+ byte[] bytesRead = new byte[2048];
+ store.readFully(2048, bytesRead, 0, bytesRead.length);
+
+ assertArrayEquals(expectedBytes, bytesRead);
+
+ store.remove();
+ assertEquals(0, sm.getUsedBufferSpace());
+ }
@Test(expected=IOException.class) public void testMaxSpace() throws Exception {
FileStorageManager sm = getStorageManager(null, null);
@@ -81,15 +100,19 @@
static Random r = new Random();
- static void writeBytes(FileStore store)
+ static void writeBytes(FileStore store) throws IOException {
+ writeBytes(store, store.getLength());
+ }
+
+ static byte[] writeBytes(FileStore store, long start)
throws IOException {
byte[] bytes = new byte[2048];
r.nextBytes(bytes);
- long start = store.getLength();
- store.write(bytes, 0, bytes.length);
+ store.write(start, bytes, 0, bytes.length);
byte[] bytesRead = new byte[2048];
store.readFully(start, bytesRead, 0, bytesRead.length);
assertTrue(Arrays.equals(bytes, bytesRead));
+ return bytes;
}
@Test public void testWritingMultipleFiles() throws Exception {