[teiid-commits] teiid SVN: r3584 - in trunk: engine/src/main/java/org/teiid/common/buffer/impl and 4 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Tue Oct 25 17:41:08 EDT 2011
Author: shawkins
Date: 2011-10-25 17:41:08 -0400 (Tue, 25 Oct 2011)
New Revision: 3584
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestLrfuEvictionQueue.java
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java
trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
Log:
TEIID-1750 switching back to using 64 bit CacheKey info and switching buffermanager to internally account in bytes rather than KB and to track batch overhead
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java 2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java 2011-10-25 21:41:08 UTC (rev 3584)
@@ -88,11 +88,11 @@
throws TeiidComponentException;
/**
- * Return the maximum KB that can be temporarily held potentially
+ * Return the max that can be temporarily held potentially
* across even a blocked exception.
* @return
*/
- int getMaxProcessingKB();
+ int getMaxProcessingSize();
/**
* Creates a new {@link FileStore}. See {@link FileStore#setCleanupReference(Object)} to
@@ -117,7 +117,7 @@
void releaseBuffers(int count);
/**
- * Get the size estimate in KB for the given schema.
+ * Get the size estimate for the given schema.
*/
int getSchemaSize(List<? extends Expression> elements);
@@ -128,7 +128,7 @@
TupleBuffer getTupleBuffer(String id);
/**
- * Set the maxActivePlans as a hint at determining the maxProcessingKB
+ * Set the maxActivePlans as a hint at determining the maxProcessing
* @param maxActivePlans
*/
void setMaxActivePlans(int maxActivePlans);
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java 2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java 2011-10-25 21:41:08 UTC (rev 3584)
@@ -90,6 +90,6 @@
* @param gid
* @param id
*/
- void remove(Long gid, Long id);
+ boolean remove(Long gid, Long id);
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java 2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java 2011-10-25 21:41:08 UTC (rev 3584)
@@ -25,10 +25,10 @@
public class CacheKey implements Comparable<CacheKey> {
private Long id;
- protected int lastAccess;
- protected float orderingValue;
+ protected long lastAccess;
+ protected double orderingValue;
- public CacheKey(Long id, int lastAccess, float orderingValue) {
+ public CacheKey(Long id, long lastAccess, double orderingValue) {
this.id = id;
this.lastAccess = lastAccess;
this.orderingValue = orderingValue;
@@ -59,11 +59,11 @@
return this.id.equals(((CacheKey)obj).getId());
}
- public int getLastAccess() {
+ public long getLastAccess() {
return lastAccess;
}
- public float getOrderingValue() {
+ public double getOrderingValue() {
return orderingValue;
}
@@ -71,7 +71,7 @@
public int compareTo(CacheKey o) {
int result = (int) Math.signum(orderingValue - o.orderingValue);
if (result == 0) {
- result = (int)Math.signum((lastAccess&0xffffffffl) - (o.lastAccess&0xffffffffl));
+ result = (int)Math.signum(lastAccess - o.lastAccess);
if (result == 0) {
return Long.signum(id - o.id);
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-25 21:41:08 UTC (rev 3584)
@@ -109,6 +109,7 @@
static final int LOG_INODE_SIZE = 6;
static final int DIRECT_POINTERS = 14;
static final int EMPTY_ADDRESS = -1;
+ static final int FREED = -2;
//TODO allow the block size to be configurable. 8k is a reasonable default up to a gig, but we could be more efficient with larger blocks from there.
//the rationale for a smaller block size is to reduce internal fragmentation, which is critical when maintaining a relatively small buffer < 256MB
@@ -290,16 +291,16 @@
}
inodesInuse.clear(inode);
if (!freedAll || indirectIndexBlock == EMPTY_ADDRESS) {
- return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
+ return acquire?dataBlockToAcquire:FREED;
}
freedAll = freeIndirectBlock(indirectIndexBlock);
if (!freedAll || doublyIndirectIndexBlock == EMPTY_ADDRESS) {
- return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
+ return acquire?dataBlockToAcquire:FREED;
}
bb = blockByteBuffer.getByteBuffer(doublyIndirectIndexBlock).slice();
freeBlock(0, bb, ADDRESSES_PER_BLOCK, false);
freeDataBlock(doublyIndirectIndexBlock);
- return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
+ return acquire?dataBlockToAcquire:FREED;
}
private boolean freeIndirectBlock(int indirectIndexBlock) {
@@ -787,7 +788,7 @@
private boolean shouldPlaceInMemoryBuffer(long currentTime, PhysicalInfo info) {
PhysicalInfo lowest = memoryBufferEntries.firstEntry(false);
CacheKey key = info.getKey();
- return (blocksInuse.getTotalBits() - blocksInuse.getBitsSet()) > (criticalCleaningThreshold + info.memoryBlockCount)
+ return (blocksInuse.getTotalBits() - blocksInuse.getBitsSet()) > (cleaningThreshold + info.memoryBlockCount)
|| (lowest != null && lowest.block != EMPTY_ADDRESS
&& lowest.getKey().getOrderingValue() < (currentTime>0?memoryBufferEntries.computeNextOrderingValue(currentTime, key.getLastAccess(), key.getOrderingValue()):key.getOrderingValue()));
}
@@ -816,13 +817,20 @@
}
@Override
- public void remove(Long gid, Long id) {
+ public boolean remove(Long gid, Long id) {
Map<Long, PhysicalInfo> map = physicalMapping.get(gid);
if (map == null) {
- return;
+ return false;
}
- PhysicalInfo info = map.remove(id);
+ PhysicalInfo info = null;
+ boolean result = false;
+ synchronized (map) {
+ int size = map.size();
+ info = map.remove(id);
+ result = size != map.size();
+ }
free(info, false, false);
+ return result;
}
@Override
@@ -850,7 +858,7 @@
return EMPTY_ADDRESS;
}
Long oid = info.getId();
- int result = EMPTY_ADDRESS;
+ int result = FREED;
BlockManager bm = null;
int block = EMPTY_ADDRESS;
int memoryBlockCount;
@@ -948,7 +956,7 @@
*/
int evictFromMemoryBuffer(boolean acquire) {
boolean writeLocked = false;
- int next = -1;
+ int next = EMPTY_ADDRESS;
try {
for (int i = 0; i < EVICTION_SCANS && next == EMPTY_ADDRESS; i++) {
//doing a cleanup may trigger the purging of resources
@@ -981,13 +989,10 @@
info.evicting = true;
}
next = free(info, true, acquire);
- if (!acquire) {
- next = 0; //let the cleaner know that we made progress
- }
break;
}
}
- if (acquire && next == -1) {
+ if (acquire && next == EMPTY_ADDRESS) {
throw new AssertionError("Could not free space for pending write"); //$NON-NLS-1$
}
} finally {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-25 21:41:08 UTC (rev 3584)
@@ -91,6 +91,7 @@
* little is reserved.
*/
private static final class Cleaner extends TimerTask {
+ private static final int MAX_READ_AGE = 1<<28;
WeakReference<BufferManagerImpl> bufferRef;
public Cleaner(BufferManagerImpl bufferManagerImpl) {
@@ -106,25 +107,22 @@
return;
}
boolean agingOut = false;
- if (impl.reserveBatchKB.get() < impl.maxReserveKB.get()*.9 || impl.activeBatchKB.get() < impl.maxReserveKB.get()*.7) {
+ if (impl.reserveBatchBytes.get() < impl.maxReserveBytes.get()*.9 || impl.activeBatchBytes.get() < impl.maxReserveBytes.get()*.7) {
CacheEntry entry = impl.evictionQueue.firstEntry(false);
if (entry == null) {
return;
}
//we aren't holding too many memory entries, ensure that
//entries aren't old
- int lastAccess = 0x1fffffff&entry.getKey().getLastAccess();
- int currentTime = 0x1fffffff&(int)impl.readAttempts.get();
- if (lastAccess > currentTime) {
- currentTime += 1<<29;
- }
- if (currentTime - lastAccess < 1<<28) {
+ long lastAccess = entry.getKey().getLastAccess();
+ long currentTime = impl.readAttempts.get();
+ if (currentTime - lastAccess < MAX_READ_AGE) {
return;
}
agingOut = true;
}
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Asynch eviction run", impl.reserveBatchKB.get(), impl.maxReserveKB.get(), impl.activeBatchKB.get()); //$NON-NLS-1$
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Asynch eviction run", impl.reserveBatchBytes.get(), impl.maxReserveBytes.get(), impl.activeBatchBytes.get()); //$NON-NLS-1$
}
impl.doEvictions(0, false);
if (!agingOut) {
@@ -138,6 +136,11 @@
}
}
+ /**
+ * This estimate is based upon adding the value to 2/3 maps and having CacheEntry/PhysicalInfo keys
+ */
+ private static final int BATCH_OVERHEAD = 128;
+
final class BatchManagerImpl implements BatchManager, Serializer<List<? extends List<?>>> {
final Long id;
SizeUtility sizeUtility;
@@ -197,6 +200,9 @@
throws TeiidComponentException {
int sizeEstimate = getSizeEstimate(batch);
Long oid = batchAdded.getAndIncrement();
+ if (oid.longValue() == 56) {
+ this.toString();
+ }
CacheEntry old = null;
if (previous != null) {
if (removeOld) {
@@ -210,6 +216,8 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", ce.getId(), "with size estimate", ce.getSizeEstimate()); //$NON-NLS-1$ //$NON-NLS-2$
}
+ maxReserveBytes.addAndGet(-BATCH_OVERHEAD);
+ reserveBatchBytes.addAndGet(-BATCH_OVERHEAD);
cache.addToCacheGroup(id, ce.getId());
addMemoryEntry(ce, true);
return oid;
@@ -252,7 +260,7 @@
}
public int getSizeEstimate(List<? extends List<?>> obj) {
- return (int) Math.max(1, sizeUtility.getBatchSize(DataTypeManager.isValueCacheEnabled(), obj) / 1024);
+ return (int) Math.max(1, sizeUtility.getBatchSize(DataTypeManager.isValueCacheEnabled(), obj));
}
@SuppressWarnings("unchecked")
@@ -284,9 +292,8 @@
throw new AssertionError("Batch not found in storage " + batch); //$NON-NLS-1$
}
if (!retain) {
- cache.remove(this.id, batch);
- }
- if (retain) {
+ removeFromCache(this.id, batch);
+ } else {
addMemoryEntry(ce, false);
}
} finally {
@@ -294,10 +301,9 @@
}
return (List<List<?>>)ce.getObject();
}
-
+
@Override
public void remove(Long batch) {
- cleanSoftReferences();
BufferManagerImpl.this.remove(id, batch, prefersMemory.get());
}
@@ -333,10 +339,10 @@
private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
private int processorBatchSize = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE;
//set to acceptable defaults for testing
- private int maxProcessingKB = 1 << 11;
- private Integer maxProcessingKBOrig;
- AtomicInteger maxReserveKB = new AtomicInteger(1 << 18);
- AtomicInteger reserveBatchKB = new AtomicInteger();
+ private int maxProcessingBytes = 1 << 21;
+ private Integer maxProcessingBytesOrig;
+ AtomicLong maxReserveBytes = new AtomicLong(1 << 28);
+ AtomicLong reserveBatchBytes = new AtomicLong();
private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a hint to set the reserveBatchKB
private boolean useWeakReferences = true;
private boolean inlineLobs = true;
@@ -346,7 +352,7 @@
private ReentrantLock lock = new ReentrantLock(true);
private Condition batchesFreed = lock.newCondition();
- AtomicInteger activeBatchKB = new AtomicInteger();
+ AtomicLong activeBatchBytes = new AtomicLong();
private AtomicLong readAttempts = new AtomicLong();
//TODO: consider the size estimate in the weighting function
@@ -394,13 +400,20 @@
void clearSoftReference(BatchSoftReference bsr) {
synchronized (bsr) {
- maxReserveKB.addAndGet(bsr.sizeEstimate);
- reserveBatchKB.addAndGet(bsr.sizeEstimate);
+ maxReserveBytes.addAndGet(bsr.sizeEstimate);
+ reserveBatchBytes.addAndGet(bsr.sizeEstimate);
bsr.sizeEstimate = 0;
}
bsr.clear();
}
+ void removeFromCache(Long gid, Long batch) {
+ if (cache.remove(gid, batch)) {
+ maxReserveBytes.addAndGet(BATCH_OVERHEAD);
+ reserveBatchBytes.addAndGet(BATCH_OVERHEAD);
+ }
+ }
+
public long getBatchesAdded() {
return batchAdded.get();
}
@@ -418,12 +431,12 @@
}
@Override
- public int getMaxProcessingKB() {
- return maxProcessingKB;
+ public int getMaxProcessingSize() {
+ return maxProcessingBytes;
}
- public int getReserveBatchKB() {
- return reserveBatchKB.get();
+ public long getReserveBatchBytes() {
+ return reserveBatchBytes.get();
}
/**
@@ -541,41 +554,48 @@
}
public void setMaxProcessingKB(int maxProcessingKB) {
- this.maxProcessingKB = maxProcessingKB;
+ if (maxProcessingKB > -1) {
+ this.maxProcessingBytes = maxProcessingKB<<10;
+ } else {
+ this.maxProcessingBytes = -1;
+ }
}
public void setMaxReserveKB(int maxReserveBatchKB) {
- this.maxReserveKB.set(maxReserveBatchKB);
if (maxReserveBatchKB > -1) {
- this.reserveBatchKB.set(maxReserveBatchKB);
+ int maxReserve = maxReserveBatchKB<<10;
+ this.maxReserveBytes.set(maxReserve);
+ this.reserveBatchBytes.set(maxReserve);
+ } else {
+ this.maxReserveBytes.set(-1);
}
}
@Override
public void initialize() throws TeiidComponentException {
- int maxMemory = (int)Math.min(Runtime.getRuntime().maxMemory() / 1024, Integer.MAX_VALUE);
- maxMemory = Math.max(0, maxMemory - 300 * 1024); //assume 300 megs of overhead for the AS/system stuff
+ long maxMemory = Runtime.getRuntime().maxMemory();
+ maxMemory = Math.max(0, maxMemory - (300 << 20)); //assume 300 megs of overhead for the AS/system stuff
if (getMaxReserveKB() < 0) {
- this.setMaxReserveKB(0);
- int one_gig = 1024 * 1024;
+ this.maxReserveBytes.set(0);
+ int one_gig = 1 << 30;
if (maxMemory > one_gig) {
//assume 75% of the memory over the first gig
- this.maxReserveKB.addAndGet(((int)Math.max(0, (maxMemory - one_gig) * .75)));
+ this.maxReserveBytes.addAndGet((long)Math.max(0, (maxMemory - one_gig) * .75));
}
- this.maxReserveKB.addAndGet(((int)Math.max(0, Math.min(one_gig, maxMemory) * .5)));
+ this.maxReserveBytes.addAndGet(Math.max(0, Math.min(one_gig, maxMemory) >> 1));
}
- this.reserveBatchKB.set(this.getMaxReserveKB());
- if (this.maxProcessingKBOrig == null) {
+ this.reserveBatchBytes.set(this.maxReserveBytes.get());
+ if (this.maxProcessingBytesOrig == null) {
//store the config value so that we can be reinitialized (this is not a clean approach)
- this.maxProcessingKBOrig = this.maxProcessingKB;
+ this.maxProcessingBytesOrig = this.maxProcessingBytes;
}
- if (this.maxProcessingKBOrig < 0) {
- this.maxProcessingKB = Math.max(Math.min(8 * processorBatchSize, Integer.MAX_VALUE), (int)(.1 * maxMemory)/maxActivePlans);
+ if (this.maxProcessingBytesOrig < 0) {
+ this.maxProcessingBytes = (int)Math.min(Math.max(processorBatchSize * targetBytesPerRow * 8l, (.1 * maxMemory)/maxActivePlans), Integer.MAX_VALUE);
}
//make a guess at the max number of batches
- int memoryBatches = maxMemory / (processorBatchSize * targetBytesPerRow / 1024);
+ long memoryBatches = maxMemory / (processorBatchSize * targetBytesPerRow);
//memoryBatches represents a full batch, so assume that most will be smaller
- int logSize = 35 - Integer.numberOfLeadingZeros(memoryBatches);
+ int logSize = 67 - Long.numberOfLeadingZeros(memoryBatches);
if (useWeakReferences) {
weakReferenceCache = new WeakReferenceHashedValueCache<CacheEntry>(Math.min(30, logSize));
}
@@ -593,13 +613,13 @@
}
if (lock.tryLock()) {
try {
- this.reserveBatchKB.addAndGet(count);
+ this.reserveBatchBytes.addAndGet(count);
batchesFreed.signalAll();
} finally {
lock.unlock();
}
} else {
- this.reserveBatchKB.addAndGet(count);
+ this.reserveBatchBytes.addAndGet(count);
}
}
@@ -616,14 +636,14 @@
//don't wait for more than is available
int waitCount = Math.min(additional, this.getMaxReserveKB() - reservedByThread.get());
int committed = 0;
- while (waitCount > 0 && waitCount > this.reserveBatchKB.get() && committed < additional) {
- int reserveBatchSample = this.reserveBatchKB.get();
+ while (waitCount > 0 && waitCount > this.reserveBatchBytes.get() && committed < additional) {
+ long reserveBatchSample = this.reserveBatchBytes.get();
try {
batchesFreed.await(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new TeiidRuntimeException(e);
}
- if (reserveBatchSample >= this.reserveBatchKB.get()) {
+ if (reserveBatchSample >= this.reserveBatchBytes.get()) {
waitCount >>= 3;
} else {
waitCount >>= 1;
@@ -645,7 +665,7 @@
}
int result = count;
if (mode == BufferReserveMode.FORCE) {
- this.reserveBatchKB.addAndGet(-count);
+ this.reserveBatchBytes.addAndGet(-count);
} else {
result = noWaitReserve(count, true);
}
@@ -656,28 +676,28 @@
private int noWaitReserve(int count, boolean allOrNothing) {
for (int i = 0; i < 2; i++) {
- int reserveBatch = this.reserveBatchKB.get();
+ long reserveBatch = this.reserveBatchBytes.get();
if (allOrNothing && count > reserveBatch) {
return 0;
}
- count = Math.min(count, Math.max(0, reserveBatch));
+ count = (int)Math.min(count, Math.max(0, reserveBatch));
if (count == 0) {
return 0;
}
- if (this.reserveBatchKB.compareAndSet(reserveBatch, reserveBatch - count)) {
+ if (this.reserveBatchBytes.compareAndSet(reserveBatch, reserveBatch - count)) {
return count;
}
}
//the value is changing rapidly, but we've already potentially adjusted the value twice, so just proceed
- this.reserveBatchKB.addAndGet(-count);
+ this.reserveBatchBytes.addAndGet(-count);
return count;
}
void persistBatchReferences() {
- int activeBatch = activeBatchKB.get();
- int reserveBatch = reserveBatchKB.get();
+ long activeBatch = activeBatchBytes.get();
+ long reserveBatch = reserveBatchBytes.get();
if (activeBatch <= reserveBatch) {
- int memoryCount = activeBatch + getMaxReserveKB() - reserveBatch;
+ long memoryCount = activeBatch + getMaxReserveKB() - reserveBatch;
if (DataTypeManager.isValueCacheEnabled()) {
if (memoryCount < getMaxReserveKB() / 8) {
DataTypeManager.setValueCacheEnabled(false);
@@ -687,13 +707,13 @@
}
return;
}
- int maxToFree = Math.max(maxProcessingKB>>1, reserveBatch>>3);
+ long maxToFree = Math.max(maxProcessingBytes>>1, reserveBatch>>3);
doEvictions(maxToFree, true);
}
- void doEvictions(int maxToFree, boolean checkActiveBatch) {
+ void doEvictions(long maxToFree, boolean checkActiveBatch) {
int freed = 0;
- while (freed <= maxToFree && (!checkActiveBatch || activeBatchKB.get() > reserveBatchKB.get() * .8)) {
+ while (freed <= maxToFree && (!checkActiveBatch || activeBatchBytes.get() > reserveBatchBytes.get() * .8)) {
CacheEntry ce = evictionQueue.firstEntry(true);
if (ce == null) {
break;
@@ -712,7 +732,7 @@
synchronized (ce) {
if (evicted && memoryEntries.remove(ce.getId()) != null) {
freed += ce.getSizeEstimate();
- activeBatchKB.addAndGet(-ce.getSizeEstimate());
+ activeBatchBytes.addAndGet(-ce.getSizeEstimate());
evictionQueue.remove(ce); //ensures that an intervening get will still be cleaned
}
}
@@ -748,14 +768,17 @@
}
private void createSoftReference(CacheEntry ce) {
- BatchSoftReference ref = new BatchSoftReference(ce, SOFT_QUEUE, ce.getSizeEstimate()/2);
+ //if we don't set aside some reserve, we
+ //will push the soft ref out of memory potentially too quickly
+ int sizeEstimate = ce.getSizeEstimate()/2;
+ BatchSoftReference ref = new BatchSoftReference(ce, SOFT_QUEUE, sizeEstimate);
softCache.put(ce.getId(), ref);
- maxReserveKB.addAndGet(- ce.getSizeEstimate()/2);
- reserveBatchKB.addAndGet(- ce.getSizeEstimate()/2);
+ maxReserveBytes.addAndGet(- sizeEstimate);
+ reserveBatchBytes.addAndGet(- sizeEstimate);
}
/**
- * Get a CacheEntry without hitting the cache
+ * Get a CacheEntry without hitting storage
*/
CacheEntry fastGet(Long batch, boolean prefersMemory, boolean retain) {
CacheEntry ce = null;
@@ -807,13 +830,16 @@
return null;
}
+ AtomicInteger removed = new AtomicInteger();
+
CacheEntry remove(Long gid, Long batch, boolean prefersMemory) {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Removing batch from BufferManager", batch); //$NON-NLS-1$
}
+ cleanSoftReferences();
CacheEntry ce = fastGet(batch, prefersMemory, false);
if (ce == null) {
- cache.remove(gid, batch);
+ removeFromCache(gid, batch);
} else {
ce.nullOut();
}
@@ -822,11 +848,11 @@
private void remove(CacheEntry ce, boolean inMemory) {
if (inMemory) {
- activeBatchKB.addAndGet(-ce.getSizeEstimate());
+ activeBatchBytes.addAndGet(-ce.getSizeEstimate());
}
Serializer<?> s = ce.getSerializer();
if (s != null) {
- cache.remove(s.getId(), ce.getId());
+ removeFromCache(s.getId(), ce.getId());
}
}
@@ -843,12 +869,15 @@
evictionQueue.touch(ce);
}
}
- activeBatchKB.getAndAdd(ce.getSizeEstimate());
+ activeBatchBytes.getAndAdd(ce.getSizeEstimate());
}
void removeCacheGroup(Long id, boolean prefersMemory) {
cleanSoftReferences();
Collection<Long> vals = cache.removeCacheGroup(id);
+ int overhead = vals.size() * BATCH_OVERHEAD;
+ maxReserveBytes.addAndGet(overhead);
+ reserveBatchBytes.addAndGet(overhead);
for (Long val : vals) {
//TODO: we will unnecessarily call remove on the cache, but that should be low cost
fastGet(val, prefersMemory, false);
@@ -905,7 +934,7 @@
}
rowCount = Math.max(1, rowCount);
total *= rowCount;
- return new int[]{rowCount, Math.max(1, total / 1024)};
+ return new int[]{rowCount, Math.max(1, total)};
}
@Override
@@ -959,17 +988,13 @@
}
public int getMaxReserveKB() {
- return maxReserveKB.get();
+ return (int)maxReserveBytes.get()>>10;
}
public void setCache(Cache cache) {
this.cache = cache;
}
- public int getActiveBatchKB() {
- return activeBatchKB.get();
- }
-
public int getMemoryCacheEntries() {
return memoryEntries.size();
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2011-10-25 21:41:08 UTC (rev 3584)
@@ -54,7 +54,7 @@
public LrfuEvictionQueue(AtomicLong clock) {
this.clock = clock;
- setCrfLamda(.1); //smaller values tend to work better since we're using interval bounds
+ setCrfLamda(.00005); //smaller values tend to work better since we're using interval bounds
}
public boolean remove(V value) {
@@ -93,27 +93,21 @@
*/
public void recordAccess(V value) {
CacheKey key = value.getKey();
- int lastAccess = key.getLastAccess();
+ long lastAccess = key.getLastAccess();
long currentClock = clock.get();
- float orderingValue = key.getOrderingValue();
+ double orderingValue = key.getOrderingValue();
orderingValue = computeNextOrderingValue(currentClock, lastAccess,
orderingValue);
value.setKey(new CacheKey(key.getId(), (int)currentClock, orderingValue));
}
- float computeNextOrderingValue(long currentTime,
- int lastAccess, float orderingValue) {
- long longLastAccess = lastAccess&0xffffffffl;
- currentTime &= 0xffffffffl;
- if (longLastAccess > currentTime) {
- currentTime += (1l<<32);
- }
- long delta = currentTime - longLastAccess;
+ double computeNextOrderingValue(long currentTime,
+ long lastAccess, double orderingValue) {
+ long delta = currentTime - lastAccess;
orderingValue =
- (float) (//Frequency component
(delta<maxInterval?(delta<minInterval?minVal:Math.pow(inverseCrfLamda, delta)):0)*orderingValue
//recency component
- + Math.pow(currentTime, crfLamda));
+ + Math.pow(currentTime, crfLamda);
return orderingValue;
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java 2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java 2011-10-25 21:41:08 UTC (rev 3584)
@@ -155,11 +155,12 @@
}
@Override
- public void remove(Long gid, Long id) {
+ public boolean remove(Long gid, Long id) {
Map<Long, CacheEntry> group = groups.get(gid);
if (group != null) {
- group.remove(id);
+ return group.remove(id) != null;
}
+ return false;
}
@Override
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java 2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java 2011-10-25 21:41:08 UTC (rev 3584)
@@ -29,7 +29,7 @@
/**
* Represents the memory buffer and storage state of an object.
* It is important to minimize the amount of data held here.
- * Currently should be 48 bytes.
+ * Currently should be 56 bytes.
*/
final class PhysicalInfo extends BaseCacheEntry {
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java 2011-10-25 21:41:08 UTC (rev 3584)
@@ -255,7 +255,7 @@
return false; //index is too large
}
int schemaSize = this.joinNode.getBufferManager().getSchemaSize(other.getSource().getOutputElements());
- int toReserve = this.joinNode.getBufferManager().getMaxProcessingKB();
+ int toReserve = this.joinNode.getBufferManager().getMaxProcessingSize();
//check if the other side can be sorted in memory
if (other.getRowCount() <= this.joinNode.getBatchSize()
|| (possibleIndex.getRowCount() > this.joinNode.getBatchSize() && other.getRowCount()/this.joinNode.getBatchSize() < toReserve/schemaSize)) {
@@ -265,7 +265,7 @@
int indexSchemaSize = this.joinNode.getBufferManager().getSchemaSize(possibleIndex.getSource().getOutputElements());
//approximate that 1/2 of the index will be memory resident
toReserve = (int)(indexSchemaSize * possibleIndex.getTupleBuffer().getRowCount() / (possibleIndex.getTupleBuffer().getBatchSize() * .5));
- if (toReserve < this.joinNode.getBufferManager().getMaxProcessingKB()) {
+ if (toReserve < this.joinNode.getBufferManager().getMaxProcessingSize()) {
useIndex = true;
} else if (possibleIndex.getTupleBuffer().getRowCount() / this.joinNode.getBatchSize() < preferMemCutoff) {
useIndex = true;
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2011-10-25 21:41:08 UTC (rev 3584)
@@ -233,7 +233,7 @@
//attempt to reserve more working memory if there are additional rows available before blocking
if (workingTuples.size() >= maxRows) {
int reserved = bufferManager.reserveBuffers(schemaSize,
- (totalReservedBuffers + schemaSize <= bufferManager.getMaxProcessingKB())?BufferReserveMode.FORCE:BufferReserveMode.NO_WAIT);
+ (totalReservedBuffers + schemaSize <= bufferManager.getMaxProcessingSize())?BufferReserveMode.FORCE:BufferReserveMode.NO_WAIT);
totalReservedBuffers += reserved;
if (reserved != schemaSize) {
break;
@@ -298,7 +298,7 @@
TupleBuffer merged = createTupleBuffer();
int desiredSpace = activeTupleBuffers.size() * schemaSize;
- int reserved = Math.min(desiredSpace, Math.max(2*schemaSize, this.bufferManager.getMaxProcessingKB()));
+ int reserved = Math.min(desiredSpace, Math.max(2*schemaSize, this.bufferManager.getMaxProcessingSize()));
bufferManager.reserveBuffers(reserved, BufferReserveMode.FORCE);
if (desiredSpace > reserved) {
reserved += bufferManager.reserveAdditionalBuffers(desiredSpace - reserved);
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestLrfuEvictionQueue.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestLrfuEvictionQueue.java 2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestLrfuEvictionQueue.java 2011-10-25 21:41:08 UTC (rev 3584)
@@ -33,9 +33,9 @@
@Test public void testPrecision() {
LrfuEvictionQueue<?> q = new LrfuEvictionQueue<BaseCacheEntry>(new AtomicLong());
- float value = 0;
+ double value = 0;
for (long i = Integer.MAX_VALUE; i < 10l + Integer.MAX_VALUE; i++) {
- float valueNext = q.computeNextOrderingValue(i, (int)i-1, value);
+ double valueNext = q.computeNextOrderingValue(i, i-1, value);
assertTrue(valueNext > value);
value = valueNext;
}
Modified: trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java 2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java 2011-10-25 21:41:08 UTC (rev 3584)
@@ -54,7 +54,7 @@
private void helpTestSort(List elements, List[] data, List sortElements, List sortTypes, List[] expected, Mode mode) throws TeiidComponentException, TeiidProcessingException {
BufferManagerImpl mgr = BufferManagerFactory.getTestBufferManager(10000, BATCH_SIZE, BATCH_SIZE);
- int reserve = mgr.getReserveBatchKB();
+ long reserve = mgr.getReserveBatchBytes();
CommandContext context = new CommandContext ("pid", "test", null, null, 1); //$NON-NLS-1$ //$NON-NLS-2$
BlockingFakeRelationalNode dataNode = new BlockingFakeRelationalNode(2, data);
@@ -89,7 +89,7 @@
}
}
assertEquals(expected.length, currentRow - 1);
- assertEquals(reserve, mgr.getReserveBatchKB());
+ assertEquals(reserve, mgr.getReserveBatchBytes());
}
/*
Modified: trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java 2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java 2011-10-25 21:41:08 UTC (rev 3584)
@@ -94,7 +94,7 @@
svc.start();
BufferManager mgr = svc.getBufferManager();
- assertEquals(3285, mgr.getSchemaSize(schema));
+ assertEquals(3364096, mgr.getSchemaSize(schema));
assertEquals(128, mgr.getProcessorBatchSize(schema));
}
More information about the teiid-commits
mailing list