Author: shawkins
Date: 2011-10-13 21:46:31 -0400 (Thu, 13 Oct 2011)
New Revision: 3550
Removed:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java
Modified:
trunk/common-core/src/main/java/org/teiid/core/util/ExecutorUtils.java
trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java
trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
Log:
TEIID-1750 wiring in the new buffer logic
Modified: trunk/common-core/src/main/java/org/teiid/core/util/ExecutorUtils.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/util/ExecutorUtils.java 2011-10-12
23:23:02 UTC (rev 3549)
+++ trunk/common-core/src/main/java/org/teiid/core/util/ExecutorUtils.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -25,6 +25,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -38,9 +39,13 @@
* @return
*/
public static ExecutorService newFixedThreadPool(int nThreads, String name) {
+ return newFixedThreadPool(nThreads, Integer.MAX_VALUE, name);
+ }
+
+ public static ExecutorService newFixedThreadPool(int nThreads, int maxQueue, String
name) {
ThreadPoolExecutor tpe = new ThreadPoolExecutor(nThreads, nThreads,
60L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(), new
NamedThreadFactory(name));
+ maxQueue==0?new
SynchronousQueue<Runnable>():new LinkedBlockingQueue<Runnable>(maxQueue), new
NamedThreadFactory(name));
tpe.allowCoreThreadTimeOut(true);
return tpe;
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java 2011-10-12
23:23:02 UTC (rev 3549)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -25,8 +25,8 @@
public class BaseCacheEntry implements Comparable<BaseCacheEntry> {
private Long id;
- protected long lastAccess;
- protected double orderingValue;
+ protected float lastAccess;
+ protected float orderingValue;
public BaseCacheEntry(Long id) {
this.id = id;
@@ -46,19 +46,19 @@
return getId().toString();
}
- public long getLastAccess() {
+ public float getLastAccess() {
return lastAccess;
}
- public void setLastAccess(long lastAccess) {
+ public void setLastAccess(float lastAccess) {
this.lastAccess = lastAccess;
}
- public double getOrderingValue() {
+ public float getOrderingValue() {
return orderingValue;
}
- public void setOrderingValue(double orderingValue) {
+ public void setOrderingValue(float orderingValue) {
this.orderingValue = orderingValue;
}
@@ -66,7 +66,7 @@
public int compareTo(BaseCacheEntry o) {
int result = (int) Math.signum(orderingValue - o.orderingValue);
if (result == 0) {
- result = Long.signum(lastAccess - o.lastAccess);
+ result = (int)Math.signum(lastAccess - o.lastAccess);
if (result == 0) {
return Long.signum(id - o.id);
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java 2011-10-12
23:23:02 UTC (rev 3549)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -25,17 +25,18 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
+/**
+ * TODO: support freeing of datablocks as we go
+ */
final class BlockInputStream extends InputStream {
private final BlockManager manager;
private final int maxBlock;
int blockIndex;
ByteBuffer buf;
- boolean free;
boolean done;
- BlockInputStream(BlockManager manager, int blockCount, boolean free) {
+ BlockInputStream(BlockManager manager, int blockCount) {
this.manager = manager;
- this.free = free;
this.maxBlock = blockCount;
}
@@ -52,15 +53,9 @@
if (buf == null || buf.remaining() == 0) {
if (maxBlock == blockIndex) {
done = true;
- if (blockIndex > 1 && free) {
- manager.freeBlock(blockIndex - 1, false);
- }
return;
}
buf = manager.getBlock(blockIndex++);
- if (blockIndex > 2 && free) {
- manager.freeBlock(blockIndex - 2, false);
- }
}
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java 2011-10-12
23:23:02 UTC (rev 3549)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -41,7 +41,7 @@
*/
ByteBuffer getBlock(int index);
- int freeBlock(int index, boolean steal);
+ void freeBlock(int index);
int free(boolean steal);
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java 2011-10-12
23:23:02 UTC (rev 3549)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -25,16 +25,20 @@
import org.teiid.common.buffer.FileStore;
import org.teiid.common.buffer.StorageManager;
+/**
+ * Represents a FileStore that holds blocks of a fixed size.
+ */
class BlockStore {
final long blockSize;
final ConcurrentBitSet blocksInUse;
final FileStore[] stores;
- public BlockStore(StorageManager storageManager, int blockSize, int blockCountLog) {
+ public BlockStore(StorageManager storageManager, int blockSize, int blockCountLog, int
concurrencyLevel) {
this.blockSize = blockSize;
int blockCount = 1 << blockCountLog;
- this.blocksInUse = new ConcurrentBitSet(blockCount,
BufferManagerImpl.CONCURRENCY_LEVEL/2);
- this.stores = new FileStore[BufferManagerImpl.CONCURRENCY_LEVEL/2];
+ this.blocksInUse = new ConcurrentBitSet(blockCount, concurrencyLevel);
+ this.blocksInUse.setCompact(true);
+ this.stores = new FileStore[concurrencyLevel];
for (int i = 0; i < stores.length; i++) {
this.stores[i] = storageManager.createFileStore(String.valueOf(blockSize) +
'_' + i);
}
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-12
23:23:02 UTC (rev 3549)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -35,7 +35,10 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -48,6 +51,7 @@
import org.teiid.common.buffer.StorageManager;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.util.ExecutorUtils;
import org.teiid.core.util.ObjectConverterUtil;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
@@ -86,9 +90,14 @@
* proportion to the number of tables/tuplebuffers in use.
*
* TODO: compact tail storage blocks. there may be dangling blocks causing us to consume
disk space.
+ * we should at least reclaim tail space if the end block is removed. for now we are
just relying
+ * on the compact option of {@link ConcurrentBitSet} to keep the blocks at the start of
the
+ * files.
*/
public class BufferFrontedFileStoreCache implements Cache, StorageManager {
+ public static final int DEFAuLT_MAX_OBJECT_SIZE = 1 << 23;
+
static final int ADDRESS_BITS = 31;
static final int SYSTEM_MASK = 1<<ADDRESS_BITS;
static final int BYTES_PER_BLOCK_ADDRESS = 4;
@@ -212,7 +221,7 @@
if ((next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS) {
memoryEvictionLock.readLock().unlock();
readLocked = false;
- next = evictFromMemoryBuffer();
+ next = evictFromMemoryBuffer(true);
}
} finally {
if (readLocked) {
@@ -226,12 +235,9 @@
}
@Override
- public int freeBlock(int index, boolean steal) {
+ public void freeBlock(int index) {
int dataBlock = getOrUpdateDataBlockIndex(index, EMPTY_ADDRESS, Mode.UPDATE);
- if (!steal) {
- freeDataBlock(dataBlock);
- }
- return dataBlock;
+ freeDataBlock(dataBlock);
}
private void freeDataBlock(int dataBlock) {
@@ -248,6 +254,7 @@
if (this.inode == -1) {
throw new AssertionError("Out of inodes"); //$NON-NLS-1$
}
+ inodesCreated.getAndIncrement();
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_DQP, "Allocating inode",
this.inode, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
}
@@ -268,11 +275,12 @@
int dataBlockToSteal = bb.getInt(0);
int indirectIndexBlock = bb.getInt(BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS);
int doublyIndirectIndexBlock =
bb.getInt(BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1));
- boolean freedAll = freeBlock(steal?BYTES_PER_BLOCK_ADDRESS:0, bb, DIRECT_POINTERS,
true);
+ boolean freedAll = freeBlock(steal?BYTES_PER_BLOCK_ADDRESS:0, bb,
DIRECT_POINTERS-(steal?1:0), true);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_DQP, "freeing inode", inode,
"for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
}
inodesInuse.clear(inode);
+ inodesRemoved.getAndIncrement();
if (!freedAll || indirectIndexBlock == EMPTY_ADDRESS) {
return steal?dataBlockToSteal:EMPTY_ADDRESS;
}
@@ -315,6 +323,8 @@
return blockByteBuffer.getByteBuffer(dataBlock);
}
}
+ AtomicInteger inodesRemoved = new AtomicInteger();
+ AtomicInteger inodesCreated = new AtomicInteger();
private static class PhysicalInfo extends BaseCacheEntry {
int inode = EMPTY_ADDRESS;
@@ -336,32 +346,16 @@
}
}
- double crfLamda = .0001;
-
- StorageManager storageManager;
- int maxStorageObjectSize = 1 << 23; //8MB
- private long memoryBufferSpace = 1 << 27;
+ private StorageManager storageManager;
+ private int maxStorageObjectSize = DEFAuLT_MAX_OBJECT_SIZE;
+ private long memoryBufferSpace = 1 << 26; //64MB
private boolean direct;
- int maxMemoryBlocks;
+ private int maxMemoryBlocks;
private AtomicLong readAttempts = new AtomicLong();
- PartiallyOrderedCache<Long, PhysicalInfo> memoryBufferEntries = new
PartiallyOrderedCache<Long, PhysicalInfo>(16, .75f,
BufferManagerImpl.CONCURRENCY_LEVEL) {
-
- @Override
- protected void recordAccess(Long key, PhysicalInfo value, boolean initial) {
- long lastAccess = value.getLastAccess();
- value.setLastAccess(readAttempts.get());
- if (initial && lastAccess == 0) {
- return;
- }
- double orderingValue = value.getOrderingValue();
- orderingValue = computeNextOrderingValue(value.getLastAccess(), lastAccess,
orderingValue);
- value.setOrderingValue(orderingValue);
- }
-
- };
+ private OrderedCache<Long, PhysicalInfo> memoryBufferEntries = new
OrderedCache<Long, PhysicalInfo>(16, .75f, BufferManagerImpl.CONCURRENCY_LEVEL,
readAttempts);
private Semaphore memoryWritePermits; //prevents deadlock waiting for free blocks
- ReentrantReadWriteLock memoryEvictionLock = new ReentrantReadWriteLock();
+ private ReentrantReadWriteLock memoryEvictionLock = new ReentrantReadWriteLock(true);
private int blocks;
private ConcurrentBitSet blocksInuse;
@@ -374,18 +368,41 @@
private ConcurrentHashMap<Long, Map<Long, PhysicalInfo>> physicalMapping =
new ConcurrentHashMap<Long, Map<Long, PhysicalInfo>>(16, .75f,
BufferManagerImpl.CONCURRENCY_LEVEL);
private BlockStore[] sizeBasedStores;
+ private AtomicBoolean cleanerRunning = new AtomicBoolean();
+ private ExecutorService asynchPool = ExecutorUtils.newFixedThreadPool(1, 0,
"FileStore Worker"); //$NON-NLS-1$
+ private final Runnable cleaningTask = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ while (lowBlocks()) {
+ if (evictFromMemoryBuffer(false) == EMPTY_ADDRESS) {
+ break;
+ }
+ }
+ } finally {
+ cleanerRunning.set(false);
+ }
+ }
+ };
+ private int cleaningThreshold;
+
+ private AtomicLong storageWrites = new AtomicLong();
+ private AtomicLong storageReads = new AtomicLong();
+
@Override
public void initialize() throws TeiidComponentException {
storageManager.initialize();
+ memoryBufferSpace = Math.max(memoryBufferSpace, maxStorageObjectSize);
blocks = (int) Math.min(Integer.MAX_VALUE, (memoryBufferSpace>>LOG_BLOCK_SIZE));
inodesInuse = new ConcurrentBitSet(blocks+1, BufferManagerImpl.CONCURRENCY_LEVEL);
blocksInuse = new ConcurrentBitSet(blocks, BufferManagerImpl.CONCURRENCY_LEVEL);
this.blockByteBuffer = new BlockByteBuffer(30, blocks, LOG_BLOCK_SIZE, direct);
//ensure that we'll run out of blocks first
this.inodeByteBuffer = new BlockByteBuffer(30, blocks+1, LOG_INODE_SIZE, direct);
- memoryBufferSpace = Math.max(memoryBufferSpace, maxStorageObjectSize);
memoryWritePermits = new Semaphore(Math.max(1,
(int)Math.min(memoryBufferSpace/maxStorageObjectSize, Integer.MAX_VALUE)));
maxMemoryBlocks = Math.min(MAX_DOUBLE_INDIRECT,
maxStorageObjectSize>>LOG_BLOCK_SIZE);
+ cleaningThreshold = Math.min(maxMemoryBlocks<<3, blocks>>1); //try to
maintain enough freespace to hold 8 max objects
//account for index pointer block overhead
if (maxMemoryBlocks > DIRECT_POINTERS) {
maxMemoryBlocks--;
@@ -397,20 +414,18 @@
List<BlockStore> stores = new ArrayList<BlockStore>();
int size = BLOCK_SIZE;
do {
- stores.add(new BlockStore(this.storageManager, size, 30));
+ if ((size>>1) >= maxStorageObjectSize) {
+ size>>=1; //adjust the last block size if needed
+ }
+ stores.add(new BlockStore(this.storageManager, size, 30,
BufferManagerImpl.CONCURRENCY_LEVEL>>2));
size <<=2;
} while (size>>2 < maxStorageObjectSize);
this.sizeBasedStores = stores.toArray(new BlockStore[stores.size()]);
}
- double computeNextOrderingValue(long currentTime,
- long lastAccess, double orderingValue) {
- orderingValue =
- //Frequency component
- orderingValue*Math.pow(1-crfLamda, currentTime - lastAccess)
- //recency component
- + Math.pow(currentTime, crfLamda);
- return orderingValue;
+ boolean lowBlocks() {
+ int bitsSet = blocksInuse.getBitsSet();
+ return bitsSet > 0 && (blocks - bitsSet < cleaningThreshold) &&
memoryBufferEntries.firstEntry() != null;
}
InodeBlockManager getBlockManager(long gid, long oid, int inode) {
@@ -441,12 +456,17 @@
} else {
newEntry = false;
synchronized (info) {
- if (info.inode != EMPTY_ADDRESS || !shouldPlaceInMemoryBuffer(readAttempts.get(),
info)) {
+ //we assume that serialization would be faster than a disk read
+ if (info.inode != EMPTY_ADDRESS || !shouldPlaceInMemoryBuffer(0, info)) {
success = true;
return;
}
}
}
+ if (!cleanerRunning.get() && lowBlocks() &&
cleanerRunning.compareAndSet(false, true)) {
+ LogManager.logDetail(LogConstants.CTX_DQP, "Starting memory buffer
cleaner"); //$NON-NLS-1$
+ asynchPool.execute(cleaningTask);
+ }
memoryWritePermits.acquire();
hasPermit = true;
blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
@@ -461,8 +481,12 @@
if (newEntry) {
info = new PhysicalInfo(s.getId(), entry.getId(), blockManager.getInode(),
fsos.getBytesWritten());
map.put(entry.getId(), info);
- memoryBufferEntries.put(entry.getId(), info);
+ } else {
+ synchronized (info) {
+ info.inode = blockManager.getInode();
+ }
}
+ memoryBufferEntries.put(entry.getId(), info);
success = true;
}
}
@@ -477,7 +501,7 @@
}
}
}
-
+
@Override
public CacheEntry get(Long oid, Serializer<?> serializer) throws
TeiidComponentException {
long currentTime = readAttempts.incrementAndGet();
@@ -499,28 +523,34 @@
LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at inode",
info.inode, serializer.getId(), oid); //$NON-NLS-1$
}
BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
- is = new BlockInputStream(manager, info.memoryBlockCount, false);
+ is = new BlockInputStream(manager, info.memoryBlockCount);
} else if (info.block != EMPTY_ADDRESS) {
+ storageReads.incrementAndGet();
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at block",
info.block, info.sizeIndex, serializer.getId(), oid); //$NON-NLS-1$
}
BlockStore blockStore = sizeBasedStores[info.sizeIndex];
FileStore fs =
blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
long blockOffset =
(info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
- is = fs.createInputStream(blockOffset);
+ is = fs.createInputStream(blockOffset,
info.memoryBlockCount<<LOG_BLOCK_SIZE);
if (shouldPlaceInMemoryBuffer(currentTime, info) &&
this.memoryWritePermits.tryAcquire()) {
BlockManager manager = null;
+ boolean success = false;
try {
manager = getBlockManager(info.gid, info.getId(), EMPTY_ADDRESS);
ExtensibleBufferedOutputStream os = new BlockOutputStream(manager);
ObjectConverterUtil.write(os, is, -1);
+ info.inode = manager.getInode();
memoryBufferEntries.put(info.getId(), info);
- is = new BlockInputStream(manager, info.memoryBlockCount, false);
+ is = new BlockInputStream(manager, info.memoryBlockCount);
+ success = true;
} finally {
this.memoryWritePermits.release();
+ if (!success && manager != null) {
+ manager.free(false);
+ info.inode = EMPTY_ADDRESS;
+ }
}
- } else {
- this.toString();
}
} else {
return null;
@@ -540,11 +570,18 @@
}
}
+ /**
+ * Determine if an object should be in the memory buffer.
+ * Adds are indicated by a current time of 0.
+ * @param currentTime
+ * @param info
+ * @return
+ */
private boolean shouldPlaceInMemoryBuffer(long currentTime, PhysicalInfo info) {
Map.Entry<PhysicalInfo, Long> lowest = memoryBufferEntries.firstEntry();
- return lowest == null
- || (blocksInuse.getTotalBits() - blocksInuse.getBitsSet()) >
(info.memoryBlockCount<<3)
- || lowest.getKey().getOrderingValue() < computeNextOrderingValue(currentTime,
info.getLastAccess(), info.getOrderingValue());
+ return (blocksInuse.getTotalBits() - blocksInuse.getBitsSet()) >
(info.memoryBlockCount<<2)
+ || (lowest != null && lowest.getKey().block != EMPTY_ADDRESS
+ && lowest.getKey().getOrderingValue() <
(currentTime>0?memoryBufferEntries.computeNextOrderingValue(currentTime,
info.getLastAccess(), info.getOrderingValue()):info.getOrderingValue()));
}
@Override
@@ -577,7 +614,7 @@
return;
}
PhysicalInfo info = map.remove(id);
- free(id, info, false);
+ free(id, info, false, false);
}
@Override
@@ -588,51 +625,54 @@
}
synchronized (map) {
for (Map.Entry<Long, PhysicalInfo> entry : map.entrySet()) {
- free(entry.getKey(), entry.getValue(), false);
+ free(entry.getKey(), entry.getValue(), false, false);
}
return map.keySet();
}
}
- int free(Long oid, PhysicalInfo info, boolean demote) {
- memoryBufferEntries.remove(oid);
+ int free(Long oid, PhysicalInfo info, boolean demote, boolean stealDataBlock) {
if (info == null) {
return EMPTY_ADDRESS;
}
synchronized (info) {
memoryBufferEntries.remove(oid);
- if (info.inode == EMPTY_ADDRESS) {
- return EMPTY_ADDRESS;
- }
- BlockManager bm = getBlockManager(info.gid, oid, info.inode);
- info.inode = EMPTY_ADDRESS;
if (demote) {
+ if (info.inode == EMPTY_ADDRESS) {
+ return EMPTY_ADDRESS;
+ }
+ BlockManager bm = getBlockManager(info.gid, oid, info.inode);
+ info.inode = EMPTY_ADDRESS;
if (info.block == EMPTY_ADDRESS) {
- BlockInputStream is = new BlockInputStream(bm, info.memoryBlockCount, true);
+ storageWrites.getAndIncrement();
+ BlockInputStream is = new BlockInputStream(bm, info.memoryBlockCount);
BlockStore blockStore = sizeBasedStores[info.sizeIndex];
+ info.block = getAndSetNextClearBit(blockStore);
FileStore fs =
blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
- info.block = getAndSetNextClearBit(blockStore);
long blockOffset =
(info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
byte[] b = new byte[BLOCK_SIZE];
int read = 0;
- boolean errored = false;
- while ((read = is.read(b, 0, b.length)) != -1) {
- if (!errored) {
- try {
- fs.write(blockOffset, b, 0, read);
- blockOffset+=read;
- } catch (Throwable e) {
- //just continue to free
- errored = true;
- LogManager.logError(LogConstants.CTX_DQP, e, "Error transferring block to
storage " + oid); //$NON-NLS-1$
- }
+ try {
+ while ((read = is.read(b, 0, b.length)) != -1) {
+ fs.write(blockOffset, b, 0, read);
+ blockOffset+=read;
}
+ } catch (Throwable e) {
+ //shouldn't happen, but we'll invalidate this write and continue
+ blockStore.blocksInUse.clear(info.block);
+ info.block = EMPTY_ADDRESS;
+ //just continue to free
+ LogManager.logError(LogConstants.CTX_DQP, e, "Error transferring block to
storage " + oid); //$NON-NLS-1$
}
- return is.free(true);
+ return is.free(stealDataBlock);
}
- return bm.free(true);
+ return bm.free(stealDataBlock);
}
- bm.free(false);
+ if (info.inode != EMPTY_ADDRESS) {
+ BlockManager bm = getBlockManager(info.gid, oid, info.inode);
+ info.inode = EMPTY_ADDRESS;
+ bm.free(false);
+ }
if (info.block != EMPTY_ADDRESS) {
BlockStore blockStore = sizeBasedStores[info.sizeIndex];
blockStore.blocksInUse.clear(info.block);
@@ -650,39 +690,34 @@
return result;
}
- /**
- * Stop the world eviction. Hopefully this should rarely happen.
- * @return the stole dataBlock
- */
- int evictFromMemoryBuffer() {
+ int evictFromMemoryBuffer(boolean steal) {
memoryEvictionLock.writeLock().lock();
int next = -1;
- boolean writeLocked = true;
try {
for (int i = 0; i < 10 && next == EMPTY_ADDRESS; i++) {
AutoCleanupUtil.doCleanup();
+ //scan the eviction queue looking for a victim
Iterator<Map.Entry<PhysicalInfo, Long>> iter =
memoryBufferEntries.getEvictionQueue().entrySet().iterator();
- while ((next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS &&
iter.hasNext()) {
+ while (((!steal && lowBlocks()) || (steal && (next =
blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS)) && iter.hasNext()) {
Map.Entry<PhysicalInfo, Long> entry = iter.next();
PhysicalInfo info = entry.getKey();
synchronized (info) {
if (info.inode == EMPTY_ADDRESS) {
continue;
}
- memoryEvictionLock.writeLock().unlock();
- writeLocked = false;
- next = free(entry.getValue(), info, true);
+ next = free(entry.getValue(), info, true, steal);
+ if (!steal) {
+ next = 0;
+ }
}
break;
}
}
- if (next == -1) {
+ if (steal && next == -1) {
throw new AssertionError("Could not free space for pending write");
//$NON-NLS-1$
}
} finally {
- if (writeLocked) {
- memoryEvictionLock.writeLock().unlock();
- }
+ memoryEvictionLock.writeLock().unlock();
}
return next;
}
@@ -711,4 +746,12 @@
this.maxStorageObjectSize = maxStorageBlockSize;
}
+ public long getStorageReads() {
+ return storageReads.get();
+ }
+
+ public long getStorageWrites() {
+ return storageWrites.get();
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-12
23:23:02 UTC (rev 3549)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -291,30 +291,11 @@
private AtomicInteger activeBatchKB = new AtomicInteger();
- //combined recency/frequency lamda value between 0 and 1 lower -> LFU, higher
-> LRU
- //TODO: adaptively adjust this value. more hits should move closer to lru
- private final double crfLamda = .001;
+ private AtomicLong readAttempts = new AtomicLong();
//implements a LRFU cache using the a customized crf function. we store the value
with
//the cache entry to make a better decision about reuse of the batch
//TODO: consider the size estimate in the weighting function
- private PartiallyOrderedCache<Long, CacheEntry> memoryEntries = new
PartiallyOrderedCache<Long, CacheEntry>(16, .75f, CONCURRENCY_LEVEL) {
-
- @Override
- protected void recordAccess(Long key, CacheEntry value, boolean initial) {
- long lastAccess = value.getLastAccess();
- value.setLastAccess(readAttempts.get());
- if (initial && lastAccess == 0) {
- return;
- }
- double orderingValue = value.getOrderingValue();
- orderingValue =
- //Frequency component
- orderingValue*Math.pow(1-crfLamda, value.getLastAccess() - lastAccess)
- //recency component
- + Math.pow(value.getLastAccess(), crfLamda);
- value.setOrderingValue(orderingValue);
- }
- };
+ private OrderedCache<Long, CacheEntry> memoryEntries = new
OrderedCache<Long, CacheEntry>(16, .75f, CONCURRENCY_LEVEL, readAttempts);
//limited size reference caches based upon the memory settings
private WeakReferenceHashedValueCache<CacheEntry> weakReferenceCache;
@@ -341,7 +322,6 @@
private AtomicLong batchAdded = new AtomicLong();
private AtomicLong readCount = new AtomicLong();
private AtomicLong writeCount = new AtomicLong();
- private AtomicLong readAttempts = new AtomicLong();
private AtomicLong referenceHit = new AtomicLong();
public long getBatchesAdded() {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java 2011-10-12
23:23:02 UTC (rev 3549)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -25,22 +25,29 @@
import java.util.BitSet;
import java.util.concurrent.atomic.AtomicInteger;
+import org.teiid.core.util.Assertion;
+
/**
* A segmented {@link BitSet} that supports greater concurrency
* and faster finding of clear bits.
*/
public class ConcurrentBitSet {
+ private static final int ADDRESS_BITS_PER_TOP_VALUE = 18;
+ private static final int MAX_TOP_VALUE = 1 << ADDRESS_BITS_PER_TOP_VALUE;
+
private static class Segment {
int offset;
int maxBits;
int startSearch;
int bitsSet;
- final private BitSet bitSet;
+ int[] topVals;
+ final BitSet bitSet;
public Segment(int bitCount) {
bitSet = new BitSet();
maxBits = bitCount;
+ this.topVals = new int[Math.max(1, maxBits >> ADDRESS_BITS_PER_TOP_VALUE)];
}
}
@@ -49,16 +56,14 @@
private AtomicInteger counter = new AtomicInteger();
private AtomicInteger bitsSet = new AtomicInteger();
private Segment[] segments;
+ private boolean compact;
public ConcurrentBitSet(int maxBits, int concurrencyLevel) {
- if (maxBits < concurrencyLevel) {
- concurrencyLevel = 1;
- while (maxBits > 2*concurrencyLevel) {
- concurrencyLevel <<=1;
- }
+ Assertion.assertTrue(maxBits > 0);
+ while ((bitsPerSegment = maxBits/concurrencyLevel) < concurrencyLevel) {
+ concurrencyLevel >>= 1;
}
segments = new Segment[concurrencyLevel];
- bitsPerSegment = maxBits/concurrencyLevel;
int modBits = maxBits%concurrencyLevel;
if (modBits > 0) {
bitsPerSegment++;
@@ -76,13 +81,17 @@
public void clear(int bitIndex) {
checkIndex(bitIndex);
Segment s = segments[bitIndex/bitsPerSegment];
- bitIndex = bitIndex%bitsPerSegment;
+ int segmentBitIndex = bitIndex%bitsPerSegment;
synchronized (s) {
- if (!s.bitSet.get(bitIndex)) {
+ if (!s.bitSet.get(segmentBitIndex)) {
throw new AssertionError(bitIndex + " not set"); //$NON-NLS-1$
}
- s.bitSet.clear(bitIndex);
+ if (compact) {
+ s.startSearch = Math.min(s.startSearch, segmentBitIndex);
+ }
+ s.bitSet.clear(segmentBitIndex);
s.bitsSet--;
+ s.topVals[segmentBitIndex>>ADDRESS_BITS_PER_TOP_VALUE]--;
}
bitsSet.decrementAndGet();
}
@@ -100,14 +109,35 @@
if (s.bitsSet == s.maxBits) {
continue;
}
- nextBit = s.bitSet.nextClearBit(s.startSearch);
- if (nextBit >= s.maxBits - 1) {
- s.startSearch = 0;
- nextBit = s.bitSet.nextClearBit(s.startSearch);
- if (nextBit >= s.maxBits) {
- throw new AssertionError("could not find clear bit"); //$NON-NLS-1$
+ int indexSearchStart = s.startSearch >> ADDRESS_BITS_PER_TOP_VALUE;
+ for (int j = indexSearchStart; j < s.topVals.length; j++) {
+ if (s.topVals[j] == MAX_TOP_VALUE) {
+ continue;
}
+ if (s.topVals[j] == 0) {
+ if (j == start) {
+ nextBit = s.startSearch;
+ break;
+ }
+ nextBit = j * MAX_TOP_VALUE;
+ break;
+ }
+ int index = j * MAX_TOP_VALUE;
+ if (j == indexSearchStart) {
+ index = s.startSearch;
+ }
+ nextBit = s.bitSet.nextClearBit(index);
+ if (s.startSearch > 0 && nextBit >= s.maxBits - 1) {
+ s.startSearch = 0;
+ //fallback full scan
+ nextBit = s.bitSet.nextClearBit(s.startSearch);
+ }
+ break;
}
+ if (nextBit >= s.maxBits) {
+ throw new AssertionError("could not find clear bit"); //$NON-NLS-1$
+ }
+ s.topVals[nextBit>>ADDRESS_BITS_PER_TOP_VALUE]++;
s.bitsSet++;
s.bitSet.set(nextBit);
s.startSearch = nextBit + 1;
@@ -142,4 +172,12 @@
return bitsPerSegment;
}
+ /**
+ * Set to try to always allocate against the first available block in a segment.
+ * @param compact
+ */
+ public void setCompact(boolean compact) {
+ this.compact = compact;
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-10-12
23:23:02 UTC (rev 3549)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -29,6 +29,7 @@
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.teiid.common.buffer.FileStore;
@@ -50,6 +51,7 @@
private long maxBufferSpace = DEFAULT_MAX_BUFFERSPACE;
private AtomicLong usedBufferSpace = new AtomicLong();
+ private AtomicInteger fileCounter = new AtomicInteger();
private class FileInfo {
private File file;
@@ -175,6 +177,9 @@
private int maxOpenFiles = DEFAULT_MAX_OPEN_FILES;
private String directory;
private File dirFile;
+ //use subdirectories to hold the files since we may create a relatively unbounded
amount of lob files and
+ //fs performance will typically degrade if a single directory is too large
+ private File[] subDirectories = new File[256];
// State
private Map<File, RandomAccessFile> fileCache = Collections.synchronizedMap(new
LinkedHashMap<File, RandomAccessFile>() {
@@ -201,13 +206,20 @@
}
dirFile = new File(this.directory);
- if(dirFile.exists()) {
- if(! dirFile.isDirectory()) {
- throw new
TeiidComponentException(QueryPlugin.Util.getString("FileStoreageManager.not_a_directory",
dirFile.getAbsoluteFile())); //$NON-NLS-1$
-
+ makeDir(dirFile);
+ for (int i = 0; i < subDirectories.length; i++) {
+ subDirectories[i] = new File(this.directory, "b" +i); //$NON-NLS-1$
+ makeDir(subDirectories[i]);
+ }
+ }
+
+ private static void makeDir(File file) throws TeiidComponentException {
+ if(file.exists()) {
+ if(! file.isDirectory()) {
+ throw new
TeiidComponentException(QueryPlugin.Util.getString("FileStoreageManager.not_a_directory",
file.getAbsoluteFile())); //$NON-NLS-1$
}
- } else if(! dirFile.mkdirs()) {
- throw new
TeiidComponentException(QueryPlugin.Util.getString("FileStoreageManager.error_creating",
dirFile.getAbsoluteFile())); //$NON-NLS-1$
+ } else if(! file.mkdirs()) {
+ throw new
TeiidComponentException(QueryPlugin.Util.getString("FileStoreageManager.error_creating",
file.getAbsoluteFile())); //$NON-NLS-1$
}
}
@@ -220,7 +232,8 @@
}
File createFile(String name) throws IOException {
- File storageFile = File.createTempFile(FILE_PREFIX + name + "_", null,
this.dirFile); //$NON-NLS-1$
+ //spray the files into separate different directories in a round robin fashion.
+ File storageFile = File.createTempFile(FILE_PREFIX + name + "_", null,
this.subDirectories[fileCounter.getAndIncrement()&(this.subDirectories.length-1)]);
//$NON-NLS-1$
if
(LogManager.isMessageToBeRecorded(org.teiid.logging.LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
LogManager.logDetail(org.teiid.logging.LogConstants.CTX_BUFFER_MGR,
"Created temporary storage area file " + storageFile.getAbsoluteFile());
//$NON-NLS-1$
}
Deleted: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java 2011-10-12
23:23:02 UTC (rev 3549)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -1,353 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.teiid.common.buffer.Cache;
-import org.teiid.common.buffer.CacheEntry;
-import org.teiid.common.buffer.FileStore;
-import org.teiid.common.buffer.Serializer;
-import org.teiid.common.buffer.StorageManager;
-import org.teiid.core.TeiidComponentException;
-import org.teiid.logging.LogConstants;
-import org.teiid.logging.LogManager;
-import org.teiid.logging.MessageLevel;
-import org.teiid.query.QueryPlugin;
-
-/**
- * A minimally blocking Cache using {@link FileStore}s.
- *
- * Storage files with significant unused space are compacted after reaching a size
threshold.
- * Compacting the empty space may be costly as it is currently implemented by blocking
all
- * read/write operations against the group.
- *
- * Since empty is concentrated at the beginning of the store a better approach could
- * be to users smaller file segments and move batches off of the beginning.
- *
- * There is unfortunately a significant memory footprint per group.
- */
-public class FileStoreCache implements Cache {
-
- private static class CacheGroup {
- private static final int RECLAIM_TAIL_SIZE = IO_BUFFER_SIZE << 5;
- private static final int MAX_FREE_SPACE = 1 << 11;
- FileStore store;
- long tail;
- long unusedSpace = 0;
- ReadWriteLock lock = new ReentrantReadWriteLock();
- Map<Long, long[]> physicalMapping = Collections.synchronizedMap(new
HashMap<Long, long[]>());
- List<Long> freed = Collections.synchronizedList(new LinkedList<Long>());
-
- CacheGroup(FileStore store) {
- this.store = store;
- }
-
- void freeBatch(Long batch) throws IOException {
- long[] info = physicalMapping.remove(batch);
- if (info != null) {
- if (info[0] + info[1] == tail) {
- tail -= info[1];
- if (store.getLength() - tail > RECLAIM_TAIL_SIZE) {
- store.setLength(tail);
- }
- } else {
- unusedSpace += info[1];
- }
- }
- }
-
- private long getOffset(Long gid, long compactionThreshold) throws IOException {
- long currentLength = store.getLength();
- if (currentLength <= compactionThreshold || unusedSpace * 4 <= currentLength *
3) {
- return tail;
- }
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "Running full compaction on",
gid); //$NON-NLS-1$
- }
- byte[] buffer = new byte[IO_BUFFER_SIZE];
- TreeSet<long[]> bySize = new TreeSet<long[]>(new
Comparator<long[]>() {
- @Override
- public int compare(long[] o1, long[] o2) {
- int signum = Long.signum(o1[1] - o2[1]);
- if (signum == 0) {
- //take the upper address first
- return Long.signum(o2[0] - o1[0]);
- }
- return signum;
- }
- });
- TreeSet<long[]> byAddress = new TreeSet<long[]>(new
Comparator<long[]>() {
-
- @Override
- public int compare(long[] o1, long[] o2) {
- return Long.signum(o1[0] - o2[0]);
- }
- });
- synchronized (physicalMapping) {
- for (long[] value : physicalMapping.values()) {
- if (value == null) {
- continue;
- }
- bySize.add(value);
- byAddress.add(value);
- }
- }
- long lastEndAddress = 0;
- long usedSpace = tail - unusedSpace;
- while (!byAddress.isEmpty()) {
- long[] info = byAddress.pollFirst();
- bySize.remove(info);
-
- long currentOffset = info[0];
- long space = currentOffset - lastEndAddress;
- boolean movedLast = false;
- while (space > 0 && !bySize.isEmpty()) {
- long[] last = byAddress.last();
- if (last[1] > space) {
- break;
- }
- movedLast = true;
- byAddress.pollLast();
- bySize.remove(last);
- move(last, lastEndAddress, buffer);
- space -= last[1];
- lastEndAddress += last[1];
- }
- if (movedLast && !byAddress.isEmpty()) {
- long[] last = byAddress.last();
- long currentLastEndAddress = last[0] + last[1];
- if (currentLastEndAddress < currentLength>>1) {
- lastEndAddress = currentLastEndAddress;
- break;
- }
- }
- while (space > 0 && !bySize.isEmpty()) {
- long[] smallest = bySize.first();
- if (smallest[1] > space) {
- break;
- }
- bySize.pollFirst();
- byAddress.remove(smallest);
- move(smallest, lastEndAddress, buffer);
- space -= smallest[1];
- lastEndAddress += smallest[1];
- }
-
- if (space > MAX_FREE_SPACE) {
- move(info, lastEndAddress, buffer);
- }
- lastEndAddress = info[0] + info[1];
- }
- store.setLength(lastEndAddress);
- tail = lastEndAddress;
- unusedSpace = lastEndAddress - usedSpace;
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Compacted store", gid,
"pre-size", currentLength, "post-size", lastEndAddress); //$NON-NLS-1$
//$NON-NLS-2$ //$NON-NLS-3$
- }
- return tail;
- }
-
- private void move(long[] toMove, long newOffset, byte[] buffer) throws IOException {
- long oldOffset = toMove[0];
- toMove[0] = newOffset;
- int size = (int)toMove[1];
- while (size > 0) {
- int toWrite = Math.min(IO_BUFFER_SIZE, size);
- store.readFully(oldOffset, buffer, 0, toWrite);
- store.write(newOffset, buffer, 0, toWrite);
- size -= toWrite;
- oldOffset += toWrite;
- newOffset += toWrite;
- }
- }
- }
-
- private static final int COMPACTION_THRESHOLD = 1 << 24; //start checking at 16
megs
- private static final int IO_BUFFER_SIZE = 1<<13;
- int compactionThreshold = COMPACTION_THRESHOLD;
- private ConcurrentHashMap<Long, CacheGroup> cacheGroups = new
ConcurrentHashMap<Long, CacheGroup>();
- private StorageManager storageManager;
-
- @Override
- public void add(CacheEntry entry, Serializer s) throws Exception {
- final CacheGroup group = cacheGroups.get(s.getId());
- if (group == null) {
- return;
- }
-
- group.lock.writeLock().lock();
- try {
- synchronized (group.freed) {
- while (!group.freed.isEmpty()) {
- group.freeBatch(group.freed.remove(0));
- }
- }
- final ByteBuffer buffer = ByteBuffer.allocate(IO_BUFFER_SIZE);
- final long offset = group.getOffset(s.getId(), compactionThreshold);
- ExtensibleBufferedOutputStream fsos = new ExtensibleBufferedOutputStream() {
- @Override
- protected ByteBuffer newBuffer() {
- buffer.rewind();
- return buffer;
- }
-
- @Override
- protected int flushDirect(int i) throws IOException {
- group.store.write(offset + bytesWritten, buffer.array(), 0, i);
- return i;
- }
- };
- ObjectOutputStream oos = new ObjectOutputStream(fsos);
- oos.writeInt(entry.getSizeEstimate());
- oos.writeLong(entry.getLastAccess());
- oos.writeDouble(entry.getOrderingValue());
- s.serialize(entry.getObject(), oos);
- oos.close();
- long size = fsos.getBytesWritten();
- long[] info = new long[] {offset, size};
- group.physicalMapping.put(entry.getId(), info);
- group.tail = Math.max(group.tail, offset + size);
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE))
{
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, s.getId(), entry.getId(),
"batch written starting at:", offset); //$NON-NLS-1$
- }
- } finally {
- group.lock.writeLock().unlock();
- }
- }
-
- @Override
- public void createCacheGroup(Long gid) {
- cacheGroups.put(gid, new
CacheGroup(storageManager.createFileStore(String.valueOf(gid))));
- }
-
- @Override
- public CacheEntry get(Long id, Serializer<?> serializer)
- throws TeiidComponentException {
- CacheGroup group = cacheGroups.get(serializer.getId());
- if (group == null) {
- return null;
- }
- try {
- group.lock.readLock().lock();
- long[] info = group.physicalMapping.get(id);
- if (info == null) {
- return null;
- }
- ObjectInputStream ois = new ObjectInputStream(new
BufferedInputStream(group.store.createInputStream(info[0]), IO_BUFFER_SIZE));
- CacheEntry ce = new CacheEntry(id);
- ce.setSizeEstimate(ois.readInt());
- ce.setLastAccess(ois.readLong());
- ce.setOrderingValue(ois.readDouble());
- ce.setObject(serializer.deserialize(ois));
- return ce;
- } catch(IOException e) {
- throw new TeiidComponentException(e,
QueryPlugin.Util.getString("FileStoreageManager.error_reading", id));
//$NON-NLS-1$
- } catch (ClassNotFoundException e) {
- throw new TeiidComponentException(e,
QueryPlugin.Util.getString("FileStoreageManager.error_reading", id));
//$NON-NLS-1$
- } finally {
- group.lock.readLock().unlock();
- }
- }
-
- @Override
- public void remove(Long gid, Long id) {
- CacheGroup group = cacheGroups.get(gid);
- if (group == null) {
- return;
- }
- if (group.lock.writeLock().tryLock()) {
- try {
- try {
- group.freeBatch(id);
- } catch (IOException e) {
- LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error removing
batch"); //$NON-NLS-1$
- }
- } finally {
- group.lock.writeLock().unlock();
- }
- } else {
- group.freed.add(id);
- }
- }
-
- @Override
- public void addToCacheGroup(Long gid, Long oid) {
- CacheGroup group = cacheGroups.get(gid);
- if (group == null) {
- return;
- }
- group.physicalMapping.put(oid, null);
- }
-
- @Override
- public Collection<Long> removeCacheGroup(Long gid) {
- CacheGroup group = cacheGroups.remove(gid);
- if (group == null) {
- return Collections.emptyList();
- }
- group.store.remove();
- synchronized (group.physicalMapping) {
- return new ArrayList<Long>(group.physicalMapping.keySet());
- }
- }
-
- @Override
- public FileStore createFileStore(String name) {
- return storageManager.createFileStore(name);
- }
-
- @Override
- public void initialize() throws TeiidComponentException {
- this.storageManager.initialize();
- }
-
- public void setStorageManager(StorageManager storageManager) {
- this.storageManager = storageManager;
- }
-
- public StorageManager getStorageManager() {
- return storageManager;
- }
-
- public void setCompactionThreshold(int compactionThreshold) {
- this.compactionThreshold = compactionThreshold;
- }
-
-}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java 2011-10-12
23:23:02 UTC (rev 3549)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -26,12 +26,35 @@
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
-public abstract class OrderedCache<K, V> {
+import org.teiid.common.buffer.BaseCacheEntry;
+
+/**
+ * A Concurrent LRFU cache. Has assumptions that match buffermanager usage.
+ * Null values are not allowed.
+ * @param <K>
+ * @param <V>
+ */
+public class OrderedCache<K, V extends BaseCacheEntry> {
- protected Map<K, V> map = new ConcurrentHashMap<K, V>();
- protected NavigableMap<V, K> expirationQueue = new ConcurrentSkipListMap<V,
K>();
- protected Map<K, V> limbo = new ConcurrentHashMap<K, V>();
+ protected Map<K, V> map;
+ //TODO: until Java 7 ConcurrentSkipListMap has a scaling bug in that
+ //the level limits the effective map size to ~ 2^16
+ //above which it performs comparably under load to a synchronized LinkedHashMap
+ //just with more CPU overhead vs. wait time.
+ protected NavigableMap<V, K> evictionQueue = new ConcurrentSkipListMap<V,
K>();
+ protected Map<K, V> limbo;
+ protected AtomicLong clock;
+ //combined recency/frequency lamda value between 0 and 1 lower -> LFU, higher
-> LRU
+ //TODO: adaptively adjust this value. more hits should move closer to lru
+ protected float crfLamda = .0002f;
+
+ public OrderedCache(int initialCapacity, float loadFactor, int concurrencyLevel,
AtomicLong clock) {
+ map = new ConcurrentHashMap<K, V>(initialCapacity, loadFactor,
concurrencyLevel);
+ limbo = new ConcurrentHashMap<K, V>(initialCapacity, loadFactor,
concurrencyLevel);
+ this.clock = clock;
+ }
public V get(K key) {
V result = map.get(key);
@@ -40,9 +63,9 @@
}
if (result != null) {
synchronized (result) {
- expirationQueue.remove(result);
- recordAccess(key, result, false);
- expirationQueue.put(result, key);
+ evictionQueue.remove(result);
+ recordAccess(result, false);
+ evictionQueue.put(result, key);
}
}
return result;
@@ -52,7 +75,7 @@
V result = map.remove(key);
if (result != null) {
synchronized (result) {
- expirationQueue.remove(result);
+ evictionQueue.remove(result);
}
}
return result;
@@ -62,18 +85,18 @@
V result = map.put(key, value);
if (result != null) {
synchronized (result) {
- expirationQueue.remove(result);
+ evictionQueue.remove(result);
}
}
synchronized (value) {
- recordAccess(key, value, result == null);
- expirationQueue.put(value, key);
+ recordAccess(value, result == null);
+ evictionQueue.put(value, key);
}
return result;
}
public V evict() {
- Map.Entry<V, K> entry = expirationQueue.pollFirstEntry();
+ Map.Entry<V, K> entry = evictionQueue.pollFirstEntry();
if (entry == null) {
return null;
}
@@ -89,6 +112,42 @@
return map.size();
}
- protected abstract void recordAccess(K key, V value, boolean initial);
+ public Map<V, K> getEvictionQueue() {
+ return evictionQueue;
+ }
+ public Map.Entry<V, K> firstEntry() {
+ return evictionQueue.firstEntry();
+ }
+
+ protected void recordAccess(BaseCacheEntry value, boolean initial) {
+ float lastAccess = value.getLastAccess();
+ value.setLastAccess(clock.get());
+ if (initial && lastAccess == 0) {
+ return; //we just want to timestamp this as created and not give it an ordering value
+ }
+ float orderingValue = value.getOrderingValue();
+ orderingValue = computeNextOrderingValue(value.getLastAccess(), lastAccess,
+ orderingValue);
+ value.setOrderingValue(orderingValue);
+ }
+
+ float computeNextOrderingValue(float currentTime,
+ float lastAccess, float orderingValue) {
+ orderingValue =
+ (float) (//Frequency component
+ orderingValue*Math.pow(1-crfLamda, currentTime - lastAccess)
+ //recency component
+ + Math.pow(currentTime, crfLamda));
+ return orderingValue;
+ }
+
+ public float getCrfLamda() {
+ return crfLamda;
+ }
+
+ public void setCrfLamda(float crfLamda) {
+ this.crfLamda = crfLamda;
+ }
+
}
Deleted:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java 2011-10-12
23:23:02 UTC (rev 3549)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -1,149 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-
-public abstract class PartiallyOrderedCache<K, V> {
-
- private int maxOrderedSize = 1 << 19;
-
- protected Map<K, V> map;
- //TODO: until Java 7 ConcurrentSkipListMap has a scaling bug in that
- //the level limits the effective map size to ~ 2^16
- //where it performs comparably under load to a synchronized LinkedHashMap
- //just with more CPU overhead vs. wait time.
- //TODO: have the concurrent version be pluggable
- protected NavigableMap<V, K> evictionQueue = new TreeMap<V, K>();
- //when we get to extreme number of entries we overflow into lru
- protected Map<V, K> evictionQueueHead = new LinkedHashMap<V, K>();
- //holds entries that are being evicted, but that might not yet be in a lower caching
level
- protected Map<K, V> limbo;
-
- public PartiallyOrderedCache(int initialCapacity, float loadFactor, int
concurrencyLevel) {
- map = new ConcurrentHashMap<K, V>(initialCapacity, loadFactor,
concurrencyLevel);
- limbo = new ConcurrentHashMap<K, V>(initialCapacity, loadFactor,
concurrencyLevel);
- }
-
- public void setMaxOrderedSize(int maxOrderedSize) {
- this.maxOrderedSize = maxOrderedSize;
- }
-
- public V get(K key) {
- V result = map.get(key);
- if (result == null) {
- result = limbo.get(key);
- }
- if (result != null) {
- maintainQueues(key, result, null);
- }
- return result;
- }
-
- public V remove(K key) {
- V result = map.remove(key);
- if (result != null) {
- synchronized (this) {
- if (evictionQueue.remove(result) != null) {
- orderedRemoved();
- } else {
- evictionQueueHead.remove(result);
- }
- }
- }
- return result;
- }
-
- private void orderedRemoved() {
- if (evictionQueue.size() < (maxOrderedSize>>1) &&
evictionQueueHead.size() > 0) {
- Iterator<Map.Entry<V,K>> i = evictionQueueHead.entrySet().iterator();
- if (i.hasNext()) {
- Map.Entry<V, K> entry = i.next();
- if (map.containsKey(entry.getValue())) {
- i.remove();
- evictionQueue.put(entry.getKey(), entry.getValue());
- }
- }
- }
- }
-
- public V put(K key, V value) {
- V result = map.put(key, value);
- maintainQueues(key, value, result);
- return result;
- }
-
- private void maintainQueues(K key, V value, V old) {
- synchronized (this) {
- if (old != null && evictionQueue.remove(old) == null) {
- evictionQueueHead.remove(old);
- }
- recordAccess(key, value, old == null);
- evictionQueue.put(value, key);
- if (evictionQueue.size() > maxOrderedSize) {
- Map.Entry<V, K> last = evictionQueue.pollLastEntry();
- if (last != null) {
- if (map.containsKey(last.getValue()) &&
!evictionQueue.containsKey(last.getKey())) {
- evictionQueueHead.put(last.getKey(), last.getValue());
- }
- }
- }
- }
- }
-
- public V evict() {
- Map.Entry<V, K> entry = evictionQueue.pollFirstEntry();
- if (entry == null) {
- return null;
- }
- synchronized (this) {
- orderedRemoved();
- }
- limbo.put(entry.getValue(), entry.getKey());
- return map.remove(entry.getValue());
- }
-
- public Map<V, K> getEvictionQueue() {
- return evictionQueue;
- }
-
- public Map.Entry<V, K> firstEntry() {
- return evictionQueue.firstEntry();
- }
-
- public void finishedEviction(K key) {
- limbo.remove(key);
- }
-
- public int size() {
- return map.size();
- }
-
- protected abstract void recordAccess(K key, V value, boolean initial);
-
-}
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java 2011-10-12
23:23:02 UTC (rev 3549)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -22,8 +22,8 @@
package org.teiid.common.buffer;
+import org.teiid.common.buffer.impl.BufferFrontedFileStoreCache;
import org.teiid.common.buffer.impl.BufferManagerImpl;
-import org.teiid.common.buffer.impl.FileStoreCache;
import org.teiid.common.buffer.impl.MemoryStorageManager;
import org.teiid.common.buffer.impl.SplittableStorageManager;
import org.teiid.core.TeiidComponentException;
@@ -85,7 +85,11 @@
MemoryStorageManager storageManager = new MemoryStorageManager();
SplittableStorageManager ssm = new SplittableStorageManager(storageManager);
ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
- FileStoreCache fsc = new FileStoreCache();
+ BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
+ //use conservative allocations
+ fsc.setDirect(false); //allow the space to be GCed easily
+ fsc.setMaxStorageObjectSize(1<<20);
+ fsc.setMemoryBufferSpace(1<<21);
fsc.setStorageManager(ssm);
fsc.initialize();
bufferManager.setCache(fsc);
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java 2011-10-12 23:23:02
UTC (rev 3549)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java 2011-10-14 01:46:31
UTC (rev 3550)
@@ -29,8 +29,8 @@
import org.junit.Test;
import org.teiid.common.buffer.STree.InsertMode;
+import org.teiid.common.buffer.impl.BufferFrontedFileStoreCache;
import org.teiid.common.buffer.impl.BufferManagerImpl;
-import org.teiid.common.buffer.impl.FileStoreCache;
import org.teiid.core.TeiidComponentException;
import org.teiid.query.sql.symbol.ElementSymbol;
@@ -102,15 +102,14 @@
}
- /**
- * Forces the logic through several compaction cycles by using large strings
- * @throws TeiidComponentException
- */
- @Test public void testCompaction() throws TeiidComponentException {
+ @Test public void testStorageWrites() throws TeiidComponentException {
BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
bm.setProcessorBatchSize(32);
bm.setMaxReserveKB(0);//force all to disk
- ((FileStoreCache)bm.getCache()).setCompactionThreshold(0);
+ BufferFrontedFileStoreCache fsc =(BufferFrontedFileStoreCache)bm.getCache();
+ fsc.setMaxStorageObjectSize(1 << 19);
+ fsc.setMemoryBufferSpace(1 << 19);
+ fsc.initialize();
bm.initialize();
ElementSymbol e1 = new ElementSymbol("x");
Modified:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java 2011-10-12
23:23:02 UTC (rev 3549)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -62,4 +62,16 @@
}
}
+ @Test public void testCompactBitSet() {
+ ConcurrentBitSet bst = new ConcurrentBitSet(100000, 1);
+ bst.setCompact(true);
+ for (int i = 0; i < 100000; i++) {
+ assertEquals(i, bst.getAndSetNextClearBit());
+ }
+ bst.clear(50);
+ bst.clear(500);
+ bst.clear(5000);
+ assertEquals(50, bst.getAndSetNextClearBit());
+ }
+
}
Deleted:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java 2011-10-12
23:23:02 UTC (rev 3549)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -1,61 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.junit.Test;
-
-public class TestPartiallyOrderedCache {
-
- @Test public void testQueueMaintenance() {
- PartiallyOrderedCache<Integer, Integer> cache = new
PartiallyOrderedCache<Integer, Integer>(16, .75f, 16) {
-
- @Override
- protected void recordAccess(Integer key, Integer value, boolean initial) {
-
- }
- };
-
- cache.setMaxOrderedSize(5);
-
- for (int i = 0; i < 10; i++) {
- cache.put(i, i);
- }
-
- cache.get(8);
- cache.get(1);
-
- List<Integer> evictions = new ArrayList<Integer>();
- for (int i = 0; i < 10; i++) {
- evictions.add(i);
- }
- //we expect natural order because the lru is converted into the sorted on natural key
- assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), evictions);
- }
-
-}
Modified: trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2011-10-12
23:23:02 UTC (rev 3549)
+++ trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -31,9 +31,9 @@
import org.jboss.managed.api.annotation.ManagementProperties;
import org.jboss.managed.api.annotation.ManagementProperty;
import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.impl.BufferFrontedFileStoreCache;
import org.teiid.common.buffer.impl.BufferManagerImpl;
import org.teiid.common.buffer.impl.FileStorageManager;
-import org.teiid.common.buffer.impl.FileStoreCache;
import org.teiid.common.buffer.impl.MemoryStorageManager;
import org.teiid.common.buffer.impl.SplittableStorageManager;
import org.teiid.core.TeiidComponentException;
@@ -68,6 +68,8 @@
private int maxReserveKb = BufferManager.DEFAULT_RESERVE_BUFFER_KB;
private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE>>20;
private boolean inlineLobs = true;
+ private int maxMemoryBufferSpace = -1;
+ private int maxStorageObjectSize =
BufferFrontedFileStoreCache.DEFAuLT_MAX_OBJECT_SIZE;
private FileStorageManager fsm;
/**
@@ -106,7 +108,14 @@
fsm.setMaxBufferSpace(maxBufferSpace*MB);
SplittableStorageManager ssm = new SplittableStorageManager(fsm);
ssm.setMaxFileSize(maxFileSize);
- FileStoreCache fsc = new FileStoreCache();
+ BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
+ fsc.setMaxStorageObjectSize(maxStorageObjectSize);
+ if (maxMemoryBufferSpace <= 0) {
+ //use approximately 20% of what's set aside for the reserved
+ fsc.setMemoryBufferSpace(this.bufferMgr.getMaxReserveKB() * 200);
+ } else {
+ fsc.setMemoryBufferSpace(maxMemoryBufferSpace);
+ }
fsc.setStorageManager(ssm);
fsc.initialize();
this.bufferMgr.setCache(fsc);
@@ -245,4 +254,22 @@
public long getReadAttempts() {
return bufferMgr.getReadAttempts();
}
+
+ public int getMaxMemoryBufferSpace() {
+ return maxMemoryBufferSpace;
+ }
+
+ public int getMaxStorageObjectSize() {
+ return maxStorageObjectSize;
+ }
+
+ @ManagementProperty(description="Max direct memory buffer space used by the
buffer manager in MB. -1 determines the setting automatically from the maxReserveKB
(default -1). This value cannot be smaller than maxStorageObjectSize.")
+ public void setMaxMemoryBufferSpace(int maxMemoryBufferSpace) {
+ this.maxMemoryBufferSpace = maxMemoryBufferSpace;
+ }
+
+ @ManagementProperty(description="The maximum size of a buffer managed object
(typically a table page or a results batch) in bytes (default 8388608).")
+ public void setMaxStorageObjectSize(int maxStorageObjectSize) {
+ this.maxStorageObjectSize = maxStorageObjectSize;
+ }
}
Modified:
trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
===================================================================
---
trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java 2011-10-12
23:23:02 UTC (rev 3549)
+++
trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java 2011-10-14
01:46:31 UTC (rev 3550)
@@ -29,9 +29,9 @@
import org.junit.Test;
import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.impl.BufferFrontedFileStoreCache;
import org.teiid.common.buffer.impl.BufferManagerImpl;
import org.teiid.common.buffer.impl.FileStorageManager;
-import org.teiid.common.buffer.impl.FileStoreCache;
import org.teiid.common.buffer.impl.SplittableStorageManager;
import org.teiid.core.types.DataTypeManager;
import org.teiid.core.util.UnitTestUtil;
@@ -55,7 +55,7 @@
assertTrue(svc.isUseDisk());
BufferManagerImpl mgr = svc.getBufferManager();
- SplittableStorageManager ssm =
(SplittableStorageManager)((FileStoreCache)mgr.getCache()).getStorageManager();
+ SplittableStorageManager ssm =
(SplittableStorageManager)((BufferFrontedFileStoreCache)mgr.getCache()).getStorageManager();
assertTrue(((FileStorageManager)ssm.getStorageManager()).getDirectory().endsWith(svc.getBufferDirectory().getName()));
}