[teiid-commits] teiid SVN: r3577 - in trunk/engine/src: test/java/org/teiid/common/buffer/impl and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Mon Oct 24 13:25:36 EDT 2011


Author: shawkins
Date: 2011-10-24 13:25:36 -0400 (Mon, 24 Oct 2011)
New Revision: 3577

Added:
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java
Modified:
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
Log:
TEIID-1750 adding better file locking and refining defrag

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java	2011-10-22 11:40:30 UTC (rev 3576)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java	2011-10-24 17:25:36 UTC (rev 3577)
@@ -29,13 +29,29 @@
 		ExtensibleBufferedOutputStream {
 	private final BlockManager blockManager;
 	int blockNum = -1;
+	private final int maxBlocks;
+	private final boolean allocate;
+	
+	static final IOException exceededMax = new IOException();  
 
-	BlockOutputStream(BlockManager blockManager) {
+	/**
+	 * @param blockManager
+	 * @param maxBlocks a max of -1 indicates use existing blocks
+	 */
+	BlockOutputStream(BlockManager blockManager, int maxBlocks) {
 		this.blockManager = blockManager;
+		this.allocate = maxBlocks != -1;
+		this.maxBlocks = maxBlocks - 2; //convert to an index
 	}
-
+	
 	@Override
-	protected ByteBuffer newBuffer() {
+	protected ByteBuffer newBuffer() throws IOException {
+		if (!allocate) {
+			return blockManager.getBlock(++blockNum);
+		}
+		if (blockNum > maxBlocks) {
+			throw exceededMax;
+		}
 		return blockManager.allocateBlock(++blockNum);
 	}
 	

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java	2011-10-22 11:40:30 UTC (rev 3576)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java	2011-10-24 17:25:36 UTC (rev 3577)
@@ -72,11 +72,11 @@
 		int block = getAndSetNextClearBit(info);
 		int segment = block/blocksInUse.getBitsPerSegment();
 		boolean success = false;
-		//we're using the read lock here so that defrag can lock the write out
-		locks[segment].readLock().lock();
+		this.locks[segment].writeLock().lock();
 		try {
 			FileStore fs = stores[segment];
 			long blockOffset = (block%blocksInUse.getBitsPerSegment())*blockSize;
+			//TODO: there is still an extra buffer being created here, we could FileChannels to do better
 			byte[] b = new byte[BufferFrontedFileStoreCache.BLOCK_SIZE];
 			int read = 0;
 			while ((read = is.read(b, 0, b.length)) != -1) {
@@ -85,7 +85,7 @@
 			}
 			success = true;
 		} finally {
-			locks[segment].readLock().unlock();
+			locks[segment].writeLock().unlock();
 			if (!success) {
 				blocksInUse.clear(block);
 				block = BufferFrontedFileStoreCache.EMPTY_ADDRESS;

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-22 11:40:30 UTC (rev 3576)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-24 17:25:36 UTC (rev 3577)
@@ -41,10 +41,10 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.teiid.common.buffer.AutoCleanupUtil;
-import org.teiid.common.buffer.BaseCacheEntry;
 import org.teiid.common.buffer.Cache;
 import org.teiid.common.buffer.CacheEntry;
 import org.teiid.common.buffer.CacheKey;
@@ -96,7 +96,7 @@
  */
 public class BufferFrontedFileStoreCache implements Cache<PhysicalInfo>, StorageManager {
 	
-	private static final int DEFAULT_MIN_DEFRAG = 1 << 23;
+	private static final int DEFAULT_MIN_DEFRAG = 1 << 26;
 	private static final byte[] HEADER_SKIP_BUFFER = new byte[16];
 	private static final int EVICTION_SCANS = 5;
 
@@ -154,8 +154,8 @@
 		}
 				
 		private int getOrUpdateDataBlockIndex(int index, int value, Mode mode) {
-			if (index >= MAX_DOUBLE_INDIRECT || (mode == Mode.ALLOCATE && index >= maxMemoryBlocks)) {
-				throw new TeiidRuntimeException("Max block number exceeded.  Increase the maxStorageObjectSize to support larger storage objects.  Alternatively you could make the processor batch size smaller."); //$NON-NLS-1$
+			if (index >= MAX_DOUBLE_INDIRECT) {
+				throw new TeiidRuntimeException("Max block number exceeded.  You could try making the processor batch size smaller."); //$NON-NLS-1$
 			}
 			int dataBlock = 0;
 			int position = 0;
@@ -372,25 +372,28 @@
 							continue;
 						}
 						try {
+							boolean sleep = false;
 							do {
-								int blockToMove = blockStore.blocksInUse.compactHighestBitSet(segment);
+								if (sleep) {
+									Thread.sleep(100); //let the file activity quite down
+								}
+								sleep = true;
+								int relativeBlockToMove = blockStore.blocksInUse.compactHighestBitSet(segment);
 								if (!shouldDefrag(blockStore, segment)) {
-									//truncate the file
-									blockStore.locks[segment].writeLock().lock();
-									try {
-										int endBlock = blockStore.blocksInUse.getHighestBitSet(segment);
-										long length = endBlock * blockStore.blockSize; 
-										blockStore.stores[segment].setLength(length);
-									} finally {
-										blockStore.locks[segment].writeLock().unlock();
-									}
+									truncate(blockStore, segment);
 									break;
 								}
 								//move the block if possible
-								InputStream is = blockStore.stores[segment].createInputStream(blockToMove * blockStore.blockSize);
+								InputStream is = blockStore.stores[segment].createInputStream(relativeBlockToMove * blockStore.blockSize, blockStore.blockSize);
 								DataInputStream dis = new DataInputStream(is);
-								Long gid = dis.readLong();
-								Long oid = dis.readLong();
+								Long gid = null;
+								Long oid = null;
+								try {
+									gid = dis.readLong();
+									oid = dis.readLong();
+								} catch (IOException e) {
+									continue; //can happen the bit was set and no data exists
+								}
 								dis.reset(); //move back to the beginning
 								Map<Long, PhysicalInfo> map = physicalMapping.get(gid);
 								if (map == null) {
@@ -400,16 +403,20 @@
 								if (info == null) {
 									continue;
 								}
+								int bitIndex = relativeBlockToMove + (segment * blockStore.blocksInUse.getBitsPerSegment());
 								synchronized (info) {
-									await(info, true, false);
+									info.await(true, false);
 									if (info.block == EMPTY_ADDRESS) {
 										continue;
 									}
-									assert info.block == blockToMove;
+									if (info.block != bitIndex) {
+										//we've marked a bit in use, but haven't yet written new data
+										continue;
+									}
 								}
 								int newBlock = blockStore.writeToStorageBlock(info, dis);
 								synchronized (info) {
-									await(info, true, true);
+									info.await(true, true);
 									if (info.block == EMPTY_ADDRESS) {
 										//already removed;
 										if (newBlock != EMPTY_ADDRESS) {
@@ -418,11 +425,14 @@
 										continue;
 									}
 									info.block = newBlock;
-									blockStore.blocksInUse.clear(blockToMove);
+									blockStore.blocksInUse.clear(bitIndex);
 								}
+								sleep = false;
 							} while (shouldDefrag(blockStore, segment));
 						} catch (IOException e) {
 							LogManager.logWarning(LogConstants.CTX_BUFFER_MGR, e, "Error performing defrag"); //$NON-NLS-1$
+						} catch (InterruptedException e) {
+							throw new TeiidRuntimeException(e);
 						}
 					}
 				}
@@ -430,6 +440,20 @@
 				defragRunning.set(false);
 			}
 		}
+
+		private void truncate(BlockStore blockStore, int segment) {
+			//truncate the file
+			blockStore.locks[segment].writeLock().lock();
+			try {
+				int endBlock = blockStore.blocksInUse.getHighestBitSet(segment);
+				long newLength = (endBlock + 1) * blockStore.blockSize; 
+				blockStore.stores[segment].setLength(newLength);
+			} catch (IOException e) {
+				LogManager.logWarning(LogConstants.CTX_BUFFER_MGR, e, "Error performing defrag truncate"); //$NON-NLS-1$
+			} finally {
+				blockStore.locks[segment].writeLock().unlock();
+			}
+		}
 	};
 	private AtomicBoolean cleanerRunning = new AtomicBoolean();
 	private final Runnable cleaningTask = new Runnable() {
@@ -465,8 +489,8 @@
 		this.blockByteBuffer = new BlockByteBuffer(30, blocks, LOG_BLOCK_SIZE, direct);
 		//ensure that we'll run out of blocks first
 		this.inodeByteBuffer = new BlockByteBuffer(30, blocks+1, LOG_INODE_SIZE, direct);
-		memoryWritePermits = new Semaphore(Math.max(1, (int)Math.min((((long)blocks)<<LOG_BLOCK_SIZE)/maxStorageObjectSize, Integer.MAX_VALUE)));
-		maxMemoryBlocks = Math.min(MAX_DOUBLE_INDIRECT, maxStorageObjectSize>>LOG_BLOCK_SIZE);
+		memoryWritePermits = new Semaphore(blocks);
+		maxMemoryBlocks = Math.min(MAX_DOUBLE_INDIRECT, blocks);
 		//try to maintain enough freespace so that writers don't block in cleaning
 		cleaningThreshold = Math.min(maxMemoryBlocks<<4, blocks>>1);
 		criticalCleaningThreshold = Math.min(maxMemoryBlocks<<2, blocks>>2);
@@ -480,9 +504,13 @@
 		}
 		List<BlockStore> stores = new ArrayList<BlockStore>();
 		int size = BLOCK_SIZE;
+		int files = 32; //this allows us to have 64 terabytes of smaller block sizes
 		do {
-			stores.add(new BlockStore(this.storageManager, size, 30, BufferManagerImpl.CONCURRENCY_LEVEL));
+			stores.add(new BlockStore(this.storageManager, size, 30, files));
 			size <<=1;
+			if (files > 1) {
+				files >>= 1;
+			}
 		} while ((size>>1) < maxStorageObjectSize);
 		this.sizeBasedStores = stores.toArray(new BlockStore[stores.size()]);
 	}
@@ -507,6 +535,7 @@
 		boolean hasPermit = false;
 		PhysicalInfo info = null;
 		boolean success = false;
+		int memoryBlocks = this.maxMemoryBlocks;
 		try {
 			Map<Long, PhysicalInfo> map = physicalMapping.get(s.getId());
 			if (map == null) {
@@ -538,26 +567,14 @@
 					}
 					info.adding = true;
 					//second chance re-add to the cache, we assume that serialization would be faster than a disk read
+					memoryBlocks = info.memoryBlockCount;
 				}
 			}
-			//proactively create freespace
-			if (!cleanerRunning.get()) {
-				if (lowBlocks(false) && cleanerRunning.compareAndSet(false, true)) {
-					LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Starting memory buffer cleaner"); //$NON-NLS-1$
-					asynchPool.execute(cleaningTask);
-					if (lowBlocks(true)) {
-						//do a non-blocking removal before we're forced to block
-						evictFromMemoryBuffer(false);
-					}
-				}
-			} else if (lowBlocks(true)) {
-				//do a non-blocking removal before we're forced to block
-				evictFromMemoryBuffer(false);
-			}
-			memoryWritePermits.acquire();
+			checkForLowMemory();
+			memoryWritePermits.acquire(memoryBlocks);
 			hasPermit = true;
 			blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
-			BlockOutputStream bos = new BlockOutputStream(blockManager);
+			BlockOutputStream bos = new BlockOutputStream(blockManager, memoryBlocks);
 			ObjectOutput dos = new DataObjectOutputStream(bos);
 			dos.writeLong(s.getId());
 			dos.writeLong(entry.getId());
@@ -577,15 +594,17 @@
             	}
 			}
 		} catch (Throwable e) {
-			if (e == PhysicalInfo.sizeChanged) {
+			if ((e == BlockOutputStream.exceededMax && newEntry) || e == PhysicalInfo.sizeChanged) {
 				//entries are mutable after adding, the original should be removed shortly so just ignore
 				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Object "+ entry.getId() +" changed size since first persistence, keeping the original."); //$NON-NLS-1$ //$NON-NLS-2$
+			} else if (e == BlockOutputStream.exceededMax){
+				LogManager.logError(LogConstants.CTX_BUFFER_MGR, "Max block number exceeded.  Increase the maxStorageObjectSize to support larger storage objects.  Alternatively you could make the processor batch size smaller."); //$NON-NLS-1$
 			} else {
 				LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting, attempts to read "+ entry.getId() +" later will result in an exception."); //$NON-NLS-1$ //$NON-NLS-2$
 			}
 		} finally {
 			if (hasPermit) {
-				memoryWritePermits.release();
+				memoryWritePermits.release(memoryBlocks);
 			}
 			if (info != null) {
 				synchronized (info) {
@@ -602,6 +621,23 @@
 		}
         return true;
 	}
+
+	private void checkForLowMemory() {
+		//proactively create freespace
+		if (!cleanerRunning.get()) {
+			if (lowBlocks(false) && cleanerRunning.compareAndSet(false, true)) {
+				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Starting memory buffer cleaner"); //$NON-NLS-1$
+				asynchPool.execute(cleaningTask);
+				if (lowBlocks(true)) {
+					//do a non-blocking removal before we're forced to block
+					evictFromMemoryBuffer(false);
+				}
+			}
+		} else if (lowBlocks(true)) {
+			//do a non-blocking removal before we're forced to block
+			evictFromMemoryBuffer(false);
+		}
+	}
 	
 	@Override
 	public PhysicalInfo lockForLoad(Long oid, Serializer<?> serializer) {
@@ -613,16 +649,7 @@
 		if (info == null) {
 			return null;
 		}
-		synchronized (info) {
-			while (info.loading) {
-				try {
-					info.wait();
-				} catch (InterruptedException e) {
-					throw new TeiidRuntimeException(e);
-				}
-			}
-			info.loading = true;
-		}
+		info.lockForLoad();
 		return info;
 	}
 	
@@ -631,11 +658,7 @@
 		if (info == null) {
 			return;
 		}
-		synchronized (info) {
-			assert info.loading;
-			info.loading = false;
-			info.notifyAll();
-		}
+		info.unlockForLoad();
 	}
 	
 	@Override
@@ -650,10 +673,12 @@
 			return null;
 		}
 		InputStream is = null;
+		Lock lock = null;
+		int memoryBlocks = 0;
 		try {
 			synchronized (info) {
 				assert !info.pinned && info.loading; //load should be locked
-				await(info, true, false); //not necessary, but should make things safer
+				info.await(true, false); //not necessary, but should make things safer
 				if (info.inode != EMPTY_ADDRESS) {
 					info.pinned = true;
 					memoryBufferEntries.touch(info); 
@@ -670,13 +695,19 @@
 						LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Getting object at block", info.block, info.sizeIndex, serializer.getId(), oid); //$NON-NLS-1$
 					}
 					BlockStore blockStore = sizeBasedStores[info.sizeIndex];
-					FileStore fs = blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
+					int segment = info.block/blockStore.blocksInUse.getBitsPerSegment();
+					FileStore fs = blockStore.stores[segment];
 					long blockOffset = (info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
 					is = fs.createInputStream(blockOffset, info.memoryBlockCount<<LOG_BLOCK_SIZE);
+					lock = blockStore.locks[segment].writeLock();
+					memoryBlocks = info.memoryBlockCount;
 				} else {
 					return null;
 				}
 			}
+			if (lock != null) {
+				is = readIntoMemory(info, is, lock, memoryBlocks);
+			}
 			ObjectInput dis = new DataObjectInputStream(is);
 			dis.readFully(HEADER_SKIP_BUFFER);
 			int sizeEstimate = dis.readInt();
@@ -686,7 +717,9 @@
         	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid)); //$NON-NLS-1$
         } catch (ClassNotFoundException e) {
         	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid)); //$NON-NLS-1$
-        } finally {
+        } catch (InterruptedException e) {
+        	throw new TeiidRuntimeException(e);
+		} finally {
         	synchronized (info) {
 				info.pinned = false;
 				info.notifyAll();
@@ -695,6 +728,56 @@
 	}
 
 	/**
+	 * Transfer into memory to release memory/file locks
+	 */
+	private InputStream readIntoMemory(PhysicalInfo info, InputStream is,
+			Lock fileLock, int memoryBlocks) throws InterruptedException,
+			IOException {
+		checkForLowMemory();
+		this.memoryWritePermits.acquire(memoryBlocks);
+		BlockManager manager = null;
+		boolean success = false;
+		boolean locked = false;
+		try {
+			manager = getBlockManager(info.gid, info.getId(), EMPTY_ADDRESS);
+			//preallocate the memory area, to ensure we won't exhaust memory while holding
+			//the file lock
+			for (int i = 0; i < memoryBlocks; i++) {
+				manager.allocateBlock(i);
+			}
+			
+			fileLock.lock();
+			locked = true;
+			ExtensibleBufferedOutputStream os = new BlockOutputStream(manager, -1);
+			//TODO: there is still an extra buffer being created here, we could FileChannels to do better
+			int b = -1;
+			while ((b = is.read()) != -1) {
+				os.write(b);
+			}
+			fileLock.unlock();
+			locked = false;
+		    synchronized (info) {
+		        info.inode = manager.getInode();
+				memoryBufferEntries.add(info);
+				is = new BlockInputStream(manager, info.memoryBlockCount);
+			}
+			success = true;
+		} finally {
+			try {
+				if (locked) {
+					fileLock.unlock();
+				}
+				if (!success && manager != null) {
+					manager.free(false);
+				}
+			} finally {
+				this.memoryWritePermits.release(memoryBlocks);
+			}
+		}
+		return is;
+	}
+
+	/**
 	 * Determine if an object should be in the memory buffer.
 	 * Adds are indicated by a current time of 0.
 	 * @param currentTime
@@ -778,7 +861,7 @@
 				//let any pending finish - it would be nice if we could pre-empt
 				//since we can save some work, but this should be rare enough 
 				//to just block
-				await(info, true, true);
+				info.await(true, true);
 				info.evicting = true;
 			} else {
 				assert info.evicting;
@@ -809,7 +892,7 @@
 				//it is possible for a read to happen while evicting.
 				//that's ok, we'll just wait for it to finish
 				assert info.evicting;
-				await(info, true, false);
+				info.await(true, false);
 				info.evicting = false;
 				info.notifyAll();
 				assert bm == null || info.inode != EMPTY_ADDRESS;
@@ -833,7 +916,7 @@
 						if (!defragRunning.get() 
 								&& shouldDefrag(blockStore, segment) 
 								&& defragRunning.compareAndSet(false, true)) {
-								this.asynchPool.execute(defragTask);
+							this.asynchPool.execute(defragTask);
 						}
 						info.block = EMPTY_ADDRESS;
 					}
@@ -849,21 +932,14 @@
 	boolean shouldDefrag(BlockStore blockStore, int segment) {
 		int highestBitSet = blockStore.blocksInUse.getHighestBitSet(segment);
 		int bitsSet = blockStore.blocksInUse.getBitsSet(segment);
-		highestBitSet = Math.max(bitsSet, Math.max(1, highestBitSet));
+		highestBitSet = Math.max(bitsSet, Math.max(0, highestBitSet));
+		if (highestBitSet == 0) {
+			return false;
+		}
 		int freeBlocks = highestBitSet-bitsSet;
 		return freeBlocks > (highestBitSet>>2) && freeBlocks*blockStore.blockSize > minDefrag;
 	}
 
-	private void await(PhysicalInfo info, boolean pinned, boolean evicting) {
-		while ((pinned && info.pinned) || (evicting && info.evicting)) {
-			try {
-				info.wait();
-			} catch (InterruptedException e) {
-				throw new TeiidRuntimeException(e);
-			}
-		}
-	}
-	
 	/**
 	 * Eviction routine.  When space is exhausted data blocks are acquired from
 	 * memory entries.
@@ -896,7 +972,7 @@
 								writeLocked = true;
 							}
 							//wait for the read/eviction to be over 
-							await(info, true, true);
+							info.await(true, true);
 							if (info.inode == EMPTY_ADDRESS) {
 								continue;
 							}
@@ -962,49 +1038,4 @@
 		this.minDefrag = minDefrag;
 	}
 
-}
-
-/**
- * Represents the memory buffer and storage state of an object.
- * It is important to minimize the amount of data held here.
- * Currently should be 48 bytes.
- */
-final class PhysicalInfo extends BaseCacheEntry {
-	
-	static final Exception sizeChanged = new Exception();  
-	
-	final Long gid;
-	//the memory inode and block count
-	int inode = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
-	int memoryBlockCount;
-	//the storage block and BlockStore index
-	int block = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
-	byte sizeIndex = 0;
-	//state flags
-	boolean pinned; //indicates that the entry is being read
-	boolean evicting; //indicates that the entry will be moved out of the memory buffer
-	boolean loading; //used by tier 1 cache to prevent double loads
-	boolean adding; //used to prevent double adds
-	
-	public PhysicalInfo(Long gid, Long id, int inode, int lastAccess) {
-		super(new CacheKey(id, lastAccess, 0));
-		this.inode = inode;
-		this.gid = gid;
-	}
-	
-	public void setSize(int size) throws Exception {
-		int newMemoryBlockCount = (size>>BufferFrontedFileStoreCache.LOG_BLOCK_SIZE) + ((size&BufferFrontedFileStoreCache.BLOCK_MASK)>0?1:0);
-		if (this.memoryBlockCount != 0) {
-			if (newMemoryBlockCount != memoryBlockCount) {
-				throw sizeChanged; 
-			}
-			return; //no changes
-		}
-		this.memoryBlockCount = newMemoryBlockCount;
-		while (newMemoryBlockCount > 1) {
-			this.sizeIndex++;
-			newMemoryBlockCount = (newMemoryBlockCount>>1) + ((newMemoryBlockCount&0x01)==0?0:1);
-		}
-	}
-	
-}
+}
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java	2011-10-22 11:40:30 UTC (rev 3576)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java	2011-10-24 17:25:36 UTC (rev 3577)
@@ -74,7 +74,7 @@
 		buf = null;
 	}
 
-	protected abstract ByteBuffer newBuffer();
+	protected abstract ByteBuffer newBuffer() throws IOException;
 	
 	/**
 	 * Flush up to i bytes where i is the current position of the buffer

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java	2011-10-22 11:40:30 UTC (rev 3576)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java	2011-10-24 17:25:36 UTC (rev 3577)
@@ -114,6 +114,9 @@
 	    protected synchronized int readWrite(long fileOffset, byte[] b, int offSet,
 	    		int length, boolean write) throws IOException {
 	    	if (!write) {
+	    		if (fileInfo == null) {
+	    			return -1;
+	    		}
 				try {
 					RandomAccessFile fileAccess = fileInfo.open();
 			        fileAccess.seek(fileOffset);

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java	2011-10-22 11:40:30 UTC (rev 3576)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java	2011-10-24 17:25:36 UTC (rev 3577)
@@ -54,7 +54,7 @@
 	
 	public LrfuEvictionQueue(AtomicLong clock) {
 		this.clock = clock;
-		setCrfLamda(.00005); //smaller values tend to work better since we're using interval bounds
+		setCrfLamda(.1); //smaller values tend to work better since we're using interval bounds
 	}
 
 	public boolean remove(V value) {

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java	2011-10-24 17:25:36 UTC (rev 3577)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import org.teiid.common.buffer.BaseCacheEntry;
+import org.teiid.common.buffer.CacheKey;
+import org.teiid.core.TeiidRuntimeException;
+
+/**
+ * Represents the memory buffer and storage state of an object.
+ * It is important to minimize the amount of data held here.
+ * Currently should be 48 bytes.
+ */
+final class PhysicalInfo extends BaseCacheEntry {
+	
+	static final Exception sizeChanged = new Exception();  
+	
+	final Long gid;
+	//the memory inode and block count
+	int inode = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
+	int memoryBlockCount;
+	//the storage block and BlockStore index
+	int block = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
+	byte sizeIndex = 0;
+	//state flags
+	boolean pinned; //indicates that the entry is being read
+	boolean evicting; //indicates that the entry will be moved out of the memory buffer
+	boolean loading; //used by tier 1 cache to prevent double loads
+	boolean adding; //used to prevent double adds
+	
+	PhysicalInfo(Long gid, Long id, int inode, int lastAccess) {
+		super(new CacheKey(id, lastAccess, 0));
+		this.inode = inode;
+		this.gid = gid;
+	}
+	
+	void setSize(int size) throws Exception {
+		int newMemoryBlockCount = (size>>BufferFrontedFileStoreCache.LOG_BLOCK_SIZE) + ((size&BufferFrontedFileStoreCache.BLOCK_MASK)>0?1:0);
+		if (this.memoryBlockCount != 0) {
+			if (newMemoryBlockCount != memoryBlockCount) {
+				throw sizeChanged; 
+			}
+			return; //no changes
+		}
+		this.memoryBlockCount = newMemoryBlockCount;
+		while (newMemoryBlockCount > 1) {
+			this.sizeIndex++;
+			newMemoryBlockCount = (newMemoryBlockCount>>1) + ((newMemoryBlockCount&0x01)==0?0:1);
+		}
+	}
+	
+	void await(boolean donePinning, boolean doneEvicting) {
+		while ((donePinning && pinned) || (doneEvicting && evicting)) {
+			try {
+				wait();
+			} catch (InterruptedException e) {
+				throw new TeiidRuntimeException(e);
+			}
+		}
+	}
+	
+	synchronized void lockForLoad() {
+		while (loading) {
+			try {
+				wait();
+			} catch (InterruptedException e) {
+				throw new TeiidRuntimeException(e);
+			}
+		}
+		loading = true;
+	}
+	
+	synchronized void unlockForLoad() {
+		assert loading;
+		loading = false;
+		notifyAll();
+	}
+	
+}


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java	2011-10-22 11:40:30 UTC (rev 3576)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java	2011-10-24 17:25:36 UTC (rev 3577)
@@ -49,7 +49,14 @@
         sm.initialize();
         return sm;
 	}
-    
+
+    @Test public void testInitialRead() throws Exception {
+        FileStorageManager sm = getStorageManager(null, null);        
+        String tsID = "0";     //$NON-NLS-1$
+        FileStore store = sm.createFileStore(tsID);
+        assertEquals(-1, store.read(0, new byte[1], 0, 1));
+    }
+	
     @Test public void testWrite() throws Exception {
         FileStorageManager sm = getStorageManager(null, null);        
         String tsID = "0";     //$NON-NLS-1$



More information about the teiid-commits mailing list