[teiid-commits] teiid SVN: r3552 - in trunk: engine/src/main/java/org/teiid/common/buffer/impl and 2 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Fri Oct 14 13:31:57 EDT 2011


Author: shawkins
Date: 2011-10-14 13:31:57 -0400 (Fri, 14 Oct 2011)
New Revision: 3552

Modified:
   trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
   trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
Log:
TEIID-1750 using finer grained locking and refining the config

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java	2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java	2011-10-14 17:31:57 UTC (rev 3552)
@@ -29,13 +29,66 @@
 /**
  * Represents the storage strategy for the {@link BufferManager}
  */
-public interface Cache extends StorageManager {
-	void createCacheGroup(Long gid); //called prior to adding an entry
-	//TODO: this should use a callback on the buffermangaer to remove memory entries
-	//without materializing all group keys
+public interface Cache<T> extends StorageManager {
+	/**
+	 * Must be called prior to adding any group entries
+	 * @param gid
+	 */
+	void createCacheGroup(Long gid); 
+	
+	/**
+	 * Remove an entire cache group
+	 * 
+	 * TODO: this should use a callback on the buffermangaer to remove memory entries
+	 * without materializing all group keys
+	 * @param gid
+	 * @return
+	 */
 	Collection<Long> removeCacheGroup(Long gid);
+
+	/**
+	 * Must be called prior to adding an entry
+	 * @param gid
+	 * @param oid
+	 */
 	void addToCacheGroup(Long gid, Long oid); 
-	CacheEntry get(Long id, Serializer<?> serializer) throws TeiidComponentException;
+
+	/**
+	 * Lock the object for load and return an identifier/lock
+	 * that can be used to retrieve the object.
+	 * @param oid
+	 * @param serializer
+	 * @return the identifier, may be null
+	 */
+	T lockForLoad(Long oid, Serializer<?> serializer);
+	
+	/**
+	 * Must be called after lockForLoad
+	 * @param o
+	 */
+	void unlockForLoad(T lock);
+
+	/**
+	 * Get method, must be called using the object obtained in the
+	 * lockForLoad method
+	 * @return
+	 * @throws TeiidComponentException
+	 */
+	CacheEntry get(T lock, Long oid, Serializer<?> serializer) throws TeiidComponentException;
+	
+	/**
+	 * Adds an entry to the cache.
+	 * @param entry
+	 * @param s
+	 * @throws Exception
+	 */
 	void add(CacheEntry entry, Serializer<?> s) throws Exception;
+	
+	/**
+	 * Remove an entry from the cache
+	 * @param gid
+	 * @param id
+	 */
 	void remove(Long gid, Long id);
+	
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java	2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java	2011-10-14 17:31:57 UTC (rev 3552)
@@ -70,7 +70,4 @@
 		return len;
 	}
 	
-	public int free(boolean steal) {
-		return manager.free(steal);
-	}
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java	2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java	2011-10-14 17:31:57 UTC (rev 3552)
@@ -43,6 +43,6 @@
 	
 	void freeBlock(int index);
 	
-	int free(boolean steal);
+	int free(boolean acquireDataBlock);
 
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-14 17:31:57 UTC (rev 3552)
@@ -38,7 +38,6 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -94,8 +93,10 @@
  * on the compact option of {@link ConcurrentBitSet} to keep the blocks at the start of the
  * files.
  */
-public class BufferFrontedFileStoreCache implements Cache, StorageManager {
+public class BufferFrontedFileStoreCache implements Cache<PhysicalInfo>, StorageManager {
 	
+	private static final int EVICTION_SCANS = 5;
+
 	public static final int DEFAuLT_MAX_OBJECT_SIZE = 1 << 23;
 	
 	static final int ADDRESS_BITS = 31;
@@ -108,6 +109,9 @@
 	
 	//TODO allow the block size to be configurable
 	static final int LOG_BLOCK_SIZE = 13;
+
+	public static final long MAX_ADDRESSABLE_MEMORY = 1l<<(ADDRESS_BITS+LOG_BLOCK_SIZE);
+	
 	static final int BLOCK_SIZE = 1 << LOG_BLOCK_SIZE;
 	static final int BLOCK_MASK = BLOCK_SIZE - 1;
 	static final int ADDRESSES_PER_BLOCK = BLOCK_SIZE/BYTES_PER_BLOCK_ADDRESS;
@@ -125,11 +129,13 @@
 		private ByteBuffer inodeBuffer;
 		private final long gid;
 		private final long oid;
+		private int blockSegment;
 
 		InodeBlockManager(long gid, long oid, int inode) {
 			this.inode = inode;
 			this.gid = gid;
 			this.oid = oid;
+			this.blockSegment = blocksInuse.getNextSegment();
 		}
 		
 		@Override
@@ -218,7 +224,7 @@
 			memoryEvictionLock.readLock().lock();
 			boolean readLocked = true;
 			try {
-				if ((next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS) {
+				if ((next = blocksInuse.getAndSetNextClearBit(blockSegment)) == EMPTY_ADDRESS) {
 					memoryEvictionLock.readLock().unlock();
 					readLocked = false;
 					next = evictFromMemoryBuffer(true);
@@ -254,7 +260,6 @@
 					if (this.inode == -1) {
 						throw new AssertionError("Out of inodes"); //$NON-NLS-1$
 					}
-					inodesCreated.getAndIncrement();
 					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
 						LogManager.logDetail(LogConstants.CTX_DQP, "Allocating inode", this.inode, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
 					}
@@ -267,31 +272,30 @@
 		}
 
 		@Override
-		public int free(boolean steal) {
+		public int free(boolean acquire) {
 			if (this.inode == EMPTY_ADDRESS) {
 				return EMPTY_ADDRESS;
 			}
 			ByteBuffer bb = getInodeBlock();
-			int dataBlockToSteal = bb.getInt(0);
+			int dataBlockToAcquire = bb.getInt(0);
 			int indirectIndexBlock = bb.getInt(BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS);
 			int doublyIndirectIndexBlock = bb.getInt(BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1));
-			boolean freedAll = freeBlock(steal?BYTES_PER_BLOCK_ADDRESS:0, bb, DIRECT_POINTERS-(steal?1:0), true);
+			boolean freedAll = freeBlock(acquire?BYTES_PER_BLOCK_ADDRESS:0, bb, DIRECT_POINTERS-(acquire?1:0), true);
 			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
 				LogManager.logDetail(LogConstants.CTX_DQP, "freeing inode", inode, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
 			}
 			inodesInuse.clear(inode);
-			inodesRemoved.getAndIncrement();
 			if (!freedAll || indirectIndexBlock == EMPTY_ADDRESS) {
-				return steal?dataBlockToSteal:EMPTY_ADDRESS;
+				return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
 			}
 			freedAll = freeIndirectBlock(indirectIndexBlock);
 			if (!freedAll || doublyIndirectIndexBlock == EMPTY_ADDRESS) {
-				return steal?dataBlockToSteal:EMPTY_ADDRESS;
+				return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
 			}
 			bb = blockByteBuffer.getByteBuffer(doublyIndirectIndexBlock);
 			freeBlock(0, bb, ADDRESSES_PER_BLOCK, false);
 			freeDataBlock(doublyIndirectIndexBlock);
-			return steal?dataBlockToSteal:EMPTY_ADDRESS;
+			return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
 		}
 
 		private boolean freeIndirectBlock(int indirectIndexBlock) {
@@ -323,29 +327,7 @@
 			return blockByteBuffer.getByteBuffer(dataBlock);
 		}
 	}
-	AtomicInteger inodesRemoved = new AtomicInteger();
-	AtomicInteger inodesCreated = new AtomicInteger();
 
-	private static class PhysicalInfo extends BaseCacheEntry {
-		int inode = EMPTY_ADDRESS;
-		int block = EMPTY_ADDRESS;
-		int sizeIndex = 0;
-		final int memoryBlockCount;
-		final Long gid;
-		
-		public PhysicalInfo(Long gid, Long id, int inode, int size) {
-			super(id);
-			this.inode = inode;
-			this.gid = gid;
-			this.memoryBlockCount = (size>>LOG_BLOCK_SIZE) + ((size&BLOCK_MASK)>0?1:0);
-			int blocks = memoryBlockCount;
-			while (blocks >= 1) {
-				this.sizeIndex++;
-				blocks>>=2;
-			}
-		}
-	}
-	
 	private StorageManager storageManager;
 	private int maxStorageObjectSize = DEFAuLT_MAX_OBJECT_SIZE;
 	private long memoryBufferSpace = 1 << 26; //64MB
@@ -375,7 +357,7 @@
 		@Override
 		public void run() {
 			try {
-				while (lowBlocks()) {
+				while (lowBlocks(false)) {
 					if (evictFromMemoryBuffer(false) == EMPTY_ADDRESS) {
 						break;
 					}
@@ -386,6 +368,7 @@
 		}
 	};
 	private int cleaningThreshold;
+	private int criticalCleaningThreshold;
 	
 	private AtomicLong storageWrites = new AtomicLong();
 	private AtomicLong storageReads = new AtomicLong();
@@ -402,7 +385,9 @@
 		this.inodeByteBuffer = new BlockByteBuffer(30, blocks+1, LOG_INODE_SIZE, direct);
 		memoryWritePermits = new Semaphore(Math.max(1, (int)Math.min(memoryBufferSpace/maxStorageObjectSize, Integer.MAX_VALUE)));
 		maxMemoryBlocks = Math.min(MAX_DOUBLE_INDIRECT, maxStorageObjectSize>>LOG_BLOCK_SIZE);
-		cleaningThreshold = Math.min(maxMemoryBlocks<<3, blocks>>1); //try to maintain enough freespace to hold 8 max objects
+		//try to maintain enough freespace so that writers don't block in cleaning
+		cleaningThreshold = Math.min(maxMemoryBlocks<<4, blocks>>1);
+		criticalCleaningThreshold = Math.min(maxMemoryBlocks<<2, blocks>>2);
 		//account for index pointer block overhead
 		if (maxMemoryBlocks > DIRECT_POINTERS) {
 			maxMemoryBlocks--;
@@ -423,9 +408,9 @@
 		this.sizeBasedStores = stores.toArray(new BlockStore[stores.size()]);
 	}
 	
-	boolean lowBlocks() {
+	boolean lowBlocks(boolean critical) {
 		int bitsSet = blocksInuse.getBitsSet();
-		return bitsSet > 0 && (blocks - bitsSet < cleaningThreshold) && memoryBufferEntries.firstEntry() != null;
+		return bitsSet > 0 && (blocks - bitsSet < (critical?criticalCleaningThreshold:cleaningThreshold)) && memoryBufferEntries.firstEntry() != null;
 	}
 	
 	InodeBlockManager getBlockManager(long gid, long oid, int inode) {
@@ -441,7 +426,6 @@
 		InodeBlockManager blockManager = null;
 		boolean hasPermit = false;
 		PhysicalInfo info = null;
-		boolean newEntry = true;
 		boolean success = false;
 		try {
 			Map<Long, PhysicalInfo> map = physicalMapping.get(s.getId());
@@ -454,18 +438,29 @@
 					return; //already removed
 				}
 			} else {
-				newEntry = false;
 				synchronized (info) {
 					//we assume that serialization would be faster than a disk read
 					if (info.inode != EMPTY_ADDRESS || !shouldPlaceInMemoryBuffer(0, info)) {
 						success = true;
 						return; 
 					}
+					//we should not be in memory since there is no inode assigned
+					assert !memoryBufferEntries.getEvictionQueue().containsKey(info);
 				}
 			}
-			if (!cleanerRunning.get() && lowBlocks() && cleanerRunning.compareAndSet(false, true)) {
-				LogManager.logDetail(LogConstants.CTX_DQP, "Starting memory buffer cleaner"); //$NON-NLS-1$
-				asynchPool.execute(cleaningTask);
+			//proactively create freespace
+			if (!cleanerRunning.get()) {
+				if (lowBlocks(false) && cleanerRunning.compareAndSet(false, true)) {
+					LogManager.logDetail(LogConstants.CTX_DQP, "Starting memory buffer cleaner"); //$NON-NLS-1$
+					asynchPool.execute(cleaningTask);
+					if (lowBlocks(true)) {
+						//do a non-blocking removal before we're forced to block
+						evictFromMemoryBuffer(false);
+					}
+				}
+			} else if (lowBlocks(true)) {
+				//do a non-blocking removal before we're forced to block
+				evictFromMemoryBuffer(false);
 			}
 			memoryWritePermits.acquire();
 			hasPermit = true;
@@ -478,15 +473,14 @@
             synchronized (map) {
             	//synchronize to ensure proper cleanup from a concurrent removal 
             	if (physicalMapping.containsKey(s.getId()) && map.containsKey(entry.getId())) {
-            		if (newEntry) {
+            		if (info == null) {
 	           			info = new PhysicalInfo(s.getId(), entry.getId(), blockManager.getInode(), fsos.getBytesWritten());
 		                map.put(entry.getId(), info);
-            		} else {
-            			synchronized (info) {
-                			info.inode = blockManager.getInode();
-						}
             		}
-            		memoryBufferEntries.put(entry.getId(), info);
+        			synchronized (info) {
+            			info.inode = blockManager.getInode();
+                		memoryBufferEntries.put(entry.getId(), info);
+					}
             		success = true;
             	}
 			}
@@ -503,28 +497,61 @@
 	}
 	
 	@Override
-	public CacheEntry get(Long oid, Serializer<?> serializer) throws TeiidComponentException {
+	public PhysicalInfo lockForLoad(Long oid, Serializer<?> serializer) {
+		Map<Long, PhysicalInfo> map = physicalMapping.get(serializer.getId());
+		if (map == null) {
+			return null;
+		}
+		PhysicalInfo info = map.get(oid);
+		if (info == null) {
+			return null;
+		}
+		synchronized (info) {
+			while (info.loading) {
+				try {
+					info.wait();
+				} catch (InterruptedException e) {
+					throw new TeiidRuntimeException(e);
+				}
+			}
+			info.loading = true;
+		}
+		return info;
+	}
+	
+	@Override
+	public void unlockForLoad(PhysicalInfo info) {
+		if (info == null) {
+			return;
+		}
+		synchronized (info) {
+			assert info.loading;
+			info.loading = false;
+			info.notifyAll();
+		}
+	}
+	
+	@Override
+	public CacheEntry get(PhysicalInfo info, Long oid, Serializer<?> serializer) throws TeiidComponentException {
+		if (info == null) {
+			return null;
+		}
 		long currentTime = readAttempts.incrementAndGet();
+		InputStream is = null;
+		boolean inStorage = false;
 		try {
-			Map<Long, PhysicalInfo> map = physicalMapping.get(serializer.getId());
-			if (map == null) {
-				return null;
-			}
-			final PhysicalInfo info = map.get(oid);
-			if (info == null) {
-				return null;
-			}
-			CacheEntry ce = new CacheEntry(oid);
-			InputStream is = null;
 			synchronized (info) {
 				if (info.inode != EMPTY_ADDRESS) {
-					memoryBufferEntries.get(oid); //touch this entry
+					info.pinned = true;
+					PhysicalInfo existing = memoryBufferEntries.get(info.getId()); //touch this entry
+					assert existing == info;
 					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
 						LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at inode", info.inode, serializer.getId(), oid); //$NON-NLS-1$
 					}
 					BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
 					is = new BlockInputStream(manager, info.memoryBlockCount);
 				} else if (info.block != EMPTY_ADDRESS) {
+					inStorage = true;
 					storageReads.incrementAndGet();
 					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
 						LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at block", info.block, info.sizeIndex, serializer.getId(), oid); //$NON-NLS-1$
@@ -533,29 +560,33 @@
 					FileStore fs = blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
 					long blockOffset = (info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
 					is = fs.createInputStream(blockOffset, info.memoryBlockCount<<LOG_BLOCK_SIZE);
-					if (shouldPlaceInMemoryBuffer(currentTime, info) && this.memoryWritePermits.tryAcquire()) {
-						BlockManager manager = null;
-						boolean success = false;
-						try {
-							manager = getBlockManager(info.gid, info.getId(), EMPTY_ADDRESS);
-							ExtensibleBufferedOutputStream os = new BlockOutputStream(manager);
-				            ObjectConverterUtil.write(os, is, -1);
-				            info.inode = manager.getInode();
-							memoryBufferEntries.put(info.getId(), info);
-							is = new BlockInputStream(manager, info.memoryBlockCount);
-							success = true;
-						} finally {
-							this.memoryWritePermits.release();
-							if (!success && manager != null) {
-								manager.free(false);
-								info.inode = EMPTY_ADDRESS;
-							}
-						}
-					}
 				} else {
 					return null;
 				}
 			}
+			if (inStorage && shouldPlaceInMemoryBuffer(currentTime, info) && this.memoryWritePermits.tryAcquire()) {
+				BlockManager manager = null;
+				boolean success = false;
+				try {
+					manager = getBlockManager(info.gid, info.getId(), EMPTY_ADDRESS);
+					ExtensibleBufferedOutputStream os = new BlockOutputStream(manager);
+		            ObjectConverterUtil.write(os, is, -1);
+		            synchronized (info) {
+			            info.inode = manager.getInode();
+			            info.pinned = true;
+						memoryBufferEntries.put(info.getId(), info);
+					}
+					is = new BlockInputStream(manager, info.memoryBlockCount);
+					success = true;
+				} finally {
+					this.memoryWritePermits.release();
+					if (!success && manager != null) {
+						manager.free(false);
+						info.inode = EMPTY_ADDRESS;
+					}
+				}
+			}
+			CacheEntry ce = new CacheEntry(oid);
 			ObjectInputStream ois = new ObjectInputStream(is);
 			ce.setSizeEstimate(ois.readInt());
 			ce.setLastAccess(1);
@@ -567,6 +598,11 @@
         	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid)); //$NON-NLS-1$
         } catch (ClassNotFoundException e) {
         	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid)); //$NON-NLS-1$
+        } finally {
+        	synchronized (info) {
+				info.pinned = false;
+				info.notifyAll();
+			}
         }
 	}
 
@@ -579,7 +615,7 @@
 	 */
 	private boolean shouldPlaceInMemoryBuffer(long currentTime, PhysicalInfo info) {
 		Map.Entry<PhysicalInfo, Long> lowest = memoryBufferEntries.firstEntry();
-		return (blocksInuse.getTotalBits() - blocksInuse.getBitsSet()) > (info.memoryBlockCount<<2)
+		return (blocksInuse.getTotalBits() - blocksInuse.getBitsSet()) > (criticalCleaningThreshold + info.memoryBlockCount)
 				|| (lowest != null && lowest.getKey().block != EMPTY_ADDRESS 
 						&& lowest.getKey().getOrderingValue() < (currentTime>0?memoryBufferEntries.computeNextOrderingValue(currentTime, info.getLastAccess(), info.getOrderingValue()):info.getOrderingValue()));
 	}
@@ -631,55 +667,90 @@
 		}
 	}
 	
-	int free(Long oid, PhysicalInfo info, boolean demote, boolean stealDataBlock) {
+	/**
+	 * Multi-purpose method to free memory.  Modes are:
+	 * demote && !acquireDataBlock -> push out of memory buffer onto disk
+	 * demote && acquireDataBlock -> push out of memory and reuse a datablock
+	 * !demote -> full removal from memory and disk
+	 */
+	int free(Long oid, PhysicalInfo info, boolean demote, boolean acquireDataBlock) {
 		if (info == null) {
 			return EMPTY_ADDRESS;
 		}
-		synchronized (info) {
-			memoryBufferEntries.remove(oid);
-			if (demote) {
-				if (info.inode == EMPTY_ADDRESS) {
+		int result = EMPTY_ADDRESS;
+		BlockManager bm = null;
+		int block = EMPTY_ADDRESS;
+		try {
+			int memoryBlockCount;
+			int sizeIndex;
+			synchronized (info) {
+				info.evicting = true;
+				block = info.block;
+				memoryBlockCount = info.memoryBlockCount;
+				sizeIndex = info.sizeIndex;
+				if (info.inode != EMPTY_ADDRESS) {
+					bm = getBlockManager(info.gid, oid, info.inode);
+				} else if (demote) {
 					return EMPTY_ADDRESS;
 				}
-				BlockManager bm = getBlockManager(info.gid, oid, info.inode);
+				//release the lock to perform the transfer
+			}
+			if (demote && block == EMPTY_ADDRESS) {
+				storageWrites.getAndIncrement();
+				BlockInputStream is = new BlockInputStream(bm, memoryBlockCount);
+				BlockStore blockStore = sizeBasedStores[sizeIndex];
+				block = getAndSetNextClearBit(blockStore);
+				FileStore fs = blockStore.stores[block/blockStore.blocksInUse.getBitsPerSegment()];
+				long blockOffset = (block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
+				byte[] b = new byte[BLOCK_SIZE];
+				int read = 0;
+				try {
+					while ((read = is.read(b, 0, b.length)) != -1) {
+						fs.write(blockOffset, b, 0, read);
+						blockOffset+=read;
+					}
+				} catch (Throwable e) {
+					//shouldn't happen, but we'll invalidate this write and continue
+					demote = false;
+					//just continue to free
+					LogManager.logError(LogConstants.CTX_DQP, e, "Error transferring block to storage " + oid); //$NON-NLS-1$
+				}
+			}
+		} finally {
+			//ensure post conditions
+			synchronized (info) {
+				//it is possible for a read to happen while evicting.
+				//that's ok, we'll just wait for it to finish
+				await(info, true, false);
+				info.evicting = false;
+				info.notifyAll();
 				info.inode = EMPTY_ADDRESS;
-				if (info.block == EMPTY_ADDRESS) {
-					storageWrites.getAndIncrement();
-					BlockInputStream is = new BlockInputStream(bm, info.memoryBlockCount);
-					BlockStore blockStore = sizeBasedStores[info.sizeIndex];
-					info.block = getAndSetNextClearBit(blockStore);
-					FileStore fs = blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
-					long blockOffset = (info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
-					byte[] b = new byte[BLOCK_SIZE];
-					int read = 0;
-					try {
-						while ((read = is.read(b, 0, b.length)) != -1) {
-							fs.write(blockOffset, b, 0, read);
-							blockOffset+=read;
-						}
-					} catch (Throwable e) {
-						//shouldn't happen, but we'll invalidate this write and continue
+				memoryBufferEntries.remove(info.getId());
+				if (block != EMPTY_ADDRESS) {
+					if (demote) {
+						info.block = block;
+					} else {
+						BlockStore blockStore = sizeBasedStores[info.sizeIndex];
 						blockStore.blocksInUse.clear(info.block);
 						info.block = EMPTY_ADDRESS;
-						//just continue to free
-						LogManager.logError(LogConstants.CTX_DQP, e, "Error transferring block to storage " + oid); //$NON-NLS-1$
 					}
-					return is.free(stealDataBlock);
 				}
-				return bm.free(stealDataBlock);
+				if (bm != null) {
+					result = bm.free(acquireDataBlock);
+				}
 			}
-			if (info.inode != EMPTY_ADDRESS) {
-				BlockManager bm = getBlockManager(info.gid, oid, info.inode);
-				info.inode = EMPTY_ADDRESS;
-				bm.free(false);
+		}
+		return result;
+	}
+
+	private void await(PhysicalInfo info, boolean pinned, boolean evicting) {
+		while ((pinned && info.pinned) || (evicting && info.evicting)) {
+			try {
+				info.wait();
+			} catch (InterruptedException e) {
+				throw new TeiidRuntimeException(e);
 			}
-			if (info.block != EMPTY_ADDRESS) {
-				BlockStore blockStore = sizeBasedStores[info.sizeIndex];
-				blockStore.blocksInUse.clear(info.block);
-				info.block = EMPTY_ADDRESS;
-			}
 		}
-		return EMPTY_ADDRESS;
 	}
 	
 	static int getAndSetNextClearBit(BlockStore bs) {
@@ -690,34 +761,62 @@
 		return result;
 	}
 	
-	int evictFromMemoryBuffer(boolean steal) {
-		memoryEvictionLock.writeLock().lock();
+	/**
+	 * Eviction routine.  When space is exhausted datablocks are stolen from
+	 * memory entries
+	 * starvation.
+	 * @param acquire
+	 * @return
+	 */
+	int evictFromMemoryBuffer(boolean acquire) {
+		boolean writeLocked = false;
 		int next = -1;
 		try {
-			for (int i = 0; i < 10 && next == EMPTY_ADDRESS; i++) {
+			for (int i = 0; i < EVICTION_SCANS && next == EMPTY_ADDRESS; i++) {
+				//doing a cleanup may trigger the purging of resources
 				AutoCleanupUtil.doCleanup();
 				//scan the eviction queue looking for a victim
 				Iterator<Map.Entry<PhysicalInfo, Long>> iter = memoryBufferEntries.getEvictionQueue().entrySet().iterator();
-				while (((!steal && lowBlocks()) || (steal && (next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS)) && iter.hasNext()) {
+				while (((!acquire && lowBlocks(false)) || (acquire && (next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS)) && iter.hasNext()) {
 					Map.Entry<PhysicalInfo, Long> entry = iter.next();
 					PhysicalInfo info = entry.getKey();
 					synchronized (info) {
 						if (info.inode == EMPTY_ADDRESS) {
 							continue;
 						}
-						next = free(entry.getValue(), info, true, steal);
-						if (!steal) {
-							next = 0;
+						if (info.pinned || info.evicting) {
+							if (!acquire || i != EVICTION_SCANS - 1) {
+								continue;
+							}
+							if (acquire && !writeLocked) {
+								//stop the world - prevent any other thread from taking a free block
+								//until this one is satisfied
+								memoryEvictionLock.writeLock().lock();
+								writeLocked = true;
+							}
+							//wait for the read/eviction to be over 
+							await(info, true, true);
+							if (info.inode == EMPTY_ADDRESS) {
+								continue;
+							}
 						}
+						//mark as evicting early so that other evictFromMemoryCalls don't select this same entry
+						info.evicting = true;
 					}
+					next = free(entry.getValue(), info, true, acquire);
+					if (!acquire) {
+						next = 0; //let the cleaner know that we made progress
+					}
 					break;
 				}
 			} 
-			if (steal && next == -1) {
+			if (acquire && next == -1) {
 				throw new AssertionError("Could not free space for pending write"); //$NON-NLS-1$
 			}
 		} finally {
-			memoryEvictionLock.writeLock().unlock();
+			if (writeLocked) {
+				memoryEvictionLock.writeLock().unlock();
+			}
 		}
 		return next;
 	}
@@ -731,7 +830,7 @@
 	}
 	
 	public void setMemoryBufferSpace(long maxBufferSpace) {
-		this.memoryBufferSpace = Math.min(maxBufferSpace, 1l<<(ADDRESS_BITS+LOG_BLOCK_SIZE));
+		this.memoryBufferSpace = Math.min(maxBufferSpace, MAX_ADDRESSABLE_MEMORY);
 	}
 	
 	public int getInodesInUse() {
@@ -755,3 +854,34 @@
 	}
 	
 }
+
+/**
+ * Represents the memory buffer and storage state of an object.
+ * It is important to minimize the amount of data held here.
+ * Currently should be 40 bytes.
+ */
+final class PhysicalInfo extends BaseCacheEntry {
+	final Long gid;
+	//the memory inode and block count
+	int inode = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
+	final int memoryBlockCount;
+	//the storage block and BlockStore index
+	int block = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
+	byte sizeIndex = 0;
+	//state flags
+	boolean pinned; //indicates that the entry is being read
+	boolean evicting; //indicates that the entry will be moved out of the memory buffer
+	boolean loading; //used by tier 1 cache to prevent double loads
+	
+	public PhysicalInfo(Long gid, Long id, int inode, int size) {
+		super(id);
+		this.inode = inode;
+		this.gid = gid;
+		this.memoryBlockCount = (size>>BufferFrontedFileStoreCache.LOG_BLOCK_SIZE) + ((size&BufferFrontedFileStoreCache.BLOCK_MASK)>0?1:0);
+		int blocks = memoryBlockCount;
+		while (blocks >= 1) {
+			this.sizeIndex++;
+			blocks>>=2;
+		}
+	}
+}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-14 17:31:57 UTC (rev 3552)
@@ -213,7 +213,9 @@
 			if (ce != null) {
 				return (List<List<?>>)(!retain?ce.nullOut():ce.getObject());
 			}
-			synchronized (this) {
+			//obtain a granular lock to prevent double memory loading
+			Object o = cache.lockForLoad(batch, this);
+			try {
 				ce = fastGet(batch, prefersMemory.get(), retain);
 				if (ce != null) {
 					return (List<List<?>>)(!retain?ce.nullOut():ce.getObject());
@@ -222,7 +224,7 @@
 				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
 					LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, id, id, "reading batch", batch, "from storage, total reads:", count); //$NON-NLS-1$ //$NON-NLS-2$
 				}
-				ce = cache.get(batch, this);
+				ce = cache.get(o, batch, this);
 				if (ce == null) {
 					throw new AssertionError("Batch not found in storage " + batch); //$NON-NLS-1$
 				}
@@ -234,7 +236,9 @@
 				if (retain) {
 					addMemoryEntry(ce, null);
 				}
-			}	
+			} finally {
+				cache.unlockForLoad(o);
+			}
 			return (List<List<?>>)ce.getObject();
 		}
 		
@@ -281,6 +285,7 @@
     private AtomicInteger maxReserveKB = new AtomicInteger(1 << 18);
     private volatile int reserveBatchKB;
     private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a hint to set the reserveBatchKB
+    private long memoryBufferSpace; //used as a hint to account for batch overhead (only useful in large scenarios)
     private boolean useWeakReferences = true;
     private boolean inlineLobs = true;
     private int targetBytesPerRow = TARGET_BYTES_PER_ROW;
@@ -313,7 +318,7 @@
     	};
     });
     
-    Cache cache;
+    private Cache cache;
     
 	private Map<String, TupleReference> tupleBufferMap = new ConcurrentHashMap<String, TupleReference>();
 	private ReferenceQueue<TupleBuffer> tupleBufferQueue = new ReferenceQueue<TupleBuffer>();
@@ -459,6 +464,10 @@
 		this.maxActivePlans = maxActivePlans;
 	}
     
+    public void setMemoryBufferSpace(long memoryBufferSpace) {
+		this.memoryBufferSpace = memoryBufferSpace;
+	}
+    
     public void setMaxProcessingKB(int maxProcessingKB) {
 		this.maxProcessingKB = maxProcessingKB;
 	}
@@ -479,6 +488,8 @@
 				this.maxReserveKB.addAndGet(((int)Math.max(0, (maxMemory - one_gig) * .75)));
 			}
 			this.maxReserveKB.addAndGet(((int)Math.max(0, Math.min(one_gig, maxMemory) * .5)));
+			int batchOverheadKB = (int)(this.memoryBufferSpace<0?(this.maxReserveKB.get()<<8):this.memoryBufferSpace)>>20;
+    		this.maxReserveKB.set(Math.max(0, this.maxReserveKB.get() - batchOverheadKB));
     	}
 		this.reserveBatchKB = this.getMaxReserveKB();
 		if (this.maxProcessingKBOrig == null) {

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java	2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java	2011-10-14 17:31:57 UTC (rev 3552)
@@ -98,10 +98,17 @@
 	
 	/**
 	 * Makes a best effort to atomically find the next clear bit and set it
-	 * @return the next bit index or -1 if no clear bits are founds
+	 * @return the next bit index or -1 if no clear bits are found
 	 */
 	public int getAndSetNextClearBit() {
-		int start = counter.getAndIncrement();
+		return getAndSetNextClearBit(counter.getAndIncrement());
+	}
+	
+	public int getNextSegment() {
+		return counter.getAndIncrement();
+	}
+	
+	public int getAndSetNextClearBit(int start) {
 		int nextBit = -1;
 		for (int i = 0; i < segments.length; i++) {
 			Segment s = segments[(start+i)&(segments.length-1)];

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java	2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java	2011-10-14 17:31:57 UTC (rev 3552)
@@ -38,7 +38,7 @@
 import org.teiid.core.TeiidComponentException;
 
 
-public class MemoryStorageManager implements Cache {
+public class MemoryStorageManager implements Cache<Long> {
 	
 	public static final int MAX_FILE_SIZE = 1 << 17;
     
@@ -132,7 +132,17 @@
 	}
 	
 	@Override
-	public CacheEntry get(Long id, Serializer<?> serializer)
+	public Long lockForLoad(Long oid, Serializer<?> serializer) {
+		return oid;
+	}
+	
+	@Override
+	public void unlockForLoad(Long o) {
+		//nothing to do no locking
+	}
+	
+	@Override
+	public CacheEntry get(Long lock, Long id, Serializer<?> serializer)
 			throws TeiidComponentException {
 		Map<Long, CacheEntry> group = groups.get(serializer.getId());
 		if (group != null) {

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java	2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java	2011-10-14 17:31:57 UTC (rev 3552)
@@ -77,8 +77,7 @@
 		ce.setObject(cacheObject);
 		cache.addToCacheGroup(s.getId(), ce.getId());
 		cache.add(ce, s);
-		
-		ce = cache.get(2l, s);
+		ce = get(cache, 2l, s);
 		assertEquals(cacheObject, ce.getObject());
 		
 		//test something that exceeds the direct inode data blocks
@@ -88,7 +87,7 @@
 		cache.addToCacheGroup(s.getId(), ce.getId());
 		cache.add(ce, s);
 		
-		ce = cache.get(3l, s);
+		ce = get(cache, 3l, s);
 		assertEquals(cacheObject, ce.getObject());
 		
 		cache.removeCacheGroup(1l);
@@ -104,7 +103,7 @@
 		cache.addToCacheGroup(s.getId(), ce.getId());
 		cache.add(ce, s);
 		
-		ce = cache.get(3l, s);
+		ce = get(cache, 3l, s);
 		assertEquals(cacheObject, ce.getObject());
 
 		cache.removeCacheGroup(1l);
@@ -120,7 +119,7 @@
 		cache.addToCacheGroup(s.getId(), ce.getId());
 		cache.add(ce, s);
 		
-		ce = cache.get(3l, s);
+		ce = get(cache, 3l, s);
 		assertNull(ce);
 
 		cache.removeCacheGroup(1l);
@@ -128,6 +127,14 @@
 		assertEquals(0, cache.getDataBlocksInUse());
 		assertEquals(0, cache.getInodesInUse());
 	}
+
+	private CacheEntry get(BufferFrontedFileStoreCache cache, Long oid,
+			Serializer<Integer> s) throws TeiidComponentException {
+		PhysicalInfo o = cache.lockForLoad(oid, s);
+		CacheEntry ce = cache.get(o, oid, s);
+		cache.unlockForLoad(o);
+		return ce;
+	}
 	
 	@Test public void testEviction() throws Exception {
 		BufferFrontedFileStoreCache cache = createLayeredCache(1<<15, 1<<15);
@@ -144,7 +151,7 @@
 		
 		ce = new CacheEntry(3l);
 		ce.setSerializer(ref);
-		cacheObject = Integer.valueOf(5000);
+		cacheObject = Integer.valueOf(5001);
 		ce.setObject(cacheObject);
 		cache.addToCacheGroup(s.getId(), ce.getId());
 		cache.add(ce, s);
@@ -152,8 +159,11 @@
 		assertEquals(3, cache.getDataBlocksInUse());
 		assertEquals(1, cache.getInodesInUse());
 
-		ce = cache.get(2l, s);
+		ce = get(cache, 2l, s);
 		assertEquals(Integer.valueOf(5000), ce.getObject());
+		
+		ce = get(cache, 3l, s);
+		assertEquals(Integer.valueOf(5001), ce.getObject());
 	}
 
 	private BufferFrontedFileStoreCache createLayeredCache(int bufferSpace, int objectSize) throws TeiidComponentException {

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java	2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java	2011-10-14 17:31:57 UTC (rev 3552)
@@ -62,6 +62,13 @@
 		}
 	}
 	
+	@Test public void testSegmentUse() {
+		ConcurrentBitSet bst = new ConcurrentBitSet(50001, 4);
+		assertEquals(0, bst.getAndSetNextClearBit(0));
+		assertEquals(1, bst.getAndSetNextClearBit(0));
+		assertEquals(2, bst.getAndSetNextClearBit(4));
+	}
+	
 	@Test public void testCompactBitSet() {
 		ConcurrentBitSet bst = new ConcurrentBitSet(100000, 1);
 		bst.setCompact(true);

Modified: trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java	2011-10-14 15:23:51 UTC (rev 3551)
+++ trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java	2011-10-14 17:31:57 UTC (rev 3552)
@@ -68,7 +68,7 @@
     private int maxReserveKb = BufferManager.DEFAULT_RESERVE_BUFFER_KB;
     private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE>>20;
     private boolean inlineLobs = true;
-    private int maxMemoryBufferSpace = -1;
+    private int memoryBufferSpace = -1;
     private int maxStorageObjectSize = BufferFrontedFileStoreCache.DEFAuLT_MAX_OBJECT_SIZE;
 	private FileStorageManager fsm;
 	
@@ -93,7 +93,7 @@
             this.bufferMgr.setProcessorBatchSize(Integer.valueOf(processorBatchSize));
             this.bufferMgr.setMaxReserveKB(this.maxReserveKb);
             this.bufferMgr.setMaxProcessingKB(this.maxProcessingKb);
-            
+            this.bufferMgr.setMemoryBufferSpace(Math.min(BufferFrontedFileStoreCache.MAX_ADDRESSABLE_MEMORY, this.memoryBufferSpace));
             this.bufferMgr.initialize();
             
             // If necessary, add disk storage manager
@@ -110,11 +110,11 @@
                 ssm.setMaxFileSize(maxFileSize);
                 BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
                 fsc.setMaxStorageObjectSize(maxStorageObjectSize);
-                if (maxMemoryBufferSpace <= 0) {
-                	//use approximately 20% of what's set aside for the reserved
-                	fsc.setMemoryBufferSpace(this.bufferMgr.getMaxReserveKB() * 200);
+                if (memoryBufferSpace < 0) {
+                	//use approximately 25% of what's set aside for the reserved
+                	fsc.setMemoryBufferSpace(this.bufferMgr.getMaxReserveKB() << 8);
                 } else {
-                	fsc.setMemoryBufferSpace(maxMemoryBufferSpace);
+                	fsc.setMemoryBufferSpace(memoryBufferSpace);
                 }
                 fsc.setStorageManager(ssm);
                 fsc.initialize();
@@ -254,21 +254,21 @@
 	public long getReadAttempts() {
 		return bufferMgr.getReadAttempts();
 	}
-    
-    public int getMaxMemoryBufferSpace() {
-		return maxMemoryBufferSpace;
+
+    @ManagementProperty(description="Direct memory buffer space used by the buffer manager in MB.  -1 determines the setting automatically from the maxReserveKB (default -1).  This value cannot be smaller than maxStorageObjectSize.")
+    public int getMemoryBufferSpace() {
+		return memoryBufferSpace;
 	}
     
     public int getMaxStorageObjectSize() {
 		return maxStorageObjectSize;
 	}
 
-    @ManagementProperty(description="Max direct memory buffer space used by the buffer manager in MB.  -1 determines the setting automatically from the maxReserveKB (default -1).  This value cannot be smaller than maxStorageObjectSize.")
-    public void setMaxMemoryBufferSpace(int maxMemoryBufferSpace) {
-		this.maxMemoryBufferSpace = maxMemoryBufferSpace;
+    @ManagementProperty(description="The maximum size of a buffer managed object (typically a table page or a results batch) in bytes (default 8388608).")
+    public void setMemoryBufferSpace(int maxMemoryBufferSpace) {
+		this.memoryBufferSpace = maxMemoryBufferSpace;
 	}
 
-    @ManagementProperty(description="The maximum size of a buffer managed object (typically a table page or a results batch) in bytes (default 8388608).")
     public void setMaxStorageObjectSize(int maxStorageObjectSize) {
 		this.maxStorageObjectSize = maxStorageObjectSize;
 	}



More information about the teiid-commits mailing list