Author: shawkins
Date: 2011-11-12 07:46:01 -0500 (Sat, 12 Nov 2011)
New Revision: 3639
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
Log:
TEIID-1750 fixing a timing issue with eviction in a constrained scenario
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-11-11
20:22:49 UTC (rev 3638)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-11-12
12:46:01 UTC (rev 3639)
@@ -39,9 +39,12 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.teiid.common.buffer.AutoCleanupUtil;
@@ -98,7 +101,7 @@
private static final int DEFAULT_MIN_DEFRAG = 1 << 26;
private static final byte[] HEADER_SKIP_BUFFER = new byte[16];
- private static final int EVICTION_SCANS = 5;
+ private static final int EVICTION_SCANS = 2;
public static final int DEFAuLT_MAX_OBJECT_SIZE = 1 << 23;
@@ -343,6 +346,8 @@
LrfuEvictionQueue<PhysicalInfo> memoryBufferEntries = new
LrfuEvictionQueue<PhysicalInfo>(readAttempts);
private Semaphore memoryWritePermits; //prevents deadlock waiting for free blocks
private ReentrantReadWriteLock memoryEvictionLock = new ReentrantReadWriteLock(true);
+ private ReentrantLock freedLock = new ReentrantLock();
+ private Condition blocksFreed = freedLock.newCondition();
private int blocks;
private ConcurrentBitSet blocksInuse;
@@ -625,16 +630,11 @@
private void checkForLowMemory() {
//proactively create freespace
- if (!cleanerRunning.get()) {
- if (lowBlocks(false) && cleanerRunning.compareAndSet(false, true)) {
- LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Starting memory buffer
cleaner"); //$NON-NLS-1$
- asynchPool.execute(cleaningTask);
- if (lowBlocks(true)) {
- //do a non-blocking removal before we're forced to block
- evictFromMemoryBuffer(false);
- }
- }
- } else if (lowBlocks(true)) {
+ if (!cleanerRunning.get() && lowBlocks(false) &&
cleanerRunning.compareAndSet(false, true)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Starting memory buffer
cleaner"); //$NON-NLS-1$
+ asynchPool.execute(cleaningTask);
+ }
+ if (lowBlocks(true)) {
//do a non-blocking removal before we're forced to block
evictFromMemoryBuffer(false);
}
@@ -931,6 +931,12 @@
}
if (bm != null) {
result = bm.free(acquireDataBlock);
+ freedLock.lock();
+ try {
+ blocksFreed.signalAll();
+ } finally {
+ freedLock.unlock();
+ }
}
}
}
@@ -993,8 +999,27 @@
}
}
if (acquire && next == EMPTY_ADDRESS) {
- throw new AssertionError("Could not free space for pending write");
//$NON-NLS-1$
+ if (!writeLocked) {
+ memoryEvictionLock.writeLock().lock();
+ writeLocked = true;
+ }
+ freedLock.lock();
+ try {
+ next = blocksInuse.getAndSetNextClearBit();
+ if (next != EMPTY_ADDRESS) {
+ return next;
+ }
+ blocksFreed.await(120, TimeUnit.SECONDS);
+ } finally {
+ freedLock.unlock();
+ }
+ next = blocksInuse.getAndSetNextClearBit();
+ if (next == EMPTY_ADDRESS) {
+ throw new AssertionError("Could not free space for pending write");
//$NON-NLS-1$
+ }
}
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
} finally {
if (writeLocked) {
memoryEvictionLock.writeLock().unlock();
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-11-11
20:22:49 UTC (rev 3638)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-11-12
12:46:01 UTC (rev 3639)
@@ -608,15 +608,12 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Releasing buffer
space", count); //$NON-NLS-1$
}
- if (lock.tryLock()) {
- try {
- this.reserveBatchBytes.addAndGet(count);
- batchesFreed.signalAll();
- } finally {
- lock.unlock();
- }
- } else {
- this.reserveBatchBytes.addAndGet(count);
+ lock.lock();
+ try {
+ this.reserveBatchBytes.addAndGet(count);
+ batchesFreed.signalAll();
+ } finally {
+ lock.unlock();
}
}
Modified:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-11-11
20:22:49 UTC (rev 3638)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-11-12
12:46:01 UTC (rev 3639)
@@ -36,7 +36,7 @@
public class TestBufferFrontedFileStoreCache {
- private final class SimpleSerializer implements Serializer<Integer> {
+ private final static class SimpleSerializer implements Serializer<Integer> {
@Override
public Integer deserialize(ObjectInput ois)
throws IOException, ClassNotFoundException {
@@ -138,7 +138,7 @@
assertEquals(0, cache.getInodesInUse());
}
- private CacheEntry get(BufferFrontedFileStoreCache cache, Long oid,
+ private static CacheEntry get(BufferFrontedFileStoreCache cache, Long oid,
Serializer<Integer> s) throws TeiidComponentException {
PhysicalInfo o = cache.lockForLoad(oid, s);
CacheEntry ce = cache.get(o, oid, new WeakReference<Serializer<?>>(s));
@@ -166,8 +166,8 @@
cache.addToCacheGroup(s.getId(), ce.getId());
cache.add(ce, s);
- assertEquals(3, cache.getDataBlocksInUse());
- assertEquals(1, cache.getInodesInUse());
+ assertTrue(cache.getDataBlocksInUse() < 4);
+ assertTrue(cache.getInodesInUse() < 2);
ce = get(cache, 2l, s);
assertEquals(Integer.valueOf(5000), ce.getObject());
@@ -176,7 +176,7 @@
assertEquals(Integer.valueOf(5001), ce.getObject());
}
- private BufferFrontedFileStoreCache createLayeredCache(int bufferSpace, int objectSize)
throws TeiidComponentException {
+ private static BufferFrontedFileStoreCache createLayeredCache(int bufferSpace, int
objectSize) throws TeiidComponentException {
BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
fsc.setMemoryBufferSpace(bufferSpace);
fsc.setMaxStorageObjectSize(objectSize);