[teiid-commits] teiid SVN: r3555 - in trunk/engine/src: main/java/org/teiid/common/buffer/impl and 3 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Mon Oct 17 23:37:08 EDT 2011


Author: shawkins
Date: 2011-10-17 23:37:06 -0400 (Mon, 17 Oct 2011)
New Revision: 3555

Modified:
   trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
   trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java
Log:
TEIID-1750 correcting a threading issue with reads during an evict and switching to mostly non-blocking reservations  TEIID-1784 correcting inital sort incomplete reservations

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java	2011-10-17 18:49:11 UTC (rev 3554)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java	2011-10-18 03:37:06 UTC (rev 3555)
@@ -52,15 +52,11 @@
 	
 	public enum BufferReserveMode {
 		/**
-		 * Claim unused buffers up to the amount requested, using a progressive decaying algorithm
-		 */
-		WAIT,
-		/**
 		 * Claim all of the buffers requested, even if they are not available, without waiting
 		 */
 		FORCE,
 		/**
-		 * Claim unused buffers up to the amount requested witout waiting
+		 * Claim unused buffers up to the amount requested without waiting
 		 */
 		NO_WAIT
 	}
@@ -135,5 +131,12 @@
 	 * Set the maxActivePlans as a hint at determining the maxProcessingKB
 	 * @param maxActivePlans
 	 */
-	void setMaxActivePlans(int maxActivePlans);		
+	void setMaxActivePlans(int maxActivePlans);
+
+	/**
+	 * Wait for additional buffers to become available.
+	 * @param additional
+	 * @return
+	 */
+	int reserveAdditionalBuffers(int additional);		
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java	2011-10-17 18:49:11 UTC (rev 3554)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java	2011-10-18 03:37:06 UTC (rev 3555)
@@ -42,6 +42,10 @@
 		this.persistent = persistent;
 	}
 	
+	public void setObject(Object object) {
+		this.object = object;
+	}
+	
 	public int getSizeEstimate() {
 		return sizeEstimate;
 	}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java	2011-10-17 18:49:11 UTC (rev 3554)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java	2011-10-18 03:37:06 UTC (rev 3555)
@@ -34,10 +34,12 @@
 	int blockIndex;
 	ByteBuffer buf;
 	boolean done;
+	private final boolean threadSafe;
 
-	BlockInputStream(BlockManager manager, int blockCount) {
+	BlockInputStream(BlockManager manager, int blockCount, boolean threadSafe) {
 		this.manager = manager;
 		this.maxBlock = blockCount;
+		this.threadSafe = threadSafe;
 	}
 
 	@Override
@@ -56,6 +58,9 @@
 				return;
 			}
 			buf = manager.getBlock(blockIndex++);
+			if (threadSafe) {
+				buf = buf.duplicate();
+			}
 		}
 	}
 

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-17 18:49:11 UTC (rev 3554)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-18 03:37:06 UTC (rev 3555)
@@ -241,8 +241,8 @@
 					memoryEvictionLock.readLock().unlock();
 				}
 			}
-			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
-				LogManager.logTrace(LogConstants.CTX_DQP, "Allocating", data?"data":"index", "block", next, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+				LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Allocating", data?"data":"index", "block", next, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$
 			}
 			return next;
 		}
@@ -254,8 +254,8 @@
 		}
 
 		private void freeDataBlock(int dataBlock) {
-			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
-				LogManager.logTrace(LogConstants.CTX_DQP, "freeing data block", dataBlock, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+				LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "freeing data block", dataBlock, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
 			}
 			blocksInuse.clear(dataBlock);
 		}
@@ -267,8 +267,8 @@
 					if (this.inode == -1) {
 						throw new AssertionError("Out of inodes"); //$NON-NLS-1$
 					}
-					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-						LogManager.logDetail(LogConstants.CTX_DQP, "Allocating inode", this.inode, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+						LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Allocating inode", this.inode, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
 					}
 					ByteBuffer bb = getInodeBlock();
 					bb.putInt(EMPTY_ADDRESS);
@@ -288,8 +288,8 @@
 			int indirectIndexBlock = bb.getInt(BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS);
 			int doublyIndirectIndexBlock = bb.getInt(BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1));
 			boolean freedAll = freeBlock(acquire?BYTES_PER_BLOCK_ADDRESS:0, bb, DIRECT_POINTERS-(acquire?1:0), true);
-			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-				LogManager.logDetail(LogConstants.CTX_DQP, "freeing inode", inode, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "freeing inode", inode, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
 			}
 			inodesInuse.clear(inode);
 			if (!freedAll || indirectIndexBlock == EMPTY_ADDRESS) {
@@ -427,8 +427,8 @@
 	@SuppressWarnings("unchecked")
 	@Override
 	public boolean add(CacheEntry entry, Serializer s) {
-		if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-			LogManager.logDetail(LogConstants.CTX_DQP, "adding object", s.getId(), entry.getId()); //$NON-NLS-1$
+		if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+			LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "adding object", s.getId(), entry.getId()); //$NON-NLS-1$
 		}
 		boolean newEntry = false;
 		InodeBlockManager blockManager = null;
@@ -469,7 +469,7 @@
 			//proactively create freespace
 			if (!cleanerRunning.get()) {
 				if (lowBlocks(false) && cleanerRunning.compareAndSet(false, true)) {
-					LogManager.logDetail(LogConstants.CTX_DQP, "Starting memory buffer cleaner"); //$NON-NLS-1$
+					LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Starting memory buffer cleaner"); //$NON-NLS-1$
 					asynchPool.execute(cleaningTask);
 					if (lowBlocks(true)) {
 						//do a non-blocking removal before we're forced to block
@@ -563,21 +563,21 @@
 		boolean inStorage = false;
 		try {
 			synchronized (info) {
-				await(info, true, false);
+				assert !info.pinned && info.loading; //load should be locked
+				await(info, true, false); //not necessary, but should make things safer
 				if (info.inode != EMPTY_ADDRESS) {
 					info.pinned = true;
 					memoryBufferEntries.touch(info, false); 
-					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-						LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at inode", info.inode, serializer.getId(), oid); //$NON-NLS-1$
+					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+						LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Getting object at inode", info.inode, serializer.getId(), oid); //$NON-NLS-1$
 					}
 					BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
-					is = new BlockInputStream(manager, info.memoryBlockCount);
+					is = new BlockInputStream(manager, info.memoryBlockCount, info.evicting);
 				} else if (info.block != EMPTY_ADDRESS) {
-					assert !info.pinned;
 					inStorage = true;
 					storageReads.incrementAndGet();
-					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-						LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at block", info.block, info.sizeIndex, serializer.getId(), oid); //$NON-NLS-1$
+					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+						LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Getting object at block", info.block, info.sizeIndex, serializer.getId(), oid); //$NON-NLS-1$
 					}
 					BlockStore blockStore = sizeBasedStores[info.sizeIndex];
 					FileStore fs = blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
@@ -595,19 +595,17 @@
 					ExtensibleBufferedOutputStream os = new BlockOutputStream(manager);
 		            ObjectConverterUtil.write(os, is, -1);
 		            synchronized (info) {
+		            	assert !info.pinned;
 			            info.inode = manager.getInode();
 			            info.pinned = true;
 						memoryBufferEntries.touch(info, false);
+						is = new BlockInputStream(manager, info.memoryBlockCount, info.evicting);
 					}
-					is = new BlockInputStream(manager, info.memoryBlockCount);
 					success = true;
 				} finally {
 					this.memoryWritePermits.release();
 					if (!success && manager != null) {
 						manager.free(false);
-						synchronized (info) {
-							info.inode = EMPTY_ADDRESS;
-						}
 					}
 				}
 			}
@@ -707,10 +705,10 @@
 		synchronized (info) {
 			//if we're a demotion then the free flag was already checked and set 
 			if (!demote) {
-				//let a pending free finish - it would be nice if we could pre-empt
+				//let any pending finish - it would be nice if we could pre-empt
 				//since we can save some work, but this should be rare enough 
 				//to just block
-				await(info, false, true);
+				await(info, true, true);
 				info.evicting = true;
 			} else {
 				assert info.evicting;
@@ -729,11 +727,11 @@
 		try {
 			if (demote && block == EMPTY_ADDRESS) {
 				storageWrites.getAndIncrement();
-				BlockInputStream is = new BlockInputStream(bm, memoryBlockCount);
+				BlockInputStream is = new BlockInputStream(bm, memoryBlockCount, false); //we know this can always be single threaded
 				BlockStore blockStore = sizeBasedStores[sizeIndex];
 				block = getAndSetNextClearBit(blockStore);
-				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-					LogManager.logDetail(LogConstants.CTX_DQP, "Allocating storage data block", info.block, "of size", blockStore.blockSize, "to", info); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-1$
+				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+					LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Allocating storage data block", info.block, "of size", blockStore.blockSize, "to", info); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ 
 				}
 				FileStore fs = blockStore.stores[block/blockStore.blocksInUse.getBitsPerSegment()];
 				long blockOffset = (block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
@@ -748,7 +746,7 @@
 					//shouldn't happen, but we'll invalidate this write and continue
 					demote = false;
 					//just continue to free
-					LogManager.logError(LogConstants.CTX_DQP, e, "Error transferring block to storage " + oid); //$NON-NLS-1$
+					LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error transferring block to storage " + oid); //$NON-NLS-1$
 				}
 			}
 		} finally {
@@ -756,6 +754,7 @@
 			synchronized (info) {
 				//it is possible for a read to happen while evicting.
 				//that's ok, we'll just wait for it to finish
+				assert info.evicting;
 				await(info, true, false);
 				info.evicting = false;
 				info.notifyAll();
@@ -770,8 +769,8 @@
 					} else {
 						BlockStore blockStore = sizeBasedStores[info.sizeIndex];
 						blockStore.blocksInUse.clear(info.block);
-						if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
-							LogManager.logDetail(LogConstants.CTX_DQP, "Freed storage data block", info.block, "of size", blockStore.blockSize); //$NON-NLS-1$ //$NON-NLS-2$
+						if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+							LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Freed storage data block", info.block, "of size", blockStore.blockSize); //$NON-NLS-1$ //$NON-NLS-2$
 						}
 						info.block = EMPTY_ADDRESS;
 					}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-17 18:49:11 UTC (rev 3554)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-18 03:37:06 UTC (rev 3555)
@@ -84,6 +84,10 @@
  */
 public class BufferManagerImpl implements BufferManager, StorageManager {
 
+	/**
+	 * Asynch cleaner attempts to age out old entries and to reduce the memory size when 
+	 * little is reserved.
+	 */
 	private static final class Cleaner extends TimerTask {
 		WeakReference<BufferManagerImpl> bufferRef;
 		
@@ -99,7 +103,8 @@
 					this.cancel();
 					return;
 				}
-				if (impl.reserveBatchKB.get() < impl.maxReserveKB.get()*.9 || impl.activeBatchKB.get() < (impl.maxReserveKB.get()>>3)) {
+				boolean agingOut = false;
+				if (impl.reserveBatchKB.get() < impl.maxReserveKB.get()*.9 || impl.activeBatchKB.get() < impl.maxReserveKB.get()*.7) {
 					CacheEntry entry = impl.evictionQueue.firstEntry(false);
 					if (entry == null) {
 						return;
@@ -114,8 +119,19 @@
 					if (currentTime - lastAccess < 1<<28) {
 						return;
 					}
+					agingOut = true;
 				}
+				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+					LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Asynch eviction run", impl.reserveBatchKB.get(), impl.maxReserveKB.get(), impl.activeBatchKB.get()); //$NON-NLS-1$
+				}
 				impl.doEvictions(0, false);
+				if (!agingOut) {
+					try {
+						Thread.sleep(100); //we don't want to evict too fast, because the processing threads are more than capable of evicting
+					} catch (InterruptedException e) {
+						throw new TeiidRuntimeException(e);
+					}
+				}
 			}
 		}
 	}
@@ -335,6 +351,12 @@
     LrfuEvictionQueue<CacheEntry> evictionQueue = new LrfuEvictionQueue<CacheEntry>(readAttempts);
     ConcurrentHashMap<Long, CacheEntry> memoryEntries = new ConcurrentHashMap<Long, CacheEntry>(16, .75f, CONCURRENCY_LEVEL);
     
+    private ThreadLocal<Integer> reservedByThread = new ThreadLocal<Integer>() {
+    	protected Integer initialValue() {
+    		return 0;
+    	}
+    };
+    
     //limited size reference caches based upon the memory settings
     private WeakReferenceHashedValueCache<CacheEntry> weakReferenceCache; 
     private Map<Long, BatchSoftReference> softCache = Collections.synchronizedMap(new LinkedHashMap<Long, BatchSoftReference>(16, .75f, false) {
@@ -343,13 +365,12 @@
 		protected boolean removeEldestEntry(Map.Entry<Long,BatchSoftReference> eldest) {
     		if (size() > maxSoftReferences) {
     			BatchSoftReference bsr = eldest.getValue();
-    			maxReserveKB.addAndGet(bsr.sizeEstimate);
-    			reserveBatchKB.addAndGet(bsr.sizeEstimate);
-    			bsr.clear();
+    			clearSoftReference(bsr);
     			return true;
     		}
     		return false;
-    	};
+    	}
+
     });
     
     private Cache cache;
@@ -366,9 +387,18 @@
 	private static final Timer timer = new Timer("BufferManager Cleaner", true); //$NON-NLS-1$
 	
 	public BufferManagerImpl() {
-		//timer.schedule(new Cleaner(this), 30000, 30000);
+		timer.schedule(new Cleaner(this), 15000, 15000);
 	}
 	
+	void clearSoftReference(BatchSoftReference bsr) {
+		synchronized (bsr) {
+			maxReserveKB.addAndGet(bsr.sizeEstimate);
+			reserveBatchKB.addAndGet(bsr.sizeEstimate);
+			bsr.sizeEstimate = 0;
+		}
+		bsr.clear();
+	}
+	
 	public long getBatchesAdded() {
 		return batchAdded.get();
 	}
@@ -389,6 +419,10 @@
     public int getMaxProcessingKB() {
 		return maxProcessingKB;
 	}
+	
+	public int getReserveBatchKB() {
+		return reserveBatchKB.get();
+	}
     
     /**
      * Get processor batch size
@@ -551,49 +585,88 @@
     	if (count < 1) {
     		return;
     	}
+    	reservedByThread.set(reservedByThread.get() - count);
     	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
     		LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Releasing buffer space", count); //$NON-NLS-1$
     	}
+    	if (lock.tryLock()) {
+	    	try {
+		    	this.reserveBatchKB.addAndGet(count);
+		    	batchesFreed.signalAll();
+	    	} finally {
+	    		lock.unlock();
+	    	}
+    	} else {
+    		this.reserveBatchKB.addAndGet(count);
+    	}
+    }
+    
+    /**
+     * TODO: should consider other reservations by the current thread
+     */
+    @Override
+    public int reserveAdditionalBuffers(int additional) {
+    	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+    		LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reserving buffer space", additional, "WAIT"); //$NON-NLS-1$ //$NON-NLS-2$
+    	}
     	lock.lock();
     	try {
-	    	this.reserveBatchKB.addAndGet(count);
-	    	batchesFreed.signalAll();
+			//don't wait for more than is available
+			int waitCount = Math.min(additional, this.getMaxReserveKB() - reservedByThread.get());
+			int committed = 0;
+	    	while (waitCount > 0 && waitCount > this.reserveBatchKB.get() && committed < additional) {
+	    		int reserveBatchSample = this.reserveBatchKB.get();
+	    		try {
+					batchesFreed.await(100, TimeUnit.MILLISECONDS);
+				} catch (InterruptedException e) {
+					throw new TeiidRuntimeException(e);
+				}
+				if (reserveBatchSample >= this.reserveBatchKB.get()) {
+					waitCount >>= 3;
+				} else {
+					waitCount >>= 1;
+				}
+		    	int result = noWaitReserve(additional - committed);
+		    	committed += result;
+	    	}	
+	    	return committed;
     	} finally {
     		lock.unlock();
+    		persistBatchReferences();
     	}
-    }	
+    }
     
     @Override
     public int reserveBuffers(int count, BufferReserveMode mode) {
     	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
     		LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reserving buffer space", count, mode); //$NON-NLS-1$
     	}
-    	lock.lock();
-	    try {
-	    	if (mode == BufferReserveMode.WAIT) {
-	    		//don't wait for more than is available
-	    		int waitCount = Math.min(count, this.getMaxReserveKB());
-		    	while (waitCount > 0 && waitCount > this.reserveBatchKB.get()) {
-		    		try {
-						batchesFreed.await(100, TimeUnit.MILLISECONDS);
-					} catch (InterruptedException e) {
-						throw new TeiidRuntimeException(e);
-					}
-					waitCount /= 2;
-		    	}	
-	    	}
-	    	if (this.reserveBatchKB.get() >= count || mode == BufferReserveMode.FORCE) {
-		    	this.reserveBatchKB.addAndGet(-count);
-	    		return count;
-	    	}
-	    	int result = Math.max(0, this.reserveBatchKB.get());
-	    	this.reserveBatchKB.addAndGet(-result);
-	    	return result;
-	    } finally {
-    		lock.unlock();
-    		persistBatchReferences();
+    	int result = count;
+    	if (mode == BufferReserveMode.FORCE) {
+    		this.reserveBatchKB.addAndGet(-count);
+    	} else {
+    		result = noWaitReserve(count);
     	}
+    	reservedByThread.set(reservedByThread.get() + result);
+		persistBatchReferences();
+    	return result;
     }
+
+	private int noWaitReserve(int count) {
+		for (int i = 0; i < 2; i++) {
+			int reserveBatch = this.reserveBatchKB.get();
+			count = Math.min(count, Math.max(0, reserveBatch));
+			if (count == 0) {
+				return 0;
+			}
+			if (this.reserveBatchKB.compareAndSet(reserveBatch, reserveBatch - count)) {
+				return count;
+			}
+		}
+		//the value is changing rapidly, but we've already potentially adjusted the value twice, so just proceed
+		this.reserveBatchKB.addAndGet(-count);
+		return count;
+	}
     
 	void persistBatchReferences() {
 		int activeBatch = activeBatchKB.get();
@@ -706,8 +779,7 @@
 			if (bsr != null) {
 				ce = bsr.get();
 				if (ce != null) {
-					maxReserveKB.addAndGet(bsr.sizeEstimate);
-					reserveBatchKB.addAndGet(bsr.sizeEstimate);
+					clearSoftReference(bsr);
 				}
 			}
 		} else if (useWeakReferences) {
@@ -776,9 +848,7 @@
 				break;
 			}
 			softCache.remove(ref.key);
-			maxReserveKB.addAndGet(ref.sizeEstimate);
-			reserveBatchKB.addAndGet(ref.sizeEstimate);
-			ref.clear();
+			clearSoftReference(ref);
 		}
 	}
 	

Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java	2011-10-17 18:49:11 UTC (rev 3554)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java	2011-10-18 03:37:06 UTC (rev 3555)
@@ -234,10 +234,10 @@
 		        	if (workingTuples.size() >= maxRows) {
 	        			int reserved = bufferManager.reserveBuffers(schemaSize, 
 	        					(totalReservedBuffers + schemaSize <= bufferManager.getMaxProcessingKB())?BufferReserveMode.FORCE:BufferReserveMode.NO_WAIT);
+		        		totalReservedBuffers += reserved;
 	        			if (reserved != schemaSize) {
 		        			break;
 		        		} 
-		        		totalReservedBuffers += reserved;
 		        		maxRows += this.batchSize;	
 		        	}
 		            try {
@@ -301,7 +301,7 @@
             int reserved = Math.min(desiredSpace, this.bufferManager.getMaxProcessingKB());
             bufferManager.reserveBuffers(reserved, BufferReserveMode.FORCE);
             if (desiredSpace > reserved) {
-            	reserved += bufferManager.reserveBuffers(desiredSpace - reserved, BufferReserveMode.WAIT);
+            	reserved += bufferManager.reserveAdditionalBuffers(desiredSpace - reserved);
             }
             int maxSortIndex = Math.max(2, reserved / schemaSize); //always allow progress
             //release any partial excess

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java	2011-10-17 18:49:11 UTC (rev 3554)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java	2011-10-18 03:37:06 UTC (rev 3555)
@@ -131,7 +131,7 @@
 	private CacheEntry get(BufferFrontedFileStoreCache cache, Long oid,
 			Serializer<Integer> s) throws TeiidComponentException {
 		PhysicalInfo o = cache.lockForLoad(oid, s);
-		CacheEntry ce = cache.get(o, oid, s);
+		CacheEntry ce = cache.get(o, oid, new WeakReference<Serializer<?>>(s));
 		cache.unlockForLoad(o);
 		return ce;
 	}

Modified: trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java	2011-10-17 18:49:11 UTC (rev 3554)
+++ trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java	2011-10-18 03:37:06 UTC (rev 3555)
@@ -37,6 +37,7 @@
 import org.teiid.common.buffer.TupleBuffer;
 import org.teiid.common.buffer.TupleSource;
 import org.teiid.common.buffer.BufferManager.TupleSourceType;
+import org.teiid.common.buffer.impl.BufferManagerImpl;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.types.DataTypeManager;
@@ -52,7 +53,8 @@
     public static final int BATCH_SIZE = 100;
     
     private void helpTestSort(List elements, List[] data, List sortElements, List sortTypes, List[] expected, Mode mode) throws TeiidComponentException, TeiidProcessingException {
-        BufferManager mgr = BufferManagerFactory.getTestBufferManager(100, BATCH_SIZE, BATCH_SIZE);
+        BufferManagerImpl mgr = BufferManagerFactory.getTestBufferManager(10000, BATCH_SIZE, BATCH_SIZE);
+        int reserve = mgr.getReserveBatchKB();
         CommandContext context = new CommandContext ("pid", "test", null, null, 1);               //$NON-NLS-1$ //$NON-NLS-2$
         
         BlockingFakeRelationalNode dataNode = new BlockingFakeRelationalNode(2, data);
@@ -87,6 +89,7 @@
         	}
         }
         assertEquals(expected.length, currentRow - 1);
+        assertEquals(reserve, mgr.getReserveBatchKB());
     }
 
     /*



More information about the teiid-commits mailing list