[teiid-commits] teiid SVN: r3584 - in trunk: engine/src/main/java/org/teiid/common/buffer/impl and 4 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Oct 25 17:41:08 EDT 2011


Author: shawkins
Date: 2011-10-25 17:41:08 -0400 (Tue, 25 Oct 2011)
New Revision: 3584

Modified:
   trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestLrfuEvictionQueue.java
   trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java
   trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
Log:
TEIID-1750 switching back to using 64 bit CacheKey info and switching buffermanager to internally account in bytes rather than KB and to track batch overhead

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java	2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java	2011-10-25 21:41:08 UTC (rev 3584)
@@ -88,11 +88,11 @@
     throws TeiidComponentException;
 	
 	/**
-	 * Return the maximum KB that can be temporarily held potentially 
+	 * Return the max that can be temporarily held potentially 
 	 * across even a blocked exception.
 	 * @return
 	 */
-    int getMaxProcessingKB();
+    int getMaxProcessingSize();
     
     /**
      * Creates a new {@link FileStore}.  See {@link FileStore#setCleanupReference(Object)} to
@@ -117,7 +117,7 @@
     void releaseBuffers(int count);
     
     /**
-     * Get the size estimate in KB for the given schema.
+     * Get the size estimate for the given schema.
      */
     int getSchemaSize(List<? extends Expression> elements);
     
@@ -128,7 +128,7 @@
 	TupleBuffer getTupleBuffer(String id);
 
 	/**
-	 * Set the maxActivePlans as a hint at determining the maxProcessingKB
+	 * Set the maxActivePlans as a hint at determining the maxProcessing
 	 * @param maxActivePlans
 	 */
 	void setMaxActivePlans(int maxActivePlans);

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java	2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java	2011-10-25 21:41:08 UTC (rev 3584)
@@ -90,6 +90,6 @@
 	 * @param gid
 	 * @param id
 	 */
-	void remove(Long gid, Long id);
+	boolean remove(Long gid, Long id);
 	
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java	2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java	2011-10-25 21:41:08 UTC (rev 3584)
@@ -25,10 +25,10 @@
 public class CacheKey implements Comparable<CacheKey> {
 
 	private Long id;
-	protected int lastAccess;
-	protected float orderingValue;
+	protected long lastAccess;
+	protected double orderingValue;
 	
-	public CacheKey(Long id, int lastAccess, float orderingValue) {
+	public CacheKey(Long id, long lastAccess, double orderingValue) {
 		this.id = id;
 		this.lastAccess = lastAccess;
 		this.orderingValue = orderingValue;
@@ -59,11 +59,11 @@
 		return this.id.equals(((CacheKey)obj).getId());
 	}
 
-	public int getLastAccess() {
+	public long getLastAccess() {
 		return lastAccess;
 	}
 	
-	public float getOrderingValue() {
+	public double getOrderingValue() {
 		return orderingValue;
 	}
 	
@@ -71,7 +71,7 @@
 	public int compareTo(CacheKey o) {
 		int result = (int) Math.signum(orderingValue - o.orderingValue);
 		if (result == 0) {
-			result = (int)Math.signum((lastAccess&0xffffffffl) - (o.lastAccess&0xffffffffl));
+			result = (int)Math.signum(lastAccess - o.lastAccess);
 			if (result == 0) {
 				return Long.signum(id - o.id);
 			}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-25 21:41:08 UTC (rev 3584)
@@ -109,6 +109,7 @@
 	static final int LOG_INODE_SIZE = 6;
 	static final int DIRECT_POINTERS = 14;
 	static final int EMPTY_ADDRESS = -1;
+	static final int FREED = -2;
 	
 	//TODO allow the block size to be configurable. 8k is a reasonable default up to a gig, but we could be more efficient with larger blocks from there.
 	//the rationale for a smaller block size is to reduce internal fragmentation, which is critical when maintaining a relatively small buffer < 256MB
@@ -290,16 +291,16 @@
 			}
 			inodesInuse.clear(inode);
 			if (!freedAll || indirectIndexBlock == EMPTY_ADDRESS) {
-				return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
+				return acquire?dataBlockToAcquire:FREED;
 			}
 			freedAll = freeIndirectBlock(indirectIndexBlock);
 			if (!freedAll || doublyIndirectIndexBlock == EMPTY_ADDRESS) {
-				return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
+				return acquire?dataBlockToAcquire:FREED;
 			}
 			bb = blockByteBuffer.getByteBuffer(doublyIndirectIndexBlock).slice();
 			freeBlock(0, bb, ADDRESSES_PER_BLOCK, false);
 			freeDataBlock(doublyIndirectIndexBlock);
-			return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
+			return acquire?dataBlockToAcquire:FREED;
 		}
 
 		private boolean freeIndirectBlock(int indirectIndexBlock) {
@@ -787,7 +788,7 @@
 	private boolean shouldPlaceInMemoryBuffer(long currentTime, PhysicalInfo info) {
 		PhysicalInfo lowest = memoryBufferEntries.firstEntry(false);
 		CacheKey key = info.getKey();
-		return (blocksInuse.getTotalBits() - blocksInuse.getBitsSet()) > (criticalCleaningThreshold + info.memoryBlockCount)
+		return (blocksInuse.getTotalBits() - blocksInuse.getBitsSet()) > (cleaningThreshold + info.memoryBlockCount)
 				|| (lowest != null && lowest.block != EMPTY_ADDRESS 
 						&& lowest.getKey().getOrderingValue() < (currentTime>0?memoryBufferEntries.computeNextOrderingValue(currentTime, key.getLastAccess(), key.getOrderingValue()):key.getOrderingValue()));
 	}
@@ -816,13 +817,20 @@
 	}
 	
 	@Override
-	public void remove(Long gid, Long id) {
+	public boolean remove(Long gid, Long id) {
 		Map<Long, PhysicalInfo> map = physicalMapping.get(gid);
 		if (map == null) {
-			return;
+			return false;
 		}
-		PhysicalInfo info = map.remove(id);
+		PhysicalInfo info = null;
+		boolean result = false;
+		synchronized (map) {
+			int size = map.size();
+			info = map.remove(id);
+			result = size != map.size();
+		}
 		free(info, false, false);
+		return result;
 	}
 
 	@Override
@@ -850,7 +858,7 @@
 			return EMPTY_ADDRESS;
 		}
 		Long oid = info.getId();
-		int result = EMPTY_ADDRESS;
+		int result = FREED;
 		BlockManager bm = null;
 		int block = EMPTY_ADDRESS;
 		int memoryBlockCount;
@@ -948,7 +956,7 @@
 	 */
 	int evictFromMemoryBuffer(boolean acquire) {
 		boolean writeLocked = false;
-		int next = -1;
+		int next = EMPTY_ADDRESS;
 		try {
 			for (int i = 0; i < EVICTION_SCANS && next == EMPTY_ADDRESS; i++) {
 				//doing a cleanup may trigger the purging of resources
@@ -981,13 +989,10 @@
 						info.evicting = true;
 					}
 					next = free(info, true, acquire);
-					if (!acquire) {
-						next = 0; //let the cleaner know that we made progress
-					}
 					break;
 				}
 			} 
-			if (acquire && next == -1) {
+			if (acquire && next == EMPTY_ADDRESS) {
 				throw new AssertionError("Could not free space for pending write"); //$NON-NLS-1$
 			}
 		} finally {

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-25 21:41:08 UTC (rev 3584)
@@ -91,6 +91,7 @@
 	 * little is reserved.
 	 */
 	private static final class Cleaner extends TimerTask {
+		private static final int MAX_READ_AGE = 1<<28;
 		WeakReference<BufferManagerImpl> bufferRef;
 		
 		public Cleaner(BufferManagerImpl bufferManagerImpl) {
@@ -106,25 +107,22 @@
 					return;
 				}
 				boolean agingOut = false;
-				if (impl.reserveBatchKB.get() < impl.maxReserveKB.get()*.9 || impl.activeBatchKB.get() < impl.maxReserveKB.get()*.7) {
+				if (impl.reserveBatchBytes.get() < impl.maxReserveBytes.get()*.9 || impl.activeBatchBytes.get() < impl.maxReserveBytes.get()*.7) {
 					CacheEntry entry = impl.evictionQueue.firstEntry(false);
 					if (entry == null) {
 						return;
 					}
 					//we aren't holding too many memory entries, ensure that
 					//entries aren't old
-					int lastAccess = 0x1fffffff&entry.getKey().getLastAccess();
-					int currentTime = 0x1fffffff&(int)impl.readAttempts.get();
-					if (lastAccess > currentTime) {
-						currentTime += 1<<29;
-					}
-					if (currentTime - lastAccess < 1<<28) {
+					long lastAccess = entry.getKey().getLastAccess();
+					long currentTime = impl.readAttempts.get();
+					if (currentTime - lastAccess < MAX_READ_AGE) {
 						return;
 					}
 					agingOut = true;
 				}
 				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
-					LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Asynch eviction run", impl.reserveBatchKB.get(), impl.maxReserveKB.get(), impl.activeBatchKB.get()); //$NON-NLS-1$
+					LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Asynch eviction run", impl.reserveBatchBytes.get(), impl.maxReserveBytes.get(), impl.activeBatchBytes.get()); //$NON-NLS-1$
 				}
 				impl.doEvictions(0, false);
 				if (!agingOut) {
@@ -138,6 +136,11 @@
 		}
 	}
 
+	/**
+	 * This estimate is based upon adding the value to 2/3 maps and having CacheEntry/PhysicalInfo keys
+	 */
+	private static final int BATCH_OVERHEAD = 128;
+	
 	final class BatchManagerImpl implements BatchManager, Serializer<List<? extends List<?>>> {
 		final Long id;
 		SizeUtility sizeUtility;
@@ -197,6 +200,9 @@
 				throws TeiidComponentException {
 			int sizeEstimate = getSizeEstimate(batch);
 			Long oid = batchAdded.getAndIncrement();
+			if (oid.longValue() == 56) {
+				this.toString();
+			}
 			CacheEntry old = null;
 			if (previous != null) {
 				if (removeOld) {
@@ -210,6 +216,8 @@
 			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
 				LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", ce.getId(), "with size estimate", ce.getSizeEstimate()); //$NON-NLS-1$ //$NON-NLS-2$
 			}
+			maxReserveBytes.addAndGet(-BATCH_OVERHEAD);
+			reserveBatchBytes.addAndGet(-BATCH_OVERHEAD);
 			cache.addToCacheGroup(id, ce.getId());
 			addMemoryEntry(ce, true);
 			return oid;
@@ -252,7 +260,7 @@
 		}
 		
 		public int getSizeEstimate(List<? extends List<?>> obj) {
-			return (int) Math.max(1, sizeUtility.getBatchSize(DataTypeManager.isValueCacheEnabled(), obj) / 1024);
+			return (int) Math.max(1, sizeUtility.getBatchSize(DataTypeManager.isValueCacheEnabled(), obj));
 		}
 		
 		@SuppressWarnings("unchecked")
@@ -284,9 +292,8 @@
 					throw new AssertionError("Batch not found in storage " + batch); //$NON-NLS-1$
 				}
 				if (!retain) {
-					cache.remove(this.id, batch);
-				}
-				if (retain) {
+					removeFromCache(this.id, batch);
+				} else {
 					addMemoryEntry(ce, false);
 				}
 			} finally {
@@ -294,10 +301,9 @@
 			}
 			return (List<List<?>>)ce.getObject();
 		}
-		
+
 		@Override
 		public void remove(Long batch) {
-			cleanSoftReferences();
 			BufferManagerImpl.this.remove(id, batch, prefersMemory.get());
 		}
 
@@ -333,10 +339,10 @@
     private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
     private int processorBatchSize = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE;
     //set to acceptable defaults for testing
-    private int maxProcessingKB = 1 << 11; 
-    private Integer maxProcessingKBOrig;
-    AtomicInteger maxReserveKB = new AtomicInteger(1 << 18);
-    AtomicInteger reserveBatchKB = new AtomicInteger();
+    private int maxProcessingBytes = 1 << 21; 
+    private Integer maxProcessingBytesOrig;
+    AtomicLong maxReserveBytes = new AtomicLong(1 << 28);
+    AtomicLong reserveBatchBytes = new AtomicLong();
     private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a hint to set the reserveBatchKB
     private boolean useWeakReferences = true;
     private boolean inlineLobs = true;
@@ -346,7 +352,7 @@
     private ReentrantLock lock = new ReentrantLock(true);
     private Condition batchesFreed = lock.newCondition();
     
-    AtomicInteger activeBatchKB = new AtomicInteger();
+    AtomicLong activeBatchBytes = new AtomicLong();
     
     private AtomicLong readAttempts = new AtomicLong();
     //TODO: consider the size estimate in the weighting function
@@ -394,13 +400,20 @@
 	
 	void clearSoftReference(BatchSoftReference bsr) {
 		synchronized (bsr) {
-			maxReserveKB.addAndGet(bsr.sizeEstimate);
-			reserveBatchKB.addAndGet(bsr.sizeEstimate);
+			maxReserveBytes.addAndGet(bsr.sizeEstimate);
+			reserveBatchBytes.addAndGet(bsr.sizeEstimate);
 			bsr.sizeEstimate = 0;
 		}
 		bsr.clear();
 	}
 	
+	void removeFromCache(Long gid, Long batch) {
+		if (cache.remove(gid, batch)) {
+			maxReserveBytes.addAndGet(BATCH_OVERHEAD);
+			reserveBatchBytes.addAndGet(BATCH_OVERHEAD);
+		}
+	}
+	
 	public long getBatchesAdded() {
 		return batchAdded.get();
 	}
@@ -418,12 +431,12 @@
 	}
 	
 	@Override
-    public int getMaxProcessingKB() {
-		return maxProcessingKB;
+	public int getMaxProcessingSize() {
+		return maxProcessingBytes;
 	}
 	
-	public int getReserveBatchKB() {
-		return reserveBatchKB.get();
+	public long getReserveBatchBytes() {
+		return reserveBatchBytes.get();
 	}
     
     /**
@@ -541,41 +554,48 @@
 	}
     
     public void setMaxProcessingKB(int maxProcessingKB) {
-		this.maxProcessingKB = maxProcessingKB;
+    	if (maxProcessingKB > -1) {
+    		this.maxProcessingBytes = maxProcessingKB<<10;
+    	} else {
+    		this.maxProcessingBytes = -1;
+    	}
 	}
     
     public void setMaxReserveKB(int maxReserveBatchKB) {
-		this.maxReserveKB.set(maxReserveBatchKB);
 		if (maxReserveBatchKB > -1) {
-			this.reserveBatchKB.set(maxReserveBatchKB);
+			int maxReserve = maxReserveBatchKB<<10;
+			this.maxReserveBytes.set(maxReserve);
+			this.reserveBatchBytes.set(maxReserve);
+		} else {
+			this.maxReserveBytes.set(-1);
 		}
 	}
     
 	@Override
 	public void initialize() throws TeiidComponentException {
-		int maxMemory = (int)Math.min(Runtime.getRuntime().maxMemory() / 1024, Integer.MAX_VALUE);
-		maxMemory = Math.max(0, maxMemory - 300 * 1024); //assume 300 megs of overhead for the AS/system stuff
+		long maxMemory = Runtime.getRuntime().maxMemory();
+		maxMemory = Math.max(0, maxMemory - (300 << 20)); //assume 300 megs of overhead for the AS/system stuff
 		if (getMaxReserveKB() < 0) {
-			this.setMaxReserveKB(0);
-			int one_gig = 1024 * 1024;
+			this.maxReserveBytes.set(0);
+			int one_gig = 1 << 30;
 			if (maxMemory > one_gig) {
 				//assume 75% of the memory over the first gig
-				this.maxReserveKB.addAndGet(((int)Math.max(0, (maxMemory - one_gig) * .75)));
+				this.maxReserveBytes.addAndGet((long)Math.max(0, (maxMemory - one_gig) * .75));
 			}
-			this.maxReserveKB.addAndGet(((int)Math.max(0, Math.min(one_gig, maxMemory) * .5)));
+			this.maxReserveBytes.addAndGet(Math.max(0, Math.min(one_gig, maxMemory) >> 1));
     	}
-		this.reserveBatchKB.set(this.getMaxReserveKB());
-		if (this.maxProcessingKBOrig == null) {
+		this.reserveBatchBytes.set(this.maxReserveBytes.get());
+		if (this.maxProcessingBytesOrig == null) {
 			//store the config value so that we can be reinitialized (this is not a clean approach)
-			this.maxProcessingKBOrig = this.maxProcessingKB;
+			this.maxProcessingBytesOrig = this.maxProcessingBytes;
 		}
-		if (this.maxProcessingKBOrig < 0) {
-			this.maxProcessingKB = Math.max(Math.min(8 * processorBatchSize, Integer.MAX_VALUE), (int)(.1 * maxMemory)/maxActivePlans);
+		if (this.maxProcessingBytesOrig < 0) {
+			this.maxProcessingBytes = (int)Math.min(Math.max(processorBatchSize * targetBytesPerRow * 8l, (.1 * maxMemory)/maxActivePlans),  Integer.MAX_VALUE);
 		} 
 		//make a guess at the max number of batches
-		int memoryBatches = maxMemory / (processorBatchSize * targetBytesPerRow / 1024);
+		long memoryBatches = maxMemory / (processorBatchSize * targetBytesPerRow);
 		//memoryBatches represents a full batch, so assume that most will be smaller
-		int logSize = 35 - Integer.numberOfLeadingZeros(memoryBatches);
+		int logSize = 67 - Long.numberOfLeadingZeros(memoryBatches);
 		if (useWeakReferences) {
 			weakReferenceCache = new WeakReferenceHashedValueCache<CacheEntry>(Math.min(30, logSize));
 		}
@@ -593,13 +613,13 @@
     	}
     	if (lock.tryLock()) {
 	    	try {
-		    	this.reserveBatchKB.addAndGet(count);
+		    	this.reserveBatchBytes.addAndGet(count);
 		    	batchesFreed.signalAll();
 	    	} finally {
 	    		lock.unlock();
 	    	}
     	} else {
-    		this.reserveBatchKB.addAndGet(count);
+    		this.reserveBatchBytes.addAndGet(count);
     	}
     }
     
@@ -616,14 +636,14 @@
 			//don't wait for more than is available
 			int waitCount = Math.min(additional, this.getMaxReserveKB() - reservedByThread.get());
 			int committed = 0;
-	    	while (waitCount > 0 && waitCount > this.reserveBatchKB.get() && committed < additional) {
-	    		int reserveBatchSample = this.reserveBatchKB.get();
+	    	while (waitCount > 0 && waitCount > this.reserveBatchBytes.get() && committed < additional) {
+	    		long reserveBatchSample = this.reserveBatchBytes.get();
 	    		try {
 					batchesFreed.await(100, TimeUnit.MILLISECONDS);
 				} catch (InterruptedException e) {
 					throw new TeiidRuntimeException(e);
 				}
-				if (reserveBatchSample >= this.reserveBatchKB.get()) {
+				if (reserveBatchSample >= this.reserveBatchBytes.get()) {
 					waitCount >>= 3;
 				} else {
 					waitCount >>= 1;
@@ -645,7 +665,7 @@
     	}
     	int result = count;
     	if (mode == BufferReserveMode.FORCE) {
-    		this.reserveBatchKB.addAndGet(-count);
+    		this.reserveBatchBytes.addAndGet(-count);
     	} else {
     		result = noWaitReserve(count, true);
     	}
@@ -656,28 +676,28 @@
 
 	private int noWaitReserve(int count, boolean allOrNothing) {
 		for (int i = 0; i < 2; i++) {
-			int reserveBatch = this.reserveBatchKB.get();
+			long reserveBatch = this.reserveBatchBytes.get();
 			if (allOrNothing && count > reserveBatch) {
 				return 0;
 			}
-			count = Math.min(count, Math.max(0, reserveBatch));
+			count = (int)Math.min(count, Math.max(0, reserveBatch));
 			if (count == 0) {
 				return 0;
 			}
-			if (this.reserveBatchKB.compareAndSet(reserveBatch, reserveBatch - count)) {
+			if (this.reserveBatchBytes.compareAndSet(reserveBatch, reserveBatch - count)) {
 				return count;
 			}
 		}
 		//the value is changing rapidly, but we've already potentially adjusted the value twice, so just proceed
-		this.reserveBatchKB.addAndGet(-count);
+		this.reserveBatchBytes.addAndGet(-count);
 		return count;
 	}
     
 	void persistBatchReferences() {
-		int activeBatch = activeBatchKB.get();
-		int reserveBatch = reserveBatchKB.get();
+		long activeBatch = activeBatchBytes.get();
+		long reserveBatch = reserveBatchBytes.get();
 		if (activeBatch <= reserveBatch) {
-    		int memoryCount = activeBatch + getMaxReserveKB() - reserveBatch;
+    		long memoryCount = activeBatch + getMaxReserveKB() - reserveBatch;
 			if (DataTypeManager.isValueCacheEnabled()) {
     			if (memoryCount < getMaxReserveKB() / 8) {
 					DataTypeManager.setValueCacheEnabled(false);
@@ -687,13 +707,13 @@
 			}
 			return;
 		}
-		int maxToFree = Math.max(maxProcessingKB>>1, reserveBatch>>3);
+		long maxToFree = Math.max(maxProcessingBytes>>1, reserveBatch>>3);
 		doEvictions(maxToFree, true);
 	}
 
-	void doEvictions(int maxToFree, boolean checkActiveBatch) {
+	void doEvictions(long maxToFree, boolean checkActiveBatch) {
 		int freed = 0;
-		while (freed <= maxToFree && (!checkActiveBatch || activeBatchKB.get() > reserveBatchKB.get() * .8)) {
+		while (freed <= maxToFree && (!checkActiveBatch || activeBatchBytes.get() > reserveBatchBytes.get() * .8)) {
 			CacheEntry ce = evictionQueue.firstEntry(true);
 			if (ce == null) {
 				break;
@@ -712,7 +732,7 @@
 				synchronized (ce) {
 					if (evicted && memoryEntries.remove(ce.getId()) != null) {
 						freed += ce.getSizeEstimate();
-						activeBatchKB.addAndGet(-ce.getSizeEstimate());
+						activeBatchBytes.addAndGet(-ce.getSizeEstimate());
 						evictionQueue.remove(ce); //ensures that an intervening get will still be cleaned
 					}
 				}
@@ -748,14 +768,17 @@
 	}
 
 	private void createSoftReference(CacheEntry ce) {
-		BatchSoftReference ref = new BatchSoftReference(ce, SOFT_QUEUE, ce.getSizeEstimate()/2);
+		//if we don't set aside some reserve, we 
+		//will push the soft ref out of memory potentially too quickly
+		int sizeEstimate = ce.getSizeEstimate()/2;
+		BatchSoftReference ref = new BatchSoftReference(ce, SOFT_QUEUE, sizeEstimate);
 		softCache.put(ce.getId(), ref);
-		maxReserveKB.addAndGet(- ce.getSizeEstimate()/2);
-		reserveBatchKB.addAndGet(- ce.getSizeEstimate()/2);
+		maxReserveBytes.addAndGet(- sizeEstimate);
+		reserveBatchBytes.addAndGet(- sizeEstimate);
 	}
 	
 	/**
-	 * Get a CacheEntry without hitting the cache
+	 * Get a CacheEntry without hitting storage
 	 */
 	CacheEntry fastGet(Long batch, boolean prefersMemory, boolean retain) {
 		CacheEntry ce = null;
@@ -807,13 +830,16 @@
 		return null;
 	}
 	
+	AtomicInteger removed = new AtomicInteger();
+	
 	CacheEntry remove(Long gid, Long batch, boolean prefersMemory) {
 		if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
 			LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Removing batch from BufferManager", batch); //$NON-NLS-1$
 		}
+		cleanSoftReferences();
 		CacheEntry ce = fastGet(batch, prefersMemory, false);
 		if (ce == null) {
-			cache.remove(gid, batch);
+			removeFromCache(gid, batch);
 		} else {
 			ce.nullOut();
 		}
@@ -822,11 +848,11 @@
 
 	private void remove(CacheEntry ce, boolean inMemory) {
 		if (inMemory) {
-			activeBatchKB.addAndGet(-ce.getSizeEstimate());
+			activeBatchBytes.addAndGet(-ce.getSizeEstimate());
 		}
 		Serializer<?> s = ce.getSerializer();
 		if (s != null) {
-			cache.remove(s.getId(), ce.getId());
+			removeFromCache(s.getId(), ce.getId());
 		}
 	}
 	
@@ -843,12 +869,15 @@
 				evictionQueue.touch(ce);
 			}
 		}
-		activeBatchKB.getAndAdd(ce.getSizeEstimate());
+		activeBatchBytes.getAndAdd(ce.getSizeEstimate());
 	}
 	
 	void removeCacheGroup(Long id, boolean prefersMemory) {
 		cleanSoftReferences();
 		Collection<Long> vals = cache.removeCacheGroup(id);
+		int overhead = vals.size() * BATCH_OVERHEAD;
+		maxReserveBytes.addAndGet(overhead);
+		reserveBatchBytes.addAndGet(overhead);
 		for (Long val : vals) {
 			//TODO: we will unnecessarily call remove on the cache, but that should be low cost
 			fastGet(val, prefersMemory, false);
@@ -905,7 +934,7 @@
 		}
 		rowCount = Math.max(1, rowCount);
 		total *= rowCount; 
-		return new int[]{rowCount, Math.max(1, total / 1024)};
+		return new int[]{rowCount, Math.max(1, total)};
 	}
 	
 	@Override
@@ -959,17 +988,13 @@
 	}
 
 	public int getMaxReserveKB() {
-		return maxReserveKB.get();
+		return (int)maxReserveBytes.get()>>10;
 	}
 	
 	public void setCache(Cache cache) {
 		this.cache = cache;
 	}
 	
-	public int getActiveBatchKB() {
-		return activeBatchKB.get();
-	}
-	
 	public int getMemoryCacheEntries() {
 		return memoryEntries.size();
 	}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java	2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java	2011-10-25 21:41:08 UTC (rev 3584)
@@ -54,7 +54,7 @@
 	
 	public LrfuEvictionQueue(AtomicLong clock) {
 		this.clock = clock;
-		setCrfLamda(.1); //smaller values tend to work better since we're using interval bounds
+		setCrfLamda(.00005); //smaller values tend to work better since we're using interval bounds
 	}
 
 	public boolean remove(V value) {
@@ -93,27 +93,21 @@
      */
 	public void recordAccess(V value) {
 		CacheKey key = value.getKey();
-		int lastAccess = key.getLastAccess();
+		long lastAccess = key.getLastAccess();
 		long currentClock = clock.get();
-		float orderingValue = key.getOrderingValue();
+		double orderingValue = key.getOrderingValue();
 		orderingValue = computeNextOrderingValue(currentClock, lastAccess,
 				orderingValue);
 		value.setKey(new CacheKey(key.getId(), (int)currentClock, orderingValue));
 	}
 
-	float computeNextOrderingValue(long currentTime,
-			int lastAccess, float orderingValue) {
-		long longLastAccess = lastAccess&0xffffffffl;
-		currentTime &= 0xffffffffl;
-		if (longLastAccess > currentTime) {
-			currentTime += (1l<<32);
-		}
-		long delta = currentTime - longLastAccess;
+	double computeNextOrderingValue(long currentTime,
+			long lastAccess, double orderingValue) {
+		long delta = currentTime - lastAccess;
 		orderingValue = 
-			(float) (//Frequency component
 			(delta<maxInterval?(delta<minInterval?minVal:Math.pow(inverseCrfLamda, delta)):0)*orderingValue
 			//recency component
-			+ Math.pow(currentTime, crfLamda));
+			+ Math.pow(currentTime, crfLamda);
 		return orderingValue;
 	}
 	

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java	2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java	2011-10-25 21:41:08 UTC (rev 3584)
@@ -155,11 +155,12 @@
 	}
 		
 	@Override
-	public void remove(Long gid, Long id) {
+	public boolean remove(Long gid, Long id) {
 		Map<Long, CacheEntry> group = groups.get(gid);
 		if (group != null) {
-			group.remove(id);
+			return group.remove(id) != null;
 		}
+		return false;
 	}
 	
 	@Override

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java	2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java	2011-10-25 21:41:08 UTC (rev 3584)
@@ -29,7 +29,7 @@
 /**
  * Represents the memory buffer and storage state of an object.
  * It is important to minimize the amount of data held here.
- * Currently should be 48 bytes.
+ * Currently should be 56 bytes.
  */
 final class PhysicalInfo extends BaseCacheEntry {
 	

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java	2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/EnhancedSortMergeJoinStrategy.java	2011-10-25 21:41:08 UTC (rev 3584)
@@ -255,7 +255,7 @@
     		return false; //index is too large
     	}
     	int schemaSize = this.joinNode.getBufferManager().getSchemaSize(other.getSource().getOutputElements());
-    	int toReserve = this.joinNode.getBufferManager().getMaxProcessingKB();
+    	int toReserve = this.joinNode.getBufferManager().getMaxProcessingSize();
     	//check if the other side can be sorted in memory
     	if (other.getRowCount() <= this.joinNode.getBatchSize() 
     			|| (possibleIndex.getRowCount() > this.joinNode.getBatchSize() && other.getRowCount()/this.joinNode.getBatchSize() < toReserve/schemaSize)) {
@@ -265,7 +265,7 @@
     	int indexSchemaSize = this.joinNode.getBufferManager().getSchemaSize(possibleIndex.getSource().getOutputElements());
     	//approximate that 1/2 of the index will be memory resident 
     	toReserve = (int)(indexSchemaSize * possibleIndex.getTupleBuffer().getRowCount() / (possibleIndex.getTupleBuffer().getBatchSize() * .5)); 
-    	if (toReserve < this.joinNode.getBufferManager().getMaxProcessingKB()) {
+    	if (toReserve < this.joinNode.getBufferManager().getMaxProcessingSize()) {
     		useIndex = true;
     	} else if (possibleIndex.getTupleBuffer().getRowCount() / this.joinNode.getBatchSize() < preferMemCutoff) {
     		useIndex = true;

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java	2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java	2011-10-25 21:41:08 UTC (rev 3584)
@@ -233,7 +233,7 @@
 		        	//attempt to reserve more working memory if there are additional rows available before blocking
 		        	if (workingTuples.size() >= maxRows) {
 	        			int reserved = bufferManager.reserveBuffers(schemaSize, 
-	        					(totalReservedBuffers + schemaSize <= bufferManager.getMaxProcessingKB())?BufferReserveMode.FORCE:BufferReserveMode.NO_WAIT);
+	        					(totalReservedBuffers + schemaSize <= bufferManager.getMaxProcessingSize())?BufferReserveMode.FORCE:BufferReserveMode.NO_WAIT);
 		        		totalReservedBuffers += reserved;
 	        			if (reserved != schemaSize) {
 		        			break;
@@ -298,7 +298,7 @@
             TupleBuffer merged = createTupleBuffer();
 
             int desiredSpace = activeTupleBuffers.size() * schemaSize;
-            int reserved = Math.min(desiredSpace, Math.max(2*schemaSize, this.bufferManager.getMaxProcessingKB()));
+            int reserved = Math.min(desiredSpace, Math.max(2*schemaSize, this.bufferManager.getMaxProcessingSize()));
             bufferManager.reserveBuffers(reserved, BufferReserveMode.FORCE);
             if (desiredSpace > reserved) {
             	reserved += bufferManager.reserveAdditionalBuffers(desiredSpace - reserved);

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestLrfuEvictionQueue.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestLrfuEvictionQueue.java	2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestLrfuEvictionQueue.java	2011-10-25 21:41:08 UTC (rev 3584)
@@ -33,9 +33,9 @@
 	
 	@Test public void testPrecision() {
 		LrfuEvictionQueue<?> q = new LrfuEvictionQueue<BaseCacheEntry>(new AtomicLong());
-		float value = 0;
+		double value = 0;
 		for (long i = Integer.MAX_VALUE; i < 10l + Integer.MAX_VALUE; i++) {
-			float valueNext = q.computeNextOrderingValue(i, (int)i-1, value);
+			double valueNext = q.computeNextOrderingValue(i, i-1, value);
 			assertTrue(valueNext > value);
 			value = valueNext;
 		}

Modified: trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java	2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java	2011-10-25 21:41:08 UTC (rev 3584)
@@ -54,7 +54,7 @@
     
     private void helpTestSort(List elements, List[] data, List sortElements, List sortTypes, List[] expected, Mode mode) throws TeiidComponentException, TeiidProcessingException {
         BufferManagerImpl mgr = BufferManagerFactory.getTestBufferManager(10000, BATCH_SIZE, BATCH_SIZE);
-        int reserve = mgr.getReserveBatchKB();
+        long reserve = mgr.getReserveBatchBytes();
         CommandContext context = new CommandContext ("pid", "test", null, null, 1);               //$NON-NLS-1$ //$NON-NLS-2$
         
         BlockingFakeRelationalNode dataNode = new BlockingFakeRelationalNode(2, data);
@@ -89,7 +89,7 @@
         	}
         }
         assertEquals(expected.length, currentRow - 1);
-        assertEquals(reserve, mgr.getReserveBatchKB());
+        assertEquals(reserve, mgr.getReserveBatchBytes());
     }
 
     /*

Modified: trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java	2011-10-25 18:25:46 UTC (rev 3583)
+++ trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java	2011-10-25 21:41:08 UTC (rev 3584)
@@ -94,7 +94,7 @@
         svc.start();
         
         BufferManager mgr = svc.getBufferManager();
-        assertEquals(3285, mgr.getSchemaSize(schema));
+        assertEquals(3364096, mgr.getSchemaSize(schema));
         assertEquals(128, mgr.getProcessorBatchSize(schema));
     }
     



More information about the teiid-commits mailing list