Author: shawkins
Date: 2011-10-24 13:25:36 -0400 (Mon, 24 Oct 2011)
New Revision: 3577
Added:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
Log:
TEIID-1750 adding better file locking and refining defrag
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java 2011-10-22
11:40:30 UTC (rev 3576)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java 2011-10-24
17:25:36 UTC (rev 3577)
@@ -29,13 +29,29 @@
ExtensibleBufferedOutputStream {
private final BlockManager blockManager;
int blockNum = -1;
+ private final int maxBlocks;
+ private final boolean allocate;
+
+ static final IOException exceededMax = new IOException();
- BlockOutputStream(BlockManager blockManager) {
+ /**
+ * @param blockManager
+ * @param maxBlocks a max of -1 indicates use existing blocks
+ */
+ BlockOutputStream(BlockManager blockManager, int maxBlocks) {
this.blockManager = blockManager;
+ this.allocate = maxBlocks != -1;
+ this.maxBlocks = maxBlocks - 2; //convert to an index
}
-
+
@Override
- protected ByteBuffer newBuffer() {
+ protected ByteBuffer newBuffer() throws IOException {
+ if (!allocate) {
+ return blockManager.getBlock(++blockNum);
+ }
+ if (blockNum > maxBlocks) {
+ throw exceededMax;
+ }
return blockManager.allocateBlock(++blockNum);
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java 2011-10-22
11:40:30 UTC (rev 3576)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java 2011-10-24
17:25:36 UTC (rev 3577)
@@ -72,11 +72,11 @@
int block = getAndSetNextClearBit(info);
int segment = block/blocksInUse.getBitsPerSegment();
boolean success = false;
- //we're using the read lock here so that defrag can lock the write out
- locks[segment].readLock().lock();
+ this.locks[segment].writeLock().lock();
try {
FileStore fs = stores[segment];
long blockOffset = (block%blocksInUse.getBitsPerSegment())*blockSize;
+ //TODO: there is still an extra buffer being created here, we could FileChannels to do
better
byte[] b = new byte[BufferFrontedFileStoreCache.BLOCK_SIZE];
int read = 0;
while ((read = is.read(b, 0, b.length)) != -1) {
@@ -85,7 +85,7 @@
}
success = true;
} finally {
- locks[segment].readLock().unlock();
+ locks[segment].writeLock().unlock();
if (!success) {
blocksInUse.clear(block);
block = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-22
11:40:30 UTC (rev 3576)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-24
17:25:36 UTC (rev 3577)
@@ -41,10 +41,10 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.teiid.common.buffer.AutoCleanupUtil;
-import org.teiid.common.buffer.BaseCacheEntry;
import org.teiid.common.buffer.Cache;
import org.teiid.common.buffer.CacheEntry;
import org.teiid.common.buffer.CacheKey;
@@ -96,7 +96,7 @@
*/
public class BufferFrontedFileStoreCache implements Cache<PhysicalInfo>,
StorageManager {
- private static final int DEFAULT_MIN_DEFRAG = 1 << 23;
+ private static final int DEFAULT_MIN_DEFRAG = 1 << 26;
private static final byte[] HEADER_SKIP_BUFFER = new byte[16];
private static final int EVICTION_SCANS = 5;
@@ -154,8 +154,8 @@
}
private int getOrUpdateDataBlockIndex(int index, int value, Mode mode) {
- if (index >= MAX_DOUBLE_INDIRECT || (mode == Mode.ALLOCATE && index >=
maxMemoryBlocks)) {
- throw new TeiidRuntimeException("Max block number exceeded. Increase the
maxStorageObjectSize to support larger storage objects. Alternatively you could make the
processor batch size smaller."); //$NON-NLS-1$
+ if (index >= MAX_DOUBLE_INDIRECT) {
+ throw new TeiidRuntimeException("Max block number exceeded. You could try
making the processor batch size smaller."); //$NON-NLS-1$
}
int dataBlock = 0;
int position = 0;
@@ -372,25 +372,28 @@
continue;
}
try {
+ boolean sleep = false;
do {
- int blockToMove = blockStore.blocksInUse.compactHighestBitSet(segment);
+ if (sleep) {
+ Thread.sleep(100); //let the file activity quite down
+ }
+ sleep = true;
+ int relativeBlockToMove = blockStore.blocksInUse.compactHighestBitSet(segment);
if (!shouldDefrag(blockStore, segment)) {
- //truncate the file
- blockStore.locks[segment].writeLock().lock();
- try {
- int endBlock = blockStore.blocksInUse.getHighestBitSet(segment);
- long length = endBlock * blockStore.blockSize;
- blockStore.stores[segment].setLength(length);
- } finally {
- blockStore.locks[segment].writeLock().unlock();
- }
+ truncate(blockStore, segment);
break;
}
//move the block if possible
- InputStream is = blockStore.stores[segment].createInputStream(blockToMove *
blockStore.blockSize);
+ InputStream is = blockStore.stores[segment].createInputStream(relativeBlockToMove
* blockStore.blockSize, blockStore.blockSize);
DataInputStream dis = new DataInputStream(is);
- Long gid = dis.readLong();
- Long oid = dis.readLong();
+ Long gid = null;
+ Long oid = null;
+ try {
+ gid = dis.readLong();
+ oid = dis.readLong();
+ } catch (IOException e) {
+ continue; //can happen the bit was set and no data exists
+ }
dis.reset(); //move back to the beginning
Map<Long, PhysicalInfo> map = physicalMapping.get(gid);
if (map == null) {
@@ -400,16 +403,20 @@
if (info == null) {
continue;
}
+ int bitIndex = relativeBlockToMove + (segment *
blockStore.blocksInUse.getBitsPerSegment());
synchronized (info) {
- await(info, true, false);
+ info.await(true, false);
if (info.block == EMPTY_ADDRESS) {
continue;
}
- assert info.block == blockToMove;
+ if (info.block != bitIndex) {
+ //we've marked a bit in use, but haven't yet written new data
+ continue;
+ }
}
int newBlock = blockStore.writeToStorageBlock(info, dis);
synchronized (info) {
- await(info, true, true);
+ info.await(true, true);
if (info.block == EMPTY_ADDRESS) {
//already removed;
if (newBlock != EMPTY_ADDRESS) {
@@ -418,11 +425,14 @@
continue;
}
info.block = newBlock;
- blockStore.blocksInUse.clear(blockToMove);
+ blockStore.blocksInUse.clear(bitIndex);
}
+ sleep = false;
} while (shouldDefrag(blockStore, segment));
} catch (IOException e) {
LogManager.logWarning(LogConstants.CTX_BUFFER_MGR, e, "Error performing
defrag"); //$NON-NLS-1$
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
}
}
}
@@ -430,6 +440,20 @@
defragRunning.set(false);
}
}
+
+ private void truncate(BlockStore blockStore, int segment) {
+ //truncate the file
+ blockStore.locks[segment].writeLock().lock();
+ try {
+ int endBlock = blockStore.blocksInUse.getHighestBitSet(segment);
+ long newLength = (endBlock + 1) * blockStore.blockSize;
+ blockStore.stores[segment].setLength(newLength);
+ } catch (IOException e) {
+ LogManager.logWarning(LogConstants.CTX_BUFFER_MGR, e, "Error performing defrag
truncate"); //$NON-NLS-1$
+ } finally {
+ blockStore.locks[segment].writeLock().unlock();
+ }
+ }
};
private AtomicBoolean cleanerRunning = new AtomicBoolean();
private final Runnable cleaningTask = new Runnable() {
@@ -465,8 +489,8 @@
this.blockByteBuffer = new BlockByteBuffer(30, blocks, LOG_BLOCK_SIZE, direct);
//ensure that we'll run out of blocks first
this.inodeByteBuffer = new BlockByteBuffer(30, blocks+1, LOG_INODE_SIZE, direct);
- memoryWritePermits = new Semaphore(Math.max(1,
(int)Math.min((((long)blocks)<<LOG_BLOCK_SIZE)/maxStorageObjectSize,
Integer.MAX_VALUE)));
- maxMemoryBlocks = Math.min(MAX_DOUBLE_INDIRECT,
maxStorageObjectSize>>LOG_BLOCK_SIZE);
+ memoryWritePermits = new Semaphore(blocks);
+ maxMemoryBlocks = Math.min(MAX_DOUBLE_INDIRECT, blocks);
//try to maintain enough freespace so that writers don't block in cleaning
cleaningThreshold = Math.min(maxMemoryBlocks<<4, blocks>>1);
criticalCleaningThreshold = Math.min(maxMemoryBlocks<<2, blocks>>2);
@@ -480,9 +504,13 @@
}
List<BlockStore> stores = new ArrayList<BlockStore>();
int size = BLOCK_SIZE;
+ int files = 32; //this allows us to have 64 terabytes of smaller block sizes
do {
- stores.add(new BlockStore(this.storageManager, size, 30,
BufferManagerImpl.CONCURRENCY_LEVEL));
+ stores.add(new BlockStore(this.storageManager, size, 30, files));
size <<=1;
+ if (files > 1) {
+ files >>= 1;
+ }
} while ((size>>1) < maxStorageObjectSize);
this.sizeBasedStores = stores.toArray(new BlockStore[stores.size()]);
}
@@ -507,6 +535,7 @@
boolean hasPermit = false;
PhysicalInfo info = null;
boolean success = false;
+ int memoryBlocks = this.maxMemoryBlocks;
try {
Map<Long, PhysicalInfo> map = physicalMapping.get(s.getId());
if (map == null) {
@@ -538,26 +567,14 @@
}
info.adding = true;
//second chance re-add to the cache, we assume that serialization would be faster
than a disk read
+ memoryBlocks = info.memoryBlockCount;
}
}
- //proactively create freespace
- if (!cleanerRunning.get()) {
- if (lowBlocks(false) && cleanerRunning.compareAndSet(false, true)) {
- LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Starting memory buffer
cleaner"); //$NON-NLS-1$
- asynchPool.execute(cleaningTask);
- if (lowBlocks(true)) {
- //do a non-blocking removal before we're forced to block
- evictFromMemoryBuffer(false);
- }
- }
- } else if (lowBlocks(true)) {
- //do a non-blocking removal before we're forced to block
- evictFromMemoryBuffer(false);
- }
- memoryWritePermits.acquire();
+ checkForLowMemory();
+ memoryWritePermits.acquire(memoryBlocks);
hasPermit = true;
blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
- BlockOutputStream bos = new BlockOutputStream(blockManager);
+ BlockOutputStream bos = new BlockOutputStream(blockManager, memoryBlocks);
ObjectOutput dos = new DataObjectOutputStream(bos);
dos.writeLong(s.getId());
dos.writeLong(entry.getId());
@@ -577,15 +594,17 @@
}
}
} catch (Throwable e) {
- if (e == PhysicalInfo.sizeChanged) {
+ if ((e == BlockOutputStream.exceededMax && newEntry) || e ==
PhysicalInfo.sizeChanged) {
//entries are mutable after adding, the original should be removed shortly so just
ignore
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Object "+ entry.getId()
+" changed size since first persistence, keeping the original."); //$NON-NLS-1$
//$NON-NLS-2$
+ } else if (e == BlockOutputStream.exceededMax){
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, "Max block number exceeded.
Increase the maxStorageObjectSize to support larger storage objects. Alternatively you
could make the processor batch size smaller."); //$NON-NLS-1$
} else {
LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting, attempts
to read "+ entry.getId() +" later will result in an exception.");
//$NON-NLS-1$ //$NON-NLS-2$
}
} finally {
if (hasPermit) {
- memoryWritePermits.release();
+ memoryWritePermits.release(memoryBlocks);
}
if (info != null) {
synchronized (info) {
@@ -602,6 +621,23 @@
}
return true;
}
+
+ private void checkForLowMemory() {
+ //proactively create freespace
+ if (!cleanerRunning.get()) {
+ if (lowBlocks(false) && cleanerRunning.compareAndSet(false, true)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Starting memory buffer
cleaner"); //$NON-NLS-1$
+ asynchPool.execute(cleaningTask);
+ if (lowBlocks(true)) {
+ //do a non-blocking removal before we're forced to block
+ evictFromMemoryBuffer(false);
+ }
+ }
+ } else if (lowBlocks(true)) {
+ //do a non-blocking removal before we're forced to block
+ evictFromMemoryBuffer(false);
+ }
+ }
@Override
public PhysicalInfo lockForLoad(Long oid, Serializer<?> serializer) {
@@ -613,16 +649,7 @@
if (info == null) {
return null;
}
- synchronized (info) {
- while (info.loading) {
- try {
- info.wait();
- } catch (InterruptedException e) {
- throw new TeiidRuntimeException(e);
- }
- }
- info.loading = true;
- }
+ info.lockForLoad();
return info;
}
@@ -631,11 +658,7 @@
if (info == null) {
return;
}
- synchronized (info) {
- assert info.loading;
- info.loading = false;
- info.notifyAll();
- }
+ info.unlockForLoad();
}
@Override
@@ -650,10 +673,12 @@
return null;
}
InputStream is = null;
+ Lock lock = null;
+ int memoryBlocks = 0;
try {
synchronized (info) {
assert !info.pinned && info.loading; //load should be locked
- await(info, true, false); //not necessary, but should make things safer
+ info.await(true, false); //not necessary, but should make things safer
if (info.inode != EMPTY_ADDRESS) {
info.pinned = true;
memoryBufferEntries.touch(info);
@@ -670,13 +695,19 @@
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Getting object at
block", info.block, info.sizeIndex, serializer.getId(), oid); //$NON-NLS-1$
}
BlockStore blockStore = sizeBasedStores[info.sizeIndex];
- FileStore fs =
blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
+ int segment = info.block/blockStore.blocksInUse.getBitsPerSegment();
+ FileStore fs = blockStore.stores[segment];
long blockOffset =
(info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
is = fs.createInputStream(blockOffset,
info.memoryBlockCount<<LOG_BLOCK_SIZE);
+ lock = blockStore.locks[segment].writeLock();
+ memoryBlocks = info.memoryBlockCount;
} else {
return null;
}
}
+ if (lock != null) {
+ is = readIntoMemory(info, is, lock, memoryBlocks);
+ }
ObjectInput dis = new DataObjectInputStream(is);
dis.readFully(HEADER_SKIP_BUFFER);
int sizeEstimate = dis.readInt();
@@ -686,7 +717,9 @@
throw new TeiidComponentException(e,
QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid));
//$NON-NLS-1$
} catch (ClassNotFoundException e) {
throw new TeiidComponentException(e,
QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid));
//$NON-NLS-1$
- } finally {
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
+ } finally {
synchronized (info) {
info.pinned = false;
info.notifyAll();
@@ -695,6 +728,56 @@
}
/**
+ * Transfer into memory to release memory/file locks
+ */
+ private InputStream readIntoMemory(PhysicalInfo info, InputStream is,
+ Lock fileLock, int memoryBlocks) throws InterruptedException,
+ IOException {
+ checkForLowMemory();
+ this.memoryWritePermits.acquire(memoryBlocks);
+ BlockManager manager = null;
+ boolean success = false;
+ boolean locked = false;
+ try {
+ manager = getBlockManager(info.gid, info.getId(), EMPTY_ADDRESS);
+ //preallocate the memory area, to ensure we won't exhaust memory while holding
+ //the file lock
+ for (int i = 0; i < memoryBlocks; i++) {
+ manager.allocateBlock(i);
+ }
+
+ fileLock.lock();
+ locked = true;
+ ExtensibleBufferedOutputStream os = new BlockOutputStream(manager, -1);
+ //TODO: there is still an extra buffer being created here, we could FileChannels to do
better
+ int b = -1;
+ while ((b = is.read()) != -1) {
+ os.write(b);
+ }
+ fileLock.unlock();
+ locked = false;
+ synchronized (info) {
+ info.inode = manager.getInode();
+ memoryBufferEntries.add(info);
+ is = new BlockInputStream(manager, info.memoryBlockCount);
+ }
+ success = true;
+ } finally {
+ try {
+ if (locked) {
+ fileLock.unlock();
+ }
+ if (!success && manager != null) {
+ manager.free(false);
+ }
+ } finally {
+ this.memoryWritePermits.release(memoryBlocks);
+ }
+ }
+ return is;
+ }
+
+ /**
* Determine if an object should be in the memory buffer.
* Adds are indicated by a current time of 0.
* @param currentTime
@@ -778,7 +861,7 @@
//let any pending finish - it would be nice if we could pre-empt
//since we can save some work, but this should be rare enough
//to just block
- await(info, true, true);
+ info.await(true, true);
info.evicting = true;
} else {
assert info.evicting;
@@ -809,7 +892,7 @@
//it is possible for a read to happen while evicting.
//that's ok, we'll just wait for it to finish
assert info.evicting;
- await(info, true, false);
+ info.await(true, false);
info.evicting = false;
info.notifyAll();
assert bm == null || info.inode != EMPTY_ADDRESS;
@@ -833,7 +916,7 @@
if (!defragRunning.get()
&& shouldDefrag(blockStore, segment)
&& defragRunning.compareAndSet(false, true)) {
- this.asynchPool.execute(defragTask);
+ this.asynchPool.execute(defragTask);
}
info.block = EMPTY_ADDRESS;
}
@@ -849,21 +932,14 @@
boolean shouldDefrag(BlockStore blockStore, int segment) {
int highestBitSet = blockStore.blocksInUse.getHighestBitSet(segment);
int bitsSet = blockStore.blocksInUse.getBitsSet(segment);
- highestBitSet = Math.max(bitsSet, Math.max(1, highestBitSet));
+ highestBitSet = Math.max(bitsSet, Math.max(0, highestBitSet));
+ if (highestBitSet == 0) {
+ return false;
+ }
int freeBlocks = highestBitSet-bitsSet;
return freeBlocks > (highestBitSet>>2) &&
freeBlocks*blockStore.blockSize > minDefrag;
}
- private void await(PhysicalInfo info, boolean pinned, boolean evicting) {
- while ((pinned && info.pinned) || (evicting && info.evicting)) {
- try {
- info.wait();
- } catch (InterruptedException e) {
- throw new TeiidRuntimeException(e);
- }
- }
- }
-
/**
* Eviction routine. When space is exhausted data blocks are acquired from
* memory entries.
@@ -896,7 +972,7 @@
writeLocked = true;
}
//wait for the read/eviction to be over
- await(info, true, true);
+ info.await(true, true);
if (info.inode == EMPTY_ADDRESS) {
continue;
}
@@ -962,49 +1038,4 @@
this.minDefrag = minDefrag;
}
-}
-
-/**
- * Represents the memory buffer and storage state of an object.
- * It is important to minimize the amount of data held here.
- * Currently should be 48 bytes.
- */
-final class PhysicalInfo extends BaseCacheEntry {
-
- static final Exception sizeChanged = new Exception();
-
- final Long gid;
- //the memory inode and block count
- int inode = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
- int memoryBlockCount;
- //the storage block and BlockStore index
- int block = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
- byte sizeIndex = 0;
- //state flags
- boolean pinned; //indicates that the entry is being read
- boolean evicting; //indicates that the entry will be moved out of the memory buffer
- boolean loading; //used by tier 1 cache to prevent double loads
- boolean adding; //used to prevent double adds
-
- public PhysicalInfo(Long gid, Long id, int inode, int lastAccess) {
- super(new CacheKey(id, lastAccess, 0));
- this.inode = inode;
- this.gid = gid;
- }
-
- public void setSize(int size) throws Exception {
- int newMemoryBlockCount = (size>>BufferFrontedFileStoreCache.LOG_BLOCK_SIZE) +
((size&BufferFrontedFileStoreCache.BLOCK_MASK)>0?1:0);
- if (this.memoryBlockCount != 0) {
- if (newMemoryBlockCount != memoryBlockCount) {
- throw sizeChanged;
- }
- return; //no changes
- }
- this.memoryBlockCount = newMemoryBlockCount;
- while (newMemoryBlockCount > 1) {
- this.sizeIndex++;
- newMemoryBlockCount = (newMemoryBlockCount>>1) +
((newMemoryBlockCount&0x01)==0?0:1);
- }
- }
-
-}
+}
\ No newline at end of file
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-10-22
11:40:30 UTC (rev 3576)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-10-24
17:25:36 UTC (rev 3577)
@@ -74,7 +74,7 @@
buf = null;
}
- protected abstract ByteBuffer newBuffer();
+ protected abstract ByteBuffer newBuffer() throws IOException;
/**
* Flush up to i bytes where i is the current position of the buffer
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-10-22
11:40:30 UTC (rev 3576)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-10-24
17:25:36 UTC (rev 3577)
@@ -114,6 +114,9 @@
protected synchronized int readWrite(long fileOffset, byte[] b, int offSet,
int length, boolean write) throws IOException {
if (!write) {
+ if (fileInfo == null) {
+ return -1;
+ }
try {
RandomAccessFile fileAccess = fileInfo.open();
fileAccess.seek(fileOffset);
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2011-10-22
11:40:30 UTC (rev 3576)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2011-10-24
17:25:36 UTC (rev 3577)
@@ -54,7 +54,7 @@
public LrfuEvictionQueue(AtomicLong clock) {
this.clock = clock;
- setCrfLamda(.00005); //smaller values tend to work better since we're using
interval bounds
+ setCrfLamda(.1); //smaller values tend to work better since we're using interval
bounds
}
public boolean remove(V value) {
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java
(rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java 2011-10-24
17:25:36 UTC (rev 3577)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import org.teiid.common.buffer.BaseCacheEntry;
+import org.teiid.common.buffer.CacheKey;
+import org.teiid.core.TeiidRuntimeException;
+
+/**
+ * Represents the memory buffer and storage state of an object.
+ * It is important to minimize the amount of data held here.
+ * Currently should be 48 bytes.
+ */
+final class PhysicalInfo extends BaseCacheEntry {
+
+ static final Exception sizeChanged = new Exception();
+
+ final Long gid;
+ //the memory inode and block count
+ int inode = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
+ int memoryBlockCount;
+ //the storage block and BlockStore index
+ int block = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
+ byte sizeIndex = 0;
+ //state flags
+ boolean pinned; //indicates that the entry is being read
+ boolean evicting; //indicates that the entry will be moved out of the memory buffer
+ boolean loading; //used by tier 1 cache to prevent double loads
+ boolean adding; //used to prevent double adds
+
+ PhysicalInfo(Long gid, Long id, int inode, int lastAccess) {
+ super(new CacheKey(id, lastAccess, 0));
+ this.inode = inode;
+ this.gid = gid;
+ }
+
+ void setSize(int size) throws Exception {
+ int newMemoryBlockCount = (size>>BufferFrontedFileStoreCache.LOG_BLOCK_SIZE) +
((size&BufferFrontedFileStoreCache.BLOCK_MASK)>0?1:0);
+ if (this.memoryBlockCount != 0) {
+ if (newMemoryBlockCount != memoryBlockCount) {
+ throw sizeChanged;
+ }
+ return; //no changes
+ }
+ this.memoryBlockCount = newMemoryBlockCount;
+ while (newMemoryBlockCount > 1) {
+ this.sizeIndex++;
+ newMemoryBlockCount = (newMemoryBlockCount>>1) +
((newMemoryBlockCount&0x01)==0?0:1);
+ }
+ }
+
+ void await(boolean donePinning, boolean doneEvicting) {
+ while ((donePinning && pinned) || (doneEvicting && evicting)) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+ }
+
+ synchronized void lockForLoad() {
+ while (loading) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+ loading = true;
+ }
+
+ synchronized void unlockForLoad() {
+ assert loading;
+ loading = false;
+ notifyAll();
+ }
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java 2011-10-22
11:40:30 UTC (rev 3576)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java 2011-10-24
17:25:36 UTC (rev 3577)
@@ -49,7 +49,14 @@
sm.initialize();
return sm;
}
-
+
+ @Test public void testInitialRead() throws Exception {
+ FileStorageManager sm = getStorageManager(null, null);
+ String tsID = "0"; //$NON-NLS-1$
+ FileStore store = sm.createFileStore(tsID);
+ assertEquals(-1, store.read(0, new byte[1], 0, 1));
+ }
+
@Test public void testWrite() throws Exception {
FileStorageManager sm = getStorageManager(null, null);
String tsID = "0"; //$NON-NLS-1$