[teiid-commits] teiid SVN: r3552 - in trunk: engine/src/main/java/org/teiid/common/buffer/impl and 2 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Fri Oct 14 13:31:57 EDT 2011
Author: shawkins
Date: 2011-10-14 13:31:57 -0400 (Fri, 14 Oct 2011)
New Revision: 3552
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
Log:
TEIID-1750 using finer grained locking and refining the config
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java 2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java 2011-10-14 17:31:57 UTC (rev 3552)
@@ -29,13 +29,66 @@
/**
* Represents the storage strategy for the {@link BufferManager}
*/
-public interface Cache extends StorageManager {
- void createCacheGroup(Long gid); //called prior to adding an entry
- //TODO: this should use a callback on the buffermangaer to remove memory entries
- //without materializing all group keys
+public interface Cache<T> extends StorageManager {
+ /**
+ * Must be called prior to adding any group entries
+ * @param gid
+ */
+ void createCacheGroup(Long gid);
+
+ /**
+ * Remove an entire cache group
+ *
+ * TODO: this should use a callback on the buffermangaer to remove memory entries
+ * without materializing all group keys
+ * @param gid
+ * @return
+ */
Collection<Long> removeCacheGroup(Long gid);
+
+ /**
+ * Must be called prior to adding an entry
+ * @param gid
+ * @param oid
+ */
void addToCacheGroup(Long gid, Long oid);
- CacheEntry get(Long id, Serializer<?> serializer) throws TeiidComponentException;
+
+ /**
+ * Lock the object for load and return an identifier/lock
+ * that can be used to retrieve the object.
+ * @param oid
+ * @param serializer
+ * @return the identifier, may be null
+ */
+ T lockForLoad(Long oid, Serializer<?> serializer);
+
+ /**
+ * Must be called after lockForLoad
+ * @param o
+ */
+ void unlockForLoad(T lock);
+
+ /**
+ * Get method, must be called using the object obtained in the
+ * lockForLoad method
+ * @return
+ * @throws TeiidComponentException
+ */
+ CacheEntry get(T lock, Long oid, Serializer<?> serializer) throws TeiidComponentException;
+
+ /**
+ * Adds an entry to the cache.
+ * @param entry
+ * @param s
+ * @throws Exception
+ */
void add(CacheEntry entry, Serializer<?> s) throws Exception;
+
+ /**
+ * Remove an entry from the cache
+ * @param gid
+ * @param id
+ */
void remove(Long gid, Long id);
+
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java 2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java 2011-10-14 17:31:57 UTC (rev 3552)
@@ -70,7 +70,4 @@
return len;
}
- public int free(boolean steal) {
- return manager.free(steal);
- }
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java 2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java 2011-10-14 17:31:57 UTC (rev 3552)
@@ -43,6 +43,6 @@
void freeBlock(int index);
- int free(boolean steal);
+ int free(boolean acquireDataBlock);
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-14 17:31:57 UTC (rev 3552)
@@ -38,7 +38,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -94,8 +93,10 @@
* on the compact option of {@link ConcurrentBitSet} to keep the blocks at the start of the
* files.
*/
-public class BufferFrontedFileStoreCache implements Cache, StorageManager {
+public class BufferFrontedFileStoreCache implements Cache<PhysicalInfo>, StorageManager {
+ private static final int EVICTION_SCANS = 5;
+
public static final int DEFAuLT_MAX_OBJECT_SIZE = 1 << 23;
static final int ADDRESS_BITS = 31;
@@ -108,6 +109,9 @@
//TODO allow the block size to be configurable
static final int LOG_BLOCK_SIZE = 13;
+
+ public static final long MAX_ADDRESSABLE_MEMORY = 1l<<(ADDRESS_BITS+LOG_BLOCK_SIZE);
+
static final int BLOCK_SIZE = 1 << LOG_BLOCK_SIZE;
static final int BLOCK_MASK = BLOCK_SIZE - 1;
static final int ADDRESSES_PER_BLOCK = BLOCK_SIZE/BYTES_PER_BLOCK_ADDRESS;
@@ -125,11 +129,13 @@
private ByteBuffer inodeBuffer;
private final long gid;
private final long oid;
+ private int blockSegment;
InodeBlockManager(long gid, long oid, int inode) {
this.inode = inode;
this.gid = gid;
this.oid = oid;
+ this.blockSegment = blocksInuse.getNextSegment();
}
@Override
@@ -218,7 +224,7 @@
memoryEvictionLock.readLock().lock();
boolean readLocked = true;
try {
- if ((next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS) {
+ if ((next = blocksInuse.getAndSetNextClearBit(blockSegment)) == EMPTY_ADDRESS) {
memoryEvictionLock.readLock().unlock();
readLocked = false;
next = evictFromMemoryBuffer(true);
@@ -254,7 +260,6 @@
if (this.inode == -1) {
throw new AssertionError("Out of inodes"); //$NON-NLS-1$
}
- inodesCreated.getAndIncrement();
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_DQP, "Allocating inode", this.inode, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
}
@@ -267,31 +272,30 @@
}
@Override
- public int free(boolean steal) {
+ public int free(boolean acquire) {
if (this.inode == EMPTY_ADDRESS) {
return EMPTY_ADDRESS;
}
ByteBuffer bb = getInodeBlock();
- int dataBlockToSteal = bb.getInt(0);
+ int dataBlockToAcquire = bb.getInt(0);
int indirectIndexBlock = bb.getInt(BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS);
int doublyIndirectIndexBlock = bb.getInt(BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1));
- boolean freedAll = freeBlock(steal?BYTES_PER_BLOCK_ADDRESS:0, bb, DIRECT_POINTERS-(steal?1:0), true);
+ boolean freedAll = freeBlock(acquire?BYTES_PER_BLOCK_ADDRESS:0, bb, DIRECT_POINTERS-(acquire?1:0), true);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_DQP, "freeing inode", inode, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
}
inodesInuse.clear(inode);
- inodesRemoved.getAndIncrement();
if (!freedAll || indirectIndexBlock == EMPTY_ADDRESS) {
- return steal?dataBlockToSteal:EMPTY_ADDRESS;
+ return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
}
freedAll = freeIndirectBlock(indirectIndexBlock);
if (!freedAll || doublyIndirectIndexBlock == EMPTY_ADDRESS) {
- return steal?dataBlockToSteal:EMPTY_ADDRESS;
+ return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
}
bb = blockByteBuffer.getByteBuffer(doublyIndirectIndexBlock);
freeBlock(0, bb, ADDRESSES_PER_BLOCK, false);
freeDataBlock(doublyIndirectIndexBlock);
- return steal?dataBlockToSteal:EMPTY_ADDRESS;
+ return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
}
private boolean freeIndirectBlock(int indirectIndexBlock) {
@@ -323,29 +327,7 @@
return blockByteBuffer.getByteBuffer(dataBlock);
}
}
- AtomicInteger inodesRemoved = new AtomicInteger();
- AtomicInteger inodesCreated = new AtomicInteger();
- private static class PhysicalInfo extends BaseCacheEntry {
- int inode = EMPTY_ADDRESS;
- int block = EMPTY_ADDRESS;
- int sizeIndex = 0;
- final int memoryBlockCount;
- final Long gid;
-
- public PhysicalInfo(Long gid, Long id, int inode, int size) {
- super(id);
- this.inode = inode;
- this.gid = gid;
- this.memoryBlockCount = (size>>LOG_BLOCK_SIZE) + ((size&BLOCK_MASK)>0?1:0);
- int blocks = memoryBlockCount;
- while (blocks >= 1) {
- this.sizeIndex++;
- blocks>>=2;
- }
- }
- }
-
private StorageManager storageManager;
private int maxStorageObjectSize = DEFAuLT_MAX_OBJECT_SIZE;
private long memoryBufferSpace = 1 << 26; //64MB
@@ -375,7 +357,7 @@
@Override
public void run() {
try {
- while (lowBlocks()) {
+ while (lowBlocks(false)) {
if (evictFromMemoryBuffer(false) == EMPTY_ADDRESS) {
break;
}
@@ -386,6 +368,7 @@
}
};
private int cleaningThreshold;
+ private int criticalCleaningThreshold;
private AtomicLong storageWrites = new AtomicLong();
private AtomicLong storageReads = new AtomicLong();
@@ -402,7 +385,9 @@
this.inodeByteBuffer = new BlockByteBuffer(30, blocks+1, LOG_INODE_SIZE, direct);
memoryWritePermits = new Semaphore(Math.max(1, (int)Math.min(memoryBufferSpace/maxStorageObjectSize, Integer.MAX_VALUE)));
maxMemoryBlocks = Math.min(MAX_DOUBLE_INDIRECT, maxStorageObjectSize>>LOG_BLOCK_SIZE);
- cleaningThreshold = Math.min(maxMemoryBlocks<<3, blocks>>1); //try to maintain enough freespace to hold 8 max objects
+ //try to maintain enough freespace so that writers don't block in cleaning
+ cleaningThreshold = Math.min(maxMemoryBlocks<<4, blocks>>1);
+ criticalCleaningThreshold = Math.min(maxMemoryBlocks<<2, blocks>>2);
//account for index pointer block overhead
if (maxMemoryBlocks > DIRECT_POINTERS) {
maxMemoryBlocks--;
@@ -423,9 +408,9 @@
this.sizeBasedStores = stores.toArray(new BlockStore[stores.size()]);
}
- boolean lowBlocks() {
+ boolean lowBlocks(boolean critical) {
int bitsSet = blocksInuse.getBitsSet();
- return bitsSet > 0 && (blocks - bitsSet < cleaningThreshold) && memoryBufferEntries.firstEntry() != null;
+ return bitsSet > 0 && (blocks - bitsSet < (critical?criticalCleaningThreshold:cleaningThreshold)) && memoryBufferEntries.firstEntry() != null;
}
InodeBlockManager getBlockManager(long gid, long oid, int inode) {
@@ -441,7 +426,6 @@
InodeBlockManager blockManager = null;
boolean hasPermit = false;
PhysicalInfo info = null;
- boolean newEntry = true;
boolean success = false;
try {
Map<Long, PhysicalInfo> map = physicalMapping.get(s.getId());
@@ -454,18 +438,29 @@
return; //already removed
}
} else {
- newEntry = false;
synchronized (info) {
//we assume that serialization would be faster than a disk read
if (info.inode != EMPTY_ADDRESS || !shouldPlaceInMemoryBuffer(0, info)) {
success = true;
return;
}
+ //we should not be in memory since there is no inode assigned
+ assert !memoryBufferEntries.getEvictionQueue().containsKey(info);
}
}
- if (!cleanerRunning.get() && lowBlocks() && cleanerRunning.compareAndSet(false, true)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "Starting memory buffer cleaner"); //$NON-NLS-1$
- asynchPool.execute(cleaningTask);
+ //proactively create freespace
+ if (!cleanerRunning.get()) {
+ if (lowBlocks(false) && cleanerRunning.compareAndSet(false, true)) {
+ LogManager.logDetail(LogConstants.CTX_DQP, "Starting memory buffer cleaner"); //$NON-NLS-1$
+ asynchPool.execute(cleaningTask);
+ if (lowBlocks(true)) {
+ //do a non-blocking removal before we're forced to block
+ evictFromMemoryBuffer(false);
+ }
+ }
+ } else if (lowBlocks(true)) {
+ //do a non-blocking removal before we're forced to block
+ evictFromMemoryBuffer(false);
}
memoryWritePermits.acquire();
hasPermit = true;
@@ -478,15 +473,14 @@
synchronized (map) {
//synchronize to ensure proper cleanup from a concurrent removal
if (physicalMapping.containsKey(s.getId()) && map.containsKey(entry.getId())) {
- if (newEntry) {
+ if (info == null) {
info = new PhysicalInfo(s.getId(), entry.getId(), blockManager.getInode(), fsos.getBytesWritten());
map.put(entry.getId(), info);
- } else {
- synchronized (info) {
- info.inode = blockManager.getInode();
- }
}
- memoryBufferEntries.put(entry.getId(), info);
+ synchronized (info) {
+ info.inode = blockManager.getInode();
+ memoryBufferEntries.put(entry.getId(), info);
+ }
success = true;
}
}
@@ -503,28 +497,61 @@
}
@Override
- public CacheEntry get(Long oid, Serializer<?> serializer) throws TeiidComponentException {
+ public PhysicalInfo lockForLoad(Long oid, Serializer<?> serializer) {
+ Map<Long, PhysicalInfo> map = physicalMapping.get(serializer.getId());
+ if (map == null) {
+ return null;
+ }
+ PhysicalInfo info = map.get(oid);
+ if (info == null) {
+ return null;
+ }
+ synchronized (info) {
+ while (info.loading) {
+ try {
+ info.wait();
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+ info.loading = true;
+ }
+ return info;
+ }
+
+ @Override
+ public void unlockForLoad(PhysicalInfo info) {
+ if (info == null) {
+ return;
+ }
+ synchronized (info) {
+ assert info.loading;
+ info.loading = false;
+ info.notifyAll();
+ }
+ }
+
+ @Override
+ public CacheEntry get(PhysicalInfo info, Long oid, Serializer<?> serializer) throws TeiidComponentException {
+ if (info == null) {
+ return null;
+ }
long currentTime = readAttempts.incrementAndGet();
+ InputStream is = null;
+ boolean inStorage = false;
try {
- Map<Long, PhysicalInfo> map = physicalMapping.get(serializer.getId());
- if (map == null) {
- return null;
- }
- final PhysicalInfo info = map.get(oid);
- if (info == null) {
- return null;
- }
- CacheEntry ce = new CacheEntry(oid);
- InputStream is = null;
synchronized (info) {
if (info.inode != EMPTY_ADDRESS) {
- memoryBufferEntries.get(oid); //touch this entry
+ info.pinned = true;
+ PhysicalInfo existing = memoryBufferEntries.get(info.getId()); //touch this entry
+ assert existing == info;
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at inode", info.inode, serializer.getId(), oid); //$NON-NLS-1$
}
BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
is = new BlockInputStream(manager, info.memoryBlockCount);
} else if (info.block != EMPTY_ADDRESS) {
+ inStorage = true;
storageReads.incrementAndGet();
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at block", info.block, info.sizeIndex, serializer.getId(), oid); //$NON-NLS-1$
@@ -533,29 +560,33 @@
FileStore fs = blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
long blockOffset = (info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
is = fs.createInputStream(blockOffset, info.memoryBlockCount<<LOG_BLOCK_SIZE);
- if (shouldPlaceInMemoryBuffer(currentTime, info) && this.memoryWritePermits.tryAcquire()) {
- BlockManager manager = null;
- boolean success = false;
- try {
- manager = getBlockManager(info.gid, info.getId(), EMPTY_ADDRESS);
- ExtensibleBufferedOutputStream os = new BlockOutputStream(manager);
- ObjectConverterUtil.write(os, is, -1);
- info.inode = manager.getInode();
- memoryBufferEntries.put(info.getId(), info);
- is = new BlockInputStream(manager, info.memoryBlockCount);
- success = true;
- } finally {
- this.memoryWritePermits.release();
- if (!success && manager != null) {
- manager.free(false);
- info.inode = EMPTY_ADDRESS;
- }
- }
- }
} else {
return null;
}
}
+ if (inStorage && shouldPlaceInMemoryBuffer(currentTime, info) && this.memoryWritePermits.tryAcquire()) {
+ BlockManager manager = null;
+ boolean success = false;
+ try {
+ manager = getBlockManager(info.gid, info.getId(), EMPTY_ADDRESS);
+ ExtensibleBufferedOutputStream os = new BlockOutputStream(manager);
+ ObjectConverterUtil.write(os, is, -1);
+ synchronized (info) {
+ info.inode = manager.getInode();
+ info.pinned = true;
+ memoryBufferEntries.put(info.getId(), info);
+ }
+ is = new BlockInputStream(manager, info.memoryBlockCount);
+ success = true;
+ } finally {
+ this.memoryWritePermits.release();
+ if (!success && manager != null) {
+ manager.free(false);
+ info.inode = EMPTY_ADDRESS;
+ }
+ }
+ }
+ CacheEntry ce = new CacheEntry(oid);
ObjectInputStream ois = new ObjectInputStream(is);
ce.setSizeEstimate(ois.readInt());
ce.setLastAccess(1);
@@ -567,6 +598,11 @@
throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid)); //$NON-NLS-1$
} catch (ClassNotFoundException e) {
throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid)); //$NON-NLS-1$
+ } finally {
+ synchronized (info) {
+ info.pinned = false;
+ info.notifyAll();
+ }
}
}
@@ -579,7 +615,7 @@
*/
private boolean shouldPlaceInMemoryBuffer(long currentTime, PhysicalInfo info) {
Map.Entry<PhysicalInfo, Long> lowest = memoryBufferEntries.firstEntry();
- return (blocksInuse.getTotalBits() - blocksInuse.getBitsSet()) > (info.memoryBlockCount<<2)
+ return (blocksInuse.getTotalBits() - blocksInuse.getBitsSet()) > (criticalCleaningThreshold + info.memoryBlockCount)
|| (lowest != null && lowest.getKey().block != EMPTY_ADDRESS
&& lowest.getKey().getOrderingValue() < (currentTime>0?memoryBufferEntries.computeNextOrderingValue(currentTime, info.getLastAccess(), info.getOrderingValue()):info.getOrderingValue()));
}
@@ -631,55 +667,90 @@
}
}
- int free(Long oid, PhysicalInfo info, boolean demote, boolean stealDataBlock) {
+ /**
+ * Multi-purpose method to free memory. Modes are:
+ * demote && !acquireDataBlock -> push out of memory buffer onto disk
+ * demote && acquireDataBlock -> push out of memory and reuse a datablock
+ * !demote -> full removal from memory and disk
+ */
+ int free(Long oid, PhysicalInfo info, boolean demote, boolean acquireDataBlock) {
if (info == null) {
return EMPTY_ADDRESS;
}
- synchronized (info) {
- memoryBufferEntries.remove(oid);
- if (demote) {
- if (info.inode == EMPTY_ADDRESS) {
+ int result = EMPTY_ADDRESS;
+ BlockManager bm = null;
+ int block = EMPTY_ADDRESS;
+ try {
+ int memoryBlockCount;
+ int sizeIndex;
+ synchronized (info) {
+ info.evicting = true;
+ block = info.block;
+ memoryBlockCount = info.memoryBlockCount;
+ sizeIndex = info.sizeIndex;
+ if (info.inode != EMPTY_ADDRESS) {
+ bm = getBlockManager(info.gid, oid, info.inode);
+ } else if (demote) {
return EMPTY_ADDRESS;
}
- BlockManager bm = getBlockManager(info.gid, oid, info.inode);
+ //release the lock to perform the transfer
+ }
+ if (demote && block == EMPTY_ADDRESS) {
+ storageWrites.getAndIncrement();
+ BlockInputStream is = new BlockInputStream(bm, memoryBlockCount);
+ BlockStore blockStore = sizeBasedStores[sizeIndex];
+ block = getAndSetNextClearBit(blockStore);
+ FileStore fs = blockStore.stores[block/blockStore.blocksInUse.getBitsPerSegment()];
+ long blockOffset = (block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
+ byte[] b = new byte[BLOCK_SIZE];
+ int read = 0;
+ try {
+ while ((read = is.read(b, 0, b.length)) != -1) {
+ fs.write(blockOffset, b, 0, read);
+ blockOffset+=read;
+ }
+ } catch (Throwable e) {
+ //shouldn't happen, but we'll invalidate this write and continue
+ demote = false;
+ //just continue to free
+ LogManager.logError(LogConstants.CTX_DQP, e, "Error transferring block to storage " + oid); //$NON-NLS-1$
+ }
+ }
+ } finally {
+ //ensure post conditions
+ synchronized (info) {
+ //it is possible for a read to happen while evicting.
+ //that's ok, we'll just wait for it to finish
+ await(info, true, false);
+ info.evicting = false;
+ info.notifyAll();
info.inode = EMPTY_ADDRESS;
- if (info.block == EMPTY_ADDRESS) {
- storageWrites.getAndIncrement();
- BlockInputStream is = new BlockInputStream(bm, info.memoryBlockCount);
- BlockStore blockStore = sizeBasedStores[info.sizeIndex];
- info.block = getAndSetNextClearBit(blockStore);
- FileStore fs = blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
- long blockOffset = (info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
- byte[] b = new byte[BLOCK_SIZE];
- int read = 0;
- try {
- while ((read = is.read(b, 0, b.length)) != -1) {
- fs.write(blockOffset, b, 0, read);
- blockOffset+=read;
- }
- } catch (Throwable e) {
- //shouldn't happen, but we'll invalidate this write and continue
+ memoryBufferEntries.remove(info.getId());
+ if (block != EMPTY_ADDRESS) {
+ if (demote) {
+ info.block = block;
+ } else {
+ BlockStore blockStore = sizeBasedStores[info.sizeIndex];
blockStore.blocksInUse.clear(info.block);
info.block = EMPTY_ADDRESS;
- //just continue to free
- LogManager.logError(LogConstants.CTX_DQP, e, "Error transferring block to storage " + oid); //$NON-NLS-1$
}
- return is.free(stealDataBlock);
}
- return bm.free(stealDataBlock);
+ if (bm != null) {
+ result = bm.free(acquireDataBlock);
+ }
}
- if (info.inode != EMPTY_ADDRESS) {
- BlockManager bm = getBlockManager(info.gid, oid, info.inode);
- info.inode = EMPTY_ADDRESS;
- bm.free(false);
+ }
+ return result;
+ }
+
+ private void await(PhysicalInfo info, boolean pinned, boolean evicting) {
+ while ((pinned && info.pinned) || (evicting && info.evicting)) {
+ try {
+ info.wait();
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
}
- if (info.block != EMPTY_ADDRESS) {
- BlockStore blockStore = sizeBasedStores[info.sizeIndex];
- blockStore.blocksInUse.clear(info.block);
- info.block = EMPTY_ADDRESS;
- }
}
- return EMPTY_ADDRESS;
}
static int getAndSetNextClearBit(BlockStore bs) {
@@ -690,34 +761,62 @@
return result;
}
- int evictFromMemoryBuffer(boolean steal) {
- memoryEvictionLock.writeLock().lock();
+ /**
+ * Eviction routine. When space is exhausted datablocks are stolen from
+ * memory entries
+ * starvation.
+ * @param acquire
+ * @return
+ */
+ int evictFromMemoryBuffer(boolean acquire) {
+ boolean writeLocked = false;
int next = -1;
try {
- for (int i = 0; i < 10 && next == EMPTY_ADDRESS; i++) {
+ for (int i = 0; i < EVICTION_SCANS && next == EMPTY_ADDRESS; i++) {
+ //doing a cleanup may trigger the purging of resources
AutoCleanupUtil.doCleanup();
//scan the eviction queue looking for a victim
Iterator<Map.Entry<PhysicalInfo, Long>> iter = memoryBufferEntries.getEvictionQueue().entrySet().iterator();
- while (((!steal && lowBlocks()) || (steal && (next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS)) && iter.hasNext()) {
+ while (((!acquire && lowBlocks(false)) || (acquire && (next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS)) && iter.hasNext()) {
Map.Entry<PhysicalInfo, Long> entry = iter.next();
PhysicalInfo info = entry.getKey();
synchronized (info) {
if (info.inode == EMPTY_ADDRESS) {
continue;
}
- next = free(entry.getValue(), info, true, steal);
- if (!steal) {
- next = 0;
+ if (info.pinned || info.evicting) {
+ if (!acquire || i != EVICTION_SCANS - 1) {
+ continue;
+ }
+ if (acquire && !writeLocked) {
+ //stop the world - prevent any other thread from taking a free block
+ //until this one is satisfied
+ memoryEvictionLock.writeLock().lock();
+ writeLocked = true;
+ }
+ //wait for the read/eviction to be over
+ await(info, true, true);
+ if (info.inode == EMPTY_ADDRESS) {
+ continue;
+ }
}
+ //mark as evicting early so that other evictFromMemoryCalls don't select this same entry
+ info.evicting = true;
}
+ next = free(entry.getValue(), info, true, acquire);
+ if (!acquire) {
+ next = 0; //let the cleaner know that we made progress
+ }
break;
}
}
- if (steal && next == -1) {
+ if (acquire && next == -1) {
throw new AssertionError("Could not free space for pending write"); //$NON-NLS-1$
}
} finally {
- memoryEvictionLock.writeLock().unlock();
+ if (writeLocked) {
+ memoryEvictionLock.writeLock().unlock();
+ }
}
return next;
}
@@ -731,7 +830,7 @@
}
public void setMemoryBufferSpace(long maxBufferSpace) {
- this.memoryBufferSpace = Math.min(maxBufferSpace, 1l<<(ADDRESS_BITS+LOG_BLOCK_SIZE));
+ this.memoryBufferSpace = Math.min(maxBufferSpace, MAX_ADDRESSABLE_MEMORY);
}
public int getInodesInUse() {
@@ -755,3 +854,34 @@
}
}
+
+/**
+ * Represents the memory buffer and storage state of an object.
+ * It is important to minimize the amount of data held here.
+ * Currently should be 40 bytes.
+ */
+final class PhysicalInfo extends BaseCacheEntry {
+ final Long gid;
+ //the memory inode and block count
+ int inode = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
+ final int memoryBlockCount;
+ //the storage block and BlockStore index
+ int block = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
+ byte sizeIndex = 0;
+ //state flags
+ boolean pinned; //indicates that the entry is being read
+ boolean evicting; //indicates that the entry will be moved out of the memory buffer
+ boolean loading; //used by tier 1 cache to prevent double loads
+
+ public PhysicalInfo(Long gid, Long id, int inode, int size) {
+ super(id);
+ this.inode = inode;
+ this.gid = gid;
+ this.memoryBlockCount = (size>>BufferFrontedFileStoreCache.LOG_BLOCK_SIZE) + ((size&BufferFrontedFileStoreCache.BLOCK_MASK)>0?1:0);
+ int blocks = memoryBlockCount;
+ while (blocks >= 1) {
+ this.sizeIndex++;
+ blocks>>=2;
+ }
+ }
+}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-14 17:31:57 UTC (rev 3552)
@@ -213,7 +213,9 @@
if (ce != null) {
return (List<List<?>>)(!retain?ce.nullOut():ce.getObject());
}
- synchronized (this) {
+ //obtain a granular lock to prevent double memory loading
+ Object o = cache.lockForLoad(batch, this);
+ try {
ce = fastGet(batch, prefersMemory.get(), retain);
if (ce != null) {
return (List<List<?>>)(!retain?ce.nullOut():ce.getObject());
@@ -222,7 +224,7 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, id, id, "reading batch", batch, "from storage, total reads:", count); //$NON-NLS-1$ //$NON-NLS-2$
}
- ce = cache.get(batch, this);
+ ce = cache.get(o, batch, this);
if (ce == null) {
throw new AssertionError("Batch not found in storage " + batch); //$NON-NLS-1$
}
@@ -234,7 +236,9 @@
if (retain) {
addMemoryEntry(ce, null);
}
- }
+ } finally {
+ cache.unlockForLoad(o);
+ }
return (List<List<?>>)ce.getObject();
}
@@ -281,6 +285,7 @@
private AtomicInteger maxReserveKB = new AtomicInteger(1 << 18);
private volatile int reserveBatchKB;
private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a hint to set the reserveBatchKB
+ private long memoryBufferSpace; //used as a hint to account for batch overhead (only useful in large scenarios)
private boolean useWeakReferences = true;
private boolean inlineLobs = true;
private int targetBytesPerRow = TARGET_BYTES_PER_ROW;
@@ -313,7 +318,7 @@
};
});
- Cache cache;
+ private Cache cache;
private Map<String, TupleReference> tupleBufferMap = new ConcurrentHashMap<String, TupleReference>();
private ReferenceQueue<TupleBuffer> tupleBufferQueue = new ReferenceQueue<TupleBuffer>();
@@ -459,6 +464,10 @@
this.maxActivePlans = maxActivePlans;
}
+ public void setMemoryBufferSpace(long memoryBufferSpace) {
+ this.memoryBufferSpace = memoryBufferSpace;
+ }
+
public void setMaxProcessingKB(int maxProcessingKB) {
this.maxProcessingKB = maxProcessingKB;
}
@@ -479,6 +488,8 @@
this.maxReserveKB.addAndGet(((int)Math.max(0, (maxMemory - one_gig) * .75)));
}
this.maxReserveKB.addAndGet(((int)Math.max(0, Math.min(one_gig, maxMemory) * .5)));
+ int batchOverheadKB = (int)(this.memoryBufferSpace<0?(this.maxReserveKB.get()<<8):this.memoryBufferSpace)>>20;
+ this.maxReserveKB.set(Math.max(0, this.maxReserveKB.get() - batchOverheadKB));
}
this.reserveBatchKB = this.getMaxReserveKB();
if (this.maxProcessingKBOrig == null) {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java 2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java 2011-10-14 17:31:57 UTC (rev 3552)
@@ -98,10 +98,17 @@
/**
* Makes a best effort to atomically find the next clear bit and set it
- * @return the next bit index or -1 if no clear bits are founds
+ * @return the next bit index or -1 if no clear bits are found
*/
public int getAndSetNextClearBit() {
- int start = counter.getAndIncrement();
+ return getAndSetNextClearBit(counter.getAndIncrement());
+ }
+
+ public int getNextSegment() {
+ return counter.getAndIncrement();
+ }
+
+ public int getAndSetNextClearBit(int start) {
int nextBit = -1;
for (int i = 0; i < segments.length; i++) {
Segment s = segments[(start+i)&(segments.length-1)];
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java 2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java 2011-10-14 17:31:57 UTC (rev 3552)
@@ -38,7 +38,7 @@
import org.teiid.core.TeiidComponentException;
-public class MemoryStorageManager implements Cache {
+public class MemoryStorageManager implements Cache<Long> {
public static final int MAX_FILE_SIZE = 1 << 17;
@@ -132,7 +132,17 @@
}
@Override
- public CacheEntry get(Long id, Serializer<?> serializer)
+ public Long lockForLoad(Long oid, Serializer<?> serializer) {
+ return oid;
+ }
+
+ @Override
+ public void unlockForLoad(Long o) {
+ //nothing to do no locking
+ }
+
+ @Override
+ public CacheEntry get(Long lock, Long id, Serializer<?> serializer)
throws TeiidComponentException {
Map<Long, CacheEntry> group = groups.get(serializer.getId());
if (group != null) {
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-14 17:31:57 UTC (rev 3552)
@@ -77,8 +77,7 @@
ce.setObject(cacheObject);
cache.addToCacheGroup(s.getId(), ce.getId());
cache.add(ce, s);
-
- ce = cache.get(2l, s);
+ ce = get(cache, 2l, s);
assertEquals(cacheObject, ce.getObject());
//test something that exceeds the direct inode data blocks
@@ -88,7 +87,7 @@
cache.addToCacheGroup(s.getId(), ce.getId());
cache.add(ce, s);
- ce = cache.get(3l, s);
+ ce = get(cache, 3l, s);
assertEquals(cacheObject, ce.getObject());
cache.removeCacheGroup(1l);
@@ -104,7 +103,7 @@
cache.addToCacheGroup(s.getId(), ce.getId());
cache.add(ce, s);
- ce = cache.get(3l, s);
+ ce = get(cache, 3l, s);
assertEquals(cacheObject, ce.getObject());
cache.removeCacheGroup(1l);
@@ -120,7 +119,7 @@
cache.addToCacheGroup(s.getId(), ce.getId());
cache.add(ce, s);
- ce = cache.get(3l, s);
+ ce = get(cache, 3l, s);
assertNull(ce);
cache.removeCacheGroup(1l);
@@ -128,6 +127,14 @@
assertEquals(0, cache.getDataBlocksInUse());
assertEquals(0, cache.getInodesInUse());
}
+
+ private CacheEntry get(BufferFrontedFileStoreCache cache, Long oid,
+ Serializer<Integer> s) throws TeiidComponentException {
+ PhysicalInfo o = cache.lockForLoad(oid, s);
+ CacheEntry ce = cache.get(o, oid, s);
+ cache.unlockForLoad(o);
+ return ce;
+ }
@Test public void testEviction() throws Exception {
BufferFrontedFileStoreCache cache = createLayeredCache(1<<15, 1<<15);
@@ -144,7 +151,7 @@
ce = new CacheEntry(3l);
ce.setSerializer(ref);
- cacheObject = Integer.valueOf(5000);
+ cacheObject = Integer.valueOf(5001);
ce.setObject(cacheObject);
cache.addToCacheGroup(s.getId(), ce.getId());
cache.add(ce, s);
@@ -152,8 +159,11 @@
assertEquals(3, cache.getDataBlocksInUse());
assertEquals(1, cache.getInodesInUse());
- ce = cache.get(2l, s);
+ ce = get(cache, 2l, s);
assertEquals(Integer.valueOf(5000), ce.getObject());
+
+ ce = get(cache, 3l, s);
+ assertEquals(Integer.valueOf(5001), ce.getObject());
}
private BufferFrontedFileStoreCache createLayeredCache(int bufferSpace, int objectSize) throws TeiidComponentException {
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java 2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java 2011-10-14 17:31:57 UTC (rev 3552)
@@ -62,6 +62,13 @@
}
}
+ @Test public void testSegmentUse() {
+ ConcurrentBitSet bst = new ConcurrentBitSet(50001, 4);
+ assertEquals(0, bst.getAndSetNextClearBit(0));
+ assertEquals(1, bst.getAndSetNextClearBit(0));
+ assertEquals(2, bst.getAndSetNextClearBit(4));
+ }
+
@Test public void testCompactBitSet() {
ConcurrentBitSet bst = new ConcurrentBitSet(100000, 1);
bst.setCompact(true);
Modified: trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2011-10-14 17:31:57 UTC (rev 3552)
@@ -68,7 +68,7 @@
private int maxReserveKb = BufferManager.DEFAULT_RESERVE_BUFFER_KB;
private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE>>20;
private boolean inlineLobs = true;
- private int maxMemoryBufferSpace = -1;
+ private int memoryBufferSpace = -1;
private int maxStorageObjectSize = BufferFrontedFileStoreCache.DEFAuLT_MAX_OBJECT_SIZE;
private FileStorageManager fsm;
@@ -93,7 +93,7 @@
this.bufferMgr.setProcessorBatchSize(Integer.valueOf(processorBatchSize));
this.bufferMgr.setMaxReserveKB(this.maxReserveKb);
this.bufferMgr.setMaxProcessingKB(this.maxProcessingKb);
-
+ this.bufferMgr.setMemoryBufferSpace(Math.min(BufferFrontedFileStoreCache.MAX_ADDRESSABLE_MEMORY, this.memoryBufferSpace));
this.bufferMgr.initialize();
// If necessary, add disk storage manager
@@ -110,11 +110,11 @@
ssm.setMaxFileSize(maxFileSize);
BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
fsc.setMaxStorageObjectSize(maxStorageObjectSize);
- if (maxMemoryBufferSpace <= 0) {
- //use approximately 20% of what's set aside for the reserved
- fsc.setMemoryBufferSpace(this.bufferMgr.getMaxReserveKB() * 200);
+ if (memoryBufferSpace < 0) {
+ //use approximately 25% of what's set aside for the reserved
+ fsc.setMemoryBufferSpace(this.bufferMgr.getMaxReserveKB() << 8);
} else {
- fsc.setMemoryBufferSpace(maxMemoryBufferSpace);
+ fsc.setMemoryBufferSpace(memoryBufferSpace);
}
fsc.setStorageManager(ssm);
fsc.initialize();
@@ -254,21 +254,21 @@
public long getReadAttempts() {
return bufferMgr.getReadAttempts();
}
-
- public int getMaxMemoryBufferSpace() {
- return maxMemoryBufferSpace;
+
+ @ManagementProperty(description="Direct memory buffer space used by the buffer manager in MB. -1 determines the setting automatically from the maxReserveKB (default -1). This value cannot be smaller than maxStorageObjectSize.")
+ public int getMemoryBufferSpace() {
+ return memoryBufferSpace;
}
public int getMaxStorageObjectSize() {
return maxStorageObjectSize;
}
- @ManagementProperty(description="Max direct memory buffer space used by the buffer manager in MB. -1 determines the setting automatically from the maxReserveKB (default -1). This value cannot be smaller than maxStorageObjectSize.")
- public void setMaxMemoryBufferSpace(int maxMemoryBufferSpace) {
- this.maxMemoryBufferSpace = maxMemoryBufferSpace;
+ @ManagementProperty(description="The maximum size of a buffer managed object (typically a table page or a results batch) in bytes (default 8388608).")
+ public void setMemoryBufferSpace(int maxMemoryBufferSpace) {
+ this.memoryBufferSpace = maxMemoryBufferSpace;
}
- @ManagementProperty(description="The maximum size of a buffer managed object (typically a table page or a results batch) in bytes (default 8388608).")
public void setMaxStorageObjectSize(int maxStorageObjectSize) {
this.maxStorageObjectSize = maxStorageObjectSize;
}
More information about the teiid-commits
mailing list