[teiid-commits] teiid SVN: r3639 - in trunk/engine/src: test/java/org/teiid/common/buffer/impl and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Sat Nov 12 07:46:02 EST 2011


Author: shawkins
Date: 2011-11-12 07:46:01 -0500 (Sat, 12 Nov 2011)
New Revision: 3639

Modified:
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
Log:
TEIID-1750 fixing a timing issue with eviction in a constrained scenario

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-11-11 20:22:49 UTC (rev 3638)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-11-12 12:46:01 UTC (rev 3639)
@@ -39,9 +39,12 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.teiid.common.buffer.AutoCleanupUtil;
@@ -98,7 +101,7 @@
 	
 	private static final int DEFAULT_MIN_DEFRAG = 1 << 26;
 	private static final byte[] HEADER_SKIP_BUFFER = new byte[16];
-	private static final int EVICTION_SCANS = 5;
+	private static final int EVICTION_SCANS = 2;
 
 	public static final int DEFAuLT_MAX_OBJECT_SIZE = 1 << 23;
 	
@@ -343,6 +346,8 @@
 	LrfuEvictionQueue<PhysicalInfo> memoryBufferEntries = new LrfuEvictionQueue<PhysicalInfo>(readAttempts);
 	private Semaphore memoryWritePermits; //prevents deadlock waiting for free blocks
 	private ReentrantReadWriteLock memoryEvictionLock = new ReentrantReadWriteLock(true);
+	private ReentrantLock freedLock = new ReentrantLock();
+	private Condition blocksFreed = freedLock.newCondition();
 	
 	private int blocks;
 	private ConcurrentBitSet blocksInuse;
@@ -625,16 +630,11 @@
 
 	private void checkForLowMemory() {
 		//proactively create freespace
-		if (!cleanerRunning.get()) {
-			if (lowBlocks(false) && cleanerRunning.compareAndSet(false, true)) {
-				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Starting memory buffer cleaner"); //$NON-NLS-1$
-				asynchPool.execute(cleaningTask);
-				if (lowBlocks(true)) {
-					//do a non-blocking removal before we're forced to block
-					evictFromMemoryBuffer(false);
-				}
-			}
-		} else if (lowBlocks(true)) {
+		if (!cleanerRunning.get() && lowBlocks(false) && cleanerRunning.compareAndSet(false, true)) {
+			LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Starting memory buffer cleaner"); //$NON-NLS-1$
+			asynchPool.execute(cleaningTask);
+		} 
+		if (lowBlocks(true)) {
 			//do a non-blocking removal before we're forced to block
 			evictFromMemoryBuffer(false);
 		}
@@ -931,6 +931,12 @@
 				}
 				if (bm != null) {
 					result = bm.free(acquireDataBlock);
+					freedLock.lock();
+					try {
+						blocksFreed.signalAll();
+					} finally {
+						freedLock.unlock();
+					}
 				}
 			}
 		}
@@ -993,8 +999,27 @@
 				}
 			} 
 			if (acquire && next == EMPTY_ADDRESS) {
-				throw new AssertionError("Could not free space for pending write"); //$NON-NLS-1$
+				if (!writeLocked) {
+					memoryEvictionLock.writeLock().lock();
+					writeLocked = true;
+				}
+				freedLock.lock();
+				try {
+					next = blocksInuse.getAndSetNextClearBit();
+					if (next != EMPTY_ADDRESS) {
+						return next;
+					}
+					blocksFreed.await(120, TimeUnit.SECONDS);
+				} finally {
+					freedLock.unlock();
+				}
+				next = blocksInuse.getAndSetNextClearBit();
+				if (next == EMPTY_ADDRESS) {
+					throw new AssertionError("Could not free space for pending write"); //$NON-NLS-1$
+				}
 			}
+		} catch (InterruptedException e) {
+			throw new TeiidRuntimeException(e);
 		} finally {
 			if (writeLocked) {
 				memoryEvictionLock.writeLock().unlock();

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-11-11 20:22:49 UTC (rev 3638)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-11-12 12:46:01 UTC (rev 3639)
@@ -608,15 +608,12 @@
     	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
     		LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Releasing buffer space", count); //$NON-NLS-1$
     	}
-    	if (lock.tryLock()) {
-	    	try {
-		    	this.reserveBatchBytes.addAndGet(count);
-		    	batchesFreed.signalAll();
-	    	} finally {
-	    		lock.unlock();
-	    	}
-    	} else {
-    		this.reserveBatchBytes.addAndGet(count);
+    	lock.lock();
+    	try {
+        	this.reserveBatchBytes.addAndGet(count);
+	    	batchesFreed.signalAll();
+    	} finally {
+    		lock.unlock();
     	}
     }
     

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java	2011-11-11 20:22:49 UTC (rev 3638)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java	2011-11-12 12:46:01 UTC (rev 3639)
@@ -36,7 +36,7 @@
 
 public class TestBufferFrontedFileStoreCache {
 	
-	private final class SimpleSerializer implements Serializer<Integer> {
+	private final static class SimpleSerializer implements Serializer<Integer> {
 		@Override
 		public Integer deserialize(ObjectInput ois)
 				throws IOException, ClassNotFoundException {
@@ -138,7 +138,7 @@
 		assertEquals(0, cache.getInodesInUse());
 	}
 
-	private CacheEntry get(BufferFrontedFileStoreCache cache, Long oid,
+	private static CacheEntry get(BufferFrontedFileStoreCache cache, Long oid,
 			Serializer<Integer> s) throws TeiidComponentException {
 		PhysicalInfo o = cache.lockForLoad(oid, s);
 		CacheEntry ce = cache.get(o, oid, new WeakReference<Serializer<?>>(s));
@@ -166,8 +166,8 @@
 		cache.addToCacheGroup(s.getId(), ce.getId());
 		cache.add(ce, s);
 
-		assertEquals(3, cache.getDataBlocksInUse());
-		assertEquals(1, cache.getInodesInUse());
+		assertTrue(cache.getDataBlocksInUse() < 4);
+		assertTrue(cache.getInodesInUse() < 2);
 
 		ce = get(cache, 2l, s);
 		assertEquals(Integer.valueOf(5000), ce.getObject());
@@ -176,7 +176,7 @@
 		assertEquals(Integer.valueOf(5001), ce.getObject());
 	}
 
-	private BufferFrontedFileStoreCache createLayeredCache(int bufferSpace, int objectSize) throws TeiidComponentException {
+	private static BufferFrontedFileStoreCache createLayeredCache(int bufferSpace, int objectSize) throws TeiidComponentException {
 		BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
 		fsc.setMemoryBufferSpace(bufferSpace);
 		fsc.setMaxStorageObjectSize(objectSize);



More information about the teiid-commits mailing list